跳到主要内容

事务

DeepSeek V3 中英对照 Transactions

本节介绍 Spring for Apache Pulsar 如何支持事务。

概述

Spring for Apache Pulsar 的事务支持建立在 Spring Framework 提供的事务支持之上。从高层次来看,事务资源会注册到一个事务管理器中,该管理器随后处理注册资源的事务状态(提交、回滚等)。

Spring for Apache Pulsar 提供了以下内容:

  • PulsarTransactionManager - 用于与普通的 Spring 事务支持一起使用(@TransactionalTransactionTemplate 等)

  • 事务性 PulsarTemplate

  • 事务性 @PulsarListener

  • 与其他事务管理器的事务同步

备注

事务支持尚未添加到 Reactive 组件中

事务支持默认是禁用的。要在使用 Spring Boot 时启用支持,只需设置 spring.pulsar.transaction.enabled 属性。进一步的配置选项将在下面的每个组件部分中详细说明。

使用 PulsarTemplate 进行事务性发布

在事务性的 PulsarTemplate 上,所有的发送操作都会查找当前的活动事务,并将每个发送操作加入到该事务中(如果找到事务的话)。

非事务性使用

默认情况下,事务性的 PulsarTemplate 也可以用于非事务性操作。当未找到现有事务时,它将以非事务性方式继续发送操作。然而,如果模板配置为需要事务,则在事务范围之外使用模板的任何尝试都会导致异常。

提示

事务可以通过 TransactionTemplate@Transactional 方法、调用 executeInTransaction 或通过事务监听器容器启动。

本地事务

我们使用术语“本地”事务来表示一种 Pulsar 原生事务,这种事务不受 Spring 的事务管理设施(即 PulsarTransactionManager)管理或关联。相反,一个“同步”事务则是 PulsarTransactionManager 管理或关联的事务。

你可以使用 PulsarTemplate 在本地事务中执行一系列操作。以下示例展示了如何实现这一点:

var results = pulsarTemplate.executeInTransaction((template) -> {
var rv = new HashMap<String, MessageId>();
rv.put("msg1", template.send(topic, "msg1"));
rv.put("msg2", template.send(topic, "msg2"));
return rv;
});
java

回调函数中的参数是调用 executeInTransaction 方法的模板实例。所有在模板上的操作都会被纳入当前事务中。如果回调函数正常退出,事务将会被提交。如果抛出异常,事务将会被回滚。

备注

如果当前有一个同步事务正在处理中,它将被忽略,并使用一个新的“嵌套”事务。

配置

