文件分割器
FileSplitter 在 4.1.2 版本中被添加,其命名空间支持在 4.2 版本中添加。FileSplitter 基于 BufferedReader.readLine() 将文本文件分割成单独的行。默认情况下,拆分器使用 Iterator 从文件中读取一行就立即将其作为消息发出。将 iterator 属性设置为 false 会导致它在将所有行读入内存后,再将它们作为消息发出。这样做的一个用例可能是,在发送任何包含文件行的消息之前,您希望检测文件上的 I/O 错误。然而,这只适用于相对较短的文件。
入站负载可以是 File、String(一个 File 路径)、InputStream 或 Reader。其他负载类型将保持不变地发出。
以下清单展示了配置 FileSplitter 的可能方式:
- Java DSL
- Kotlin DSL
- Java
- XML
@SpringBootApplication
public class FileSplitterApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileSplitterApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileSplitterFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(tmpDir.getRoot())
.filter(new ChainFileListFilter<File>()
.addFilter(new AcceptOnceFileListFilter<>())
.addFilter(new ExpressionFileListFilter<>(
new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
.split(Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.channel(c -> c.queue("fileSplittingResultChannel"))
.get();
}
}
@Bean
fun fileSplitterFlow() =
integrationFlow(
Files.inboundAdapter(tmpDir.getRoot())
.filter(
ChainFileListFilter<File?>()
.addFilter(AcceptOnceFileListFilter())
.addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
)
) {
split(
Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true)
)
channel { queue("fileSplittingResultChannel") }
}
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
FileSplitter splitter = new FileSplitter(true, true);
splitter.setApplySequence(true);
splitter.setOutputChannel(outputChannel);
return splitter;
}
<int-file:splitter id="splitter" // <1>
iterator="" // <2>
markers="" // <3>
markers-json="" // <4>
apply-sequence="" // <5>
requires-reply="" // <6>
charset="" // <7>
first-line-as-header="" // <8>
input-channel="" // <9>
output-channel="" // <10>
send-timeout="" // <11>
auto-startup="" // <12>
order="" // <13>
phase="" /> // <14>
拆分器的 Bean 名称。
设置为
true(默认值)以使用迭代器,或设置为false以在发送行之前将文件加载到内存中。设置为
true以在文件数据前后发出文件开始和文件结束标记消息。标记是带有FileSplitter.FileMarker负载的消息(mark属性中包含START和END值)。当下游流中顺序处理文件且某些行被过滤时,您可能会使用标记。它们使下游处理能够知道文件何时已完全处理。此外,这些消息会添加一个包含START或END的file_marker头部。END标记包含行数。如果文件为空,则仅发出START和END标记,lineCount为0。默认值为false。当为true时,默认情况下apply-sequence为false。另请参阅markers-json(下一个属性)。当
markers为 true 时,将此设置为true以使FileMarker对象转换为 JSON 字符串。(底层使用SimpleJsonSerializer)。设置为
false以禁用消息中包含sequenceSize和sequenceNumber头部。默认值为true,除非markers为true。当为true且markers为true时,标记包含在序列化中。当为true且iterator为true时,sequenceSize头部设置为0,因为大小未知。设置为
true以在文件中没有行时抛出RequiresReplyException。默认值为false。设置将文本数据读入
String负载时要使用的字符集名称。默认值为平台字符集。第一行的头部名称,将作为头部携带在为剩余行发出的消息中。自版本 5.0 起。
设置用于向拆分器发送消息的输入通道。
设置消息发送到的输出通道。
设置发送超时。仅当
output-channel可能阻塞时适用——例如已满的QueueChannel。设置为
false以在刷新上下文时禁用自动启动拆分器。默认值为true。如果
input-channel是<publish-subscribe-channel/>,则设置此端点的顺序。设置拆分器的启动阶段(当
auto-startup为true时使用)。
FileSplitter 同样可以将任何基于文本的 InputStream 拆分为行。从 4.3 版本开始,当与使用 stream 选项来检索文件的 FTP 或 SFTP 流式入站通道适配器或 FTP 或 SFTP 出站网关结合使用时,拆分器会在文件完全消费后自动关闭支持该流的会话。有关这些功能的更多信息,请参阅 FTP 流式入站通道适配器 和 SFTP 流式入站通道适配器,以及 FTP 出站网关 和 SFTP 出站网关。
在使用 Java 配置时,可以使用额外的构造函数,如下例所示:
public FileSplitter(boolean iterator, boolean markers, boolean markersJson)
当 markersJson 为 true 时,标记将以 JSON 字符串的形式表示(使用 SimpleJsonSerializer)。
版本 5.0 引入了 firstLineAsHeader 选项,用于指定内容的第一行是标题行(例如 CSV 文件中的列名)。传递给此属性的参数是标题名称,该标题行将作为后续行所发出消息的标题。该行不包含在序列标题中(如果 applySequence 为 true),也不包含在与 FileMarker.END 关联的 lineCount 中。注意:从版本 5.5 开始,lineCount 也作为 FileHeaders.LINE_COUNT 包含在 FileMarker.END 消息的标题中,因为 FileMarker 可以被序列化为 JSON。如果文件仅包含标题行,则该文件被视为空文件,因此在拆分过程中仅发出 FileMarker 实例(如果启用了标记——否则不会发出任何消息)。默认情况下(如果未设置标题名称),第一行被视为数据,并成为第一个发出消息的有效负载。
如果你需要从文件内容中提取更复杂的标题逻辑(例如,不是第一行、不是整行内容、不是特定标题等),请考虑在 FileSplitter 之前使用标题增强器。请注意,已移至标题的行可能会在正常内容处理流程的下游被过滤掉。
幂等下游处理分割文件
当 apply-sequence 为 true 时,拆分器会在 SEQUENCE_NUMBER 头部添加行号(当 markers 为 true 时,标记符也会被计入行数)。该行号可与幂等接收器配合使用,以避免重启后重复处理已处理过的行。
例如:
@Bean
public ConcurrentMetadataStore store() {
return new ZookeeperMetadataStore();
}
@Bean
public MetadataStoreSelector selector() {
return new MetadataStoreSelector(
message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
.getAbsolutePath(),
message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
.toString(),
store())
.compareValues(
(oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(selector());
}
@Bean
public IntegrationFlow flow() {
...
.split(new FileSplitter())
...
.handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
...
}