分散-聚集
从 4.1 版本开始,Spring Integration 提供了 scatter-gather 企业集成模式的实现。它是一个复合端点,其目标是将消息发送给接收者并汇总结果。正如在 Enterprise Integration Patterns 中所指出的,它是用于诸如“最佳报价”场景的组件,在这些场景中,我们需要向多个供应商请求信息,并决定哪个供应商为我们提供所请求项目的最佳条款。
以前,可以通过使用分立组件来配置模式。此增强功能带来了更便捷的配置。
ScatterGatherHandler 是一个请求-回复端点,它结合了 PublishSubscribeChannel(或 RecipientListRouter)和 AggregatingMessageHandler。请求消息被发送到 scatter 通道,ScatterGatherHandler 等待聚合器发送到 outputChannel 的回复。
功能
Scatter-Gather 模式建议两种场景:“拍卖”和“分发”。在这两种情况下,aggregation 函数是相同的,并提供 AggregatingMessageHandler 所有可用的选项。(实际上,ScatterGatherHandler 仅需要一个 AggregatingMessageHandler 作为构造函数参数。)更多信息请参阅 聚合器。
拍卖
拍卖的 Scatter-Gather 变体使用“发布-订阅”逻辑来处理请求消息,其中“scatter”通道是一个 PublishSubscribeChannel,并且 apply-sequence="true"。但是,此通道可以是任何 MessageChannel 实现(就像 ContentEnricher 中的 request-channel 一样——请参阅内容充实)。然而,在这种情况下,你应该为自己创建一个自定义的 correlationStrategy 来用于 aggregation 函数。
分布
分布式的 Scatter-Gather 变体基于 RecipientListRouter(见 RecipientListRouter),并带有 RecipientListRouter 的所有可用选项。这是 ScatterGatherHandler 构造函数的第二个参数。如果你想只依赖于默认的 correlationStrategy 来处理 recipient-list-router 和 aggregator,你应该指定 apply-sequence="true"。否则,你应该为 aggregator 提供一个自定义的 correlationStrategy。与 PublishSubscribeChannel 变体(拍卖变体)不同的是,recipient-list-router 的 selector 选项可以根据消息来过滤目标供应商。当 apply-sequence="true" 时,默认的 sequenceSize 会被提供,并且 aggregator 可以正确地释放组。分发选项与拍卖选项是互斥的。
applySequence=true 仅在基于 ScatterGatherHandler(MessageHandler scatterer, MessageHandler gatherer) 构造函数配置的普通 Java 配置中是必需的,因为框架无法更改外部提供的组件。为了方便起见,从 6.0 版本开始,XML 和 Java DSL 的 Scatter-Gather 将 applySequence 设置为 true。
对于拍卖和分发这两种变体,请求(scatter)消息都添加了 gatherResultChannel 头,以等待来自 aggregator 的回复消息。
默认情况下,所有供应商应将其结果发送到 replyChannel 头(通常通过省略最终端点的 output-channel 来实现)。但是,也提供了 gatherChannel 选项,让供应商可以将回复发送到该通道以进行聚合。
配置 Scatter-Gather 终端
以下示例显示了 Scatter-Gather 的 Java 配置:
bean definition
请注意,Scatter-Gather 是一个专有名词,在配置中保持不变。
@Bean
public MessageHandler distributor() {
    RecipientListRouter router = new RecipientListRouter();
    router.setApplySequence(true);
    router.setChannels(Arrays.asList(distributionChannel1(), distributionChannel2(),
            distributionChannel3()));
    return router;
}
@Bean
public MessageHandler gatherer() {
	return new AggregatingMessageHandler(
			new ExpressionEvaluatingMessageGroupProcessor("^[payload gt 5] ?: -1D"),
			new SimpleMessageStore(),
			new HeaderAttributeCorrelationStrategy(
			       IntegrationMessageHeaderAccessor.CORRELATION_ID),
			new ExpressionEvaluatingReleaseStrategy("size() == 2"));
}
@Bean
@ServiceActivator(inputChannel = "distributionChannel")
public MessageHandler scatterGatherDistribution() {
	ScatterGatherHandler handler = new ScatterGatherHandler(distributor(), gatherer());
	handler.setOutputChannel(output());
	return handler;
}
在前面的例子中,我们配置了 RecipientListRouter distributor bean,设置了 applySequence="true" 和收件人通道列表。下一个 bean 是用于 AggregatingMessageHandler 的。最后,我们将这两个 bean 注入到 ScatterGatherHandler bean 定义中,并将其标记为 @ServiceActivator 以将散射收集组件连接到集成流中。
以下示例展示了如何通过使用 XML 命名空间来配置 <scatter-gather> 端点:
<scatter-gather
		id=""  // <1>
		auto-startup=""  // <2>
		input-channel=""  // <3>
		output-channel=""  // <4>
		scatter-channel=""  // <5>
		gather-channel=""  // <6>
		order=""  // <7>
		phase=""  // <8>
		send-timeout=""  // <9>
		gather-timeout=""  // <10>
		requires-reply="" > // <11>
			<scatterer/>  // <12>
			<gatherer/>  // <13>
