事务支持
本章介绍 Spring Integration 对事务的支持。它涵盖了以下主题:
了解消息流中的事务
Spring Integration 暴露了几个钩子来解决您的消息流的事务需求。为了更好地理解这些钩子以及您如何从中受益,我们必须首先回顾一下您可以用来启动消息流的六种机制,并了解如何在每种机制内解决这些流的事务需求。
以下六种机制发起消息流(每种机制的详细信息在本手册中均有提供):
-
网关代理:一个基本的消息网关。
-
消息通道:与
MessageChannel
方法的直接交互(例如,channel.send(message)
)。 -
消息发布者:通过调用 Spring bean 上的方法来发起消息流的方式。
-
入站通道适配器和网关:通过将第三方系统与 Spring Integration 消息系统连接起来发起消息流的方式(例如,
[JmsMessage] → Jms 入站适配器 [SI Message] → SI 通道
)。 -
调度器:基于预配置调度器分发的调度事件来发起消息流的方式。
-
轮询器:类似于调度器,这是基于预配置轮询器分发的调度或间隔事件来发起消息流的方式。
我们可以将这六种机制分为两大类:
-
由用户进程发起的消息流:此类别的示例场景包括调用网关方法或显式地向
MessageChannel
发送Message
。换句话说,这些消息流依赖于第三方进程(例如您编写的一些代码)来发起。 -
由守护进程发起的消息流:此类别的示例场景包括轮询器轮询消息队列以使用轮询到的消息发起新的消息流,或者调度程序通过创建新消息并在预定义时间启动消息流来调度进程。
显然,网关代理、MessageChannel.send(…)
和 MessagePublisher
都属于第一类,而传入适配器和网关、调度器和轮询器属于第二类。
那么,你如何在每个类别中的各种场景中解决事务需求,Spring Integration 是否需要为特定场景提供明确的事务相关支持?或者你可以使用 Spring 的事务支持吗?
Spring 本身提供了对事务管理的一流支持。因此,我们的目标不是提供新的东西,而是使用 Spring 来利用其现有的事务支持功能。换句话说,作为框架,我们必须公开 Spring 事务管理功能的钩子。然而,由于 Spring Integration 配置基于 Spring 配置,我们并不总是需要公开这些钩子,因为 Spring 已经公开了它们。毕竟,每个 Spring Integration 组件都是一个 Spring Bean。
有了这个目标,我们可以再次考虑两种场景:由用户进程发起的消息流和由守护进程发起的消息流。
由用户进程发起并在 Spring 应用程序上下文中配置的消息流受此类进程的常规事务配置约束。因此,它们不需要通过 Spring Integration 显式配置以支持事务。事务应该通过 Spring 的标准事务支持来启动。Spring Integration 消息流自然遵循组件的事务语义,因为它本身是由 Spring 配置的。例如,网关或服务激活器方法可以使用 @Transactional
进行注解,或者可以在 XML 配置中定义 TransactionInterceptor
,并使用切入点表达式指向应为事务性的特定方法。总之,在这些场景中,您对事务配置和边界有完全的控制权。
但是,当涉及到由守护进程发起的消息流时,情况有些不同。虽然这些流是由开发人员配置的,但它们的发起并不直接涉及人类或其他进程。这些是基于触发器的流,根据进程的配置由触发器进程(守护进程)发起。例如,我们可以有一个调度程序在每个周五晚上发起一个消息流。我们还可以配置一个每秒发起一次消息流的触发器,等等。因此,我们需要一种方法让这些基于触发器的进程知道我们希望生成的消息流是事务性的,以便在每次发起新的消息流时创建一个事务上下文。换句话说,我们需要暴露一些事务配置,但只够委托给 Spring 已经提供的事务支持(如同我们在其他场景中所做的)。
轮询器事务支持
Spring Integration 为轮询器提供了事务支持。轮询器是一种特殊的组件,因为在轮询器任务内,我们可以对本身是事务性的资源调用 receive()
,从而将 receive()
调用包含在事务的边界内,这使得在任务失败时它可以回滚。如果我们为通道添加相同的支持,则添加的事务将影响从 send()
调用开始的所有下游组件。这为事务界定提供了一个相当广泛的范围,但没有强烈的原因,特别是在 Spring 已经提供了几种解决任何下游组件事务需求的方法的情况下。然而,将 receive()
方法包含在事务边界内是轮询器的“强烈原因”。
每当您配置一个 Poller 时,您都可以通过使用 transactional
子元素及其属性来提供事务性配置,如下例所示:
<int:poller max-messages-per-poll="1" fixed-rate="1000">
<transactional transaction-manager="txManager"
isolation="DEFAULT"
propagation="REQUIRED"
read-only="true"
timeout="1000"/>
</poller>
上述配置类似于原生的 Spring 事务配置。你仍然需要提供对事务管理器的引用,并且要么指定事务属性,要么依赖默认设置(例如,如果未指定 'transaction-manager' 属性,则默认为名为 'transactionManager' 的 bean)。在内部,该过程被包装在 Spring 的原生事务中,其中 TransactionInterceptor
负责处理事务。有关如何配置事务管理器、事务管理器的类型(如 JTA、Datasource 等)以及其他与事务配置相关的详细信息,请参阅 Spring 框架参考指南。
通过上述配置,由该轮询器发起的所有消息流都是事务性的。有关轮询器事务配置的更多信息和详细信息,请参见 轮询和事务。
除了事务之外,在运行轮询器时,你可能还需要处理一些其他的横切关注点。为了帮助解决这些问题,轮询器元素接受一个 <advice-chain>
子元素,这使得你可以定义一个自定义的建议实例链来应用于轮询器。(更多详情请参阅 可轮询消息源)。在 Spring Integration 2.0 中,轮询器经历了一次重构,并现在使用代理机制来处理事务性关注点以及其他横切关注点。从这一努力中演变出的一个重要变化是我们使 <transactional>
和 <advice-chain>
元素互斥。其背后的逻辑是,如果你需要多个建议并且其中一个为事务建议,你可以像以前一样方便地将其包含在 <advice-chain>
中,但具有更多的控制力,因为你可以选择将建议放置在所需的顺序中。以下示例展示了如何做到这一点:
<int:poller max-messages-per-poll="1" fixed-rate="10000">
<advice-chain>
<ref bean="txAdvice"/>
<ref bean="someOtherAdviceBean" />
<beans:bean class="foo.bar.SampleAdvice"/>
</advice-chain>
</poller>
<tx:advice id="txAdvice" transaction-manager="txManager">
<tx:attributes>
<tx:method name="get*" read-only="true"/>
<tx:method name="*"/>
</tx:attributes>
</tx:advice>
前面的示例展示了 Spring 事务建议 (txAdvice
) 的基本 XML 配置,并将其包含在轮询器定义的 <advice-chain>
中。如果您只需要处理轮询器的事务问题,仍然可以使用 <transactional>
元素作为便利手段。
事务边界
另一个重要因素是消息流中事务的边界。当一个事务开始时,事务上下文会绑定到当前线程。因此,无论你在消息流中有多少个端点和通道,只要确保流程在同一线程上继续,事务上下文就会保持不变。一旦通过引入 轮询通道 或 执行器通道 或在某些服务中手动启动新线程而中断它,事务边界也会被打破。实际上,事务会在那时结束,并且如果线程之间已经成功交接,即使流将继续并可能在下游某处导致异常,也会被认为是成功的,并发送 COMMIT 信号。如果这样的流是同步的,则该异常可以抛回到消息流的发起者,该发起者也是事务上下文的发起者,事务将导致回滚(ROLLBACK)。折中的办法是在任何线程边界被打破的地方使用事务性通道。例如,你可以使用基于队列的通道,该通道委托给事务性 MessageStore 策略,或者你可以使用基于 JMS 的通道。
事务同步
在某些环境中,将操作与涵盖整个流程的事务同步会有所帮助。例如,考虑一个位于流程开始处的 <file:inbound-channel-adapter/>
,它执行多个数据库更新。如果事务提交,我们可能想要将文件移动到 success
目录,而如果事务回滚,我们可能想要将其移动到 failure
目录。
Spring Integration 2.2 引入了使用事务同步这些操作的能力。此外,如果你没有 '真实' 的事务,但仍然希望在成功或失败时执行不同的操作,你可以配置一个 PseudoTransactionManager
。有关更多信息,请参见 伪事务。
以下列表显示了此功能的关键策略接口:
public interface TransactionSynchronizationFactory {
TransactionSynchronization create(Object key);
}
public interface TransactionSynchronizationProcessor {
void processBeforeCommit(IntegrationResourceHolder holder);
void processAfterCommit(IntegrationResourceHolder holder);
void processAfterRollback(IntegrationResourceHolder holder);
}
工厂负责创建一个 TransactionSynchronization 对象。你可以实现自己的,或者使用框架提供的:DefaultTransactionSynchronizationFactory
。此实现返回一个 TransactionSynchronization
,它委托给 TransactionSynchronizationProcessor
的默认实现:ExpressionEvaluatingTransactionSynchronizationProcessor
。此处理器支持三个 SpEL 表达式:beforeCommitExpression
、afterCommitExpression
和 afterRollbackExpression
。
这些操作对于熟悉事务的人来说应该是不言自明的。在每种情况下,#root
变量都是原始的 Message
。在某些情况下,根据轮询器轮询的 MessageSource
,会提供其他 SpEL 变量。例如,MongoDbMessageSource
提供了 #mongoTemplate
变量,它引用了消息源的 MongoTemplate
。同样,RedisStoreMessageSource
提供了 #store
变量,它引用了由轮询创建的 RedisStore
。
要为特定的轮询器启用此功能,可以通过使用 synchronization-factory
属性在轮询器的 <transactional/>
元素上提供 TransactionSynchronizationFactory
的引用。
从 5.0 版本开始,Spring Integration 提供了 PassThroughTransactionSynchronizationFactory
,当没有配置 TransactionSynchronizationFactory
但建议链中存在类型为 TransactionInterceptor
的建议时,它会默认应用于轮询端点。在使用任何开箱即用的 TransactionSynchronizationFactory
实现时,轮询端点将轮询的消息绑定到当前事务上下文,并在事务建议之后抛出异常时将其作为 MessagingException
中的 failedMessage
提供。当使用不实现 TransactionInterceptor
的自定义事务建议时,您可以显式地配置 PassThroughTransactionSynchronizationFactory
以实现这种行为。在这两种情况下,MessagingException
成为发送到 errorChannel
的 ErrorMessage
的有效载荷,而原因是建议抛出的原始异常。以前,ErrorMessage
的有效载荷是建议抛出的原始异常,并且没有提供对 failedMessage
信息的引用,这使得确定事务提交问题的原因变得困难。
为了简化这些组件的配置,Spring Integration 为默认工厂提供了命名空间支持。以下示例展示了如何使用命名空间来配置文件入站通道适配器:
<int-file:inbound-channel-adapter id="inputDirPoller"
channel="someChannel"
directory="/foo/bar"
filter="filter"
comparator="testComparator">
<int:poller fixed-rate="5000">
<int:transactional transaction-manager="transactionManager" synchronization-factory="syncFactory" />
</int:poller>
</int-file:inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.renameTo(new java.io.File('/success/' + payload.name))"
channel="committedChannel" />
<int:after-rollback expression="payload.renameTo(new java.io.File('/failed/' + payload.name))"
channel="rolledBackChannel" />
</int:transaction-synchronization-factory>
SpEL 评估的结果作为有效负载发送到 committedChannel
或 rolledBackChannel
(在本例中,这将是 Boolean.TRUE
或 Boolean.FALSE
—— java.io.File.renameTo()
方法调用的结果)。
如果你想发送整个有效载荷以进行进一步的 Spring Integration 处理,请使用 'payload' 表达式。
重要的是要理解这会将操作与一个事务同步。它并不会使本质上不是事务性的资源变成事务性。相反,事务(无论是 JDBC 还是其他)会在轮询之前开始,并在流程完成时提交或回滚,然后执行同步操作。
如果你提供了一个自定义的 TransactionSynchronizationFactory
,它负责创建资源同步,使得绑定的资源在事务完成时自动解除绑定。默认的 TransactionSynchronizationFactory
通过返回 ResourceHolderSynchronization
的子类来实现这一点,默认情况下 shouldUnbindAtCompletion()
返回 true
。
除了 after-commit
和 after-rollback
表达式之外,还支持 before-commit
。在这种情况下,如果评估(或下游处理)抛出异常,则事务将回滚而不是提交。
伪事务
在阅读了事务同步部分后,您可能会认为即使轮询器下游没有“真实”的事务性资源(例如 JDBC),当流程完成时,采取这些“成功”或“失败”操作也是有用的。例如,考虑一个“<file:inbound-channel-adapter/>”后面跟着一个“<ftp:outbout-channel-adapter/>”。这两个组件都不是事务性的,但我们可能希望根据 FTP 传输的成功或失败将输入文件移动到不同的目录。
为了提供此功能,框架提供了一个 PseudoTransactionManager
,即使没有涉及真实的事务资源,也能启用上述配置。如果流程正常完成,则会调用 beforeCommit
和 afterCommit
同步。在失败时,则会调用 afterRollback
同步。因为它不是真正的事务,所以不会发生实际的提交或回滚。伪事务是一种用于启用同步功能的工具。
要使用 PseudoTransactionManager
,你可以将其定义为一个 <bean/>,就像配置一个真实的事务管理器一样。以下示例展示了如何进行配置:
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager" />
响应式事务
从 5.3 版本开始,ReactiveTransactionManager
也可以与 TransactionInterceptor
通知一起用于返回反应式类型的端点。这包括产生带有 Flux
或 Mono
负载的消息的 MessageSource
和 ReactiveMessageHandler
实现(例如 ReactiveMongoDbMessageSource
)。所有其他产生回复的消息处理器实现可以在其回复负载也是某种反应式类型时依赖于 ReactiveTransactionManager
。