跳到主要内容
版本:7.0.2

事务支持

DeepSeek V3 中英对照 Transaction Support

从 5.0 版本开始,通过 HandleMessageAdvice 实现,引入了一个新的 TransactionHandleMessageAdvice,使整个下游流程具有事务性。当在 <request-handler-advice-chain> 元素中使用常规的 TransactionInterceptor 时(例如,通过配置 <tx:advice>),启动的事务仅适用于内部的 AbstractReplyProducingMessageHandler.handleRequestMessage(),而不会传播到下游流程。

为了简化 XML 配置,除了 <request-handler-advice-chain> 之外,所有 <outbound-gateway><service-activator> 及相关组件都添加了 <transactional> 元素。以下示例展示了 <transactional> 的用法:

<int-jdbc:outbound-gateway query="select * from things where id=:headers[id]">
<int-jdbc:transactional/>
</int-jdbc:outbound-gateway>

<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
<constructor-arg value="org.springframework.transaction.TransactionManager"/>
</bean>

如果你熟悉 JPA 集成组件,这样的配置并不陌生,但现在我们可以从流程中的任意点启动事务——不仅限于 <poller> 或消息驱动通道适配器(例如 JMS)。

通过使用 TransactionInterceptorBuilder 可以简化 Java 配置,并且生成的 Bean 名称可以在消息注解adviceChain 属性中使用,如下例所示:

@Bean
public ConcurrentMetadataStore store() {
return new SimpleMetadataStore(hazelcastInstance()
.getMap("idempotentReceiverMetadataStore"));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(
new MetadataStoreSelector(
message -> message.getPayload().toString(),
message -> message.getPayload().toString().toUpperCase(), store()));
}

@Bean
public TransactionInterceptor transactionInterceptor() {
return new TransactionInterceptorBuilder(true)
.transactionManager(this.transactionManager)
.isolation(Isolation.READ_COMMITTED)
.propagation(Propagation.REQUIRES_NEW)
.build();
}

@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "input",
outputChannel = "output",
adviceChain = { "idempotentReceiverInterceptor",
"transactionInterceptor" })
public Transformer transformer() {
return message -> message;
}

注意 TransactionInterceptorBuilder 构造函数中的 true 参数。它会导致创建一个 TransactionHandleMessageAdvice,而不是常规的 TransactionInterceptor

Java DSL 通过端点配置中的 .transactional() 选项支持 Advice,如下例所示:

@Bean
public IntegrationFlow updatingGatewayFlow() {
return f -> f
.handle(Jpa.updatingGateway(this.entityManagerFactory),
e -> e.transactional(true))
.channel(c -> c.queue("persistResults"));
}