跳到主要内容

入站通道适配器:轮询多个服务器和目录

QWen Plus 中英对照 Inbound Channel Adapters: Polling Multiple Servers and Directories

从 5.0.7 版本开始,RotatingServerAdvice 可用;当配置为轮询器建议时,入站适配器可以轮询多个服务器和目录。按照常规方式配置该建议并将其添加到轮询器的建议链中。使用 DelegatingSessionFactory 来选择服务器,详情请参阅 委托会话工厂。建议的配置由一系列 RotationPolicy.KeyDirectory 对象组成。

@Bean
public RotatingServerAdvice advice() {
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}
java

此建议将在服务器 one 上轮询目录 foo,直到没有新文件存在,然后移动到目录 bar,再移动到服务器 two 上的目录 baz,等等。

此默认行为可以通过 fair 构造函数参数进行修改:

@Bean
public RotatingServerAdvice advice() {
...
return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}
java

在这种情况下,无论之前的轮询是否返回了文件,建议都会移动到下一个服务器/目录。

或者,你可以提供自己的 RotationPolicy 来根据需要重新配置消息源:

public interface RotationPolicy {

void beforeReceive(MessageSource<?> source);

void afterReceive(boolean messageReceived, MessageSource<?> source);

}
java

@Bean
public RotatingServerAdvice advice() {
return new RotatingServerAdvice(myRotationPolicy());
}
java

local-filename-generator-expression 属性(同步器上的 localFilenameGeneratorExpression)现在可以包含 #remoteDirectory 变量。这允许从不同目录检索的文件下载到类似的本地目录:

@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from(Sftp.inboundAdapter(sf())
.filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.localDirectory(new File(tmpDir))
.localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
.channel(MessageChannels.queue("files"))
.get();
}
java
important

在使用此建议时,不要在轮询器上配置 TaskExecutor;有关更多信息,请参阅 消息源的条件轮询器

另请参见一个方便的 AbstractRemoteFileStreamingMessageSource.clearFetchedCache() API,当不是所有获取的文件都在单个轮询周期内处理时,但 SessionFactory 可能会切换到不同的会话工厂。