在 MongoDB 中,你可以通过使用 Map-Reduce 来查询 MongoDB,这对于批量处理、数据聚合以及查询语言不能满足你的需求时非常有用。
Spring 通过在 MongoOperations 上提供的方法来简化 Map-Reduce 操作的创建和运行,从而提供了与 MongoDB 的 Map-Reduce 的集成。
MongoOperations 可以将 Map-Reduce 操作的结果转换成 POJO,并与 Spring 的资源抽象集成。这让你可以将你的 JavaScript 文件放在文件系统、classpath、HTTP 服务器或任何其他 Spring 资源实现上,然后通过简单的 URI 风格的语法来引用 JavaScript 资源。例如:classpath:reduce.js。
注意:在文件中外部化 JavaScript 代码通常比在代码中作为 Java 字符串嵌入更值得推荐。如果你愿意,你仍然可以将 JavaScript 代码作为 Java 字符串传递,只是不推荐这么做。
为了了解如何进行 Map-Reduce 操作,我们使用《Mongo DB - The Definitive Guide》一书中的一个例子。在这个例子中,我们创建了三个文档,其值分别为 [a,b]、[b,c] 和 [c,d]。每个文档中的值都与键 "x" 相关,如下例所示(假设这些文档都在一个名为 jmr1 的集合中):
{ "_id" : ObjectId("4e5ff893c0277826074ec533"), "x" : [ "a", "b" ] } { "_id" : ObjectId("4e5ff893c0277826074ec534"), "x" : [ "b", "c" ] } { "_id" : ObjectId("4e5ff893c0277826074ec535"), "x" : [ "c", "d" ] }
下面的 map 函数计算每个文档的数组中每个字母的出现次数:
function () { for (var i = 0; i < this.x.length; i++) { emit(this.x[i], 1); } }
下面的 reduce 函数汇总了所有文档中每个字母的出现情况:
function (key, values) { var sum = 0; for (var i = 0; i < values.length; i++) sum += values[i]; return sum; }
运行前面的函数会生成以下集合:
{ "_id" : "a", "value" : 1 } { "_id" : "b", "value" : 2 } { "_id" : "c", "value" : 2 } { "_id" : "d", "value" : 1 }
假设 map 和 reduce 函数位于 map.js 和 reduce.js 中,并捆绑在你的 jar 中,因此它们在 classpath 上是可用的,你可以按如下方式运行 Map-Reduce 操作:
MapReduceResults<ValueObject> results = mongoOperations.mapReduce("jmr1", "classpath:map.js", "classpath:reduce.js", ValueObject.class); for (ValueObject valueObject : results) { System.out.println(valueObject); }
前面的例子产生了以下输出:
ValueObject [id=a, value=1.0] ValueObject [id=b, value=2.0] ValueObject [id=c, value=2.0] ValueObject [id=d, value=1.0]
MapReduceResults 类实现了 Iterable,并提供了对原始输出以及时间和计数统计的访问。下面列出了 ValueObject 类的代码:
public class ValueObject { private String id; private float value; public String getId() { return id; } public float getValue() { return value; } public void setValue(float value) { this.value = value; } @Override public String toString() { return "ValueObject [id=" + id + ", value=" + value + "]"; } }
默认情况下,使用 INLINE 的输出类型,因此不需要指定输出集合。要指定额外的 Map-Reduce 选项,请使用一个重载方法,该方法需要一个额外的 MapReduceOptions 参数:
MapReduceResults<ValueObject> results = mongoOperations.mapReduce("jmr1", "classpath:map.js", "classpath:reduce.js", new MapReduceOptions().outputCollection("jmr1_out"), ValueObject.class);
可以使用静态导入功能,静态导入 import static org.springframework.data.mongodb.core.mapreduce.MapReduceOptions.options,可以用来使语法稍微紧凑一些,如下例所示:
MapReduceResults<ValueObject> results = mongoOperations.mapReduce("jmr1", "classpath:map.js", "classpath:reduce.js", options().outputCollection("jmr1_out"), ValueObject.class);
您还可以指定一个查询来减少输入Map-Reduce 操作的数据集。下面的例子从 Map-Reduce 操作中删除了包含 [a,b] 的文档::
Query query = new Query(where("x").ne(new String[] { "a", "b" })); MapReduceResults<ValueObject> results = mongoOperations.mapReduce(query, "jmr1", "classpath:map.js", "classpath:reduce.js", options().outputCollection("jmr1_out"), ValueObject.class);
注意,你可以在查询上指定额外的 limit 和 sort 值,但你不能制定 skip 值。
(1)配置类,代码如下:
package com.hxstrive.springdata.mongodb.config; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.data.mongodb.MongoDatabaseFactory; import org.springframework.data.mongodb.core.MongoTemplate; /** * 配置 MongoTemplate * @author hxstrive.com 2022/12/23 */ @Slf4j @Configuration public class AppConfig { @Bean public MongoTemplate mongoTemplate(MongoDatabaseFactory mongoDatabaseFactory) { log.info("mongoTemplate({}, {})", mongoDatabaseFactory); return new MongoTemplate(mongoDatabaseFactory); } }
application.properties 配置文件内容如下:
# Log logging.level.root=debug # MongoDB spring.data.mongodb.uri=mongodb://localhost:27017/test
(2)定义实体,代码如下:
package com.hxstrive.springdata.mongodb.entity; /** * 实体 * @author hxstrive.com */ public class ValueObject { private String id; private float value; public String getId() { return id; } public float getValue() { return value; } public void setValue(float value) { this.value = value; } @Override public String toString() { return "ValueObject [id=" + id + ", value=" + value + "]"; } }
(3)在 resources 目录下面创建 map.js 和 reduce.js 文件,如下:
a、map.js
function () { for (var i = 0; i < this.x.length; i++) { emit(this.x[i], 1); } }
b、reduce.js
function (key, values) { var sum = 0; for (var i = 0; i < values.length; i++) sum += values[i]; return sum; }
(4)测试代码,如下:
package com.hxstrive.springdata.mongodb; import com.hxstrive.springdata.mongodb.entity.ValueObject; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.data.mongodb.core.mapreduce.MapReduceOptions; import org.springframework.data.mongodb.core.mapreduce.MapReduceResults; import org.springframework.data.mongodb.core.query.Query; import static org.springframework.data.mongodb.core.mapreduce.MapReduceOptions.options; import static org.springframework.data.mongodb.core.query.Criteria.where; /** * Map-Reduce 简单示例 * @author hxstrive.com */ @SpringBootTest class MapReduceDemo { @Autowired private MongoTemplate mongoTemplate; @BeforeEach public void init() { // { "_id" : ObjectId("4e5ff893c0277826074ec533"), "x" : [ "a", "b" ] } // { "_id" : ObjectId("4e5ff893c0277826074ec534"), "x" : [ "b", "c" ] } // { "_id" : ObjectId("4e5ff893c0277826074ec535"), "x" : [ "c", "d" ] } mongoTemplate.save("{ \"_id\" : ObjectId(\"4e5ff893c0277826074ec533\"), \"x\" : [ \"a\", \"b\" ] }","jmr1"); mongoTemplate.save("{ \"_id\" : ObjectId(\"4e5ff893c0277826074ec534\"), \"x\" : [ \"b\", \"c\" ] }","jmr1"); mongoTemplate.save("{ \"_id\" : ObjectId(\"4e5ff893c0277826074ec535\"), \"x\" : [ \"c\", \"d\" ] }","jmr1"); } @Test public void mapReduce() { MapReduceResults<ValueObject> results = mongoTemplate.mapReduce( "jmr1", "classpath:map.js", "classpath:reduce.js", ValueObject.class); for (ValueObject valueObject : results) { System.out.println(valueObject); } // 结果: // Sending command '{"mapreduce": "jmr1", "map": {"$code": "function () {\r\n // for (var i = 0; i < this.x.length; i++) {\r\n emit(this.x[i], 1);\r\n }\r\n}"}, // "reduce": {"$code": "function (key, values) {\r\n var sum = 0;\r\n // for (var i = 0; i < values.length; i++)\r\n sum += values[i];\r\n // return sum;\r\n}"}, "out": {"inline": 1}, "query": {}, // "verbose": true, "$db": "test", "lsid": {"id": {"$binary": {"base64": "gzk6IXVFRkCeMxfnA5J4Cg==", // "subType": "04"}}}}' // with request id 8 to database test on connection [connectionId{localValue:3, serverValue:3}] // to server localhost:27017 // // ValueObject [id=c, value=2.0] // ValueObject [id=d, value=1.0] // ValueObject [id=a, value=1.0] // ValueObject [id=b, value=2.0] } @Test public void mapReduce2() { // 使用 MapReduceOptions 指定其他选项 MapReduceResults<ValueObject> results = mongoTemplate.mapReduce( "jmr1", "classpath:map.js", "classpath:reduce.js", new MapReduceOptions().outputCollection("jmr1_out"), ValueObject.class); for (ValueObject valueObject : results) { System.out.println(valueObject); } // 结果: // ValueObject [id=b, value=2.0] // ValueObject [id=a, value=1.0] // ValueObject [id=c, value=2.0] // ValueObject [id=d, value=1.0] } @Test public void mapReduce3() { // 指定一个查询来减少输入Map-Reduce 操作的数据集 Query query = new Query(where("x").ne(new String[] { "a", "b" })); MapReduceResults<ValueObject> results = mongoTemplate.mapReduce(query, "jmr1", "classpath:map.js", "classpath:reduce.js", options().outputCollection("jmr1_out"), ValueObject.class); for (ValueObject valueObject : results) { System.out.println(valueObject); } // 结果: // ValueObject [id=d, value=1.0] // ValueObject [id=c, value=2.0] // ValueObject [id=b, value=1.0] } }