文件分割器
FileSplitter
在版本 4.1.2 中被添加,其命名空间支持在版本 4.2 中被添加。FileSplitter
基于 BufferedReader.readLine()
将文本文件拆分为单个行。默认情况下,拆分器使用 Iterator
逐次发出从文件中读取的行。将 iterator
属性设置为 false
会导致它在发出它们作为消息之前将所有行读入内存。这种用例的一个例子可能是如果你想在发送包含行的消息之前检测文件上的 I/O 错误。然而,这仅适用于相对较短的文件。
入站有效负载可以是 File
、String
(一个 File
路径)、InputStream
或 Reader
。其他有效负载类型将保持不变发出。
以下列表显示了配置 FileSplitter
的可能方法:
- Java DSL
- Kotlin DSL
- Java
- XML
@SpringBootApplication
public class FileSplitterApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileSplitterApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileSplitterFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(tmpDir.getRoot())
.filter(new ChainFileListFilter<File>()
.addFilter(new AcceptOnceFileListFilter<>())
.addFilter(new ExpressionFileListFilter<>(
new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
.split(Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.channel(c -> c.queue("fileSplittingResultChannel"))
.get();
}
}
@Bean
fun fileSplitterFlow() =
integrationFlow(
Files.inboundAdapter(tmpDir.getRoot())
.filter(
ChainFileListFilter<File?>()
.addFilter(AcceptOnceFileListFilter())
.addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
)
) {
split(
Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true)
)
channel { queue("fileSplittingResultChannel") }
}
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
FileSplitter splitter = new FileSplitter(true, true);
splitter.setApplySequence(true);
splitter.setOutputChannel(outputChannel);
return splitter;
}
<int-file:splitter id="splitter" // <1>
iterator="" // <2>
markers="" // <3>
markers-json="" // <4>
apply-sequence="" // <5>
requires-reply="" // <6>
charset="" // <7>
first-line-as-header="" // <8>
input-channel="" // <9>
output-channel="" // <10>
send-timeout="" // <11>
auto-startup="" // <12>
order="" // <13>
phase="" /> // <14>
分割器的 bean 名称。
设置为
true
(默认值)以使用迭代器,或设置为false
以在发送行之前将文件加载到内存中。设置为
true
以在文件数据前后发出开始和结束文件标记消息。标记是带有FileSplitter.FileMarker
负载的消息(mark
属性中的值为START
和END
)。当顺序处理下游流程中的文件时,其中一些行被过滤掉,可以使用标记来让下游处理知道何时一个文件已完全处理。此外,会向这些消息添加包含START
或END
的file_marker
标头。END
标记包括行计数。如果文件为空,则只发出START
和END
标记,且lineCount
为0
。默认值为false
。当设置为true
时,默认情况下apply-sequence
为false
。另请参阅markers-json
(下一个属性)。当
markers
为 true 时,设置此选项为true
可将FileMarker
对象转换为 JSON 字符串。(底层使用SimpleJsonSerializer
)。设置为
false
以禁用在消息中包含sequenceSize
和sequenceNumber
标头。默认值为true
,除非markers
为true
。当true
且markers
为true
时,标记会被包含在排序中。当true
且iterator
为true
时,由于大小未知,sequenceSize
标头被设置为0
。设置为
true
,如果文件中没有行,则会抛出RequiresReplyException
。默认值为false
。设置读取文本数据到
String
负载时使用的字符集名称。默认值是平台字符集。第一行作为剩余行发出的消息标头的标头名称。自 5.0 版起。
设置用于向分割器发送消息的输入通道。
设置消息发送到的输出通道。
设置发送超时时间。仅适用于
output-channel
可能阻塞的情况——例如满的QueueChannel
。设置为
false
以禁用在刷新上下文时自动启动分割器。默认值为true
。如果
input-channel
是<publish-subscribe-channel/>
,则设置此端点的顺序。设置分割器的启动阶段(当
auto-startup
为true
时使用)。
FileSplitter
还可以将任何基于文本的 InputStream
拆分为行。从 4.3 版开始,当与使用 stream
选项检索文件的 FTP 或 SFTP 流式传输入站通道适配器或 FTP 或 SFTP 出站网关一起使用时,拆分器会在文件被完全消费后自动关闭支持流的会话。有关这些功能的更多信息,请参阅 FTP 流式传输入站通道适配器和 SFTP 流式传输入站通道适配器,以及 FTP 出站网关和 SFTP 出站网关。
当使用 Java 配置时,会提供一个额外的构造函数,如下例所示:
public FileSplitter(boolean iterator, boolean markers, boolean markersJson)
当 markersJson
为 true 时,标记以 JSON 字符串的形式表示(使用 SimpleJsonSerializer
)。
版本 5.0 引入了 firstLineAsHeader
选项,用于指定内容的第一行是标题(例如 CSV 文件中的列名)。传递给此属性的参数是第一行作为剩余行发出的消息中的标题所使用的标题名称。此行不包含在序列标题中(如果 applySequence
为 true)也不包含在与 FileMarker.END
关联的 lineCount
中。注意:从版本 5.5 开始,lineCount
也作为 FileHeaders.LINE_COUNT
包含在 FileMarker.END
消息的头部中,因为 FileMarker
可以序列化为 JSON。如果文件仅包含标题行,则该文件被视为为空,因此在拆分期间只发出 FileMarker
实例(如果启用了标记——否则不发出任何消息)。默认情况下(如果没有设置标题名称),第一行被视为数据,并成为第一个发出的消息的有效负载。
如果你需要从文件内容中提取标题的更复杂的逻辑(不是第一行,不是整行的内容,不是某个特定的标题,等等),请考虑在 FileSplitter
之前使用 header enricher。请注意,已经被移到标题中的行可能会从正常内容处理流程中被过滤掉。
幂等下游处理拆分文件
当 apply-sequence
为 true 时,拆分器会在 SEQUENCE_NUMBER
标头中添加行号(当 markers
为 true 时,标记会被计为行)。行号可以与 幂等接收器 一起使用,在重启后避免重新处理行。
例如:
@Bean
public ConcurrentMetadataStore store() {
return new ZookeeperMetadataStore();
}
@Bean
public MetadataStoreSelector selector() {
return new MetadataStoreSelector(
message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
.getAbsolutePath(),
message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
.toString(),
store())
.compareValues(
(oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(selector());
}
@Bean
public IntegrationFlow flow() {
...
.split(new FileSplitter())
...
.handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
...
}