入站通道适配器:控制远程文件获取
配置入站通道适配器时,您应考虑两个属性。与所有轮询器一样,max-messages-per-poll 可用于限制每次轮询时发出的消息数量(如果就绪消息超过配置值)。max-fetch-size(自 5.0 版本起)可限制每次从远程服务器检索的文件数量。
以下场景假设起始状态为一个空的本地目录:
-
max-messages-per-poll=2且max-fetch-size=1:适配器获取一个文件,发出该文件,再获取下一个文件并发出。然后休眠直到下一次轮询。 -
max-messages-per-poll=2且max-fetch-size=2:适配器获取两个文件,然后逐个发出。 -
max-messages-per-poll=2且max-fetch-size=4:适配器最多获取 4 个文件(如果可用),并发出前两个(如果至少有两个)。剩余的两个文件将在下一次轮询中发出。 -
max-messages-per-poll=2且未指定max-fetch-size:适配器获取所有远程文件,并发出前两个(如果至少有两个)。后续文件将在后续轮询中发出(每次两个)。当所有文件都处理完毕后,将再次尝试远程获取以拾取任何新文件。
当您部署多个应用程序实例时,建议设置较小的 max-fetch-size,以避免某个实例“抢占”所有文件,导致其他实例无法获取资源。
max-fetch-size 的另一个用途是,当你希望停止获取远程文件但继续处理已获取的文件时。在 MessageSource 上设置 maxFetchSize 属性(通过编程、JMX 或 控制总线)可以有效地阻止适配器获取更多文件,但允许轮询器继续为先前已获取的文件发出消息。如果在更改属性时轮询器处于活动状态,则更改将在下一次轮询时生效。
从 5.1 版本开始,同步器可以配置 Comparator<?> 参数。这在通过 maxFetchSize 限制获取文件数量时非常有用。
从版本6.4开始,AbstractRemoteFileStreamingMessageSource 现在提供了一个便捷的 clearFetchedCache() API,用于从缓存中移除未处理的远程文件引用。这些引用之所以保留在缓存中,是因为轮询配置不允许在一个周期内处理所有文件,并且目标 SessionFactory 可能在轮询周期之间发生变化,例如通过 RotatingServerAdvice。
从 7.0 版本开始,AbstractInboundFileSynchronizer 在应用 maxFetchSize 切片后会缓存经过筛选的 Session.list(remoteDirectory)。AbstractInboundFileSynchronizer.transferFilesFromRemoteToLocal() 方法的逻辑如下:
-
若
maxFetchSize > 0,系统会针对remoteDirectory获取锁,以避免在缓存相关操作期间不同线程间的竞态条件。由于后续所有同步仅处理内存中的缓存剩余项,因此性能影响微乎其微; -
若
remoteDirectory没有对应的缓存条目,系统会调用Session.list(remoteDirectory),并对返回的所有远程文件进行过滤; -
过滤后的结果会被截取至
maxFetchSize指定的数量; -
随后这些文件条目将被传输到本地目录;
-
过滤后剩余的远程文件会被缓存,供后续同步使用;
-
若
remoteDirectory存在缓存条目,则直接对该列表进行截取(至 maxFetchSize 数量)并遍历以传输至本地目录; -
若其中任一传输失败,
filter会从失败的远程文件处重置,同时缓存也会被清空;因此,下一次同步将从干净的状态重新开始。
另请参阅通用的 SFTP 入站通道适配器 章节,了解有关 FileListFilter 配置的信息。