跳到主要内容

事务

DeepSeek V3 中英对照 Transactions

Spring Rabbit 框架支持在同步和异步使用场景中自动管理事务,提供了多种不同的语义,这些语义可以像 Spring 事务的现有用户所熟悉的那样,通过声明式的方式进行选择。这使得许多(如果不是大多数)常见的消息传递模式易于实现。

有两种方式可以向框架传递所需的事务语义。在 RabbitTemplateSimpleMessageListenerContainer 中,都有一个标志 channelTransacted,如果设置为 true,则告诉框架使用事务性通道,并在所有操作(发送或接收)结束时执行提交或回滚(取决于结果),异常则表示回滚。另一种信号是提供一个外部事务,使用 Spring 的 PlatformTransactionManager 实现之一作为当前操作的上下文。如果在框架发送或接收消息时已经有一个事务正在进行,并且 channelTransacted 标志为 true,则消息事务的提交或回滚将推迟到当前事务结束时执行。如果 channelTransacted 标志为 false,则消息操作不应用事务语义(它将自动确认)。

channelTransacted 标志是一个配置时的设置。它在创建 AMQP 组件时声明并处理一次,通常是在应用程序启动时。外部事务在原则上更加动态,因为系统在运行时响应当前线程状态。然而,在实践中,当事务以声明方式分层到应用程序上时,它通常也是一个配置设置。

对于使用 RabbitTemplate 的同步用例,外部事务由调用者提供,可以根据偏好以声明式或命令式的方式提供(通常的 Spring 事务模型)。以下示例展示了一种声明式方法(通常更受欢迎,因为它是非侵入式的),其中模板已配置为 channelTransacted=true

@Transactional
public void doSomething() {
String incoming = rabbitTemplate.receiveAndConvert();
// do some more database processing...
String outgoing = processInDatabaseAndExtractReply(incoming);
rabbitTemplate.convertAndSend(outgoing);
}
java

在前面的示例中,接收到一个 String 类型的有效负载,并在标记为 @Transactional 的方法内将其转换并作为消息体发送。如果数据库处理因异常而失败,传入的消息将返回到代理(broker),并且传出的消息不会被发送。这适用于在事务方法链中使用 RabbitTemplate 的任何操作(除非,例如,直接操作 Channel 以提前提交事务)。

对于使用 SimpleMessageListenerContainer 的异步场景,如果需要外部事务,则必须在容器设置监听器时请求该事务。为了表示需要外部事务,用户在配置容器时提供一个 PlatformTransactionManager 的实现。以下示例展示了如何做到这一点:

@Configuration
public class ExampleExternalTransactionAmqpConfiguration {

@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setTransactionManager(transactionManager());
container.setChannelTransacted(true);
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}

}
java

在前面的示例中,事务管理器被添加为从另一个 bean 定义(未显示)中注入的依赖项,并且 channelTransacted 标志也被设置为 true。这样做的效果是,如果监听器因异常而失败,事务将回滚,消息也将返回到代理。重要的是,如果事务提交失败(例如,由于数据库约束错误或连接问题),AMQP 事务也会回滚,消息将返回到代理。这有时被称为“尽力而为的一阶段提交”,是实现可靠消息传递的一个非常强大的模式。如果在前面的示例中将 channelTransacted 标志设置为 false(默认值),监听器仍然会提供外部事务,但所有消息操作将自动确认,因此即使业务操作回滚,消息操作也会提交。

条件回滚

在 1.6.6 版本之前,当使用外部事务管理器(如 JDBC)时,向容器的 transactionAttribute 添加回滚规则是无效的。异常总是会回滚事务。

此外,当在容器的通知链中使用事务通知时,条件回滚并不十分有用,因为所有监听器异常都被包装在 ListenerExecutionFailedException 中。

第一个问题已得到修正,规则现在能够正确应用。此外,现在提供了 ListenerFailedRuleBasedTransactionAttribute。它是 RuleBasedTransactionAttribute 的子类,唯一的不同在于它能够识别 ListenerExecutionFailedException 并使用此类异常的原因来应用规则。该事务属性可以直接在容器中使用,也可以通过事务通知使用。

