SFTP 入站通道适配器
SFTP 入站通道适配器是一个特殊的监听器,它连接到服务器并监听远程目录事件(例如创建新文件),此时它会发起文件传输。以下示例展示了如何配置 SFTP 入站通道适配器:
<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
session-factory="sftpSessionFactory"
channel="requestChannel"
filename-pattern="*.txt"
remote-directory="/foo/bar"
preserve-timestamp="true"
local-directory="file:target/foo"
auto-create-local-directory="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
delete-remote-files="false">
<int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>
前面的配置示例展示了如何为各种属性提供值,包括以下内容:
-
local-directory
: 文件将要被传输到的位置 -
remote-directory
: 文件将要被传输的远程源目录 -
session-factory
: 引用我们之前配置的bean
默认情况下,传输的文件携带与原始文件相同的名称。如果你想覆盖此行为,可以设置 local-filename-generator-expression
属性,这使你可以提供一个 SpEL 表达式来生成本地文件的名称。与出站网关和适配器不同,在这些地方 SpEL 评估上下文的根对象是一个 Message
,而这个入站适配器在评估时还没有消息,因为最终它会用传输的文件作为有效负载来生成消息。因此,SpEL 评估上下文的根对象是远程文件的原始名称(一个 String
)。
入站通道适配器首先将文件获取到本地目录,然后根据轮询器配置发出每个文件。从 5.0 版开始,当需要新的文件获取时,您可以限制从 SFTP 服务器获取的文件数量。当目标文件较大或在带有持久文件列表过滤器的集群系统中运行时,这可能会带来好处,在本节后面会讨论这一点。使用 max-fetch-size
来实现此目的。负值(默认值)表示没有限制,并且所有匹配的文件都将被获取。更多信息请参见 入站通道适配器:控制远程文件获取。从 5.0 版开始,您还可以通过设置 scanner
属性为 inbound-channel-adapter
提供自定义 DirectoryScanner
实现。
从 Spring Integration 3.0 开始,你可以指定 preserve-timestamp
属性(默认值为 false
)。当设置为 true
时,本地文件的修改时间戳将被设置为从服务器获取的值。否则,它将被设置为当前时间。
从 4.2 版本开始,你可以指定 remote-directory-expression
而不是 remote-directory
,这让你可以在每次轮询时动态确定目录——例如,remote-directory-expression="@myBean.determineRemoteDir()"
。
有时,基于通过 filename-pattern
属性指定的简单模式的文件过滤可能不够用。如果是这种情况,您可以使用 filename-regex
属性来指定正则表达式(例如,filename-regex=".*\.test$"
)。如果您需要完全控制,可以使用 filter
属性提供对 org.springframework.integration.file.filters.FileListFilter
自定义实现的引用,这是一个用于过滤文件列表的策略接口。此过滤器确定哪些远程文件将被获取。您还可以通过使用 CompositeFileListFilter
将基于模式的过滤器与其他过滤器(如 AcceptOnceFileListFilter
,以避免同步之前已获取的文件)组合使用。
AcceptOnceFileListFilter
在内存中存储其状态。如果您希望状态在系统重启后仍然存在,请考虑改用 SftpPersistentAcceptOnceFileListFilter
。此过滤器将已接受的文件名存储在 MetadataStore
策略的一个实例中(请参阅元数据存储)。此过滤器根据文件名和远程修改时间进行匹配。
自从 4.0 版本以来,此过滤器需要一个 ConcurrentMetadataStore
。当与共享数据存储(例如使用 RedisMetadataStore
的 Redis
)一起使用时,这允许在多个应用程序或服务器实例之间共享过滤器键。
从 5.0 版本开始,默认为 SftpInboundFileSynchronizer
应用带有内存中 SimpleMetadataStore
的 SftpPersistentAcceptOnceFileListFilter
。此过滤器还与 XML 配置中的 regex
或 pattern
选项一起应用,以及通过 Java DSL 中的 SftpInboundChannelAdapterSpec
应用。您可以通过使用 CompositeFileListFilter
(或 ChainFileListFilter
)来处理任何其他用例。
上述讨论是指在检索文件之前对其进行过滤。一旦文件被检索到,文件系统中的文件会应用额外的过滤器。默认情况下,这是 `AcceptOnceFileListFilter` ,如本节所述,在内存中保留状态,并不考虑文件的修改时间。除非您的应用程序在处理后删除文件,否则适配器在应用程序重新启动后,默认会重新处理磁盘上的文件。
另外,如果你配置 filter
使用 SftpPersistentAcceptOnceFileListFilter
,并且远程文件的时间戳发生了变化(导致它被重新获取),默认的本地过滤器不允许这个新文件被处理。
有关此过滤器的更多信息及其使用方法,请参阅 远程持久文件列表过滤器。
你可以使用 local-filter
属性来配置本地文件系统过滤器的行为。从 4.3.8 版本开始,默认配置了一个 FileSystemPersistentAcceptOnceFileListFilter
。此过滤器将接受的文件名和修改的时间戳存储在 MetadataStore
策略的一个实例中(参见元数据存储),并检测本地文件修改时间的变化。默认的 MetadataStore
是一个 SimpleMetadataStore
,它将状态存储在内存中。
自从 4.1.5 版本以来,这些过滤器有了一个叫做 flushOnUpdate
的新属性,该属性会在每次更新时刷新元数据存储(如果存储实现了 Flushable
)。
此外,如果你使用分布式 MetadataStore
(例如 Redis Metadata Store),你可以拥有同一个适配器或应用程序的多个实例,并确保只有一个实例处理一个文件。
实际的本地过滤器是一个 CompositeFileListFilter
,它包含提供的过滤器和一个模式过滤器,该模式过滤器防止处理正在下载中的文件(基于 temporary-file-suffix
)。文件会带有此后缀进行下载(默认是 .writing
),当传输完成时,文件会被重命名为其最终名称,从而使它们对过滤器“可见”。
有关这些属性的更多详细信息,请参阅 schema。
SFTP 入站通道适配器是一个轮询消费者。因此,您必须配置一个轮询器(全局默认轮询器或本地元素轮询器)。一旦文件被传输到本地目录,就会生成并发送一条消息,其有效负载类型为 java.io.File
,该消息会发送到由 channel
属性标识的通道。
从 6.2 版本开始,您可以使用 SftpLastModifiedFileListFilter
根据最后修改策略过滤 SFTP 文件。此过滤器可以配置一个 age
属性,以便只有比该值更旧的文件才能通过过滤器。年龄默认为 60 秒,但您应该选择一个足够大的年龄以避免过早地获取文件(由于例如网络故障)。更多详情请参阅其 Javadoc。
更多关于文件过滤和大文件
有时,刚刚出现在监控(远程)目录中的文件可能不完整。通常,这类文件会用一些临时扩展名编写(例如,名为 something.txt.writing
的文件使用 .writing
),然后在写入过程完成后重命名。在大多数情况下,开发人员只对完整的文件感兴趣,并且希望仅过滤这些文件。要处理这些场景,您可以使用 filename-pattern
、filename-regex
和 filter
属性提供的过滤支持。如果您需要自定义过滤器实现,可以通过设置 filter
属性在适配器中包含引用。以下示例展示了如何操作:
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="receiveChannel"
session-factory="sftpSessionFactory"
filter="customFilter"
local-directory="file:/local-test-dir"
remote-directory="/remote-test-dir">
<int:poller fixed-rate="1000" max-messages-per-poll="10" task-executor="executor"/>
</int-sftp:inbound-channel-adapter>
<bean id="customFilter" class="org.foo.CustomFilter"/>
从故障中恢复
你应该理解适配器的架构。一个文件同步器获取文件,FileReadingMessageSource
为每个同步的文件发出一条消息。如之前讨论的,涉及两个过滤器。filter
属性(和模式)指的是远程 (SFTP) 文件列表,以避免获取已经获取过的文件。FileReadingMessageSource
使用 local-filter
来确定哪些文件将被发送为消息。
同步器列出远程文件并咨询其过滤器。然后传输这些文件。如果在文件传输期间发生 IO 错误,已经添加到过滤器的任何文件都将被移除,以便它们可以在下次轮询时重新获取。这仅适用于实现 ReversibleFileListFilter
接口的过滤器(例如 AcceptOnceFileListFilter
)。
如果在同步文件后,下游流程在处理文件时发生错误,不会自动回滚过滤器,因此,默认情况下失败的文件不会被重新处理。
如果您希望在失败后重新处理此类文件,可以使用类似的配置来便于将失败的文件从过滤器中移除:
<int-sftp:inbound-channel-adapter id="sftpAdapter"
session-factory="sftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/sftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-sftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
上述配置适用于任何 ResettableFileListFilter
。
从 5.0 版本开始,入站通道适配器可以根据生成的本地文件名在本地构建子目录。这也可以是远程子路径。为了能够根据层次结构支持递归读取本地目录以进行修改,你现在可以提供一个带有新的 RecursiveDirectoryScanner
的内部 FileReadingMessageSource
,该扫描器基于 Files.walk()
算法。更多信息请参见 AbstractInboundFileSynchronizingMessageSource.setScanner()。此外,你现在可以通过使用 setUseWatchService()
选项将 AbstractInboundFileSynchronizingMessageSource
切换到基于 WatchService
的 DirectoryScanner
。它还配置了所有 WatchEventType
实例,以响应本地目录中的任何修改。前面显示的重新处理示例是基于 FileReadingMessageSource.WatchServiceDirectoryScanner
的内置功能,在文件从本地目录中删除 (StandardWatchEventKinds.ENTRY_DELETE
) 时使用 ResettableFileListFilter.remove()
。更多信息请参见 WatchServiceDirectoryScanner。
使用 Java 配置进行配置
下面的 Spring Boot 应用程序展示了如何使用 Java 配置入站适配器的示例:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(port);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
factory.setTestSession(true);
return new CachingSessionFactory<>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(new File("sftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序展示了如何使用 Java DSL 配置入站适配器的示例:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow sftpInboundFlow() {
return IntegrationFlow
.from(Sftp.inboundAdapter(this.sftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilenameExpression("#this.toUpperCase() + '.a'")
.localDirectory(new File("sftp-inbound")),
e -> e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
处理不完整数据
见 处理不完整数据。
SftpSystemMarkerFilePresentFileListFilter
用于过滤远程系统上没有对应标记文件的远程文件。有关配置信息,请参阅 Javadoc。