跳到主要内容
版本:7.0.2

线程屏障

DeepSeek V3 中英对照 Thread Barrier

有时,我们需要暂停消息流线程,直到其他异步事件发生。例如,考虑一个向 RabbitMQ 发布消息的 HTTP 请求。我们可能希望在 RabbitMQ 代理发出消息接收确认之前,不向用户回复。

在 4.2 版本中,Spring Integration 为此目的引入了 <barrier/> 组件。其底层的 MessageHandlerBarrierMessageHandler。该类还实现了 MessageTriggerAction 接口,其中传递给 trigger() 方法的消息会在 handleRequestMessage() 方法(如果存在)中释放对应的线程。

挂起的线程与触发线程通过调用消息上的 CorrelationStrategy 进行关联。当消息发送到 input-channel 时,线程会挂起最多 requestTimeout 毫秒,等待相应的触发消息。默认的关联策略使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 头部。当具有相同关联标识的触发消息到达时,线程将被释放。释放后发送到 output-channel 的消息是通过 MessageGroupProcessor 构建的。默认情况下,该消息是两个有效载荷的 Collection<?>,头部通过 DefaultAggregatingMessageGroupProcessor 进行合并。

警告

如果首先调用 trigger() 方法(或在主线程超时后调用),该方法将最多挂起 triggerTimeout 时间,等待挂起消息到达。如果不希望挂起触发器线程,请考虑改用 TaskExecutor 进行任务移交,以便挂起其线程而非触发器线程。

备注

在 5.4 版本之前,请求和触发消息只有一个 timeout 选项,但在某些使用场景中,为这些操作设置不同的超时时间更为合适。因此,我们引入了 requestTimeouttriggerTimeout 选项。

requires-reply 属性决定了当挂起的线程在触发消息到达前超时时应采取的操作。默认值为 false,这意味着端点将返回 null,流程结束,线程返回给调用者。当设置为 true 时,将抛出 ReplyRequiredException

你可以通过编程方式调用 trigger() 方法(通过使用名称 barrier.handler 获取 bean 引用,其中 barrier 是屏障端点的 bean 名称)。或者,你也可以配置一个 <outbound-channel-adapter/> 来触发释放。

important

同一关联标识只能挂起一个线程。同一关联标识可多次使用,但每次只能同时挂起一个线程。若第二个线程使用相同关联标识尝试挂起,将抛出异常。

以下示例展示了如何使用自定义标头进行关联:

@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
barrier.setOutputChannel(out());
barrier.setDiscardChannel(lateTriggerChannel);
return barrier;
}

@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
return barrier::trigger;
}

根据哪个消息先到达,向 in 发送消息的线程或向 release 发送消息的线程会等待最多十秒,直到另一个消息到达。当消息被释放时,out 通道会收到一条消息,该消息结合了调用名为 myOutputProcessor 的自定义 MessageGroupProcessor bean 的结果。如果主线程超时且触发消息稍后到达,你可以配置一个丢弃通道,用于接收延迟的触发消息。如果请求消息未能及时到达,触发消息也会被丢弃。

有关此组件的示例,请参见屏障示例应用程序