从版本 4 开始,MongoDB 支持事务。事务是建立在会话之上的,因此需要一个活动的 ClientSession。
注意,除非您在应用程序上下文中指定 MongoTransactionManager,否则事务支持将被禁用。您可以使用 setSessionSynchronization(ALWAYS) 来参与正在进行的非本机MongoDB 事务。
为了获得对事务的完全程序化控制,你可能想在 MongoOperations 上使用会话回调。下面的示例演示 SessionCallback 中的编程事务控制:
// 获取新的 ClientSession ClientSession session = client.startSession(options); template.withSession(session) .execute(action -> { // 开始事务 session.startTransaction(); try { Step step = // ...; action.insert(step); process(step); action.update(Step.class).apply(Update.set("state", // ... // 如果一切按预期进行,就提交修改,提交事务 session.commitTransaction(); } catch (RuntimeException e) { // 如果抛出异常,事务回滚一切 session.abortTransaction(); } }, ClientSession::close) // 完成后不要忘记关闭会话
上面的示例让你完全控制了事务行为,同时在回调中使用会话范围的 MongoOperations 实例,以确保会话被传递给每个服务器调用。为了避免这种方法带来的一些开销,你可以使用 TransactionTemplate 来消除手动事务的一些噪音。
Spring Data MongoDB 事务支持 TransactionTemplate。下面的示例演示如何创建和使用 TransactionTemplate:
// 在模板API配置期间启用事务同步 template.setSessionSynchronization(ALWAYS); // ... // 使用提供的 PlatformTransactionManager 创建 TransactionTemplate TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager); txTemplate.execute(new TransactionCallbackWithoutResult() { // 在回调中,已经注册了 ClientSession 和事务 @Override protected void doInTransactionWithoutResult(TransactionStatus status) { Step step = // ...; template.insert(step); process(step); template.update(Step.class).apply(Update.set("state", // ... }; });
MongoTransactionManager 是通往众所周知的 Spring 事务支持的网关,它可以让应用程序使用 Spring 的托管事务功能。
MongoTransactionManager 将一个客户端会话绑定到线程上。
MongoTemplate 会检测会话,并相应地对这些与事务相关的资源进行操作。MongoTemplate 也可以参与其他正在进行的事务。
下面的例子展示了如何用 MongoTransactionManager 创建和使用事务:
@Configuration static class Config extends AbstractMongoClientConfiguration { // 在应用程序上下文中注册 MongoTransactionManager @Bean MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) { return new MongoTransactionManager(dbFactory); } // ... } @Component public class StateService { // 将方法标记为事务性的 @Transactional void someBusinessFunction(Step step) { template.insert(step); process(step); template.update(Step.class).apply(Update.set("state", // ... }; });
注意,@Transactional(readOnly = true) 建议 MongoTransactionManager 也启动一个事务,将 ClientSession 添加到发出的请求中。
与支持响应式 ClientSession 一样,响应式 MongoTemplate 提供了专门的方法,用于在事务中进行操作,而不必担心根据操作结果提交或停止操作。
使用普通的 MongoDB 响应式驱动程序 API,事务流中的删除可能看起来像这样。例如:
Mono<DeleteResult> result = Mono // 首先,我们显然需要启动会话 .from(client.startSession()) .flatMap(session -> { // 一旦我们手头有了 ClientSession,就开始进行事务 session.startTransaction(); // 通过向操作传递 ClientSession,在事务中进行操作 return Mono.from(collection.deleteMany(session, ...)) // 如果操作异常完成,我们需要停止交易,并保留错误信息 .onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e))) // 当然,也可以在成功的情况下提交更改,仍然保留操作结果 .flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val))) // 最后,我们需要确保关闭会话 .doFinally(signal -> session.close()); });
上述操作的罪魁祸首是在保留主流 DeleteResult,而不是通过 commitTransaction() 或 abortTransaction() 发布的事务结果,这导致了相当复杂的设置。
Spring Data MongoDB 事务支持 TransactionalOperator。下面的例子展示了如何创建和使用 TransactionalOperator:
// 启用事务同步以进行事务 template.setSessionSynchronization(ALWAYS); // ... // 使用提供的 ReactiveTransactionManager创建 TransactionalOperator TransactionalOperator rxtx = TransactionalOperator.create(anyTxManager, new DefaultTransactionDefinition()); Step step = // ...; template.insert(step); Mono<Void> process(step) .then(template.update(Step.class).apply(Update.set("state", …)) // TransactionalOperator.transactional(...) 为所有上游操作提供事务管理 .as(rxtx::transactional) (3) .then();
ReactiveMongoTransactionManager 是通往众所周知的 Spring 事务支持的网关。它允许应用程序利用 Spring 的管理事务功能。ReactiveMongoTransactionManager 将客户会话绑定到用户上下文。ReactiveMongoTemplate 会检测会话,并对这些与事务相关的资源进行相应操作。ReactiveMongoTemplate 也可以参与到其他正在进行的事务中。
下面的例子展示了如何用 ReactiveMongoTransactionManager 创建和使用事务:
@Configuration public class Config extends AbstractReactiveMongoConfiguration { // 在应用程序上下文中注册 ReactiveMongoTransactionManager @Bean ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) { return new ReactiveMongoTransactionManager(factory); } // ... } @Service public class StateService { // 将方法标记为事务性 @Transactional Mono<UpdateResult> someBusinessFunction(Step step) { return template.insert(step) .then(process(step)) .then(template.update(Step.class).apply(Update.set("state", …)); }; });
注意,@Transactional(readOnly = true) 建议 ReactiveMongoTransactionManager 也启动一个事务,将 ClientSession 添加到发出的请求中。
在事务内部,MongoDB 服务器有一个稍微不同的行为。
MongoDB驱动程序提供了一个专用的副本集名称配置选项,使驱动程序进入自动检测模式。此选项有助于在事务期间识别主复制集节点和命令路由。
MongoDB 不支持集合操作,例如在一个事务中创建集合。这也影响了第一次使用时发生的即时集合创建。因此,请确保所有需要的结构都已到位。
MongoDB 可以为事务操作期间引发的错误添加特殊标签。这些可能表示短暂的失败,这些失败可能仅仅通过重试操作就会消失。出于这些目的,我们强烈建议使用 Spring Retry。 尽管如此,您可以重写 MongoTransactionManager#doCommit(MongoTransactionObject) 来实现 MongoDB 参考手册中概述的重试提交操作行为。
MongoDB 的计数操作是基于集合统计,可能无法反映事务中的实际情况。当在一个多文档事务中发出计数命令时,服务器会响应错误 50851。一旦 MongoTemplate 检测到一个活动的事务,所有暴露的 count() 方法都会被转换,并使用 $match 和 $count 操作符委托给聚合框架,保留查询设置,如 collation。
在聚集计数助手中使用地理命令时,有一些限制。以下运算符不能使用,必须用不同的运算符代替:
$where → $expr
$near → $geoWithin with $center
$nearSphere → $geoWithin with $centerSphere
使用 Criteria.near(...) 和 Criteria.nearSphere(...) 的查询必须改写为 Criteria.within(...) 各自 Criteria.withinSphere(...)。同样适用于资源库查询方法中的 near 查询关键字,必须改为 within。也请参见 MongoDB JIRA ticket DRIVERS-518 以获得更多参考。
下面的片段显示了会话绑定闭包内的计数用法:
session.startTransaction(); template.withSession(session) .execute(action -> { action.count(query(where("state").is("active")), Step.class) ...
上面的片段具体化为以下命令:
db.collection.aggregate( [ { $match: { state: "active" } }, { $count: "totalEntityCount" } ] )
而不是:
db.collection.find( { state: "active" } ).count()