以下事务设置可以直接在 PulsarTemplate 上使用(通过 transactions 字段):

  • enabled - 模板是否支持事务(默认 false

  • required - 模板是否需要事务(默认 false

  • timeout - 事务超时的持续时间(默认 null

在不使用 Spring Boot 的情况下,你可以在提供的模板上调整这些设置。然而,当使用 Spring Boot 时,模板是自动配置的,没有机制可以影响这些属性。在这种情况下,你可以注册一个 PulsarTemplateCustomizer bean,用于调整设置。以下示例展示了如何在自动配置的模板上设置超时时间:

@Bean
PulsarTemplateCustomizer<?> templateCustomizer() {
return (template) -> template.transactions().setTimeout(Duration.ofSeconds(45));
}
java

使用 @PulsarListener 进行事务性接收

当启用了监听器事务时,带有 @PulsarListener 注解的监听器方法会在一个同步事务的范围内被调用。

DefaultPulsarMessageListenerContainer 使用一个配置了 PulsarTransactionManager 的 Spring TransactionTemplate 来在方法调用之前启动事务。

每个接收到的消息的确认都被列入作用域事务中。

消费-处理-生产场景

一个常见的事务模式是消费者从一个 Pulsar topic 读取消息,转换这些消息,最后生产者将结果消息写入另一个 Pulsar topic。当启用事务并且你的监听器方法使用事务性的 PulsarTemplate 来生成转换后的消息时,框架支持这种用例。

给定以下监听器方法:

@PulsarListener(topics = "my-input-topic") 1
void listen(String msg) { 2
var transformedMsg = msg.toUpperCase(Locale.ROOT); 3
this.transactionalTemplate.send("my-output-topic", transformedMsg); 4
} // <5> // <6>
java

启用监听器事务时,会发生以下交互:

  • 监听器容器启动新的事务,并在事务范围内调用监听器方法

  • 监听器方法接收消息

  • 监听器方法转换消息

  • 监听器方法使用事务模板发送转换后的消息,事务模板将发送操作加入当前活动事务中

  • 监听器容器自动确认消息,并将确认操作加入当前活动事务中

  • 监听器容器(通过 TransactionTemplate)提交事务

如果你没有使用 @PulsarListener,而是直接使用监听器容器,同样会提供如上所述的相同的事务支持。请记住,@PulsarListener 只是一个方便的方式,用于将 Java 方法注册为监听器容器的消息监听器。

带有记录监听器的事务

上面的例子使用了记录监听器(record listener)。当使用记录监听器时,每次调用监听器方法时都会创建一个新的事务,这相当于每条消息对应一个事务。

备注

由于事务边界是针对每条消息的,并且每条消息的确认都参与到每个事务中,因此批量确认模式不能与事务性记录监听器一起使用。

批量监听器的事务处理

在使用批量监听器时,每次调用监听器方法时都会创建一个新的事务,这相当于每批消息对应一个事务。

备注

事务性批处理监听器目前不支持自定义错误处理程序。

配置

监听器容器工厂

以下事务设置可以直接在 ConcurrentPulsarListenerContainerFactory 创建监听器容器时使用的 PulsarContainerProperties 上进行配置。这些设置会影响所有的监听器容器,包括 @PulsarListener 使用的容器。

  • enabled - 容器是否支持事务(默认值为 false

  • required - 容器是否需要事务(默认值为 false

  • timeout - 事务超时时间(默认值为 null

  • transactionDefinition - 一个蓝图事务定义,其属性将被复制到容器的事务模板中(默认值为 null

  • transactionManager - 用于启动事务的事务管理器

在不使用 Spring Boot 时,你可以在你提供的容器工厂上调整这些设置。然而,在使用 Spring Boot 时,容器工厂是自动配置的。在这种情况下,你可以注册一个 org.springframework.boot.autoconfigure.pulsar.PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> bean 来访问并自定义容器属性。以下示例展示了如何在容器工厂上设置超时:

@Bean
PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> containerCustomizer() {
return (containerFactory) -> containerFactory.getContainerProperties().transactions().setTimeout(Duration.ofSeconds(45));
}
java

@PulsarListener

默认情况下,每个监听器都会遵循其对应的监听器容器工厂的事务设置。然而,用户可以在每个 @PulsarListener 上设置 transactional 属性,以覆盖容器工厂的设置,如下所示:

  • 如果容器工厂启用了事务,那么设置 transactional = false 将为单个监听器禁用事务。

  • 如果容器工厂启用了事务并且事务是必需的,那么尝试设置 transactional = false 将会抛出一个异常,提示事务是必需的。

  • 如果容器工厂禁用了事务,那么尝试设置 transactional = true 将会被忽略,并且会记录一条警告日志。

使用 PulsarTransactionManager

PulsarTransactionManager 是 Spring Framework 的 PlatformTransactionManager 的一个实现。你可以将 PulsarTransactionManager 与 Spring 的正常事务支持(如 @TransactionalTransactionTemplate 等)一起使用。

如果事务处于活动状态,那么在事务范围内执行的任何 PulsarTemplate 操作都会加入并参与当前事务。事务管理器会根据操作的成功或失败来决定提交或回滚事务。

提示

你可能不需要直接使用 PulsarTransactionManager,因为大多数事务性用例都由 PulsarTemplate@PulsarListener 覆盖。

Pulsar 与其他事务管理器的集成

仅生产者事务

如果你想在单个事务中将记录发送到 Pulsar 并执行一些数据库更新,你可以使用带有 DataSourceTransactionManager 的普通 Spring 事务管理。

备注

以下示例假设有一个名为 "dataSourceTransactionManager" 的 DataSourceTransactionManager bean 已注册

@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {
var msg = calculateMessage();
this.pulsarTemplate.send("my-topic", msg);
this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));
}
java

@Transactional 注解的拦截器会启动数据库事务,而 PulsarTemplate 将与数据库事务管理器同步一个事务;每次发送操作都会参与该事务。当方法退出时,数据库事务将提交,随后 Pulsar 事务也会提交。

如果您希望先提交 Pulsar 事务,并且仅在 Pulsar 事务成功时才提交数据库事务,可以使用嵌套的 @Transactional 方法,其中外部方法配置为使用 DataSourceTransactionManager,而内部方法配置为使用 PulsarTransactionManager

@Transactional("dataSourceTransactionManager")
public void myServiceMethod() {
var msg = calculateMessage();
this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(msg));
this.sendToPulsar(msg);
}

@Transactional("pulsarTransactionManager")
public void sendToPulsar(String msg) {
this.pulsarTemplate.send("my-topic", msg);
}
java

消费者 + 生产者事务

如果你想从 Pulsar 消费记录、向 Pulsar 发送记录,并在事务中执行一些数据库更新,你可以将普通的 Spring 事务管理(使用 DataSourceTransactionManager)与容器启动的事务结合起来。

在以下示例中,监听器容器启动了 Pulsar 事务,而 @Transactional 注解启动了数据库事务。数据库事务会首先提交;如果 Pulsar 事务提交失败,记录将会被重新投递,因此数据库更新应该是幂等的。

@PulsarListener(topics = "my-input-topic")
@Transactional("dataSourceTransactionManager")
void listen(String msg) {
var transformedMsg = msg.toUpperCase(Locale.ROOT);
this.pulsarTemplate.send("my-output-topic", transformedMsg);
this.jdbcTemplate.execute("insert into my_table (data) values ('%s')".formatted(transformedMsg));
}
java