跳到主要内容
版本:7.0.2

文件聚合器

DeepSeek V3 中英对照 File Aggregator

从版本 5.5 开始,引入了 FileAggregator,以覆盖在启用 START/END 标记时 FileSplitter 的另一使用场景。为方便起见,FileAggregator 实现了所有三种序列详情策略:

  • 使用带有 FileHeaders.FILENAME 属性的 HeaderAttributeCorrelationStrategy 来计算关联键。当在 FileSplitter 上启用标记时,它不会填充序列详情头部,因为 START/END 标记消息也包含在序列大小中。对于发出的每一行(包括 START/END 标记消息),FileHeaders.FILENAME 仍会被填充。

  • FileMarkerReleaseStrategy - 检查组中是否存在 FileSplitter.FileMarker.Mark.END 消息,然后将 FileHeaders.LINE_COUNT 头部值与组大小减去 2(即 FileSplitter.FileMarker 实例数)进行比较。它还实现了方便的 GroupConditionProvider 契约,以便 conditionSupplier 函数能在 AbstractCorrelatingMessageHandler 中使用。更多信息请参阅消息组条件

  • FileAggregatingMessageGroupProcessor 仅从组中移除 FileSplitter.FileMarker 消息,并将剩余消息收集到列表负载中以进行生成。

下面的列表展示了配置 FileAggregator 的可能方式:

@Bean
public IntegrationFlow fileSplitterAggregatorFlow(TaskExecutor taskExecutor) {
return f -> f
.split(Files.splitter()
.markers()
.firstLineAsHeader("firstLine"))
.channel(c -> c.executor(taskExecutor))
.filter(payload -> !(payload instanceof FileSplitter.FileMarker),
e -> e.discardChannel("aggregatorChannel"))
.<String, String>transform(String::toUpperCase)
.channel("aggregatorChannel")
.aggregate(new FileAggregator())
.channel(c -> c.queue("resultChannel"));
}

如果 FileAggregator 的默认行为无法满足目标逻辑,建议配置一个具有独立策略的聚合器端点。更多信息请参阅 FileAggregator 的 JavaDocs。