SFTP 流式传输入站通道适配器
<int-sftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-sftp:inbound-streaming-channel-adapter>
你只能使用 filename-pattern
、filename-regex
、filter
或 filter-expression
中的一个。
从 5.0 版本开始,默认情况下,SftpStreamingMessageSource
适配器通过使用基于内存中的 SimpleMetadataStore
的 SftpPersistentAcceptOnceFileListFilter
来防止远程文件的重复。默认情况下,此过滤器也会与文件名模式(或正则表达式)一起应用。如果您需要允许重复项,可以使用 AcceptAllFileListFilter
。您可以通过使用 CompositeFileListFilter
(或 ChainFileListFilter
)来处理任何其他用例。Java 配置稍后显示展示了一种在处理后删除远程文件以避免重复的技术。
有关 SftpPersistentAcceptOnceFileListFilter
的更多信息及其使用方法,请参阅远程持久文件列表过滤器。
你可以使用 max-fetch-size
属性来限制每次轮询时获取的文件数量,当需要获取时。将其设置为 1
并在集群环境中使用持久化过滤器。更多信息,请参见 入站通道适配器:控制远程文件获取。
适配器将远程目录和文件名放在头部 (FileHeaders.REMOTE_DIRECTORY
和 FileHeaders.REMOTE_FILE
)。从 5.0 版开始,FileHeaders.REMOTE_FILE_INFO
头部提供了额外的远程文件信息(以 JSON 格式)。如果您将 SftpStreamingMessageSource
的 fileInfoJson
属性设置为 false
,则头部包含一个 SftpFileInfo
对象。您可以通过使用 SftpFileInfo.getFileInfo()
方法访问底层 SftpClient
提供的 SftpClient.DirEntry
对象。当您使用 XML 配置时,fileInfoJson
属性不可用,但您可以通过将 SftpStreamingMessageSource
注入到您的一个配置类中来设置它。另请参阅 远程文件信息。
使用 Java 配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置入站适配器的示例:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
messageSource.setRemoteDirectory("sftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public SftpRemoteFileTemplate template() {
return new SftpRemoteFileTemplate(sftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + '/' + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
请注意,在这个例子中,转换器下游的消息处理程序有一个 advice
,在处理完成后会删除远程文件。