此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
读取文件
一个FileReadingMessageSource
可用于使用文件系统中的文件。
这是MessageSource
从文件系统目录创建消息。
以下示例演示如何配置FileReadingMessageSource
:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:directory="${input.directory}"/>
要防止为某些文件创建消息,您可以提供FileListFilter
.
默认情况下,我们使用以下过滤器:
-
IgnoreHiddenFileListFilter
-
AcceptOnceFileListFilter
这IgnoreHiddenFileListFilter
确保不处理隐藏文件。
请注意,隐藏的确切定义取决于系统。
例如,在基于 UNIX 的系统上,以句点字符开头的文件被视为隐藏。
另一方面,Microsoft Windows 有一个专用的文件属性来指示隐藏文件。
4.2 版引入了 |
这AcceptOnceFileListFilter
确保仅从目录中选取一次文件。
这 从 4.0 版开始,此过滤器需要 从版本 4.1.5 开始,此过滤器具有一个新属性 ( |
持久文件列表过滤器现在具有布尔属性forRecursion
.
将此属性设置为true
,也设置alwaysAcceptDirectories
,这意味着出站网关上的递归作 (ls
和mget
) 现在每次都会遍历完整的目录树。
这是为了解决未检测到目录树深处更改的问题。
另外forRecursion=true
导致文件的完整路径用作元数据存储键;这解决了如果具有相同名称的文件多次出现在不同目录中,则过滤器无法正常工作的问题。
重要提示:这意味着对于顶级目录下的文件,将找不到持久元数据存储中的现有密钥。
因此,该属性是false
默认情况下;这可能会在将来的版本中更改。
以下示例配置FileReadingMessageSource
使用过滤器:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="customFilterBean"/>
读取文件的一个常见问题是,在文件准备就绪之前可能会被检测到(即,其他进程可能仍在写入文件)。
默认值AcceptOnceFileListFilter
并不能阻止这一点。
在大多数情况下,如果文件写入过程在每个文件准备好读取后立即重命名,则可以防止这种情况发生。
一个filename-pattern
或filename-regex
过滤器,该过滤器仅接受已准备就绪的文件(可能基于已知后缀),使用默认值AcceptOnceFileListFilter
,允许这种情况。
这CompositeFileListFilter
启用组合,如以下示例所示:
<bean id="pollableFileSource"
class="org.springframework.integration.file.FileReadingMessageSource"
p:inputDirectory="${input.directory}"
p:filter-ref="compositeFilter"/>
<bean id="compositeFilter"
class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
<bean class="o.s.i.file.filters.RegexPatternFileListFilter">
<constructor-arg value="^test.*$"/>
</bean>
</list>
</constructor-arg>
</bean>
如果无法使用临时名称创建文件并重命名为最终名称,Spring Integration 提供了另一种选择。
4.2 版本添加了LastModifiedFileListFilter
.
此过滤器可以配置age
属性,以便过滤器仅传递早于此值的文件。
年龄默认为 60 秒,但您应该选择足够大的年龄以避免提前拾取文件(例如,由于网络故障)。
以下示例演示如何配置LastModifiedFileListFilter
:
<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
<property name="age" value="120" />
</bean>
从 4.3.7 版本开始,ChainFileListFilter
(的扩展CompositeFileListFilter
) 的引入,以允许后续筛选器应仅看到上一个筛选器的结果的情况。
(使用CompositeFileListFilter
,所有过滤器都会看到所有文件,但它只传递已通过所有过滤器的文件)。
需要新行为的一个示例是LastModifiedFileListFilter
和AcceptOnceFileListFilter
,当我们不想接受文件时,直到经过一段时间。
使用CompositeFileListFilter
,因为AcceptOnceFileListFilter
在第一次通过时查看所有文件,当另一个过滤器通过时,它不会再传递它。
这CompositeFileListFilter
当模式过滤器与查找辅助文件以指示文件传输完成的自定义过滤器结合使用时,方法非常有用。
模式筛选器可能只传递主文件(例如something.txt
),但“完成”过滤器需要查看(例如)something.done
存在。
假设我们有文件a.txt
,a.done
和b.txt
.
模式过滤器仅通过a.txt
和b.txt
,而“完成”过滤器会看到所有三个文件并仅通过a.txt
.
复合Filter的最终结果是,只有a.txt
被释放。
使用ChainFileListFilter ,如果链中的任何过滤器返回空列表,则不会调用其余过滤器。 |
5.0 版引入了ExpressionFileListFilter
以对文件执行 SpEL 表达式作为上下文求值根对象。
为此,用于文件处理的所有 XML 组件(本地和远程)以及现有的filter
属性,已提供filter-expression
选项,如以下示例所示:
<int-file:inbound-channel-adapter
directory="${inputdir}"
filter-expression="name matches '.text'"
auto-startup="false"/>
5.0.5 版引入了DiscardAwareFileListFilter
对被拒绝的文件感兴趣的实现。
为此,应通过addDiscardCallback(Consumer<File>)
.
在框架中,此功能是从FileReadingMessageSource.WatchServiceDirectoryScanner
,结合LastModifiedFileListFilter
.
与常规不同DirectoryScanner
这WatchService
提供根据目标文件系统上的事件进行处理的文件。
在轮询具有这些文件的内部队列时,LastModifiedFileListFilter
可能会丢弃它们,因为它们相对于其配置的来说太年轻了age
.
因此,我们丢失了该文件以备将来可能考虑。
discard 回调钩子允许我们将文件保留在内部队列中,以便可以根据age
在随后的民意调查中。
这CompositeFileListFilter
还实现了一个DiscardAwareFileListFilter
并将丢弃回调填充到其所有DiscardAwareFileListFilter
代表。
因为CompositeFileListFilter 将文件与所有委托进行匹配,则discardCallback 可以对同一文件多次调用。 |
从 5.1 版本开始,FileReadingMessageSource
不检查目录是否存在,并且直到其start()
被调用(通常通过包装SourcePollingChannelAdapter
).
以前,没有简单的方法可以防止在引用目录时(例如来自测试)或稍后应用权限时出现作系统权限错误。
与LastModifiedFileListFilter
一个RecentFileListFilter
从 6.5 版开始引入策略。
它是本地文件系统的扩展AbstractRecentFileListFilter
.
默认情况下,它接受不超过 1 天的文件。
请参阅其其他实现,了解相应的远程文件协议。
邮件头
从 5.0 版开始,FileReadingMessageSource
(除了payload
作为轮询的File
) 将以下标头填充到出站Message
:
-
FileHeaders.FILENAME
:这File.getName()
要发送的文件。 可用于后续重命名或复制逻辑。 -
FileHeaders.ORIGINAL_FILE
:这File
对象本身。 通常,当我们丢失原始组件时,此标头会自动由框架组件(例如拆分器或转换器)填充File
对象。 但是,为了与任何其他自定义用例的一致性和便利性,此标头对于访问原始文件非常有用。 -
FileHeaders.RELATIVE_PATH
:引入了一个新标头,用于表示文件路径相对于扫描根目录的部分。 当需要在其他位置还原源目录层次结构时,此标头可能很有用。 为此,该DefaultFileNameGenerator
(参见“'生成文件名)可以配置为使用此标头。
目录扫描和轮询
这FileReadingMessageSource
不会立即为目录中的文件生成消息。
它使用内部队列来存储scanner
.
这scanEachPoll
选项用于确保在每次轮询时使用最新的输入目录内容刷新内部队列。
默认情况下 (scanEachPoll = false
)、FileReadingMessageSource
清空其队列,然后再扫描目录。
此默认行为对于减少目录中大量文件的扫描特别有用。
但是,在需要自定义排序的情况下,请务必考虑将此标志设置为true
.
文件的处理顺序可能与预期不同。
默认情况下,队列中的文件以其自然 (path
) 订单。
扫描添加的新文件(即使队列已有文件)也会插入到适当的位置以保持该自然顺序。
要自定义订单,请FileReadingMessageSource
可以接受Comparator<File>
作为构造函数参数。
它由内部 (PriorityBlockingQueue
)根据业务需求重新排序其内容。
因此,要按特定顺序处理文件,您应该提供一个比较器FileReadingMessageSource
而不是对自定义生成的列表进行排序DirectoryScanner
.
5.0 版推出RecursiveDirectoryScanner
以执行文件树访问。
实现基于Files.walk(Path start, int maxDepth, FileVisitOption… options)
功能性。
根目录 (DirectoryScanner.listFiles(File)
) 参数从结果中排除。
所有其他子目录包含和排除均基于目标FileListFilter
实现。
例如,SimplePatternFileListFilter
默认过滤掉目录。
看AbstractDirectoryAwareFileListFilter
及其实现以获取更多信息。
从 5.5 版本开始,FileInboundChannelAdapterSpec 的 Java DSL 有一个方便的recursive(boolean) 使用RecursiveDirectoryScanner 在目标中FileReadingMessageSource 而不是默认的。 |
命名空间支持
使用特定于文件的命名空间可以简化文件读取的配置。 为此,请使用以下模板:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>
在此命名空间中,您可以减少FileReadingMessageSource
并将其包装在入站通道适配器中,如下所示:
<int-file:inbound-channel-adapter id="filesIn1"
directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>
<int-file:inbound-channel-adapter id="filesIn2"
directory="file:${input.directory}"
filter="customFilterBean" />
<int-file:inbound-channel-adapter id="filesIn3"
directory="file:${input.directory}"
filename-pattern="test*" />
<int-file:inbound-channel-adapter id="filesIn4"
directory="file:${input.directory}"
filename-regex="test[0-9]+\.txt" />
第一个通道适配器示例依赖于默认的FileListFilter
实现:
-
IgnoreHiddenFileListFilter
(不处理隐藏文件) -
AcceptOnceFileListFilter
(防止重复)
因此,您也可以省略prevent-duplicates
和ignore-hidden
属性,因为它们是true
默认情况下。
Spring Integration 4.2 引入了 |
第二个通道适配器示例使用自定义过滤器,第三个使用自定义过滤器filename-pattern
属性添加AntPathMatcher
基于过滤器,第四个使用该过滤器的filename-regex
属性,将基于正则表达式模式的过滤器添加到FileReadingMessageSource
.
这filename-pattern
和filename-regex
每个属性都与常规filter
reference 属性。
但是,您可以使用filter
属性来引用CompositeFileListFilter
它结合了任意数量的过滤器,包括一个或多个基于模式的过滤器,以满足您的特定需求。
当多个进程从同一目录读取时,您可能需要锁定文件以防止它们被并发选取。
为此,您可以使用FileLocker
.
有一个java.nio
基于的实现可用,但也可以实现您自己的锁定方案。
这nio
储物柜可以按如下方式注入:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:nio-locker/>
</int-file:inbound-channel-adapter>
您可以按如下方式配置自定义储物柜:
<int-file:inbound-channel-adapter id="filesIn"
directory="file:${input.directory}" prevent-duplicates="true">
<int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
当文件入站适配器配置了储物柜时,它负责在允许接收文件之前获取锁。
它不承担解锁文件的责任。
如果您已经处理了文件并保持锁悬挂,则存在内存泄漏。
如果这是一个问题,您应该调用FileLocker.unlock(File file) 在适当的时间自己。 |
如果过滤和锁定文件还不够,您可能需要控制文件的完全列出方式。
要实现此类需求,您可以使用DirectoryScanner
.
此扫描程序可让您准确确定每次轮询中列出的文件。
这也是 Spring Integration 内部用于连接的接口FileListFilter
instances 和FileLocker
到FileReadingMessageSource
.
您可以注入自定义DirectoryScanner
进入<int-file:inbound-channel-adapter/>
在scanner
属性,如以下示例所示:
<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
scanner="customDirectoryScanner"/>
这样做使您可以完全自由地选择排序、列表和锁定策略。
同样重要的是要了解过滤器(包括patterns
,regex
,prevent-duplicates
等)和locker
实例实际上由scanner
.
在适配器上设置的任何这些属性随后都会注入到内部scanner
.
对于外部scanner
,则禁止在FileReadingMessageSource
.
必须在该自定义中指定(如果需要)DirectoryScanner
.
换句话说,如果您注入scanner
进入FileReadingMessageSource
,您应该提供filter
和locker
关于这一点scanner
,而不是在FileReadingMessageSource
.
默认情况下,DefaultDirectoryScanner 使用IgnoreHiddenFileListFilter 和AcceptOnceFileListFilter .
若要防止使用它们,您可以配置自己的过滤器(例如AcceptAllFileListFilter ),甚至将其设置为null . |
WatchServiceDirectoryScanner
这FileReadingMessageSource.WatchServiceDirectoryScanner
当将新文件添加到目录时依赖于文件系统事件。
在初始化期间,将注册目录以生成事件。
初始文件列表也是在初始化期间构建的。
在遍历目录树时,遇到的任何子目录也会被注册以生成事件。
在第一次轮询时,返回遍历目录的初始文件列表。
在后续轮询中,将返回来自新创建事件的文件。
如果添加了新的子目录,则其创建事件将用于遍历新子树以查找现有文件并注册找到的任何新子目录。
有一个问题WatchKey 当其内部事件queue 不会在目录修改事件发生时被程序清空。
如果超过队列大小,则StandardWatchEventKinds.OVERFLOW 发出以指示某些文件系统事件可能丢失。
在这种情况下,将完全重新扫描根目录。
为避免重复,请考虑使用适当的FileListFilter (例如AcceptOnceFileListFilter ) 或在处理完成后删除文件。 |
这WatchServiceDirectoryScanner
可以通过FileReadingMessageSource.use-watch-service
选项,它与scanner
选择。
内部FileReadingMessageSource.WatchServiceDirectoryScanner
实例为提供的directory
.
此外,现在WatchService
轮询逻辑可以跟踪StandardWatchEventKinds.ENTRY_MODIFY
和StandardWatchEventKinds.ENTRY_DELETE
.
如果您需要跟踪现有文件和新文件的修改,则应实现ENTRY_MODIFY
events 逻辑中的FileListFilter
.
否则,将以相同的方式处理这些事件中的文件。
这ResettableFileListFilter
实现会选择ENTRY_DELETE
事件。
因此,他们的文件是为remove()
操作。
启用此事件后,过滤器(例如AcceptOnceFileListFilter
删除该文件。
因此,如果出现同名文件,则该文件将通过筛选器并作为消息发送。
为此,该watch-events
属性 (FileReadingMessageSource.setWatchEvents(WatchEventType… watchEvents)
)已被引入。
(WatchEventType
是FileReadingMessageSource
.)
有了这样的选项,我们可以对新文件使用一个下游流逻辑,对修改后的文件使用其他逻辑。
以下示例展示了如何为同一目录中的创建和修改事件配置不同的逻辑:
值得一提的是,ENTRY_DELETE
事件涉及监视目录子目录的重命名作。
更具体地说,ENTRY_DELETE
事件,与上一个目录名称相关,位于ENTRY_CREATE
通知新(重命名)目录的事件。
在某些作系统(如 Windows)上,ENTRY_DELETE
必须注册事件来处理这种情况。
否则,在文件资源管理器中重命名监视的子目录可能会导致在该子目录中未检测到新文件。
<int-file:inbound-channel-adapter id="newFiles"
directory="${input.directory}"
use-watch-service="true"/>
<int-file:inbound-channel-adapter id="modifiedFiles"
directory="${input.directory}"
use-watch-service="true"
filter="acceptAllFilter"
watch-events="MODIFY"/> <!-- The default is CREATE. -->
从 6.1 版开始,FileReadingMessageSource
公开了两个新的WatchService
-相关选项:
-
watchMaxDepth
- 一个参数Files.walkFileTree(Path root, Set attributes, int maxDepth, FileVisitor visitor)
应用程序接口; -
watchDirPredicate
-一个Predicate<Path>
以测试是否应该遍历扫描树中的目录并使用WatchService
以及配置的监视事件类型。
限制内存消耗
您可以使用HeadDirectoryScanner
以限制内存中保留的文件数。
这在扫描大型目录时非常有用。
对于 XML 配置,这是通过设置queue-size
入站通道适配器上的属性。
在 4.2 版之前,此设置与任何其他过滤器的使用不兼容。
任何其他过滤器(包括prevent-duplicates="true"
) 覆盖了用于限制大小的过滤器。
使用 通常,而不是使用 |
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置配置出站适配器的示例:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public MessageChannel fileInputChannel() {
return new DirectChannel();
}
@Bean
@InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
public MessageSource<File> fileReadingMessageSource() {
FileReadingMessageSource source = new FileReadingMessageSource();
source.setDirectory(new File(INBOUND_PATH));
source.setFilter(new SimplePatternFileListFilter("*.txt"));
return source;
}
@Bean
@Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
public FileToStringTransformer fileToStringTransformer() {
return new FileToStringTransformer();
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序显示了如何使用 Java DSL 配置出站适配器的示例:
@SpringBootApplication
public class FileReadingJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileReadingJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileReadingFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(new File(INBOUND_PATH))
.patternFilter("*.txt"),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(Files.toStringTransformer())
.channel("processFileChannel")
.get();
}
}
'tailing'文件
另一个流行的用例是从文件的末尾(或尾部)获取“行”,并在添加新行时捕获新行。
提供了两种实现。
第一个,OSDelegatingFileTailingMessageProducer
,使用本机tail
命令(在具有命令的作系统上)。
这通常是这些平台上最有效的实现。
对于没有tail
命令,第二个实现,ApacheCommonsFileTailingMessageProducer
,使用 Apachecommons-io
Tailer
类。
在这两种情况下,文件系统事件(例如文件不可用和其他事件)都发布为ApplicationEvent
实例使用正常的 Spring 事件发布机制。
此类事件的示例包括:
[message=tail: cannot open '/tmp/somefile' for reading:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has become inaccessible:
No such file or directory, file=/tmp/somefile]
[message=tail: '/tmp/somefile' has appeared;
following end of new file, file=/tmp/somefile]
例如,在轮换文件时,可能会发生上述示例中显示的事件序列。
从 5.0 版开始,一个FileTailingIdleEvent
在文件中没有数据时发出idleEventInterval
.
以下示例显示了此类事件的外观:
[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
并非所有支持tail 命令提供这些状态消息。 |
从这些终结点发出的消息具有以下标头:
-
FileHeaders.ORIGINAL_FILE
:这File
对象 -
FileHeaders.FILENAME
:文件名 (File.getName()
)
在 5.0 之前的版本中,FileHeaders.FILENAME header 包含文件绝对路径的字符串表示。
现在,您可以通过调用getAbsolutePath() 在原始文件头上。 |
以下示例使用默认选项(“-F -n 0”,表示跟随当前末尾的文件名)创建本机适配器。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
task-executor="exec"
file="/tmp/foo"/>
以下示例创建一个带有“-F -n +0”选项的本机适配器(意味着遵循文件名,发出所有现有行)。
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
native-options="-F -n +0"
task-executor="exec"
file-delay=10000
file="/tmp/foo"/>
如果tail
命令失败(在某些平台上,丢失的文件会导致tail
失败,即使有-F
指定),则每 10 秒重试一次命令。
默认情况下,本机适配器从标准输出中捕获内容,并将内容作为消息发送。
它们还捕获标准误差以引发事件。
从版本 4.3.6 开始,您可以通过将enable-status-reader
自false
,如以下示例所示:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
enable-status-reader="false"
task-executor="exec"
file="/tmp/foo"/>
在以下示例中,IdleEventInterval
设置为5000
,这意味着,如果五秒钟内没有写任何行,FileTailingIdleEvent
每五秒触发一次:
<int-file:tail-inbound-channel-adapter id="native"
channel="input"
idle-event-interval="5000"
task-executor="exec"
file="/tmp/somefile"/>
当您需要停止适配器时,这会很有用。
以下示例创建 Apachecommons-io
Tailer
适配器,每两秒检查一次文件中的新行,并每十秒检查是否存在丢失的文件:
<int-file:tail-inbound-channel-adapter id="apache"
channel="input"
task-executor="exec"
file="/tmp/bar"
delay="2000"
end="false" (1)
reopen="true" (2)
file-delay="10000"/>
1 | 该文件从头开始就尾随 (end="false" ) 而不是结束(这是默认值)。 |
2 | 每个块都会重新打开该文件(默认值是保持文件打开)。 |
指定delay ,end 或reopen attributes 强制使用 Apachecommons-io 适配器,并使native-options 属性不可用。 |
处理不完整的数据
文件传输方案中的一个常见问题是如何确定传输是否完成,以便您不会开始读取不完整的文件。
解决此问题的常用技术是使用临时名称编写文件,然后以原子方式将其重命名为最终名称。
这种技术与屏蔽临时文件不被消费者获取的过滤器一起提供了一个强大的解决方案。
此技术由写入文件(本地或远程)的 Spring Integration 组件使用。
默认情况下,它们将.writing
添加到文件名,并在传输完成后将其删除。