FTP 流式传输入站通道适配器
Version 4.3 引入了流式入境通道适配器。此适配器生成有效负载类型为 InputStream 的消息,允许在不写入本地文件系统的情况下获取文件。由于会话保持打开状态,消费应用程序负责在文件被消费后关闭会话。会话在 closeableResource 标头 (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE) 中提供。标准框架组件,如 FileSplitter 和 StreamTransformer,会自动关闭会话。有关这些组件的更多信息,请参阅文件拆分器和流转换器。以下示例展示了如何配置 inbound-streaming-channel-adapter:
<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
            channel="ftpChannel"
            session-factory="sessionFactory"
            filename-pattern="*.txt"
            filename-regex=".*\.txt"
            filter="filter"
            filter-expression="@myFilterBean.check(#root)"
            remote-file-separator="/"
            comparator="comparator"
            max-fetch-size="1"
            remote-directory-expression="'foo/bar'">
        <int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>
只允许使用 filename-pattern、filename-regex、filter 或 filter-expression 中的一个。
从 5.0 版本开始,默认情况下,FtpStreamingMessageSource 适配器通过基于内存中的 SimpleMetadataStore 的 FtpPersistentAcceptOnceFileListFilter 来防止远程文件的重复。默认情况下,此过滤器也会应用于文件名模式(或正则表达式)。如果您需要允许重复,可以使用 AcceptAllFileListFilter。其他任何用例都可以通过 CompositeFileListFilter(或 ChainFileListFilter)来处理。Java 配置(稍后在文档中)展示了一种在处理完远程文件后删除它的技术,以避免重复。
有关 FtpPersistentAcceptOnceFileListFilter 的更多信息及其使用方法,请参阅远程持久文件列表过滤器。
使用 max-fetch-size 属性来限制每次轮询时获取的文件数量,当需要获取文件时。将其设置为 1 并在集群环境中运行时使用持久过滤器。更多信息,请参见 入站通道适配器:控制远程文件获取。
适配器将远程目录和文件名分别放在 FileHeaders.REMOTE_DIRECTORY 和 FileHeaders.REMOTE_FILE 头中。从 5.0 版开始,FileHeaders.REMOTE_FILE_INFO 头提供了额外的远程文件信息(默认以 JSON 表示)。如果您将 FtpStreamingMessageSource 上的 fileInfoJson 属性设置为 false,则头包含一个 FtpFileInfo 对象。可以通过使用 FtpFileInfo.getFileInfo() 方法访问底层 Apache Net 库提供的 FTPFile 对象。当您使用 XML 配置时,fileInfoJson 属性不可用,但您可以通过将 FtpStreamingMessageSource 注入到其中一个配置类中来设置它。另请参阅 远程文件信息。
从 5.1 版开始,comparator 的通用类型是 FTPFile。之前,它是 AbstractFileInfo<FTPFile>。这是因为排序现在在处理的更早阶段执行,在过滤和应用 maxFetch 之前。
使用Java配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置来配置入站适配器的示例:
@SpringBootApplication
public class FtpJavaApplication {
    public static void main(String[] args) {
        new SpringApplicationBuilder(FtpJavaApplication.class)
            .web(false)
            .run(args);
    }
    @Bean
    @InboundChannelAdapter(channel = "stream")
    public MessageSource<InputStream> ftpMessageSource() {
        FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
        messageSource.setRemoteDirectory("ftpSource/");
        messageSource.setFilter(new AcceptAllFileListFilter<>());
        messageSource.setMaxFetchSize(1);
        return messageSource;
    }
    @Bean
    @Transformer(inputChannel = "stream", outputChannel = "data")
    public org.springframework.integration.transformer.Transformer transformer() {
        return new StreamTransformer("UTF-8");
    }
    @Bean
    public FtpRemoteFileTemplate template() {
        return new FtpRemoteFileTemplate(ftpSessionFactory());
    }
    @ServiceActivator(inputChannel = "data", adviceChain = "after")
    @Bean
    public MessageHandler handle() {
        return System.out::println;
    }
    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice after() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpression(
                "@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
        advice.setPropagateEvaluationFailures(true);
        return advice;
    }
}
请注意,在这个例子中,转换器下游的消息处理程序有一个 advice ,在处理完成后会删除远程文件。