以下示例使用了此规则:

@Bean
public AbstractMessageListenerContainer container() {
...
container.setTransactionManager(transactionManager);
RuleBasedTransactionAttribute transactionAttribute =
new ListenerFailedRuleBasedTransactionAttribute();
transactionAttribute.setRollbackRules(Collections.singletonList(
new NoRollbackRuleAttribute(DontRollBackException.class)));
container.setTransactionAttribute(transactionAttribute);
...
}
java

关于接收消息回滚的说明

AMQP 事务仅适用于发送到代理的消息和确认。因此,当 Spring 事务发生回滚且已接收到消息时,Spring AMQP 不仅需要回滚事务,还必须手动拒绝消息(类似于 nack,但规范中并不这样称呼)。消息拒绝时采取的操作与事务无关,取决于 defaultRequeueRejected 属性(默认值为 true)。有关拒绝失败消息的更多信息,请参阅消息监听器和异步情况

有关 RabbitMQ 事务及其限制的更多信息,请参阅 RabbitMQ Broker Semantics

备注

在 RabbitMQ 2.7.0 之前,这类消息(以及任何在通道关闭或中止时未被确认的消息)会被放在 Rabbit 代理队列的末尾。自 2.7.0 版本以来,被拒绝的消息会放在队列的前面,类似于 JMS 回滚消息的方式。

备注

之前,在事务回滚时消息的重新入队行为在本地事务和提供 TransactionManager 的情况下是不一致的。在前一种情况下,应用了正常的重新入队逻辑(AmqpRejectAndDontRequeueExceptiondefaultRequeueRejected=false)(参见 消息监听器和异步情况)。而在使用事务管理器时,消息在回滚时会无条件地重新入队。从 2.0 版本开始,行为变得一致,在这两种情况下都会应用正常的重新入队逻辑。要恢复到之前的行为,可以将容器的 alwaysRequeueWithTxManagerRollback 属性设置为 true。参见 消息监听器容器配置

使用 RabbitTransactionManager

RabbitTransactionManager 是一种替代方案,用于在外部事务中执行 Rabbit 操作,并与之同步。该事务管理器是 PlatformTransactionManager 接口的一个实现,应与单个 Rabbit ConnectionFactory 一起使用。

important

此策略无法提供 XA 事务 — 例如,为了在消息传递和数据库访问之间共享事务。

应用程序代码需要通过 ConnectionFactoryUtils.getTransactionalResourceHolder(ConnectionFactory, boolean) 来获取事务性的 Rabbit 资源,而不是通过标准的 Connection.createChannel() 调用并随后创建 Channel。当使用 Spring AMQP 的 RabbitTemplate 时,它会自动检测线程绑定的 Channel 并自动参与其事务。

使用 Java 配置,你可以通过以下 bean 来设置一个新的 RabbitTransactionManager

@Bean
public RabbitTransactionManager rabbitTransactionManager() {
return new RabbitTransactionManager(connectionFactory);
}
java

如果你更喜欢使用 XML 配置,你可以在你的 XML Application Context 文件中声明以下 bean:

<bean id="rabbitTxManager"
class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="connectionFactory"/>
</bean>
xml

事务同步

将 RabbitMQ 事务与其他(例如 DBMS)事务同步提供了“尽力而为的单阶段提交”语义。在事务同步的完成阶段,RabbitMQ 事务可能会提交失败。spring-tx 基础设施会将其记录为错误,但不会向调用代码抛出异常。从 2.3.10 版本开始,你可以在处理事务的同一线程上提交事务后调用 ConnectionUtils.checkAfterCompletion()。如果没有发生异常,它将简单地返回;否则,它将抛出一个 AfterCompletionFailedException,该异常将包含一个表示完成同步状态的属性。

通过调用 ConnectionFactoryUtils.enableAfterCompletionFailureCapture(true) 来启用此功能;这是一个全局标志,适用于所有线程。