Spring Data MongoDB 教程

MongoDB 事务

从版本 4 开始,MongoDB 支持事务。事务是建立在会话之上的,因此需要一个活动的 ClientSession。

注意,除非您在应用程序上下文中指定 MongoTransactionManager,否则事务支持将被禁用。您可以使用 setSessionSynchronization(ALWAYS) 来参与正在进行的非本机MongoDB 事务。

为了获得对事务的完全程序化控制,你可能想在 MongoOperations 上使用会话回调。下面的示例演示 SessionCallback 中的编程事务控制:

// 获取新的 ClientSession
ClientSession session = client.startSession(options);

template.withSession(session)
   .execute(action -> {
       // 开始事务
       session.startTransaction();

       try {
           Step step = // ...;
           action.insert(step);
           process(step);
           action.update(Step.class).apply(Update.set("state", // ...

           // 如果一切按预期进行,就提交修改,提交事务
           session.commitTransaction();
       } catch (RuntimeException e) {
           // 如果抛出异常,事务回滚一切
           session.abortTransaction();
       }
   }, ClientSession::close) // 完成后不要忘记关闭会话

上面的示例让你完全控制了事务行为,同时在回调中使用会话范围的 MongoOperations 实例,以确保会话被传递给每个服务器调用。为了避免这种方法带来的一些开销,你可以使用 TransactionTemplate 来消除手动事务的一些噪音。

使用 TransactionTemplate 的事务

Spring Data MongoDB 事务支持 TransactionTemplate。下面的示例演示如何创建和使用 TransactionTemplate:

// 在模板API配置期间启用事务同步
template.setSessionSynchronization(ALWAYS);

// ...

// 使用提供的 PlatformTransactionManager 创建 TransactionTemplate
TransactionTemplate txTemplate = new TransactionTemplate(anyTxManager);

txTemplate.execute(new TransactionCallbackWithoutResult() {

   // 在回调中,已经注册了 ClientSession 和事务
   @Override
   protected void doInTransactionWithoutResult(TransactionStatus status) {
       Step step = // ...;
       template.insert(step);
       process(step);
       template.update(Step.class).apply(Update.set("state", // ...
   };
});

使用 MongoTransactionManager 的事务

MongoTransactionManager 是通往众所周知的 Spring 事务支持的网关,它可以让应用程序使用 Spring 的托管事务功能。

MongoTransactionManager 将一个客户端会话绑定到线程上。

MongoTemplate 会检测会话,并相应地对这些与事务相关的资源进行操作。MongoTemplate 也可以参与其他正在进行的事务。

下面的例子展示了如何用 MongoTransactionManager 创建和使用事务:

@Configuration
static class Config extends AbstractMongoClientConfiguration {
   // 在应用程序上下文中注册 MongoTransactionManager
   @Bean
   MongoTransactionManager transactionManager(MongoDatabaseFactory dbFactory) {
       return new MongoTransactionManager(dbFactory);
   }
   // ...
}

@Component
public class StateService {
   // 将方法标记为事务性的
   @Transactional
   void someBusinessFunction(Step step) {
       template.insert(step);
       process(step);
       template.update(Step.class).apply(Update.set("state", // ...
   };
});

注意,@Transactional(readOnly = true) 建议 MongoTransactionManager 也启动一个事务,将 ClientSession 添加到发出的请求中。

响应式事务

与支持响应式 ClientSession 一样,响应式 MongoTemplate 提供了专门的方法,用于在事务中进行操作,而不必担心根据操作结果提交或停止操作。

使用普通的 MongoDB 响应式驱动程序 API,事务流中的删除可能看起来像这样。例如:

Mono<DeleteResult> result = Mono
   // 首先,我们显然需要启动会话
   .from(client.startSession())
   .flatMap(session -> {
       // 一旦我们手头有了 ClientSession,就开始进行事务
       session.startTransaction();
       // 通过向操作传递 ClientSession,在事务中进行操作
       return Mono.from(collection.deleteMany(session, ...))
           // 如果操作异常完成,我们需要停止交易,并保留错误信息
           .onErrorResume(e -> Mono.from(session.abortTransaction()).then(Mono.error(e)))
           // 当然,也可以在成功的情况下提交更改,仍然保留操作结果
           .flatMap(val -> Mono.from(session.commitTransaction()).then(Mono.just(val)))
           // 最后,我们需要确保关闭会话
           .doFinally(signal -> session.close());
     });

上述操作的罪魁祸首是在保留主流 DeleteResult,而不是通过 commitTransaction() 或 abortTransaction() 发布的事务结果,这导致了相当复杂的设置。

使用 TransactionalOperator 的事务

Spring Data MongoDB 事务支持 TransactionalOperator。下面的例子展示了如何创建和使用 TransactionalOperator:

// 启用事务同步以进行事务
template.setSessionSynchronization(ALWAYS);

// ...

// 使用提供的 ReactiveTransactionManager创建 TransactionalOperator
TransactionalOperator rxtx = TransactionalOperator.create(anyTxManager,
                                  new DefaultTransactionDefinition());

Step step = // ...;
template.insert(step);

Mono<Void> process(step)
   .then(template.update(Step.class).apply(Update.set("state", …))
   // TransactionalOperator.transactional(...) 为所有上游操作提供事务管理
   .as(rxtx::transactional)                                                         (3)
   .then();

使用 ReactiveMongoTransactionManager 的事务

ReactiveMongoTransactionManager 是通往众所周知的 Spring 事务支持的网关。它允许应用程序利用 Spring 的管理事务功能。ReactiveMongoTransactionManager 将客户会话绑定到用户上下文。ReactiveMongoTemplate 会检测会话,并对这些与事务相关的资源进行相应操作。ReactiveMongoTemplate 也可以参与到其他正在进行的事务中。

下面的例子展示了如何用 ReactiveMongoTransactionManager 创建和使用事务:

@Configuration
public class Config extends AbstractReactiveMongoConfiguration {
   // 在应用程序上下文中注册 ReactiveMongoTransactionManager
   @Bean
   ReactiveMongoTransactionManager transactionManager(ReactiveMongoDatabaseFactory factory) {
       return new ReactiveMongoTransactionManager(factory);
   }
   // ...
}

@Service
public class StateService {
   // 将方法标记为事务性
   @Transactional
   Mono<UpdateResult> someBusinessFunction(Step step) {
       return template.insert(step)
           .then(process(step))
           .then(template.update(Step.class).apply(Update.set("state", …));
   };
});

注意,@Transactional(readOnly = true) 建议 ReactiveMongoTransactionManager 也启动一个事务,将 ClientSession 添加到发出的请求中。

事务内部的特殊行为

在事务内部,MongoDB 服务器有一个稍微不同的行为。

连接设置

MongoDB驱动程序提供了一个专用的副本集名称配置选项,使驱动程序进入自动检测模式。此选项有助于在事务期间识别主复制集节点和命令路由。

集合操作

MongoDB 不支持集合操作,例如在一个事务中创建集合。这也影响了第一次使用时发生的即时集合创建。因此,请确保所有需要的结构都已到位。

瞬态错误

MongoDB 可以为事务操作期间引发的错误添加特殊标签。这些可能表示短暂的失败,这些失败可能仅仅通过重试操作就会消失。出于这些目的,我们强烈建议使用 Spring Retry。 尽管如此,您可以重写 MongoTransactionManager#doCommit(MongoTransactionObject) 来实现 MongoDB 参考手册中概述的重试提交操作行为。

计数

MongoDB 的计数操作是基于集合统计,可能无法反映事务中的实际情况。当在一个多文档事务中发出计数命令时,服务器会响应错误 50851。一旦 MongoTemplate 检测到一个活动的事务,所有暴露的 count() 方法都会被转换,并使用 $match 和 $count 操作符委托给聚合框架,保留查询设置,如 collation。

在聚集计数助手中使用地理命令时,有一些限制。以下运算符不能使用,必须用不同的运算符代替:

  • $where → $expr

  • $near → $geoWithin with $center

  • $nearSphere → $geoWithin with $centerSphere

使用 Criteria.near(...) 和 Criteria.nearSphere(...) 的查询必须改写为 Criteria.within(...) 各自 Criteria.withinSphere(...)。同样适用于资源库查询方法中的 near 查询关键字,必须改为 within。也请参见 MongoDB JIRA ticket DRIVERS-518 以获得更多参考。

下面的片段显示了会话绑定闭包内的计数用法:

session.startTransaction();

template.withSession(session)
   .execute(action -> {
       action.count(query(where("state").is("active")), Step.class)
       ...

上面的片段具体化为以下命令:

db.collection.aggregate(
  [
     { $match: { state: "active" } },
     { $count: "totalEntityCount" }
  ]
)

而不是:

db.collection.find( { state: "active" } ).count()
说说我的看法
全部评论(
没有评论
关于
本网站专注于 Java、数据库(MySQL、Oracle)、Linux、软件架构及大数据等多领域技术知识分享。涵盖丰富的原创与精选技术文章,助力技术传播与交流。无论是技术新手渴望入门,还是资深开发者寻求进阶,这里都能为您提供深度见解与实用经验,让复杂编码变得轻松易懂,携手共赴技术提升新高度。如有侵权,请来信告知:hxstrive@outlook.com
公众号