此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
SFTP 入站通道适配器
SFTP 入站通道适配器是一个特殊的侦听器,它连接到服务器并侦听远程目录事件(例如正在创建的新文件),此时它会启动文件传输。 以下示例演示如何配置 SFTP 入站通道适配器:
<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
session-factory="sftpSessionFactory"
channel="requestChannel"
filename-pattern="*.txt"
remote-directory="/foo/bar"
preserve-timestamp="true"
local-directory="file:target/foo"
auto-create-local-directory="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
delete-remote-files="false">
<int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>
前面的配置示例演示了如何为各种属性提供值,包括以下内容:
-
local-directory
:文件将传输到的位置 -
remote-directory
:要从中传输文件的远程源目录 -
session-factory
:对我们之前配置的 bean 的引用
默认情况下,传输的文件与原始文件具有相同的名称。
如果要覆盖此行为,可以将local-filename-generator-expression
属性,它允许您提供 SpEL 表达式来生成本地文件的名称。
与出站网关和适配器不同,其中 SpEL 评估上下文的根对象是Message
,则此入站适配器在评估时尚未收到该消息,因为这是它最终使用传输的文件作为其有效负载生成的内容。
因此,SpEL 求值上下文的根对象是远程文件的原始名称 (String
).
入站通道适配器首先将文件检索到本地目录,然后根据轮询器配置发出每个文件。
从 5.0 版开始,当需要检索新文件时,您可以限制从 SFTP 服务器获取的文件数。
当目标文件很大或在具有持久文件列表过滤器的群集系统中运行时,这非常有用,本节稍后将讨论。
用max-fetch-size
为此目的。
负值(默认值)表示没有限制,并且会检索所有匹配的文件。
有关更多信息,请参阅入站通道适配器:控制远程文件获取。
从 5.0 版本开始,您还可以提供自定义DirectoryScanner
实现到inbound-channel-adapter
通过将scanner
属性。
从 Spring Integration 3.0 开始,您可以指定preserve-timestamp
属性(默认值为false
).
什么时候true
,则本地文件的修改时间戳设置为从服务器检索的值。
否则,它设置为当前时间。
从 4.2 版开始,您可以指定remote-directory-expression
而不是remote-directory
,它允许您动态确定每次轮询的目录 - 例如remote-directory-expression="@myBean.determineRemoteDir()"
.
有时,基于通过filename-pattern
属性可能不够。
如果是这种情况,您可以使用filename-regex
属性来指定正则表达式(例如,filename-regex=".*\.test$"
).
如果您需要完全控制,可以使用filter
属性来提供对org.springframework.integration.file.filters.FileListFilter
,这是一个用于过滤文件列表的策略接口。
此过滤器确定要检索哪些远程文件。
您还可以将基于模式的过滤器与其他过滤器(例如AcceptOnceFileListFilter
,以避免同步之前获取的文件),方法是使用CompositeFileListFilter
.
这AcceptOnceFileListFilter
将其状态存储在内存中。
如果您希望状态在系统重新启动后继续存在,请考虑使用SftpPersistentAcceptOnceFileListFilter
相反。
此过滤器将接受的文件名存储在MetadataStore
策略(请参阅元数据存储)。
此过滤器与文件名和远程修改时间相匹配。
从 4.0 版开始,此过滤器需要ConcurrentMetadataStore
.
与共享数据存储(例如Redis
使用RedisMetadataStore
),这允许在多个应用程序或服务器实例之间共享过滤器键。
从 5.0 版开始,SftpPersistentAcceptOnceFileListFilter
使用内存中的SimpleMetadataStore
默认情况下应用于SftpInboundFileSynchronizer
.
此过滤器也与regex
或pattern
选项,以及通过SftpInboundChannelAdapterSpec
在 Java DSL 中。
您可以使用以下命令来处理任何其他用例CompositeFileListFilter
(或ChainFileListFilter
).
上面的讨论是指在检索文件之前对其进行过滤。 检索文件后,将对文件系统上的文件应用额外的过滤器。 默认情况下,这是一个“AcceptOnceFileListFilter”,如本节所述,它将状态保留在内存中,并且不考虑文件的修改时间。 除非应用程序在处理后删除文件,否则适配器会在应用程序重新启动后默认重新处理磁盘上的文件。
此外,如果您配置filter
使用SftpPersistentAcceptOnceFileListFilter
并且远程文件时间戳更改(导致它被重新获取),默认本地过滤器不允许处理这个新文件。
有关此筛选器及其使用方式的详细信息,请参阅远程持久文件列表筛选器。
您可以使用local-filter
属性来配置本地文件系统过滤器的行为。
从 4.3.8 版本开始,FileSystemPersistentAcceptOnceFileListFilter
默认配置。
此过滤器将接受的文件名和修改的时间戳存储在MetadataStore
策略(请参阅元数据存储)并检测对本地文件修改时间的更改。
默认值MetadataStore
是一个SimpleMetadataStore
将状态存储在内存中。
从 4.1.5 版开始,这些过滤器有一个名为flushOnUpdate
,这会导致它们刷新
元数据存储(如果存储实现Flushable
).
此外,如果您使用分布式MetadataStore (例如 Redis 元数据存储),您可以拥有同一适配器或应用程序的多个实例,并确保只有一个实例处理一个文件。 |
实际的本地过滤器是CompositeFileListFilter
包含提供的过滤器和模式过滤器,该过滤器可防止处理正在下载的文件(基于temporary-file-suffix
).
下载带有此后缀的文件(默认值为.writing
),传输完成后,文件将重命名为最终名称,使它们对过滤器“可见”。
有关这些属性的更多详细信息,请参阅架构。
SFTP 入站通道适配器是轮询使用者。
因此,您必须配置轮询器(全局默认值或本地元素)。
将文件传输到本地目录后,将出现一条带有java.io.File
由于其有效负载类型被生成并发送到channel
属性。
从版本 6.2 开始,您可以使用以下命令根据上次修改的策略过滤 SFTP 文件SftpLastModifiedFileListFilter
.
此过滤器可以配置age
属性,以便过滤器仅传递早于此值的文件。
年龄默认为 60 秒,但您应该选择足够大的年龄以避免提前拾取文件(例如,由于网络故障)。
查看其 Javadoc 以获取更多信息。
相比之下,从 6.5 版开始,SftpRecentFileListFilter
已引入仅接受那些不早于提供的文件age
.
有关文件过滤和大文件的更多信息
有时,刚刚出现在受监视(远程)目录中的文件不完整。
通常,此类文件是使用一些临时扩展名(例如.writing
在名为something.txt.writing
),然后在写入过程完成后重命名。
在大多数情况下,开发人员只对完整的文件感兴趣,并且只想过滤这些文件。
若要处理这些方案,可以使用filename-pattern
,filename-regex
和filter
属性。
如果需要自定义筛选器实现,可以通过将filter
属性。
以下示例显示了如何执行此作:
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="receiveChannel"
session-factory="sftpSessionFactory"
filter="customFilter"
local-directory="file:/local-test-dir"
remote-directory="/remote-test-dir">
<int:poller fixed-rate="1000" max-messages-per-poll="10" task-executor="executor"/>
</int-sftp:inbound-channel-adapter>
<bean id="customFilter" class="org.foo.CustomFilter"/>
从故障中恢复
您应该了解适配器的体系结构。
文件同步器获取文件,然后FileReadingMessageSource
为每个同步文件发出一条消息。
如前所述,涉及两个过滤器。
这filter
属性(和模式)引用远程 (SFTP) 文件列表,以避免获取已获取的文件。
这FileReadingMessageSource
使用local-filter
以确定哪些文件将作为消息发送。
同步器列出远程文件并查阅其过滤器。
然后传输文件。
如果在文件传输期间发生 IO 错误,则会删除已添加到过滤器中的任何文件,以便有资格在下一次轮询时重新提取它们。
仅当过滤器实现ReversibleFileListFilter
(例如AcceptOnceFileListFilter
).
如果在同步文件后,处理文件的下游流发生错误,则不会发生过滤器的自动回滚,因此默认情况下不会重新处理失败的文件。
如果希望在失败后重新处理此类文件,可以使用类似于以下内容的配置来方便从过滤器中删除失败的文件:
<int-sftp:inbound-channel-adapter id="sftpAdapter"
session-factory="sftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/sftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-sftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
上述配置适用于任何ResettableFileListFilter
.
从 5.0 版开始,入站通道适配器可以根据生成的本地文件名在本地构建子目录。
这也可以是一个远程子路径。
为了能够递归地读取本地目录以根据层次结构支持进行修改,您现在可以提供一个内部FileReadingMessageSource
使用新的RecursiveDirectoryScanner
基于Files.walk()
算法。 看AbstractInboundFileSynchronizingMessageSource.setScanner()
了解更多信息。此外,您现在可以切换AbstractInboundFileSynchronizingMessageSource
到WatchService
-基于DirectoryScanner
通过使用setUseWatchService()
选择。 它还配置为所有WatchEventType
实例对本地目录中的任何修改做出反应。前面显示的重新处理示例基于FileReadingMessageSource.WatchServiceDirectoryScanner
,它使用ResettableFileListFilter.remove()
删除文件时 (StandardWatchEventKinds.ENTRY_DELETE
) 从本地目录中。 看WatchServiceDirectoryScanner
了解更多信息。
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置入站适配器的示例:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(port);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
factory.setTestSession(true);
return new CachingSessionFactory<>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(new File("sftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序显示了如何使用 Java DSL 配置入站适配器的示例:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow sftpInboundFlow() {
return IntegrationFlow
.from(Sftp.inboundAdapter(this.sftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilenameExpression("#this.toUpperCase() + '.a'")
.localDirectory(new File("sftp-inbound")),
e -> e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}