跳到主要内容
版本:7.0.2

事务支持

DeepSeek V3 中英对照 Transaction Support

本章涵盖 Spring Integration 对事务的支持,主要包括以下主题:

理解消息流中的事务

Spring Integration 提供了多个钩子来满足消息流的事务需求。为了更好地理解这些钩子以及如何从中受益,我们必须首先回顾可用于启动消息流的六种机制,并了解如何在这些机制中满足消息流的事务需求。

以下六种机制会启动消息流(本手册将详细介绍每种机制):

  • 网关代理:一个基础的消息网关。

  • 消息通道:直接与 MessageChannel 方法交互(例如 channel.send(message))。

  • 消息发布者:通过调用 Spring Bean 上的方法作为副产品来启动消息流的方式。

  • 入站通道适配器和网关:通过连接第三方系统与 Spring Integration 消息系统来启动消息流的方式(例如 [JmsMessage] → Jms 入站适配器[SI 消息] → SI 通道)。

  • 调度器:基于预配置调度器分发的事件来启动消息流的方式。

  • 轮询器:类似于调度器,这是基于预配置轮询器分发的调度或基于间隔的事件来启动消息流的方式。

我们可以将这六种机制分为两大类:

  • 由用户进程发起的消息流:此类场景的示例包括调用网关方法或显式向 MessageChannel 发送 Message。换句话说,这些消息流的启动依赖于第三方进程(例如您编写的某些代码)。

  • 由守护进程发起的消息流:此类场景的示例包括轮询器轮询消息队列以使用轮询到的消息启动新消息流,或调度器在预定时间创建新消息并启动消息流。

显然,网关代理、MessageChannel.send(…​)MessagePublisher 都属于第一类,而入站适配器和网关、调度器以及轮询器则属于第二类。

那么,如何在每个类别的不同场景中处理事务需求?Spring Integration 是否需要针对特定场景提供明确的事务支持?或者,您是否可以直接使用 Spring 的事务支持?

Spring 本身提供了一流的(first-class)事务管理支持。因此,我们的目标并非提供新的东西,而是利用 Spring 来从其现有的事务支持中获益。换句话说,作为一个框架,我们必须向 Spring 的事务管理功能暴露钩子(hooks)。然而,由于 Spring Integration 的配置基于 Spring 配置,我们并不总是需要暴露这些钩子,因为 Spring 已经暴露了它们。毕竟,每个 Spring Integration 组件都是一个 Spring Bean。

考虑到这一目标,我们可以再次审视两种场景:由用户进程发起的消息流和由守护进程发起的消息流。

由用户进程发起并在Spring应用上下文中配置的消息流,遵循此类进程的常规事务配置。因此,它们无需通过Spring Integration显式配置来支持事务。事务可以且应当通过Spring的标准事务支持来启动。Spring Integration消息流天然遵循组件的事务语义,因为它本身由Spring配置。例如,网关或服务激活器方法可通过 @Transactional 注解进行声明,或在XML配置中通过切入点表达式定义 TransactionInterceptor 来指定特定方法应具有事务性。关键在于,在这些场景中您对事务配置和边界拥有完全控制权。

然而,当涉及到由守护进程发起的消息流时,情况就有些不同了。虽然这些流程是由开发者配置的,但它们并不直接涉及人类或其他进程来启动。这些是基于触发器的流程,由触发器进程(一个守护进程)根据流程的配置来启动。例如,我们可以让一个调度器在每个周五晚上启动一个消息流。我们也可以配置一个触发器,让它每秒启动一个消息流,等等。因此,我们需要一种方法,让这些基于触发器的进程知道我们打算让产生的消息流具有事务性,这样每当一个新的消息流启动时,就可以创建一个事务上下文。换句话说,我们需要暴露一些事务配置,但只需足够委托给 Spring 已经提供的事务支持(就像我们在其他场景中所做的那样)。

轮询器事务支持

Spring Integration 为轮询器(poller)提供了事务支持。轮询器是一种特殊类型的组件,因为在轮询器任务中,我们可以对本身具有事务性的资源调用 receive() 方法,从而将 receive() 调用包含在事务边界内,这样在任务失败时就可以回滚该操作。如果我们为通道(channel)添加同样的支持,那么额外添加的事务将影响从 send() 调用开始的所有下游组件。这会在没有充分理由的情况下,为事务划分提供相当宽泛的范围,尤其是当 Spring 已经提供了多种方法来满足任何下游组件的事务需求时。然而,将 receive() 方法包含在事务边界内,正是轮询器需要这种支持的“充分理由”。

每当配置轮询器时,都可以通过使用 transactional 子元素及其属性来提供事务性配置,如下例所示:

