文件聚合器
从 5.5 版本开始,引入了 FileAggregator 以覆盖在启用 START/END 标记时 FileSplitter 的另一种使用场景。为了方便,FileAggregator 实现了所有三种序列详情策略:
- 
使用带有 FileHeaders.FILENAME属性的HeaderAttributeCorrelationStrategy进行相关键计算。当在FileSplitter上启用标记时,它不会填充序列详细信息头,因为 START/END 标记消息也被包含在序列大小中。FileHeaders.FILENAME仍然会为每个发出的行填充,包括 START/END 标记消息。
- 
FileMarkerReleaseStrategy- 检查组中的FileSplitter.FileMarker.Mark.END消息,然后将FileHeaders.LINE_COUNT头值与组大小减去 2 -FileSplitter.FileMarker实例进行比较。它还实现了方便的GroupConditionProvider接口,以便在AbstractCorrelatingMessageHandler中使用conditionSupplier函数。有关更多信息,请参阅 消息组条件。
- 
FileAggregatingMessageGroupProcessor只是将FileSplitter.FileMarker消息从组中移除,并将剩余的消息收集到一个列表有效负载中以生成。
以下列表显示了配置 FileAggregator 的可能方法:
- Java DSL
- Kotlin DSL
- Java
- XML
@Bean
public IntegrationFlow fileSplitterAggregatorFlow(TaskExecutor taskExecutor) {
    return f -> f
            .split(Files.splitter()
                    .markers()
                    .firstLineAsHeader("firstLine"))
            .channel(c -> c.executor(taskExecutor))
            .filter(payload -> !(payload instanceof FileSplitter.FileMarker),
                    e -> e.discardChannel("aggregatorChannel"))
            .<String, String>transform(String::toUpperCase)
            .channel("aggregatorChannel")
            .aggregate(new FileAggregator())
            .channel(c -> c.queue("resultChannel"));
}
@Bean
fun fileSplitterAggregatorFlow(taskExecutor: TaskExecutor?) =
    integrationFlow {
        split(Files.splitter().markers().firstLineAsHeader("firstLine"))
        channel { executor(taskExecutor) }
        filter<Any>({ it !is FileMarker }) { discardChannel("aggregatorChannel") }
        transform(String::toUpperCase)
        channel("aggregatorChannel")
        aggregate(FileAggregator())
        channel { queue("resultChannel") }
    }
@serviceActivator(inputChannel="toAggregateFile")
@Bean
public AggregatorFactoryBean fileAggregator() {
    AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
    aggregator.setProcessorBean(new FileAggregator());
    aggregator.setOutputChannel(outputChannel);
    return aggregator;
}
<int:chain input-channel="input" output-channel="output">
    <int-file:splitter markers="true"/>
    <int:aggregator>
        <bean class="org.springframework.integration.file.aggregator.FileAggregator"/>
    </int:aggregator>
</int:chain>
如果 FileAggregator 的默认行为不能满足目标逻辑,建议配置具有独立策略的聚合器端点。有关更多信息,请参阅 FileAggregator JavaDocs。