Scatter-Gather
此前,该模式可通过分立元件进行配置。此项增强功能带来了更便捷的配置方式。
ScatterGatherHandler 是一个请求-回复端点,它结合了 PublishSubscribeChannel(或 RecipientListRouter)和 AggregatingMessageHandler。请求消息被发送到 scatter 通道,然后 ScatterGatherHandler 等待聚合器发送到 outputChannel 的回复。
功能
Scatter-Gather 模式包含两种场景:"auction"(竞拍)和 "distribution"(分发)。在这两种情况下,aggregation(聚合)函数是相同的,并且提供了 AggregatingMessageHandler 所有可用的选项。(实际上,ScatterGatherHandler 仅需要一个 AggregatingMessageHandler 作为构造函数参数。)更多信息请参阅 聚合器。
拍卖
拍卖的 Scatter-Gather 变体对请求消息采用“发布-订阅”逻辑,其中“分散”通道是一个设置了 apply-sequence="true" 的 PublishSubscribeChannel。不过,该通道可以是任何 MessageChannel 实现(正如 ContentEnricher 中的 request-channel 一样——参见 内容增强器)。然而,在这种情况下,您需要为 aggregation 函数创建自定义的 correlationStrategy。
发行版
Scatter-Gather 的分布模式变体基于 RecipientListRouter(参见 RecipientListRouter),并支持 RecipientListRouter 的所有可用选项。这些选项作为 ScatterGatherHandler 构造函数的第二个参数传入。若希望仅依赖 recipient-list-router 和 aggregator 的默认 correlationStrategy,则应设置 apply-sequence="true";否则,需为 aggregator 提供自定义的 correlationStrategy。与 PublishSubscribeChannel 变体(即拍卖模式变体)不同,recipient-list-router 的 selector 选项允许基于消息内容筛选目标供应商。当设置 apply-sequence="true" 时,系统将提供默认的 sequenceSize,确保 aggregator 能正确释放消息组。需注意,分布模式与拍卖模式是互斥的。
仅当基于 ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer) 构造函数的纯 Java 配置需要设置 applySequence=true,因为框架无法修改外部提供的组件。为方便起见,从版本 6.0 开始,XML 和 Java DSL 中的 Scatter-Gather 已将 applySequence 默认设置为 true。
对于拍卖和分发两种变体,请求(分散)消息都会通过添加 gatherResultChannel 头部来等待来自 aggregator 的回复消息。
默认情况下,所有供应方应将结果发送至 replyChannel 头信息(通常通过省略最终端点的 output-channel 实现)。但同时也提供了 gatherChannel 选项,允许供应方将回复发送至该通道进行聚合。
配置 Scatter-Gather 端点
以下示例展示了 Scatter-Gather 的 Bean 定义的 Java 配置:
@Bean
public MessageHandler distributor() {
RecipientListRouter router = new RecipientListRouter();
router.setApplySequence(true);
router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
distributionChannel3()));
return router;
}
@Bean
public MessageHandler gatherer() {
return new AggregatingMessageHandler(
new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
new SimpleMessageStore(),
new HeaderAttributeCorrelationStrategy(
IntegrationMessageHeaderAccessor.CORRELATION_ID),
new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}
@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
handler.setOutputChannel(output());
return handler;
}
在前面的示例中,我们为 RecipientListRouter 的 distributor bean 配置了 applySequence="true" 和接收者通道列表。下一个 bean 用于 AggregatingMessageHandler。最后,我们将这两个 bean 注入到 ScatterGatherHandler bean 定义中,并将其标记为 @ServiceActivator,以便将分散-收集组件连接到集成流中。
以下示例展示了如何使用 XML 命名空间配置 <scatter-gather> 端点:
<scatter-gather
id="" // <1>
auto-startup="" // <2>
input-channel="" // <3>
output-channel="" // <4>
scatter-channel="" // <5>
gather-channel="" // <6>
order="" // <7>
phase="" // <8>
send-timeout="" // <9>
gather-timeout="" // <10>
requires-reply="" > // <11>
<scatterer/> // <12>
<gatherer/> // <13>
</scatter-gather>
端点ID。
ScatterGatherHandlerBean 以id + '.handler'的别名注册。RecipientListRouterBean 以id + '.scatterer'的别名注册。AggregatingMessageHandlerBean 以id + '.gatherer'的别名注册。可选。(BeanFactory会生成默认的id值。)生命周期属性,指示端点是否应在应用上下文初始化期间启动。此外,
ScatterGatherHandler也实现了Lifecycle接口,并会启动和停止gatherEndpoint(如果提供了gather-channel,该端点会在内部创建)。可选。(默认值为true。)接收请求消息以便在
ScatterGatherHandler中处理它们的通道。必需。ScatterGatherHandler将聚合结果发送到的通道。可选。(传入消息可以在replyChannel消息头中自行指定回复通道。)在拍卖场景中用于发送分散消息的通道。可选。与
<scatterer>子元素互斥。用于接收来自每个供应商的回复以进行聚合的通道。它被用作分散消息中的
replyChannel头。可选。默认情况下,会创建FixedSubscriberChannel。当多个处理器订阅到同一个
DirectChannel时(用于负载均衡目的),此组件的顺序。可选。指定端点应启动和停止的阶段。启动顺序从低到高进行,关闭顺序从高到低进行。默认情况下,此值为
Integer.MAX_VALUE,意味着此容器尽可能晚启动并尽可能早停止。可选。向
output-channel发送回复Message时的等待超时间隔。默认情况下,send()会阻塞一秒。仅当输出通道存在某些“发送”限制时才适用——例如,具有固定“容量”且已满的QueueChannel。在这种情况下,会抛出MessageDeliveryException。对于AbstractSubscribableChannel实现,send-timeout会被忽略。对于group-timeout(-expression),来自计划过期任务的MessageDeliveryException会导致此任务被重新调度。可选。允许您指定分散-收集在返回前等待回复消息的时间。默认情况下,它等待
30秒。如果回复超时,则返回 'null'。可选。指定分散-收集是否必须返回非空值。此值默认为
true。因此,当底层聚合器在gather-timeout后返回空值时,会抛出ReplyRequiredException。请注意,如果可能返回null,则应指定gather-timeout以避免无限期等待。<recipient-list-router>选项。可选。与scatter-channel属性互斥。<aggregator>选项。必需。
从版本 6.5.3 开始,当为 ScatterGatherHandler 配置了 async = true 选项时,请求消息处理线程不再阻塞等待内部 ((PollableChannel) gatherResultChannel).receive(this.gatherTimeout) 操作的结果。相反,会返回一个 reactor.core.publisher.Mono 作为回复对象,该对象基于最终从 gatherResultChannel 产生的结果。然后,该 Mono 会根据框架中的 Reactive Streams 支持 进行处理。
错误处理
由于Scatter-Gather是一个多请求-回复组件,其错误处理具有额外的复杂性。在某些情况下,如果ReleaseStrategy允许流程以少于请求数量的回复完成,那么捕获并忽略下游异常可能是更好的选择。在其他情况下,当发生错误时,应考虑从子流返回类似“补偿消息”的内容。
每个异步子流程都应配置一个 errorChannel 头,以便从 MessagePublishingErrorHandler 正确发送错误消息。否则,错误将被发送到全局 errorChannel 并采用通用的错误处理逻辑。有关异步错误处理的更多信息,请参阅错误处理。
同步流可以使用 ExpressionEvaluatingRequestHandlerAdvice 来忽略异常或返回补偿消息。当某个子流向 ScatterGatherHandler 抛出异常时,该异常会直接重新抛向上游。这样一来,所有其他子流的处理都将白费功夫,并且它们的回复在 ScatterGatherHandler 中会被忽略。虽然有时这可能是预期行为,但在大多数情况下,更好的做法是在特定的子流中处理错误,而不影响其他所有子流以及收集器的预期结果。
从 5.1.3 版本开始,ScatterGatherHandler 提供了 errorChannelName 选项。该选项会被填充到分散消息的 errorChannel 头部,在发生异步错误时使用,也可在常规的同步子流程中用于直接发送错误消息。
以下示例配置通过返回补偿消息来演示异步错误处理:
@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
return f -> f
.scatterGather(
scatterer -> scatterer
.recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
.recipientFlow(f2 -> f2
.channel(c -> c.executor(taskExecutor))
.transform(p -> {
throw new RuntimeException("Sub-flow#2");
})),
null,
s -> s.errorChannel("scatterGatherErrorChannel"));
}
@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
return MessageBuilder.withPayload(payload.getCause().getCause())
.copyHeaders(payload.getFailedMessage().getHeaders())
.build();
}
为了生成适当的回复,我们必须从 MessagingException 的 failedMessage 中复制头部信息(包括 replyChannel 和 errorChannel),该异常已通过 MessagePublishingErrorHandler 发送至 scatterGatherErrorChannel。这样,目标异常就能返回给 ScatterGatherHandler 的收集器,以完成回复消息组的处理。此类异常的 payload 可以在收集器的 MessageGroupProcessor 中被过滤掉,或者在分散-聚合端点之后,通过下游的其他方式进行进一步处理。
在将分散结果发送到收集器之前,ScatterGatherHandler 会恢复请求消息的头部,包括回复通道和错误通道(如果存在)。这样,即使分散接收子流程中应用了异步处理,来自 AggregatingMessageHandler 的错误也会传播给调用者。为了确保操作成功,必须将 gatherResultChannel、originalReplyChannel 和 originalErrorChannel 头部传回分散接收子流程的回复中。在这种情况下,必须为 ScatterGatherHandler 配置一个合理且有限的 gatherTimeout。否则,默认情况下它将永远阻塞,等待收集器的回复。