</scatter-gather>
- 端点的 id。 - ScatterGatherHandlerbean 以- id + '.handler'的别名注册。- RecipientListRouterbean 以- id + '.scatterer'的别名注册。- AggregatingMessageHandlerbean 以- id + '.gatherer'的别名注册。可选。(- BeanFactory生成默认的- id值。)
- 生命周期属性,指示端点是否应在应用程序上下文初始化期间启动。此外, - ScatterGatherHandler还实现了- Lifecycle并启动和停止- gatherEndpoint,如果提供了- gather-channel,则会内部创建- gatherEndpoint。可选。(默认值为- true。)
- 用于接收请求消息的通道,以便在 - ScatterGatherHandler中处理它们。必需。
- ScatterGatherHandler发送聚合结果的通道。可选。(传入的消息可以在- replyChannel消息头中指定自己的回复通道)。
- 用于拍卖场景发送散射消息的通道。可选。与 - <scatterer>子元素互斥。
- 用于从每个供应商接收回复以进行聚合的通道。它用作散射消息中的 - replyChannel头。可选。默认情况下,创建- FixedSubscriberChannel。
- 当多个处理器订阅相同的 - DirectChannel时(用于负载均衡目的),此组件的顺序。可选。
- 指定端点应启动和停止的阶段。启动顺序从最低到最高进行,而关闭顺序从最高到最低。默认情况下,此值为 - Integer.MAX_VALUE,意味着此容器尽可能晚地启动并尽可能早地停止。可选。
- 在向 - output-channel发送回复- Message时等待的超时间隔。默认情况下,- send()阻塞一秒钟。仅当输出通道有某些 '发送' 限制时才适用——例如,具有固定 '容量' 已满的- QueueChannel。在这种情况下,会抛出- MessageDeliveryException。对于- AbstractSubscribableChannel实现,- send-timeout被忽略。对于- group-timeout(-expression),来自计划过期任务的- MessageDeliveryException将导致此任务重新安排。可选。
- 允许您指定散射-收集等待回复消息的时间。默认情况下,它等待 - 30秒。如果回复超时,则返回 'null'。可选。
- 指定散射-收集是否必须返回非空值。此值默认为 - true。因此,在- gather-timeout后,当底层聚合器返回 null 值时,将抛出- ReplyRequiredException。注意,如果 'null' 是一种可能性,则应指定- gather-timeout以避免无限期等待。
- <recipient-list-router>选项。可选。与- scatter-channel属性互斥。
- <aggregator>选项。必需。
错误处理
由于 Scatter-Gather 是一个多请求-回复组件,错误处理有一些额外的复杂性。在某些情况下,如果 ReleaseStrategy 允许进程以少于请求数量的回复完成,则最好只是捕获并忽略下游异常。在其他情况下,当发生错误时,应该考虑类似“补偿消息”的机制从子流返回。
每个异步子流都应该配置一个 errorChannel 头,以便从 MessagePublishingErrorHandler 正确发送错误消息。否则,错误将被发送到全局的 errorChannel,并使用通用的错误处理逻辑。有关异步错误处理的更多信息,请参阅错误处理。
同步流可以使用 ExpressionEvaluatingRequestHandlerAdvice 来忽略异常或返回补偿消息。当从其中一个子流向 ScatterGatherHandler 抛出异常时,它只是重新抛出到上游。这样一来,所有其他子流将白费功夫,它们的回复将在 ScatterGatherHandler 中被忽略。有时这可能是预期的行为,但在大多数情况下,最好是在特定的子流中处理错误而不影响所有其他子流和聚合器中的期望。
从 5.1.3 版本开始,ScatterGatherHandler 提供了 errorChannelName 选项。它被填充到散射消息的 errorChannel 头中,在异步错误发生时使用,或者可以在常规同步子流中直接发送错误消息时使用。
下面的示例配置通过返回补偿消息来演示异步错误处理:
@Bean
public IntegrationFlow scatterGatherAndExecutorChannelSubFlow(TaskExecutor taskExecutor) {
    return f -> f
            .scatterGather(
                    scatterer -> scatterer
                            .recipientFlow(f1 -> f1.transform(p -> "Sub-flow#1"))
                            .recipientFlow(f2 -> f2
                                    .channel(c -> c.executor(taskExecutor))
                                    .transform(p -> {
                                        throw new RuntimeException("Sub-flow#2");
                                    })),
                    null,
                    s -> s.errorChannel("scatterGatherErrorChannel"));
}
@ServiceActivator(inputChannel = "scatterGatherErrorChannel")
public Message<?> processAsyncScatterError(MessagingException payload) {
    return MessageBuilder.withPayload(payload.getCause().getCause())
            .copyHeaders(payload.getFailedMessage().getHeaders())
            .build();
}
为了产生一个适当的回复,我们必须从 failedMessage 中复制报头(包括 replyChannel 和 errorChannel),该 failedMessage 是由 MessagePublishingErrorHandler 发送到 scatterGatherErrorChannel 的 MessagingException。这样,目标异常会被返回到 ScatterGatherHandler 的收集器中,以完成回复消息的组处理。这种异常 payload 可以在收集器的 MessageGroupProcessor 中过滤掉,或者在 scatter-gather 终端之后以其他方式下游处理。
在将散射结果发送给收集器之前,ScatterGatherHandler 会恢复请求消息头,包括回复和错误通道(如果有的话)。这样,即使在散射接收子流中应用了异步交接,AggregatingMessageHandler 的错误也会被传播给调用者。为了成功操作,必须将 gatherResultChannel、originalReplyChannel 和 originalErrorChannel 头信息重新传递回来自散射接收子流的回复。在这种情况下,必须为 ScatterGatherHandler 配置一个合理的、有限的 gatherTimeout。否则,默认情况下它将一直阻塞等待来自收集器的回复。