跳到主要内容

聚合器和重排序器

QWen Plus 中英对照 Aggregators and Resequencers

Aggregator 从概念上讲是 Splitter 的对立面。它将一系列单独的消息聚合为一条消息,因此必然会更复杂。默认情况下,聚合器返回一条包含来自传入消息的有效负载集合的消息。对于 Resequencer,应用相同的规则。以下示例展示了一个典型的拆分器-聚合器模式:

@Bean
public IntegrationFlow splitAggregateFlow() {
return IntegrationFlow.from("splitAggregateInput")
.split()
.channel(MessageChannels.executor(this.taskExecutor()))
.resequence()
.aggregate()
.get();
}
java

split() 方法将列表拆分为单独的消息并发送到 ExecutorChannelresequence() 方法根据消息头中找到的序列详细信息重新排序消息。aggregate() 方法收集这些消息。

但是,您可以通过指定发布策略和相关性策略等来更改默认行为。考虑以下示例:

.aggregate(a ->
a.correlationStrategy(m -> m.getHeaders().get("myCorrelationKey"))
.releaseStrategy(g -> g.size() > 10)
.messageStore(messageStore()))
java

前面的例子关联了具有 myCorrelationKey 标头的消息,并在累积至少 十 个消息后释放这些消息。

类似的 lambda 配置也适用于 resequence() EIP 方法。