跳到主要内容
版本:7.0.2

FTP 入站通道适配器

DeepSeek V3 中英对照 FTP Inbound Channel Adapter

FTP入站通道适配器是一种特殊的监听器,它连接到FTP服务器并监听远程目录事件(例如新文件创建),随后启动文件传输。以下示例展示了如何配置inbound-channel-adapter

<int-ftp:inbound-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="ftpSessionFactory"
auto-create-local-directory="true"
delete-remote-files="true"
filename-pattern="*.txt"
remote-directory="some/remote/path"
remote-file-separator="/"
preserve-timestamp="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
local-directory=".">
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>

如前面的配置所示,你可以使用 inbound-channel-adapter 元素来配置一个 FTP 入站通道适配器,同时为各种属性提供值,例如 local-directoryfilename-pattern(基于简单的模式匹配,而非正则表达式),以及对 session-factory 的引用。

默认情况下,传输的文件会保留与原始文件相同的名称。如果您想覆盖此行为,可以设置 local-filename-generator-expression 属性,该属性允许您提供一个 SpEL 表达式来生成本地文件的名称。与出站网关和适配器不同(其 SpEL 评估上下文的根对象是 Message),此入站适配器在评估时尚未拥有消息,因为它最终会以传输的文件作为其有效负载来生成消息。因此,SpEL 评估上下文的根对象是远程文件的原始名称(一个 String)。

入站通道适配器首先检索本地目录的 File 对象,然后根据轮询器配置发送每个文件。从版本 5.0 开始,现在可以在需要获取新文件时限制从 FTP 服务器获取的文件数量。当目标文件非常大,或者在具有持久文件列表过滤器的集群系统中运行时,这可能是有益的,这一点将在后面讨论。为此,请使用 max-fetch-size。负值(默认值)表示没有限制,将检索所有匹配的文件。更多信息请参见入站通道适配器:控制远程文件获取。从版本 5.0 开始,您还可以通过设置 scanner 属性,为 inbound-channel-adapter 提供自定义的 DirectoryScanner 实现。

从 Spring Integration 3.0 开始,你可以指定 preserve-timestamp 属性(其默认值为 false)。当设置为 true 时,本地文件的修改时间戳将被设置为从服务器检索到的值。否则,它将被设置为当前时间。

从版本 4.2 开始,你可以指定 remote-directory-expression 来代替 remote-directory,让你能够在每次轮询时动态确定目录——例如,remote-directory-expression="@myBean.determineRemoteDir()"

从版本 4.3 开始,你可以省略 remote-directoryremote-directory-expression 属性。它们的默认值为 null。在这种情况下,根据 FTP 协议,客户端的工作目录将被用作默认的远程目录。

有时,仅基于 filename-pattern 属性指定的简单模式进行文件过滤可能不够用。在这种情况下,您可以使用 filename-regex 属性来指定一个正则表达式(例如 filename-regex=".*\.test$")。此外,如果您需要完全控制,可以使用 filter 属性并引用 o.s.i.file.filters.FileListFilter 的任何自定义实现,这是一个用于过滤文件列表的策略接口。此过滤器决定检索哪些远程文件。您还可以通过使用 CompositeFileListFilter 将基于模式的过滤器与其他过滤器(例如 AcceptOnceFileListFilter,以避免同步之前已获取的文件)组合使用。

AcceptOnceFileListFilter 将其状态存储在内存中。如果您希望状态在系统重启后得以保留,请考虑改用 FtpPersistentAcceptOnceFileListFilter。此过滤器将已接受的文件名存储在 MetadataStore 策略的实例中(参见元数据存储)。此过滤器根据文件名和远程修改时间进行匹配。

自 4.0 版本起,此过滤器需要一个 ConcurrentMetadataStore。当与共享数据存储(例如使用 RedisMetadataStoreRedis)结合使用时,它允许跨多个应用程序或服务器实例共享过滤器键。

从版本5.0开始,FtpInboundFileSynchronizer 默认应用了基于内存 SimpleMetadataStoreFtpPersistentAcceptOnceFileListFilter。此过滤器同样适用于 XML 配置中的 regexpattern 选项,以及 Java DSL 中的 FtpInboundChannelAdapterSpec。其他用例可通过 CompositeFileListFilter(或 ChainFileListFilter)进行管理。

