文件分割器
FileSplitter 在版本 4.1.2 中被添加,其命名空间支持在版本 4.2 中被添加。FileSplitter 基于 BufferedReader.readLine() 将文本文件拆分为单个行。默认情况下,拆分器使用 Iterator 逐次发出从文件中读取的行。将 iterator 属性设置为 false 会导致它在发出它们作为消息之前将所有行读入内存。这种用例的一个例子可能是如果你想在发送包含行的消息之前检测文件上的 I/O 错误。然而,这仅适用于相对较短的文件。
入站有效负载可以是 File、String(一个 File 路径)、InputStream 或 Reader。其他有效负载类型将保持不变发出。
以下列表显示了配置 FileSplitter 的可能方法:
- Java DSL
- Kotlin DSL
- Java
- XML
@SpringBootApplication
public class FileSplitterApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileSplitterApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileSplitterFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(tmpDir.getRoot())
.filter(new ChainFileListFilter<File>()
.addFilter(new AcceptOnceFileListFilter<>())
.addFilter(new ExpressionFileListFilter<>(
new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
.split(Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.channel(c -> c.queue("fileSplittingResultChannel"))
.get();
}
}
@Bean
fun fileSplitterFlow() =
integrationFlow(
Files.inboundAdapter(tmpDir.getRoot())
.filter(
ChainFileListFilter<File?>()
.addFilter(AcceptOnceFileListFilter())
.addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
)
) {
split(
Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true)
)
channel { queue("fileSplittingResultChannel") }
}
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
FileSplitter splitter = new FileSplitter(true, true);
splitter.setApplySequence(true);
splitter.setOutputChannel(outputChannel);
return splitter;
}
<int-file:splitter id="splitter" // <1>
iterator="" // <2>
markers="" // <3>
markers-json="" // <4>
apply-sequence="" // <5>
requires-reply="" // <6>
charset="" // <7>
first-line-as-header="" // <8>
input-channel="" // <9>
output-channel="" // <10>
send-timeout="" // <11>
auto-startup="" // <12>
order="" // <13>
phase="" /> // <14>
分割器的 bean 名称。
设置为
true(默认值)以使用迭代器,或设置为false以在发送行之前将文件加载到内存中。设置为
true以在文件数据前后发出开始和结束文件标记消息。标记是带有FileSplitter.FileMarker负载的消息(mark属性中的值为START和END)。当顺序处理下游流程中的文件时,其中一些行被过滤掉,可以使用标记来让下游处理知道何时一个文件已完全处理。此外,会向这些消息添加包含START或END的file_marker标头。END标记包括行计数。如果文件为空,则只发出START和END标记,且lineCount为0。默认值为false。当设置为true时,默认情况下apply-sequence为false。另请参阅markers-json(下一个属性)。当
markers为 true 时,设置此选项为true可将FileMarker对象转换为 JSON 字符串。(底层使用SimpleJsonSerializer)。设置为
false以禁用在消息中包含sequenceSize和sequenceNumber标头。默认值为true,除非markers为true。当true且markers为true时,标记会被包含在排序中。当true且iterator为true时,由于大小未知,sequenceSize标头被设置为0。设置为
true,如果文件中没有行,则会抛出RequiresReplyException。默认值为false。设置读取文本数据到
String负载时使用的字符集名称。默认值是平台字符集。第一行作为剩余行发出的消息标头的标头名称。自 5.0 版起。
设置用于向分割器发送消息的输入通道。
设置消息发送到的输出通道。
设置发送超时时间。仅适用于
output-channel可能阻塞的情况——例如满的QueueChannel。设置为
false以禁用在刷新上下文时自动启动分割器。默认值为true。如果
input-channel是<publish-subscribe-channel/>,则设置此端点的顺序。设置分割器的启动阶段(当
auto-startup为true时使用)。
FileSplitter 还可以将任何基于文本的 InputStream 拆分为行。从 4.3 版开始,当与使用 stream 选项检索文件的 FTP 或 SFTP 流式传输入站通道适配器或 FTP 或 SFTP 出站网关一起使用时,拆分器会在文件被完全消费后自动关闭支持流的会话。有关这些功能的更多信息,请参阅 FTP 流式传输入站通道适配器和 SFTP 流式传输入站通道适配器,以及 FTP 出站网关和 SFTP 出站网关。
当使用 Java 配置时,会提供一个额外的构造函数,如下例所示:
public FileSplitter(boolean iterator, boolean markers, boolean markersJson)
当 markersJson 为 true 时,标记以 JSON 字符串的形式表示(使用 SimpleJsonSerializer)。
版本 5.0 引入了 firstLineAsHeader 选项,用于指定内容的第一行是标题(例如 CSV 文件中的列名)。传递给此属性的参数是第一行作为剩余行发出的消息中的标题所使用的标题名称。此行不包含在序列标题中(如果 applySequence 为 true)也不包含在与 FileMarker.END 关联的 lineCount 中。注意:从版本 5.5 开始,lineCount 也作为 FileHeaders.LINE_COUNT 包含在 FileMarker.END 消息的头部中,因为 FileMarker 可以序列化为 JSON。如果文件仅包含标题行,则该文件被视为为空,因此在拆分期间只发出 FileMarker 实例(如果启用了标记——否则不发出任何消息)。默认情况下(如果没有设置标题名称),第一行被视为数据,并成为第一个发出的消息的有效负载。
如果你需要从文件内容中提取标题的更复杂的逻辑(不是第一行,不是整行的内容,不是某个特定的标题,等等),请考虑在 FileSplitter 之前使用 header enricher。请注意,已经被移到标题中的行可能会从正常内容处理流程中被过滤掉。
幂等下游处理拆分文件
当 apply-sequence 为 true 时,拆分器会在 SEQUENCE_NUMBER 标头中添加行号(当 markers 为 true 时,标记会被计为行)。行号可以与 幂等接收器 一起使用,在重启后避免重新处理行。
例如:
@Bean
public ConcurrentMetadataStore store() {
return new ZookeeperMetadataStore();
}
@Bean
public MetadataStoreSelector selector() {
return new MetadataStoreSelector(
message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
.getAbsolutePath(),
message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
.toString(),
store())
.compareValues(
(oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(selector());
}
@Bean
public IntegrationFlow flow() {
...
.split(new FileSplitter())
...
.handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
...
}