Spring Data MongoDB 教程

Map-Reduce 操作

在 MongoDB 中,你可以通过使用 Map-Reduce 来查询 MongoDB,这对于批量处理、数据聚合以及查询语言不能满足你的需求时非常有用。

Spring 通过在 MongoOperations 上提供的方法来简化 Map-Reduce 操作的创建和运行,从而提供了与 MongoDB 的 Map-Reduce 的集成。

MongoOperations 可以将 Map-Reduce 操作的结果转换成 POJO,并与 Spring 的资源抽象集成。这让你可以将你的 JavaScript 文件放在文件系统、classpath、HTTP 服务器或任何其他 Spring 资源实现上,然后通过简单的 URI 风格的语法来引用 JavaScript 资源。例如:classpath:reduce.js。

注意:在文件中外部化 JavaScript 代码通常比在代码中作为 Java 字符串嵌入更值得推荐。如果你愿意,你仍然可以将 JavaScript 代码作为 Java 字符串传递,只是不推荐这么做。

示例

为了了解如何进行 Map-Reduce 操作,我们使用《Mongo DB - The Definitive Guide》一书中的一个例子。在这个例子中,我们创建了三个文档,其值分别为 [a,b]、[b,c] 和 [c,d]。每个文档中的值都与键 "x" 相关,如下例所示(假设这些文档都在一个名为 jmr1 的集合中):

{ "_id" : ObjectId("4e5ff893c0277826074ec533"), "x" : [ "a", "b" ] }
{ "_id" : ObjectId("4e5ff893c0277826074ec534"), "x" : [ "b", "c" ] }
{ "_id" : ObjectId("4e5ff893c0277826074ec535"), "x" : [ "c", "d" ] }

下面的 map 函数计算每个文档的数组中每个字母的出现次数:

function () {
   for (var i = 0; i < this.x.length; i++) {
       emit(this.x[i], 1);
   }
}

下面的 reduce 函数汇总了所有文档中每个字母的出现情况:

function (key, values) {
   var sum = 0;
   for (var i = 0; i < values.length; i++)
       sum += values[i];
   return sum;
}

运行前面的函数会生成以下集合:

{ "_id" : "a", "value" : 1 }
{ "_id" : "b", "value" : 2 }
{ "_id" : "c", "value" : 2 }
{ "_id" : "d", "value" : 1 }

假设 map 和 reduce 函数位于 map.js 和 reduce.js 中,并捆绑在你的 jar 中,因此它们在 classpath 上是可用的,你可以按如下方式运行 Map-Reduce 操作:

MapReduceResults<ValueObject> results = mongoOperations.mapReduce("jmr1", "classpath:map.js", "classpath:reduce.js", ValueObject.class);
for (ValueObject valueObject : results) {
 System.out.println(valueObject);
}

前面的例子产生了以下输出:

ValueObject [id=a, value=1.0]
ValueObject [id=b, value=2.0]
ValueObject [id=c, value=2.0]
ValueObject [id=d, value=1.0]

MapReduceResults 类实现了 Iterable,并提供了对原始输出以及时间和计数统计的访问。下面列出了 ValueObject 类的代码:

public class ValueObject {

 private String id;
 private float value;

 public String getId() {
   return id;
 }

 public float getValue() {
   return value;
 }

 public void setValue(float value) {
   this.value = value;
 }

 @Override
 public String toString() {
   return "ValueObject [id=" + id + ", value=" + value + "]";
 }
}

默认情况下,使用 INLINE 的输出类型,因此不需要指定输出集合。要指定额外的 Map-Reduce 选项,请使用一个重载方法,该方法需要一个额外的 MapReduceOptions 参数:

MapReduceResults<ValueObject> results = mongoOperations.mapReduce("jmr1", "classpath:map.js", "classpath:reduce.js",
       new MapReduceOptions().outputCollection("jmr1_out"), ValueObject.class);

可以使用静态导入功能,静态导入 import static org.springframework.data.mongodb.core.mapreduce.MapReduceOptions.options,可以用来使语法稍微紧凑一些,如下例所示:

MapReduceResults<ValueObject> results = mongoOperations.mapReduce("jmr1", "classpath:map.js", "classpath:reduce.js",
       options().outputCollection("jmr1_out"), ValueObject.class);

您还可以指定一个查询来减少输入Map-Reduce 操作的数据集。下面的例子从 Map-Reduce 操作中删除了包含 [a,b] 的文档::

Query query = new Query(where("x").ne(new String[] { "a", "b" }));
MapReduceResults<ValueObject> results = mongoOperations.mapReduce(query, "jmr1", "classpath:map.js", "classpath:reduce.js",
       options().outputCollection("jmr1_out"), ValueObject.class);

注意,你可以在查询上指定额外的 limit 和 sort 值,但你不能制定 skip 值。

完整代码

(1)配置类,代码如下:

package com.hxstrive.springdata.mongodb.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.mongodb.MongoDatabaseFactory;
import org.springframework.data.mongodb.core.MongoTemplate;

/**
* 配置 MongoTemplate
* @author hxstrive.com 2022/12/23
*/
@Slf4j
@Configuration
public class AppConfig {

   @Bean
   public MongoTemplate mongoTemplate(MongoDatabaseFactory mongoDatabaseFactory) {
       log.info("mongoTemplate({}, {})", mongoDatabaseFactory);
       return new MongoTemplate(mongoDatabaseFactory);
   }

}