前面的讨论涉及在检索文件之前进行过滤。一旦文件被检索到,还会对文件系统上的文件应用额外的过滤器。默认情况下,这是一个 AcceptOnceFileListFilter,如前所述,它在内存中保留状态,并且不考虑文件的修改时间。除非您的应用程序在处理后删除文件,否则适配器在应用程序重启后默认会重新处理磁盘上的文件。

此外,如果将 filter 配置为使用 FtpPersistentAcceptOnceFileListFilter,并且远程文件的时间戳发生更改(导致文件被重新获取),默认的本地过滤器将不会处理这个新文件。

有关此过滤器及其使用方式的更多信息,请参阅 远程持久文件列表过滤器

您可以使用 local-filter 属性来配置本地文件系统过滤器的行为。从 4.3.8 版本开始,默认配置了一个 FileSystemPersistentAcceptOnceFileListFilter。该过滤器将已接受的文件名和修改时间戳存储在 MetadataStore 策略的实例中(参见 元数据存储),并检测本地文件修改时间的变化。默认的 MetadataStoreSimpleMetadataStore,它在内存中存储状态。

自 4.1.5 版本起,这些过滤器新增了一个属性 (flushOnUpdate),该属性会在每次更新时刷新元数据存储(前提是该存储实现了 Flushable 接口)。

important

此外,如果你使用分布式的 MetadataStore(例如 Redis),你可以运行同一适配器或应用的多个实例,并确保每个文件仅被处理一次。

实际的本地过滤器是一个 ChainFileListFilter,它包含一个模式过滤器,用于防止处理正在下载的文件(基于 temporary-file-suffix)以及提供的过滤器。文件下载时会带有此后缀(默认为 .writing),传输完成后文件会被重命名为最终名称,从而使其对过滤器“可见”。

remote-file-separator 属性允许您配置一个文件分隔符字符,如果默认的 '/' 不适用于您的特定环境。

有关这些属性的更多详细信息,请参阅 schema

同时,您需要了解FTP入站通道适配器是一种轮询消费者。因此,您必须配置一个轮询器(通过使用全局默认值或本地子元素)。一旦文件被传输,就会生成一个以 java.io.File 作为其有效负载的消息,并发送到由 channel 属性标识的通道。

从 6.2 版本开始,你可以使用 FtpLastModifiedFileListFilter 基于最后修改时间策略来过滤 FTP 文件。该过滤器可以配置一个 age 属性,以便只有早于此值的文件才能通过过滤器。age 的默认值为 60 秒,但你应该选择一个足够大的值,以避免过早地选取文件(例如,由于网络故障)。更多信息请查阅其 Javadoc。

相比之下,从版本6.5开始,引入了 FtpRecentFileListFilter,它只接受那些不早于指定 age 的文件。

更多关于文件过滤与不完整文件的信息

有时,监控(远程)目录中刚出现的文件可能尚未完整。通常,这类文件会以临时扩展名(例如 somefile.txt.writing)写入,待写入过程完成后才进行重命名。大多数情况下,您可能只关心已完成的文件,并希望仅筛选出完整的文件。为处理这类场景,您可以使用 filename-patternfilename-regexfilter 属性提供的筛选支持。以下示例展示了自定义筛选器的实现方式:

<int-ftp:inbound-channel-adapter
channel="ftpChannel"
session-factory="ftpSessionFactory"
filter="customFilter"
local-directory="file:/my_transfers">
remote-directory="some/remote/path"
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>

<bean id="customFilter" class="org.example.CustomFilter"/>

入站 FTP 适配器的轮询器配置说明

入站FTP适配器的工作包含两项任务:

  1. 与远程服务器通信,以便将文件从远程目录传输到本地目录。

  2. 对于每个传输的文件,生成一条以该文件为有效负载的消息,并将其发送到由 'channel' 属性标识的通道。这就是为什么它们被称为“通道适配器”而不仅仅是“适配器”。此类适配器的主要工作是生成消息以发送到消息通道。本质上,第二个任务具有优先权,即如果本地目录已有一个或多个文件,它会首先从这些文件生成消息。只有在处理完所有本地文件后,它才会启动远程通信以获取更多文件。