<int:poller max-messages-per-poll="1" fixed-rate="1000">
<transactional transaction-manager="txManager"
isolation="DEFAULT"
propagation="REQUIRED"
read-only="true"
timeout="1000"/>
</poller>

上述配置看起来类似于原生的Spring事务配置。您仍需引用一个事务管理器,并指定事务属性或依赖默认值(例如,如果未指定'transaction-manager'属性,则默认使用名为'transactionManager'的bean)。在内部,该过程被包装在Spring的原生事务中,其中TransactionInterceptor负责处理事务。有关如何配置事务管理器、事务管理器的类型(如JTA、数据源等)以及其他与事务配置相关的详细信息,请参阅Spring Framework参考指南

通过上述配置,该轮询器发起的所有消息流都是事务性的。有关轮询器事务配置的更多信息和详细信息,请参阅轮询与事务

除了事务处理,在运行轮询器时,您可能还需要处理其他几个横切关注点。为了帮助解决这个问题,轮询器元素接受一个 <advice-chain> 子元素,它允许您定义一个自定义的建议实例链,这些实例将应用于轮询器。(更多详情请参见可轮询消息源。)在 Spring Integration 2.0 中,轮询器经过了重构,现在使用代理机制来处理事务问题以及其他横切关注点。这项重构带来的一个重要变化是,我们使 <transactional><advice-chain> 元素互斥。这背后的理由是,如果您需要多个建议,并且其中一个是事务建议,您可以将其包含在 <advice-chain> 中,这样既保持了与之前相同的便利性,又提供了更多的控制,因为您现在可以选择将建议放置在所需的顺序中。以下示例展示了如何做到这一点:

<int:poller max-messages-per-poll="1" fixed-rate="10000">
<advice-chain>
<ref bean="txAdvice"/>
<ref bean="someOtherAdviceBean" />
<beans:bean class="foo.bar.SampleAdvice"/>
</advice-chain>
</poller>

<tx:advice id="txAdvice" transaction-manager="txManager">
<tx:attributes>
<tx:method name="get*" read-only="true"/>
<tx:method name="*"/>
</tx:attributes>
</tx:advice>

前面的示例展示了基于XML的Spring事务通知(txAdvice)的基本配置,并将其包含在轮询器(Poller)定义的<advice-chain>中。如果只需要处理轮询器的事务相关配置,你仍然可以方便地使用<transactional>元素。

事务边界

另一个重要因素是消息流中事务的边界。当启动一个事务时,事务上下文会绑定到当前线程。因此,无论消息流中有多少个端点和通道,只要确保流程在同一线程上继续,事务上下文就会得以保留。一旦通过引入可轮询通道执行器通道,或在某些服务中手动启动新线程来打破这种连续性,事务边界也会随之被打破。本质上,事务会在此处结束,如果线程之间成功完成了交接,流程将被视为成功,即使流程继续并可能在下游某处引发异常,也会发送提交信号。如果此类流程是同步的,该异常可能会被抛回给消息流的发起者(同时也是事务上下文的发起者),事务将导致回滚。折中的方案是在任何线程边界被打破的地方使用事务性通道。例如,可以使用基于队列的通道,该通道委托给事务性消息存储策略,或者使用基于 JMS 的通道。

事务同步

在某些环境中,将操作与涵盖整个流程的事务进行同步会很有帮助。例如,考虑一个位于流程开头的 <file:inbound-channel-adapter/>,它执行一系列数据库更新。如果事务提交,我们可能希望将文件移动到 success 目录;而如果事务回滚,我们可能希望将其移动到 failure 目录。

Spring Integration 2.2 引入了将这些操作与事务同步的能力。此外,如果你没有“真实”的事务,但仍希望在成功或失败时执行不同的操作,可以配置一个 PseudoTransactionManager。更多信息,请参阅伪事务

以下清单展示了此功能的关键策略接口:

public interface TransactionSynchronizationFactory {

TransactionSynchronization create(Object key);
}

public interface TransactionSynchronizationProcessor {

void processBeforeCommit(IntegrationResourceHolder holder);

void processAfterCommit(IntegrationResourceHolder holder);

void processAfterRollback(IntegrationResourceHolder holder);

}

该工厂负责创建 TransactionSynchronization 对象。您可以自行实现,或使用框架提供的实现:DefaultTransactionSynchronizationFactory。此实现返回一个委托给 TransactionSynchronizationProcessor 默认实现(即 ExpressionEvaluatingTransactionSynchronizationProcessor)的 TransactionSynchronization。该处理器支持三种 SpEL 表达式:beforeCommitExpressionafterCommitExpressionafterRollbackExpression