application.properties 配置文件内容如下:

# Log
logging.level.root=debug

# MongoDB
spring.data.mongodb.uri=mongodb://localhost:27017/test

(2)定义实体,代码如下:

package com.hxstrive.springdata.mongodb.entity;

/**
* 实体
* @author hxstrive.com
*/
public class ValueObject {

   private String id;
   private float value;

   public String getId() {
       return id;
   }

   public float getValue() {
       return value;
   }

   public void setValue(float value) {
       this.value = value;
   }

   @Override
   public String toString() {
       return "ValueObject [id=" + id + ", value=" + value + "]";
   }
}

(3)在 resources 目录下面创建 map.js 和 reduce.js 文件,如下:

a、map.js

function () {
   for (var i = 0; i < this.x.length; i++) {
       emit(this.x[i], 1);
   }
}

b、reduce.js

function (key, values) {
   var sum = 0;
   for (var i = 0; i < values.length; i++)
       sum += values[i];
   return sum;
}

(4)测试代码,如下:

package com.hxstrive.springdata.mongodb;

import com.hxstrive.springdata.mongodb.entity.ValueObject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.mongodb.core.MongoTemplate;
import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions;
import org.springframework.data.mongodb.core.mapreduce.MapReduceResults;
import org.springframework.data.mongodb.core.query.Query;

import static org.springframework.data.mongodb.core.mapreduce.MapReduceOptions.options;
import static org.springframework.data.mongodb.core.query.Criteria.where;

/**
* Map-Reduce 简单示例
* @author hxstrive.com
*/
@SpringBootTest
class MapReduceDemo {

   @Autowired
   private MongoTemplate mongoTemplate;

   @BeforeEach
   public void init() {
       // { "_id" : ObjectId("4e5ff893c0277826074ec533"), "x" : [ "a", "b" ] }
       // { "_id" : ObjectId("4e5ff893c0277826074ec534"), "x" : [ "b", "c" ] }
       // { "_id" : ObjectId("4e5ff893c0277826074ec535"), "x" : [ "c", "d" ] }
       mongoTemplate.save("{ \"_id\" : ObjectId(\"4e5ff893c0277826074ec533\"), \"x\" : [ \"a\", \"b\" ] }","jmr1");
       mongoTemplate.save("{ \"_id\" : ObjectId(\"4e5ff893c0277826074ec534\"), \"x\" : [ \"b\", \"c\" ] }","jmr1");
       mongoTemplate.save("{ \"_id\" : ObjectId(\"4e5ff893c0277826074ec535\"), \"x\" : [ \"c\", \"d\" ] }","jmr1");
   }

   @Test
   public void mapReduce() {
       MapReduceResults<ValueObject> results = mongoTemplate.mapReduce(
               "jmr1", "classpath:map.js", "classpath:reduce.js",
               ValueObject.class);
       for (ValueObject valueObject : results) {
           System.out.println(valueObject);
       }
       // 结果:
       // Sending command '{"mapreduce": "jmr1", "map": {"$code": "function () {\r\n
       // for (var i = 0; i < this.x.length; i++) {\r\n        emit(this.x[i], 1);\r\n    }\r\n}"},
       // "reduce": {"$code": "function (key, values) {\r\n    var sum = 0;\r\n
       // for (var i = 0; i < values.length; i++)\r\n        sum += values[i];\r\n
       // return sum;\r\n}"}, "out": {"inline": 1}, "query": {},
       // "verbose": true, "$db": "test", "lsid": {"id": {"$binary": {"base64": "gzk6IXVFRkCeMxfnA5J4Cg==",
       // "subType": "04"}}}}'
       // with request id 8 to database test on connection [connectionId{localValue:3, serverValue:3}]
       // to server localhost:27017
       //
       // ValueObject [id=c, value=2.0]
       // ValueObject [id=d, value=1.0]
       // ValueObject [id=a, value=1.0]
       // ValueObject [id=b, value=2.0]
   }

   @Test
   public void mapReduce2() {
       // 使用 MapReduceOptions 指定其他选项
       MapReduceResults<ValueObject> results = mongoTemplate.mapReduce(
               "jmr1", "classpath:map.js", "classpath:reduce.js",
               new MapReduceOptions().outputCollection("jmr1_out"), ValueObject.class);
       for (ValueObject valueObject : results) {
           System.out.println(valueObject);
       }
       // 结果:
       // ValueObject [id=b, value=2.0]
       // ValueObject [id=a, value=1.0]
       // ValueObject [id=c, value=2.0]
       // ValueObject [id=d, value=1.0]
   }

   @Test
   public void mapReduce3() {
       // 指定一个查询来减少输入Map-Reduce 操作的数据集
       Query query = new Query(where("x").ne(new String[] { "a", "b" }));
       MapReduceResults<ValueObject> results = mongoTemplate.mapReduce(query,
               "jmr1", "classpath:map.js", "classpath:reduce.js",
               options().outputCollection("jmr1_out"), ValueObject.class);
       for (ValueObject valueObject : results) {
           System.out.println(valueObject);
       }
       // 结果:
       // ValueObject [id=d, value=1.0]
       // ValueObject [id=c, value=2.0]
       // ValueObject [id=b, value=1.0]
   }

}
说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号