此外,在轮询器上配置触发器时,应特别注意 max-messages-per-poll 属性。对于所有 SourcePollingChannelAdapter 实例(包括 FTP),其默认值为 1。这意味着,一旦一个文件被处理,它就会等待触发器配置所确定的下一个执行时间。如果您碰巧在 local-directory 中有一个或多个文件,它会在启动与远程 FTP 服务器的通信之前处理这些文件。此外,如果 max-messages-per-poll 设置为 1(默认值),它每次只处理一个文件,间隔由触发器定义,实质上表现为“一次轮询 === 一个文件”。

对于典型的文件传输场景,您很可能需要相反的行为:在每次轮询时尽可能处理所有文件,然后才等待下一次轮询。如果是这种情况,请将 max-messages-per-poll 设置为 -1。这样,在每次轮询时,适配器会尝试生成尽可能多的消息。换句话说,它会先处理本地目录中的所有文件,然后连接到远程目录,将所有可用的文件传输到本地进行处理。只有完成这些操作后,轮询操作才被视为完成,轮询器才会等待下一次执行时间。

或者,您可以将 'max-messages-per-poll' 值设置为一个正数,该值表示每次轮询时从文件创建消息的上限。例如,值为 10 意味着每次轮询时,它尝试处理不超过十个文件。

从故障中恢复

理解适配器的架构至关重要。其中包含一个用于获取文件的文件同步器,以及一个为每个同步文件发送消息的 FileReadingMessageSource。如前所述,该过程涉及两个过滤器。filter 属性(及其模式)作用于远程(FTP)文件列表,以避免重复获取已获取过的文件。而 local-filter 则由 FileReadingMessageSource 使用,用于确定哪些文件应作为消息发送。

同步器会列出远程文件并参考其过滤器。随后,文件被传输。如果在文件传输过程中发生IO错误,任何已添加到过滤器中的文件将被移除,以便它们有资格在下一次轮询时重新获取。这仅适用于实现了 ReversibleFileListFilter(例如 AcceptOnceFileListFilter)的过滤器。

如果在同步文件后,下游流程处理文件时发生错误,过滤器不会自动回滚,因此默认情况下不会重新处理失败的文件。

若希望在失败后重新处理此类文件,您可以使用类似以下配置,以便从过滤器中移除失败的文件:

<int-ftp:inbound-channel-adapter id="ftpAdapter"
session-factory="ftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/ftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-ftp:inbound-channel-adapter>

<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />

<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>

<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />

上述配置适用于任何 ResettableFileListFilter

从 5.0 版本开始,入站通道适配器可以在本地构建与生成的本地文件名相对应的子目录。这也可以是远程子路径。为了能够根据层次结构支持递归读取本地目录以进行修改,您现在可以为内部的 FileReadingMessageSource 提供一个新的 RecursiveDirectoryScanner,它基于 Files.walk() 算法。更多信息请参阅 AbstractInboundFileSynchronizingMessageSource.setScanner()。此外,您现在可以通过使用 setUseWatchService() 选项将 AbstractInboundFileSynchronizingMessageSource 切换到基于 WatchServiceDirectoryScanner。它还被配置为对所有 WatchEventType 实例作出反应,以响应本地目录中的任何修改。前面展示的重处理示例基于 FileReadingMessageSource.WatchServiceDirectoryScanner 的内置功能,当文件从本地目录中删除(StandardWatchEventKinds.ENTRY_DELETE)时,执行 ResettableFileListFilter.remove()。更多信息请参阅 WatchServiceDirectoryScanner

使用 Java 配置进行配置

以下Spring Boot应用程序展示了如何使用Java配置来配置入站适配器的示例:

@SpringBootApplication
public class FtpJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}

@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}

@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}

@Bean
@InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> ftpMessageSource() {
FtpInboundFileSynchronizingMessageSource source =
new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
source.setLocalDirectory(new File("ftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}

@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
return new MessageHandler() {

@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}

};
}

}

使用 Java DSL 进行配置

以下Spring Boot应用程序展示了如何使用Java DSL配置入站适配器的示例:

@SpringBootApplication
public class FtpJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}

@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlow
.from(Ftp.inboundAdapter(this.ftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilename(f -> f.toUpperCase() + ".a")
.localDirectory(new File("d:\\ftp_files")),
e -> e.id("ftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}

处理不完整数据

FtpSystemMarkerFilePresentFileListFilter 用于过滤远程系统中没有对应标记文件的远程文件。有关配置信息,请参阅 Javadoc(并可浏览其父类)。