读取文件
FileReadingMessageSource 可用于从文件系统消费文件。这是 MessageSource 的一个实现,它从文件系统目录创建消息。以下示例展示了如何配置 FileReadingMessageSource:
<bean id="pollableFileSource"
class="org.springframework.integration.file.inbound.FileReadingMessageSource"
p:directory="${input.directory}"/>
为了防止为某些文件创建消息,你可以提供一个 FileListFilter。默认情况下,我们使用以下过滤器:
-
IgnoreHiddenFileListFilter -
AcceptOnceFileListFilter
IgnoreHiddenFileListFilter 确保隐藏文件不会被处理。请注意,隐藏的确切定义取决于系统。例如,在基于 UNIX 的系统中,以点字符开头的文件被视为隐藏文件。另一方面,Microsoft Windows 有一个专用的文件属性来指示隐藏文件。
版本 4.2 引入了 IgnoreHiddenFileListFilter。在之前的版本中,隐藏文件是被包含在内的。在默认配置下,首先触发 IgnoreHiddenFileListFilter,然后是 AcceptOnceFileListFilter。
AcceptOnceFileListFilter 确保文件只会从目录中被选取一次。
AcceptOnceFileListFilter 将其状态存储在内存中。如果您希望状态在系统重启后得以保留,可以使用 FileSystemPersistentAcceptOnceFileListFilter。该过滤器将已接受的文件名存储在 MetadataStore 实现中(参见 元数据存储)。此过滤器根据文件名和修改时间进行匹配。
自 4.0 版本起,此过滤器需要一个 ConcurrentMetadataStore。当与共享数据存储(例如使用 RedisMetadataStore 的 Redis)一起使用时,它允许过滤器键在多个应用程序实例之间或跨多个服务器使用的网络文件共享之间共享。
自 4.1.5 版本起,此过滤器新增了一个属性 (flushOnUpdate),该属性会导致它在每次更新时刷新元数据存储(如果存储实现了 Flushable 接口)。
持久化文件列表过滤器现在拥有一个布尔属性 forRecursion。将此属性设置为 true 时,也会同时设置 alwaysAcceptDirectories,这意味着出站网关(ls 和 mget)上的递归操作现在每次都会遍历完整的目录树。这是为了解决深层目录树中的变更未被检测到的问题。此外,forRecursion=true 会导致文件的完整路径被用作元数据存储的键;这解决了当同名文件在不同目录中多次出现时过滤器无法正常工作的问题。重要提示:这意味着对于顶级目录下的文件,持久化元数据存储中现有的键将无法被找到。因此,该属性默认值为 false;这可能会在未来的版本中改变。
以下示例配置了一个带有过滤器的 FileReadingMessageSource:
<bean id="pollableFileSource"
class="org.springframework.integration.file.inbound.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="customFilterBean"/>
读取文件时的一个常见问题是,文件可能在尚未就绪时就被检测到(即其他进程可能仍在写入该文件)。默认的 AcceptOnceFileListFilter 无法防止这种情况。在大多数情况下,如果文件写入进程在文件就绪可读时立即重命名文件,就可以避免此问题。通过组合一个仅接受就绪文件(可能基于已知后缀)的 filename-pattern 或 filename-regex 过滤器与默认的 AcceptOnceFileListFilter,可以应对这种情况。CompositeFileListFilter 支持这种组合,如下例所示:
<bean id="pollableFileSource"
class="org.springframework.integration.file.inbound.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="compositeFilter"/>
<bean id="compositeFilter"
class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
<bean class="o.s.i.file.filters.RegexPatternFileListFilter">
<constructor-arg value="^test.*$"/>
</bean>
</list>
</constructor-arg>
</bean>
如果无法使用临时名称创建文件并重命名为最终名称,Spring Integration 提供了另一种替代方案。版本 4.2 新增了 LastModifiedFileListFilter。该过滤器可通过配置 age 属性,使得只有修改时间早于此值的文件才能通过过滤器。age 的默认值为 60 秒,但您应选择足够大的值,以避免过早选取文件(例如由于网络故障)。以下示例展示了如何配置 LastModifiedFileListFilter:
<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
<property name="age" value="120" />
</bean>
从版本 4.3.7 开始,引入了 ChainFileListFilter(CompositeFileListFilter 的扩展),以支持后续过滤器仅能看到前一个过滤器结果的场景。(使用 CompositeFileListFilter 时,所有过滤器都能看到所有文件,但它仅传递通过所有过滤器的文件)。新行为的一个示例是 LastModifiedFileListFilter 和 AcceptOnceFileListFilter 的组合,当我们希望经过一段时间后才接受文件时。使用 CompositeFileListFilter 时,由于 AcceptOnceFileListFilter 在第一次传递时看到所有文件,当其他过滤器通过时,它后来不会再次传递该文件。当模式过滤器与查找辅助文件以指示文件传输完成的自定义过滤器结合使用时,CompositeFileListFilter 方法很有用。模式过滤器可能仅传递主文件(例如 something.txt),但“完成”过滤器需要查看是否存在(例如)something.done。
假设我们有文件 a.txt、a.done 和 b.txt。
模式过滤器仅通过 a.txt 和 b.txt,而“完成”过滤器则看到所有三个文件,并且仅通过 a.txt。复合过滤器的最终结果是仅发布 a.txt。
使用 ChainFileListFilter 时,如果链中的任何过滤器返回空列表,则不会调用剩余的过滤器。
版本 5.0 引入了 ExpressionFileListFilter,用于将文件作为上下文评估根对象执行 SpEL 表达式。为此,所有用于文件处理(本地和远程)的 XML 组件,以及现有的 filter 属性,都已提供 filter-expression 选项,如下例所示:
<int-file:inbound-channel-adapter
directory="${inputdir}"
filter-expression="name matches '.text'"
auto-startup="false"/>
版本 5.0.5 引入了 DiscardAwareFileListFilter 实现,这些实现关注被拒绝的文件。为此,应通过 addDiscardCallback(Consumer<File>) 为此类过滤器实现提供回调。在框架中,此功能与 LastModifiedFileListFilter 结合,由 FileReadingMessageSource.WatchServiceDirectoryScanner 使用。与常规的 DirectoryScanner 不同,WatchService 根据目标文件系统上的事件提供待处理的文件。在轮询包含这些文件的内部队列时,LastModifiedFileListFilter 可能会丢弃它们,因为相对于其配置的 age 而言,这些文件过于"年轻"。因此,我们失去了未来可能考虑该文件的机会。丢弃回调钩子让我们能够将文件保留在内部队列中,以便在后续轮询中根据 age 进行检查。CompositeFileListFilter 也实现了 DiscardAwareFileListFilter,并将其丢弃回调填充到其所有 DiscardAwareFileListFilter 委托中。
由于 CompositeFileListFilter 会针对所有委托对文件进行匹配,因此对于同一个文件,discardCallback 可能会被多次调用。
从版本5.1开始,FileReadingMessageSource 不再检查目录是否存在,也不会在调用其 start() 方法(通常通过包装的 SourcePollingChannelAdapter 调用)之前创建目录。在此之前,当引用目录时(例如在测试中,或在稍后应用权限时),没有简单的方法来防止操作系统权限错误。
与 LastModifiedFileListFilter 相对,从版本 6.5 开始引入了一种 RecentFileListFilter 策略。它是 AbstractRecentFileListFilter 针对本地文件系统的扩展。默认情况下,它会接受修改时间不超过 1 天的文件。有关其他远程文件协议的实现,请参阅其相应的其他实现。
消息头
从 5.0 版本开始,FileReadingMessageSource(除了将轮询到的 File 作为 payload 外)还会向出站 Message 填充以下头部信息:
-
FileHeaders.FILENAME:要发送文件的File.getName()。可用于后续的重命名或复制逻辑。 -
FileHeaders.ORIGINAL_FILE:File对象本身。通常,当我们丢失原始File对象时,此标头由框架组件(例如拆分器或转换器)自动填充。然而,为了与任何其他自定义用例保持一致和方便,此标头可用于访问原始文件。 -
FileHeaders.RELATIVE_PATH:引入的新标头,用于表示相对于扫描根目录的文件路径部分。当需要在其他地方恢复源目录层次结构时,此标头非常有用。为此,可以配置DefaultFileNameGenerator(参见"生成文件名")以使用此标头。
目录扫描与轮询
FileReadingMessageSource 不会立即处理目录中的文件。它使用一个内部队列来存放由 scanner 返回的“合格文件”。scanEachPoll 选项用于确保在每次轮询时,内部队列都会根据输入目录的最新内容进行刷新。默认情况下(scanEachPoll = false),FileReadingMessageSource 会在再次扫描目录之前清空其队列。这种默认行为对于减少对目录中大量文件的扫描特别有用。然而,在需要自定义排序的情况下,必须考虑将此标志设置为 true 的影响。文件处理的顺序可能不符合预期。默认情况下,队列中的文件按其自然(path)顺序处理。即使队列中已有文件,扫描添加的新文件也会插入到适当位置以保持该自然顺序。要自定义顺序,FileReadingMessageSource 可以接受一个 Comparator<File> 作为构造函数参数。内部(PriorityBlockingQueue)使用它来根据业务需求重新排序其内容。因此,要以特定顺序处理文件,您应该向 FileReadingMessageSource 提供一个比较器,而不是对自定义 DirectoryScanner 生成的列表进行排序。
版本 5.0 引入了 RecursiveDirectoryScanner 来执行文件树遍历。该实现基于 Files.walk(Path start, int maxDepth, FileVisitOption… options) 功能。根目录(DirectoryScanner.listFiles(File) 参数)被排除在结果之外。所有其他子目录的包含与排除都基于目标 FileListFilter 实现。例如,SimplePatternFileListFilter 默认会过滤掉目录。更多信息请参阅 AbstractDirectoryAwareFileListFilter 及其实现。
从版本 5.5 开始,Java DSL 的 FileInboundChannelAdapterSpec 提供了一个便捷的 recursive(boolean) 选项,用于在目标 FileReadingMessageSource 中使用 RecursiveDirectoryScanner 替代默认的扫描器。
从版本 7.0 开始,FileReadingMessageSource 可以为其 directory 属性配置一个 SpEL 表达式。每次请求新扫描时都会评估此表达式。当扫描完成后需要更改输入目录,或者当目录基于时间戳轮换时,这种逻辑可能会很有用。
命名空间支持
通过使用文件特定的命名空间,可以简化文件读取的配置。为此,请使用以下模板:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>
在此命名空间内,您可以简化 FileReadingMessageSource 并将其包装为入站通道适配器,具体如下:
<int-file:inbound-channel-adapter id="filesIn1"
directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>
<int-file:inbound-channel-adapter id="filesIn2"
directory="file:${input.directory}"
filter="customFilterBean" />
<int-file:inbound-channel-adapter id="filesIn3"
directory="file:${input.directory}"
filename-pattern="test*" />
<int-file:inbound-channel-adapter id="filesIn4"
directory="file:${input.directory}"
filename-regex="test[0-9]+\.txt" />
第一个通道适配器示例依赖于默认的 FileListFilter 实现:
-
IgnoreHiddenFileListFilter(不处理隐藏文件) -
AcceptOnceFileListFilter(防止重复处理)
因此,你也可以省略 prevent-duplicates 和 ignore-hidden 属性,因为它们默认值为 true。
Spring Integration 4.2 引入了 ignore-hidden 属性。在之前的版本中,隐藏文件会被包含在内。
第二个通道适配器示例使用了自定义过滤器,第三个示例使用 filename-pattern 属性添加基于 AntPathMatcher 的过滤器,第四个示例使用 filename-regex 属性为 FileReadingMessageSource 添加基于正则表达式模式的过滤器。filename-pattern 和 filename-regex 属性均与常规的 filter 引用属性互斥。但您可以通过 filter 属性引用 CompositeFileListFilter 实例,该实例可组合任意数量的过滤器(包括一个或多个基于模式的过滤器),以满足您的特定需求。
当多个进程从同一目录读取文件时,你可能需要锁定文件以防止它们被并发处理。为此,你可以使用 FileLocker。系统提供了一个基于 java.nio 的实现,但你也可以实现自己的锁定方案。nio 锁定器可以通过以下方式注入:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:nio-locker/>
</int-file:inbound-channel-adapter>
你可以按如下方式配置自定义锁:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
当文件入站适配器配置了锁定时,它负责在允许接收文件之前获取锁。它不承担解锁文件的责任。如果你已经处理了文件但锁仍然存在,就会导致内存泄漏。如果这是一个问题,你应该在适当的时候自己调用 FileLocker.unlock(File file)。
当过滤和锁定文件仍不足以满足需求时,您可能需要完全控制文件的列出方式。为实现此类需求,可以使用 DirectoryScanner 的实现。该扫描器允许您精确决定每次轮询中列出的文件。Spring Integration 在内部也使用此接口将 FileListFilter 实例和 FileLocker 连接到 FileReadingMessageSource。您可以通过 scanner 属性将自定义的 DirectoryScanner 注入到 <int-file:inbound-channel-adapter/> 中,如下例所示:
<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
scanner="customDirectoryScanner"/>
这样做可以让你完全自由地选择排序、列出和锁定策略。
同样重要的是要理解,过滤器(包括 patterns、regex、prevent-duplicates 等)和 locker 实例实际上是由 scanner 使用的。在适配器上设置的任何这些属性随后都会被注入到内部的 scanner 中。对于外部 scanner 的情况,FileReadingMessageSource 上禁止使用所有过滤器和锁属性。它们必须(如果需要)在该自定义 DirectoryScanner 上指定。换句话说,如果你向 FileReadingMessageSource 注入一个 scanner,你应该在该 scanner 上提供 filter 和 locker,而不是在 FileReadingMessageSource 上。
默认情况下,DefaultDirectoryScanner 使用一个 IgnoreHiddenFileListFilter 和一个 AcceptOnceFileListFilter。要阻止它们的使用,你可以配置自己的过滤器(例如 AcceptAllFileListFilter),甚至将其设置为 null。
WatchServiceDirectoryScanner
FileReadingMessageSource.WatchServiceDirectoryScanner 依赖文件系统事件来检测目录中新添加的文件。在初始化过程中,目录会被注册以生成事件。初始文件列表也会在初始化期间构建。在遍历目录树时,遇到的任何子目录也会被注册以生成事件。在首次轮询中,返回的是通过遍历目录得到的初始文件列表。在后续轮询中,返回的是来自新创建事件的文件。如果添加了新的子目录,则使用其创建事件来遍历新的子树,以查找现有文件并注册发现的任何新子目录。
当程序未能以目录修改事件发生的速度清空 WatchKey 的内部事件队列时,会出现问题。如果队列大小超出限制,系统会发出 StandardWatchEventKinds.OVERFLOW 事件,表示某些文件系统事件可能已丢失。在这种情况下,根目录会被完全重新扫描。为避免重复处理,请考虑使用适当的 FileListFilter(例如 AcceptOnceFileListFilter)或在处理完成后移除文件。
WatchServiceDirectoryScanner 可通过 FileReadingMessageSource.use-watch-service 选项启用,该选项与 scanner 选项互斥。系统会为指定的 directory 填充一个内部的 FileReadingMessageSource.WatchServiceDirectoryScanner 实例。
此外,现在 WatchService 的轮询逻辑能够追踪 StandardWatchEventKinds.ENTRY_MODIFY 和 StandardWatchEventKinds.ENTRY_DELETE 事件。
如果你需要跟踪现有文件的修改以及新文件,你应在 FileListFilter 中实现 ENTRY_MODIFY 事件的处理逻辑。否则,这些事件中的文件将被以相同方式处理。
ResettableFileListFilter 实现会捕获 ENTRY_DELETE 事件。因此,这些文件会被提供给 remove() 操作。当启用此事件时,诸如 AcceptOnceFileListFilter 之类的过滤器会移除该文件。这样一来,如果出现同名文件,它就能通过过滤器并作为消息发送。
为此,我们引入了 watch-events 属性(FileReadingMessageSource.setWatchEvents(WatchEventType… watchEvents))。(WatchEventType 是 FileReadingMessageSource 中的一个公共内部枚举类型。)通过此选项,我们可以为新增文件使用一种下游流程逻辑,为修改文件使用另一种逻辑。以下示例展示了如何在同一目录中为创建和修改事件配置不同的逻辑:
值得一提的是,ENTRY_DELETE 事件会涉及被监视目录中子目录的重命名操作。具体来说,与旧目录名相关的 ENTRY_DELETE 事件会先于通知新(重命名后)目录的 ENTRY_CREATE 事件发生。在某些操作系统(如 Windows)中,必须注册 ENTRY_DELETE 事件来处理这种情况。否则,在文件资源管理器中重命名被监视的子目录可能导致该子目录中的新文件无法被检测到。
<int-file:inbound-channel-adapter id="newFiles"
directory="${input.directory}"
use-watch-service="true"/>
<int-file:inbound-channel-adapter id="modifiedFiles"
directory="${input.directory}"
use-watch-service="true"
filter="acceptAllFilter"
watch-events="MODIFY"/> <!-- The default is CREATE. -->
从版本 6.1 开始,FileReadingMessageSource 提供了两个新的 WatchService 相关选项:
-
watchMaxDepth-Files.walkFileTree(Path root, Set attributes, int maxDepth, FileVisitor visitor)API 的一个参数; -
watchDirPredicate- 一个Predicate<Path>,用于测试扫描树中的目录是否应被遍历,并注册到WatchService及配置的监视事件类型。
限制内存消耗
您可以使用 HeadDirectoryScanner 来限制内存中保留的文件数量。这在扫描大型目录时非常有用。通过 XML 配置,您可以在入站通道适配器上设置 queue-size 属性来启用此功能。
在 4.2 版本之前,此设置与使用任何其他过滤器不兼容。任何其他过滤器(包括 prevent-duplicates="true")都会覆盖用于限制大小的过滤器。
使用 HeadDirectoryScanner 与 AcceptOnceFileListFilter 不兼容。由于在轮询决策期间会咨询所有过滤器,AcceptOnceFileListFilter 并不知道其他过滤器可能会临时过滤文件。即使之前被 HeadDirectoryScanner.HeadFilter 过滤的文件现在可用,AcceptOnceFileListFilter 仍会过滤它们。
通常,在这种情况下,不应使用 AcceptOnceFileListFilter,而应删除已处理的文件,以便之前被过滤的文件在未来的轮询中可用。
使用 Java 配置进行配置
以下Spring Boot应用程序展示了如何使用Java配置来配置出站适配器的示例:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(INBOUND_PATH));
source.setFilter(new SimplePatternFileListFilter("*.txt"));
return source;
}
@Bean
@Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}
}
使用 Java DSL 进行配置
以下Spring Boot应用程序展示了如何使用Java DSL配置出站适配器的示例:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(new File(INBOUND_PATH))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Files.toStringTransformer())
.channel("processFileChannel")
.get();
}
}
跟踪文件
另一个常见的用例是从文件末尾(或尾部)获取"行",并在添加新行时捕获它们。这里提供了两种实现方式。第一种是 OSDelegatingFileTailingMessageProducer,它使用原生的 tail 命令(在支持该命令的操作系统上)。这通常是这些平台上最高效的实现方式。对于没有 tail 命令的操作系统,第二种实现方式 ApacheCommonsFileTailingMessageProducer 使用 Apache commons-io 的 Tailer 类。
在这两种情况下,文件系统事件(例如文件不可用及其他事件)都会通过常规的Spring事件发布机制,以ApplicationEvent实例的形式发布。此类事件的示例如下:
[message=tail: cannot open '/tmp/somefile' for reading:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become inaccessible:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has appeared;
following end of new file, file=/tmp/somefile]
例如,当文件被轮转时,可能会发生前面示例中所示的事件序列。
从 5.0 版本开始,当文件在 idleEventInterval 期间没有数据时,会发出 FileTailingIdleEvent。以下示例展示了此类事件的具体形态:
[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
并非所有支持 tail 命令的平台都会提供这些状态信息。
从这些端点发出的消息包含以下标头:
-
FileHeaders.ORIGINAL_FILE:File对象 -
FileHeaders.FILENAME: 文件名 (File.getName())
在 5.0 版本之前,FileHeaders.FILENAME 头信息包含文件绝对路径的字符串表示。现在,您可以通过在原始文件头信息上调用 getAbsolutePath() 方法来获取该字符串表示。
以下示例使用默认选项('-F -n 0',表示从当前文件末尾开始追踪文件名)创建一个原生适配器。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
task-executor="exec"
file="/tmp/foo"/>
以下示例创建了一个带有 '-F -n +0' 选项的原生适配器(表示跟随文件名,并输出所有现有行)。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
native-options="-F -n +0"
task-executor="exec"
file-delay=10000
file="/tmp/foo"/>
如果 tail 命令失败(在某些平台上,文件缺失会导致 tail 命令失败,即使指定了 -F 参数),该命令会每隔 10 秒重试一次。
默认情况下,原生适配器会捕获标准输出并将其内容作为消息发送。它们还会捕获标准错误以触发事件。从 4.3.6 版本开始,您可以通过将 enable-status-reader 设置为 false 来丢弃标准错误事件,如下例所示:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
enable-status-reader="false"
task-executor="exec"
file="/tmp/foo"/>
在以下示例中,IdleEventInterval 被设置为 5000,这意味着如果五秒内没有写入任何行,则每五秒触发一次 FileTailingIdleEvent:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
idle-event-interval="5000"
task-executor="exec"
file="/tmp/somefile"/>
当你需要停止适配器时,这可能会很有用。
以下示例创建了一个 Apache commons-io Tailer 适配器,该适配器每两秒检查文件中的新行,并每十秒检查缺失文件的存在:
<int-file:tail-inbound-channel-adapter id="apache"
channel="input"
task-executor="exec"
file="/tmp/bar"
delay="2000"
end="false" // <1>
reopen="true" // <2>
file-delay="10000"/>
文件从开头开始追踪 (
end="false"),而非默认的从末尾开始。每个数据块都会重新打开文件(默认是保持文件打开状态)。
指定 delay、end 或 reopen 属性会强制使用 Apache commons-io 适配器,并使 native-options 属性不可用。
处理不完整数据
在文件传输场景中,一个常见问题是如何确定传输已完成,以避免读取不完整的文件。解决此问题的常用技术是使用临时名称写入文件,然后以原子操作方式将其重命名为最终名称。结合使用过滤器来屏蔽临时文件,使其不被消费者获取,这提供了一种稳健的解决方案。Spring Integration 中写入文件(本地或远程)的组件就采用了这种技术。默认情况下,它们会在文件名后附加 .writing,并在传输完成时将其移除。