跳到主要内容
版本:7.0.2

文件分割器

DeepSeek V3 中英对照 File Splitter

FileSplitter 在 4.1.2 版本中被添加,其命名空间支持在 4.2 版本中添加。FileSplitter 基于 BufferedReader.readLine() 将文本文件分割成单独的行。默认情况下,拆分器使用 Iterator 从文件中读取一行就立即将其作为消息发出。将 iterator 属性设置为 false 会导致它在将所有行读入内存后,再将它们作为消息发出。这样做的一个用例可能是,在发送任何包含文件行的消息之前,您希望检测文件上的 I/O 错误。然而,这只适用于相对较短的文件。

入站负载可以是 FileString(一个 File 路径)、InputStreamReader。其他负载类型将保持不变地发出。

以下清单展示了配置 FileSplitter 的可能方式:

@SpringBootApplication
public class FileSplitterApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(FileSplitterApplication.class)
.web(false)
.run(args);
}

@Bean
public IntegrationFlow fileSplitterFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(tmpDir.getRoot())
.filter(new ChainFileListFilter<File>()
.addFilter(new AcceptOnceFileListFilter<>())
.addFilter(new ExpressionFileListFilter<>(
new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
.split(Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.channel(c -> c.queue("fileSplittingResultChannel"))
.get();
}

}
  • 拆分器的 Bean 名称。

  • 设置为 true(默认值)以使用迭代器,或设置为 false 以在发送行之前将文件加载到内存中。

  • 设置为 true 以在文件数据前后发出文件开始和文件结束标记消息。标记是带有 FileSplitter.FileMarker 负载的消息(mark 属性中包含 STARTEND 值)。当下游流中顺序处理文件且某些行被过滤时,您可能会使用标记。它们使下游处理能够知道文件何时已完全处理。此外,这些消息会添加一个包含 STARTENDfile_marker 头部。END 标记包含行数。如果文件为空,则仅发出 STARTEND 标记,lineCount0。默认值为 false。当为 true 时,默认情况下 apply-sequencefalse。另请参阅 markers-json(下一个属性)。

  • markers 为 true 时,将此设置为 true 以使 FileMarker 对象转换为 JSON 字符串。(底层使用 SimpleJsonSerializer)。

  • 设置为 false 以禁用消息中包含 sequenceSizesequenceNumber 头部。默认值为 true,除非 markerstrue。当为 truemarkerstrue 时,标记包含在序列化中。当为 trueiteratortrue 时,sequenceSize 头部设置为 0,因为大小未知。

  • 设置为 true 以在文件中没有行时抛出 RequiresReplyException。默认值为 false

  • 设置将文本数据读入 String 负载时要使用的字符集名称。默认值为平台字符集。

  • 第一行的头部名称,将作为头部携带在为剩余行发出的消息中。自版本 5.0 起。

  • 设置用于向拆分器发送消息的输入通道。

  • 设置消息发送到的输出通道。

  • 设置发送超时。仅当 output-channel 可能阻塞时适用——例如已满的 QueueChannel

  • 设置为 false 以在刷新上下文时禁用自动启动拆分器。默认值为 true

  • 如果 input-channel<publish-subscribe-channel/>,则设置此端点的顺序。

  • 设置拆分器的启动阶段(当 auto-startuptrue 时使用)。

FileSplitter 同样可以将任何基于文本的 InputStream 拆分为行。从 4.3 版本开始,当与使用 stream 选项来检索文件的 FTP 或 SFTP 流式入站通道适配器或 FTP 或 SFTP 出站网关结合使用时,拆分器会在文件完全消费后自动关闭支持该流的会话。有关这些功能的更多信息,请参阅 FTP 流式入站通道适配器SFTP 流式入站通道适配器,以及 FTP 出站网关SFTP 出站网关

在使用 Java 配置时,可以使用额外的构造函数,如下例所示:

public FileSplitter(boolean iterator, boolean markers, boolean markersJson)

markersJson 为 true 时,标记将以 JSON 字符串的形式表示(使用 SimpleJsonSerializer)。

版本 5.0 引入了 firstLineAsHeader 选项,用于指定内容的第一行是标题行(例如 CSV 文件中的列名)。传递给此属性的参数是标题名称,该标题行将作为后续行所发出消息的标题。该行不包含在序列标题中(如果 applySequence 为 true),也不包含在与 FileMarker.END 关联的 lineCount 中。注意:从版本 5.5 开始,lineCount 也作为 FileHeaders.LINE_COUNT 包含在 FileMarker.END 消息的标题中,因为 FileMarker 可以被序列化为 JSON。如果文件仅包含标题行,则该文件被视为空文件,因此在拆分过程中仅发出 FileMarker 实例(如果启用了标记——否则不会发出任何消息)。默认情况下(如果未设置标题名称),第一行被视为数据,并成为第一个发出消息的有效负载。

如果你需要从文件内容中提取更复杂的标题逻辑(例如,不是第一行、不是整行内容、不是特定标题等),请考虑在 FileSplitter 之前使用标题增强器。请注意,已移至标题的行可能会在正常内容处理流程的下游被过滤掉。

幂等下游处理分割文件

apply-sequence 为 true 时,拆分器会在 SEQUENCE_NUMBER 头部添加行号(当 markers 为 true 时,标记符也会被计入行数)。该行号可与幂等接收器配合使用,以避免重启后重复处理已处理过的行。

例如:

@Bean
public ConcurrentMetadataStore store() {
return new ZookeeperMetadataStore();
}

@Bean
public MetadataStoreSelector selector() {
return new MetadataStoreSelector(
message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
.getAbsolutePath(),
message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
.toString(),
store())
.compareValues(
(oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(selector());
}

@Bean
public IntegrationFlow flow() {
...
.split(new FileSplitter())
...
.handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
...
}