跳到主要内容
版本:7.0.2

FTP 流式入站通道适配器

DeepSeek V3 中英对照 FTP Streaming Inbound Channel Adapter

版本 4.3 引入了流式入站通道适配器。该适配器会生成一个有效载荷类型为 InputStream 的消息,从而允许在不写入本地文件系统的情况下获取文件。由于会话保持打开状态,消费应用程序需要在文件消费完毕后负责关闭会话。会话会在 closeableResource 头信息(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE)中提供。标准框架组件,例如 FileSplitterStreamTransformer,会自动关闭会话。有关这些组件的更多信息,请参阅文件拆分器流转换器。以下示例展示了如何配置 inbound-streaming-channel-adapter

<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>

filename-patternfilename-regexfilterfilter-expression 中只能使用一个。

important

从 5.0 版本开始,默认情况下,FtpStreamingMessageSource 适配器通过基于内存 SimpleMetadataStoreFtpPersistentAcceptOnceFileListFilter 来防止远程文件重复处理。默认情况下,此过滤器也会与文件名模式(或正则表达式)一起应用。如果需要允许重复文件,可以使用 AcceptAllFileListFilter。其他任何用例都可以通过 CompositeFileListFilter(或 ChainFileListFilter)来处理。Java 配置(文档后续部分)展示了一种在处理后删除远程文件以避免重复的技术。

有关 FtpPersistentAcceptOnceFileListFilter 的更多信息及其使用方法,请参阅远程持久化文件列表过滤器

使用 max-fetch-size 属性来限制每次轮询时获取的文件数量。在集群环境中运行时,请将其设置为 1 并使用持久化过滤器。更多信息请参阅入站通道适配器:控制远程文件获取

适配器将远程目录和文件名分别放入 FileHeaders.REMOTE_DIRECTORYFileHeaders.REMOTE_FILE 头部信息中。从版本 5.0 开始,FileHeaders.REMOTE_FILE_INFO 头部提供了额外的远程文件信息(默认以 JSON 格式表示)。如果您将 FtpStreamingMessageSource 上的 fileInfoJson 属性设置为 false,则该头部将包含一个 FtpFileInfo 对象。通过使用 FtpFileInfo.getFileInfo() 方法,可以访问由底层 Apache Net 库提供的 FTPFile 对象。当您使用 XML 配置时,fileInfoJson 属性不可用,但您可以通过将 FtpStreamingMessageSource 注入到您的某个配置类中来设置它。另请参阅 远程文件信息

从版本 5.1 开始,comparator 的泛型类型为 FTPFile。在此之前,它是 AbstractFileInfo<FTPFile>。这是因为排序现在在处理流程的更早阶段执行,即在过滤和应用 maxFetch 之前。

使用 Java 配置进行配置

以下Spring Boot应用程序展示了如何通过Java配置来配置入站适配器的示例:

@SpringBootApplication
public class FtpJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}

@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
messageSource.setRemoteDirectory("ftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}

@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}

@Bean
public FtpRemoteFileTemplate template() {
return new FtpRemoteFileTemplate(ftpSessionFactory());
}

@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}

@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}

}

请注意,在此示例中,transformer 下游的消息处理器包含一个 advice,用于在处理后删除远程文件。