这些操作对于熟悉事务的人来说应该是不言自明的。在每种情况下,#root 变量都是原始的 Message。在某些情况下,根据轮询器轮询的 MessageSource,还会提供其他 SpEL 变量。例如,MongoDbMessageSource 提供了 #mongoTemplate 变量,该变量引用了消息源的 MongoTemplate。类似地,RedisStoreMessageSource 提供了 #store 变量,该变量引用了轮询创建的 RedisStore

要为特定的轮询器启用此功能,您可以通过在轮询器的 <transactional/> 元素上使用 synchronization-factory 属性,提供对 TransactionSynchronizationFactory 的引用。

自5.0版本起,Spring Integration 提供了 PassThroughTransactionSynchronizationFactory,当未配置 TransactionSynchronizationFactory 但通知链中存在 TransactionInterceptor 类型的通知时,该工厂会默认应用于轮询端点。当使用任何开箱即用的 TransactionSynchronizationFactory 实现时,轮询端点会将轮询到的消息绑定到当前事务上下文,并在事务通知后抛出异常时,将其作为 failedMessage 包含在 MessagingException 中。当使用未实现 TransactionInterceptor 的自定义事务通知时,您可以显式配置 PassThroughTransactionSynchronizationFactory 以实现此行为。无论哪种情况,MessagingException 都会成为发送到 errorChannelErrorMessage 的有效载荷,而原因是通知抛出的原始异常。在此之前,ErrorMessage 的有效载荷是通知抛出的原始异常,并未提供对 failedMessage 信息的引用,这使得难以确定事务提交问题的原因。

为简化这些组件的配置,Spring Integration 为默认工厂提供了命名空间支持。以下示例展示了如何使用命名空间配置文件入站通道适配器:

<int-file:inbound-channel-adapter id="inputDirPoller"
channel="someChannel"
directory="/foo/bar"
filter="filter"
comparator="testComparator">
<int:poller fixed-rate="5000">
<int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
</int:poller>
</int-file:inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.renameTo(new java.io.File('/success/' + payload.name))"
channel="committedChannel" />
<int:after-rollback expression="payload.renameTo(new java.io.File('/failed/' + payload.name))"
channel="rolledBackChannel" />
</int:transaction-synchronization-factory>

SpEL表达式求值的结果将作为有效负载发送到 committedChannelrolledBackChannel(在本例中,这将是 Boolean.TRUEBoolean.FALSE,即 java.io.File.renameTo() 方法调用的结果)。

如果您希望发送整个有效载荷以进行进一步的Spring集成处理,请使用'payload'表达式。

important

重要的是要理解,这会将操作与事务同步。它并不会使一个本质上非事务性的资源变得具有事务性。相反,事务(无论是JDBC还是其他类型)会在轮询之前启动,并在流程完成时提交或回滚,随后执行同步操作。

如果你提供了一个自定义的 TransactionSynchronizationFactory,它负责创建一个资源同步,使得绑定的资源在事务完成时自动解绑。默认的 TransactionSynchronizationFactory 通过返回一个 ResourceHolderSynchronization 的子类来实现这一点,其默认的 shouldUnbindAtCompletion() 方法返回 true

除了 after-commitafter-rollback 表达式外,before-commit 也同样受支持。在这种情况下,如果表达式求值(或下游处理)抛出异常,事务将被回滚而非提交。

伪交易

阅读完事务同步章节后,你可能会想到,即使轮询器下游没有“真正的”事务性资源(例如 JDBC),在流完成时执行这些“成功”或“失败”操作也会很有用。例如,考虑一个“<file:inbound-channel-adapter/>”后接一个“<ftp:outbout-channel-adapter/>”的场景。这两个组件都不是事务性的,但我们可能希望根据 FTP 传输的成功或失败,将输入文件移动到不同的目录。

为实现此功能,该框架提供了一个PseudoTransactionManager,使得即使在没有实际事务资源参与的情况下也能进行上述配置。当流程正常完成时,会调用beforeCommitafterCommit同步回调;若执行失败,则调用afterRollback同步回调。由于这不是真实的事务,因此不会发生实际的提交或回滚操作。伪事务机制是用于启用同步功能的一种载体。

要使用 PseudoTransactionManager,你可以将其定义为 <bean/>,配置方式与配置真实事务管理器相同。以下示例展示了如何操作:

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />

响应式事务

从版本5.3开始,ReactiveTransactionManager 也可以与 TransactionInterceptor 通知结合使用,用于返回响应式类型的端点。这包括 MessageSourceReactiveMessageHandler 实现(例如 ReactiveMongoDbMessageSource),它们生成带有 FluxMono 负载的消息。所有其他生成回复的消息处理器实现,当它们的回复负载也是某种响应式类型时,可以依赖 ReactiveTransactionManager