聚合器与重排序器
Aggregator(聚合器)在概念上是Splitter(拆分器)的反向操作。它将一系列独立的消息聚合成一条单一消息,因此必然更为复杂。默认情况下,聚合器返回的消息包含来自传入消息的有效负载集合。同样的规则也适用于Resequencer(重排器)。以下示例展示了拆分器-聚合器模式的典型示例:
@Bean
public IntegrationFlow splitAggregateFlow() {
return IntegrationFlow.from("splitAggregateInput")
.split()
.channel(MessageChannels.executor(this.taskExecutor()))
.resequence()
.aggregate()
.get();
}
split() 方法将列表拆分为独立的消息,并将它们发送到 ExecutorChannel。resequence() 方法根据消息头中的序列详情对消息进行重新排序。aggregate() 方法则负责收集这些消息。
然而,你可以通过指定发布策略和关联策略等方式来改变默认行为。考虑以下示例:
.aggregate(a ->
a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
.releaseStrategy(g -> g.size() > 10)
.messageStore(messageStore()))
前面的示例会关联那些带有 myCorrelationKey 消息头的消息,并在累积至少十条消息后一次性释放它们。
resequence() EIP 方法也提供了类似的 lambda 配置。