跳到主要内容

线程屏障

QWen Plus 中英对照 Thread Barrier

有时,我们需要挂起一个消息流线程,直到某些其他异步事件发生。例如,考虑一个将消息发布到 RabbitMQ 的 HTTP 请求。我们可能希望在 RabbitMQ 代理发出消息已收到的确认之前,不回复用户。

在版本 4.2 中,Spring Integration 引入了 <barrier/> 组件来实现此目的。底层的 MessageHandlerBarrierMessageHandler。这个类还实现了 MessageTriggerAction,在这个接口中,传递给 trigger() 方法的消息会在 handleRequestMessage() 方法中释放相应的线程(如果存在)。

挂起的线程和触发线程通过在消息上调用 CorrelationStrategy 来关联。当消息发送到 input-channel 时,线程最多会挂起 requestTimeout 毫秒,等待相应的触发消息。默认的相关策略使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 头。当具有相同相关性的触发消息到达时,线程会被释放。释放后发送到 output-channel 的消息是通过使用 MessageGroupProcessor 构建的。默认情况下,消息是两个有效负载的 Collection<?>,并且头信息是通过使用 DefaultAggregatingMessageGroupProcessor 合并的。

警告

如果首先调用 trigger() 方法(或在主线程超时后调用),它将被暂停最多 triggerTimeout 时间,等待挂起的消息到达。如果你不希望挂起触发线程,可以考虑交给 TaskExecutor 处理,这样它的线程会被挂起。

备注

在 5.4 版本之前,请求消息和触发消息只有一个 timeout 选项,但在某些使用场景中,为这些动作设置不同的超时时间会更好。因此,引入了 requestTimeouttriggerTimeout 选项。

requires-reply 属性确定如果挂起的线程在触发消息到达之前超时应采取的操作。默认情况下,它是 false,这意味着端点返回 null,流程结束,线程返回给调用者。当设置为 true 时,会抛出 ReplyRequiredException

你可以通过编程方式调用 trigger() 方法(通过名称获取bean引用,barrier.handler — 其中 barrier 是屏障端点的bean名称)。或者,你可以配置一个 <outbound-channel-adapter/> 来触发释放。

important

只能有一个线程可以使用相同的关联被挂起。相同的关联可以被多次使用,但同时只能使用一次。如果第二个线程带着相同的关联到达,则会抛出一个异常。

以下示例展示了如何使用自定义标题进行关联:

@ServiceActivator(inputChannel="in")
@Bean
public BarrierMessageHandler barrier(MessageChannel out, MessageChannel lateTriggerChannel) {
BarrierMessageHandler barrier = new BarrierMessageHandler(10000);
barrier.setOutputChannel(out());
barrier.setDiscardChannel(lateTriggerChannel);
return barrier;
}

@ServiceActivator (inputChannel="release")
@Bean
public MessageHandler releaser(MessageTriggerAction barrier) {
return barrier::trigger;
}
java

取决于哪一个消息先到达,发送消息到 in 的线程或发送消息到 release 的线程会等待最多十秒直到另一个消息到达。当消息被释放时,out 通道会发送一个消息,该消息结合了调用自定义 MessageGroupProcessor bean(名为 myOutputProcessor)的结果。如果主线程超时并且触发器稍后到达,您可以配置一个丢弃通道,将迟到的触发器发送到该通道。如果请求消息未能及时到达,触发消息也会被丢弃。

对于此组件的示例,请参阅 屏障示例应用程序