MongoDB Map-Reduce实例2

本文将通过实例介绍怎样运行 Map-Reduce

在 mongo shell 中,db.collection.mapReduce() 方法是 mapReduce 命令的包装。下面的示例使用 db.collection.mapReduce() 方法:

准备数据

使用以下文档创建一个简单的 orders 集合:

db.orders.insertMany([
   { _id: 1, cust_id: "Ant O. Knee", ord_date: new Date("2020-03-01"), price: 25, items: [ { sku: "oranges", qty: 5, price: 2.5 }, { sku: "apples", qty: 5, price: 2.5 } ], status: "A" },
   { _id: 2, cust_id: "Ant O. Knee", ord_date: new Date("2020-03-08"), price: 70, items: [ { sku: "oranges", qty: 8, price: 2.5 }, { sku: "chocolates", qty: 5, price: 10 } ], status: "A" },
   { _id: 3, cust_id: "Busby Bee", ord_date: new Date("2020-03-08"), price: 50, items: [ { sku: "oranges", qty: 10, price: 2.5 }, { sku: "pears", qty: 10, price: 2.5 } ], status: "A" },
   { _id: 4, cust_id: "Busby Bee", ord_date: new Date("2020-03-18"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" },
   { _id: 5, cust_id: "Busby Bee", ord_date: new Date("2020-03-19"), price: 50, items: [ { sku: "chocolates", qty: 5, price: 10 } ], status: "A"},
   { _id: 6, cust_id: "Cam Elot", ord_date: new Date("2020-03-19"), price: 35, items: [ { sku: "carrots", qty: 10, price: 1.0 }, { sku: "apples", qty: 10, price: 2.5 } ], status: "A" },
   { _id: 7, cust_id: "Cam Elot", ord_date: new Date("2020-03-20"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" },
   { _id: 8, cust_id: "Don Quis", ord_date: new Date("2020-03-20"), price: 75, items: [ { sku: "chocolates", qty: 5, price: 10 }, { sku: "apples", qty: 10, price: 2.5 } ], status: "A" },
   { _id: 9, cust_id: "Don Quis", ord_date: new Date("2020-03-20"), price: 55, items: [ { sku: "carrots", qty: 5, price: 1.0 }, { sku: "apples", qty: 10, price: 2.5 }, { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" },
   { _id: 10, cust_id: "Don Quis", ord_date: new Date("2020-03-23"), price: 25, items: [ { sku: "oranges", qty: 10, price: 2.5 } ], status: "A" }
])

用每项平均数量计算订单和总数量

在本例中,您将对 orders 集合中所有 ord_date 值大于或等于 2020-03-01 的文档执行 map-reduce 操作。操作按 item.sku 字段进行分组,并计算每个 sku 的订单数量和订购总量。然后,操作计算每个 sku 值的每个订单的平均数量,并将结果合并到输出集合中。合并结果时,如果现有文档具有与新结果相同的键,则操作将覆盖现有文档。如果没有具有相同 key 的现有文档,则操作将插入该文档。

(1)定义 map 函数以处理每个输入文档:

    • 在函数中,this 表示 map-reduce 操作正在处理的文档。

    • 对于每个项目,该函数将 sku 与一个新对象值相关联,该对象值包含订单的计数1和项目数量 qty,并发出 sku 和 value 对。

 var mapFunction2 = function() {
    for (var idx = 0; idx < this.items.length; idx++) {
       var key = this.items[idx].sku;
       var value = { count: 1, qty: this.items[idx].qty };
       emit(key, value);
    }
};

(2)使用两个参数 --keySKU 和 --countObjVals 定义相应的 reduce 函数:

    • countObjVals 是一个数组,其元素是由 map 函数传递给 reducer 函数的,每一个元素包含 qty 和 count 字段。

    • 该函数将 countObjVals 数组简化为一个包含 count 和 qty 字段的 reducedValue 对象。

    • 在 reducedVal 中,count 字段包含来自单个数组元素的 count 字段的和,而 qty 字段包含来自单个数组元素的 qty 字段的和。

var reduceFunction2 = function(keySKU, countObjVals) {
   reducedVal = { count: 0, qty: 0 };
   for (var idx = 0; idx < countObjVals.length; idx++) {
       reducedVal.count += countObjVals[idx].count;
       reducedVal.qty += countObjVals[idx].qty;
   }
   return reducedVal;
};

(3)定义一个带有两个参数 key 和 reducedVal 的 finalize 函数。函数修改 reducedVal 对象以添加名为 avg 的计算字段,并返回修改后的对象:

var finalizeFunction2 = function (key, reducedVal) {
  reducedVal.avg = reducedVal.qty/reducedVal.count;
  return reducedVal;
};

(4)使用 mapFunction2、reduceFunction2 和 finalizeFunction2 函数对 orders 集合执行 map-reduce 操作。

db.orders.mapReduce(
   mapFunction2,
   reduceFunction2,
   {
     out: { merge: "map_reduce_example2" },
     query: { ord_date: { $gte: new Date("2020-03-01") } },
     finalize: finalizeFunction2
   }
 );

此操作使用 query 字段仅选择那些 ord_date 大于或等于新 Date("2020-03-01") 的文档。然后将结果输出到一个集合 map_reduce_example2。

如果 map_reduce_example2 集合已经存在,那么该操作将把现有的内容与这个 map-reduce 操作的结果合并起来。也就是说,如果现有文档具有与新结果相同的键,则操作将覆盖现有文档。如果没有具有相同键的现有文档,则操作将插入该文档。

(5)查询 map_reduce_example2 集合去验证结果:

db.map_reduce_example2.find().sort( { _id: 1 } )

操作返回以下文档:

{ "_id" : "apples", "value" : { "count" : 3, "qty" : 30, "avg" : 10 } }
{ "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } }
{ "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } }
{ "_id" : "oranges", "value" : { "count" : 6, "qty" : 58, "avg" : 9.666666666666666 } }
{ "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }

实例:将上面讲解的 map 和 reduce 函数放入到 map-reduce-demo2.js 文件中,然后使用 mongo 命令去运行脚本。脚本内容如下:

// 权限验证
db.auth("test", "123456");
// Map函数
var mapFunction2 = function() {
    for (var idx = 0; idx < this.items.length; idx++) {
       var key = this.items[idx].sku;
       var value = { count: 1, qty: this.items[idx].qty };
       emit(key, value);
    }
};
// Reduce函数
var reduceFunction2 = function(keySKU, countObjVals) {
    reducedVal = { count: 0, qty: 0 };
    for (var idx = 0; idx < countObjVals.length; idx++) {
        reducedVal.count += countObjVals[idx].count;
        reducedVal.qty += countObjVals[idx].qty;
    }
    return reducedVal;
};
// finalize函数
var finalizeFunction2 = function (key, reducedVal) {
    reducedVal.avg = reducedVal.qty/reducedVal.count;
    return reducedVal;
};
// 执行 map-reduce 操作
db.orders.mapReduce(
    mapFunction2,
    reduceFunction2,
    {
      out: { merge: "map_reduce_example2" },
      query: { ord_date: { $gte: new Date("2020-03-01") } },
      finalize: finalizeFunction2
    }
);
// 输出结果
var results = db.map_reduce_example2.find().sort({ _id: 1 });
while(results.hasNext()) {
    printjson( results.next() );
}

执行 “mongo .map-reduce-demo2.js” 命令,去运行脚本。输出内容如下:

D:MongoDB-Scriptmap-reduce> mongo .map-reduce-demo2.js
MongoDB shell version v4.0.2-143-g7ea530946f
connecting to:mongodb:Implicit session:session{"id":UUID("04527daf-44dd-439b-a243-104d1a109b40")}
MongoDB server version:4.0.2-143-g7ea530946f
{"_id":"apples","value":{"count":4,"qty":35,"avg":8.75}}
{"_id":"carrots","value":{"count":2,"qty":15,"avg":7.5}}
{"_id":"chocolates","value":{"count":3,"qty":15,"avg":5}}
{"_id":"oranges","value":{"count":7,"qty":63,"avg":9}}
{"_id":"pears","value":{"count":1,"qty":10,"avg":10}}

聚合替代

使用可用的聚合管道操作符,您可以在不定义自定义函数的情况下重写 map-reduce 操作:

db.orders.aggregate( [
   { $match: { ord_date: { $gte: new Date("2020-03-01") } } },
   { $unwind: "$items" },
   { $group: { _id: "$items.sku", qty: { $sum: "$items.qty" }, orders_ids: { $addToSet: "$_id" } }  },
   { $project: { value: { count: { $size: "$orders_ids" }, qty: "$qty", avg: { $divide: [ "$qty", { $size: "$orders_ids" } ] } } } },
   { $merge: { into: "agg_alternative_3", on: "_id", whenMatched: "replace",  whenNotMatched: "insert" } }
] )

(1)$match 阶段只选择那些 ord_date 大于或等于新 Date("2020-03-01") 的文档。

(2)$unwinds 阶段通过 items 数组字段分解文档,为每个数组元素输出一个文档。例如:

{ "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 5, "price" : 2.5 }, "status" : "A" }
{ "_id" : 1, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-01T00:00:00Z"), "price" : 25, "items" : { "sku" : "apples", "qty" : 5, "price" : 2.5 }, "status" : "A" }
{ "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "oranges", "qty" : 8, "price" : 2.5 }, "status" : "A" }
{ "_id" : 2, "cust_id" : "Ant O. Knee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 70, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" }
{ "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" }
{ "_id" : 3, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-08T00:00:00Z"), "price" : 50, "items" : { "sku" : "pears", "qty" : 10, "price" : 2.5 }, "status" : "A" }
{ "_id" : 4, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-18T00:00:00Z"), "price" : 25, "items" : { "sku" : "oranges", "qty" : 10, "price" : 2.5 }, "status" : "A" }
{ "_id" : 5, "cust_id" : "Busby Bee", "ord_date" : ISODate("2020-03-19T00:00:00Z"), "price" : 50, "items" : { "sku" : "chocolates", "qty" : 5, "price" : 10 }, "status" : "A" }
...

(3)$group 阶段按 items.sku 分组,计算每个 sku:

    • qty 字段。qty字段包含每个item .sku 订购的总数量。

    • orders_ids 数组。orders_ids 字段包含一个不同的 order _id 数组,其中 order _id 是针对 item .sku 的。

{ "_id" : "chocolates", "qty" : 15, "orders_ids" : [ 2, 5, 8 ] }
{ "_id" : "oranges", "qty" : 63, "orders_ids" : [ 4, 7, 3, 2, 9, 1, 10 ] }
{ "_id" : "carrots", "qty" : 15, "orders_ids" : [ 6, 9 ] }
{ "_id" : "apples", "qty" : 35, "orders_ids" : [ 9, 8, 1, 6 ] }
{ "_id" : "pears", "qty" : 10, "orders_ids" : [ 3 ] }

(4)$project 阶段对输出文档进行整形,以将 map-reduce 的输出映射为两个字段 _id 和 value。$project 设置:

    • value.count 记录 orders_ids 数组大小。

    • value.qty 为输入文档 qty 字段的值。

    • value.avg 为每个订单的平均数量。

{ "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } }
{ "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }
{ "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } }
{ "_id" : "oranges", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } }
{ "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } }

(5)最后,$merge 将输出写入集合 agg_Alternative_3。如果现有文档具有与新结果相同的键_id,则操作将覆盖现有文档。如果没有具有相同键的现有文档,则操作将插入该文档。

(6)查询 agg_alternative_3 集合验证结果:

db.agg_alternative_3.find().sort( { _id: 1 } )

操作返回以下文档:

{ "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } }
{ "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } }
{ "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } }
{ "_id" : "oranges", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } }
{ "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }

实例:我们可以使用聚合操作符替换 map-reduce 操作,如下:

C:UsersAdministrator> mongo
MongoDB shell version v4.0.2-143-g7ea530946f
connecting to: mongodb://127.0.0.1:27017
Implicit session: session { "id" : UUID("b7db8e50-c7d7-4d9b-983e-b4e2400298aa") }
MongoDB server version: 4.2.6
> db.orders.aggregate([
...    { $match: { ord_date: { $gte: new Date("2020-03-01") } } },
...    { $unwind: "$items" },
...    { $group: { _id: "$items.sku", qty: { $sum: "$items.qty" }, orders_ids: { $addToSet: "$_id" } }  },
...    { $project: { value: { count: { $size: "$orders_ids" }, qty: "$qty", avg: { $divide: [ "$qty", { $size: "$orders_ids" } ] } } } },
...    { $merge: { into: "agg_alternative_3", on: "_id", whenMatched: "replace",  whenNotMatched: "insert" } }
... ]);
> db.agg_alternative_3.find()
{ "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }
{ "_id" : "oranges", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } }
{ "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } }
{ "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } }
{ "_id" : "chocolates", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } }
>

注意:$merge 阶段操作符是在 MongoDB 4.2 版本引入的。

沉浸于现实的忙碌之中,没有时间和精力思念过去,成功也就不会太远了。——雷音
0 不喜欢
说说我的看法 -
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号