SFTP 适配器
SFTP适配器
Spring Integration 提供了对通过 SFTP 进行文件传输操作的支持。
安全文件传输协议(SFTP)是一种网络协议,允许您通过任何可靠的流在互联网上的两台计算机之间传输文件。
SFTP 协议需要一个安全的通道(例如 SSH),并在整个 SFTP 会话中可见客户端的身份。
Spring Integration 通过提供三个客户端侧端点支持通过 SFTP 发送和接收文件:入站通道适配器、出站通道适配器和出站网关。 它还提供了便捷的命名空间配置,用于定义这些客户端组件。
从 6.0 版本开始,过时的 JCraft JSch 客户端已被现代的 Apache MINA SSHD 框架所取代。
这导致了框架组件中出现了大量破坏性变更。
然而,在大多数情况下,此类迁移被隐藏在 Spring Integration API 之后。
变化最剧烈的部分是 DefaultSftpSessionFactory,它现在基于 org.apache.sshd.client.SshClient 并暴露了其部分配置属性。 |
您需要将以下依赖项包含到您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-sftp</artifactId>
<version>6.0.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-sftp:6.0.9"
要在您的 XML 配置中包含 SFTP 命名空间,请在根元素中包含以下属性:
xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
xsi:schemaLocation="http://www.springframework.org/schema/integration/sftp
https://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd"
SFTP 会话工厂
| 从版本 3.0 开始,会话默认不再被缓存。 请参见 SFTP 会话缓存。 |
在配置 SFTP 适配器之前,必须先配置 SFTP 会话工厂。 您可以使用常规 Bean 定义来配置 SFTP 会话工厂,如下例所示:
<beans:bean id="sftpSessionFactory"
class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
<beans:property name="host" value="localhost"/>
<beans:property name="privateKey" value="classpath:META-INF/keys/sftpTest"/>
<beans:property name="privateKeyPassphrase" value="springIntegration"/>
<beans:property name="port" value="22"/>
<beans:property name="user" value="kermit"/>
</beans:bean>
每当适配器从其 SessionFactory 请求会话对象时,都会创建一个新的 SFTP 会话。
在底层,SFTP 会话工厂依赖 Apache MINA SSHD 库来提供 SFTP 功能。
然而,Spring Integration 还支持 SFTP 会话的缓存。 有关更多信息,请参阅 SFTP 会话缓存。
|
The 使用此功能时,您必须将会话工厂包装在缓存会话工厂中,如后文所述,这样在操作完成时连接不会被物理关闭。 如果缓存被重置,则仅在最后一个通道关闭时断开会话。 如果在新操作获取会话时发现连接已断开,则连接将被刷新。 |
现在您需要做的只是将此 SFTP 会话工厂注入到您的适配器中。
| 为 SFTP 会话工厂提供值的一种更实用的方法是使用 Spring 的属性占位符支持。 |
配置属性
以下列表描述了由 DefaultSftpSessionFactory 暴露的所有属性。
isSharedSession (构造函数参数)::当true时,所有请求的SftpSession实例将使用单个SftpClient。
默认值为false。
sftpVersionSelector::SFTP协议选择的实例。
默认值为SftpVersionSelector.CURRENT。
host::要连接的主机的 URL。
必需。
hostConfig::作为用户/主机/端口选项的替代方案,指定一个实例。
可以使用代理跳转属性进行配置。
port::建立 SFTP 连接所使用的端口。
如果未指定,此值默认为 22。
如果已指定,该属性必须为正数。
user::要使用的远程用户。
必需。
knownHostsResource::用于主机密钥存储库的 org.springframework.core.io.Resource。
该资源的格式必须与 OpenSSH known_hosts 文件相同,且为必填项;若 allowUnknownKeys 为 false,则必须预先填充。
password::用于验证远程主机的密码。
如果未提供密码,则必须设置 privateKey 属性。
privateKey::表示用于向远程主机进行身份验证的私钥位置的 org.springframework.core.io.Resource。
如果未提供 privateKey,则必须提供 password 属性。
privateKeyPassphrase::私钥的密码。
如果您设置了 userInfo,则不允许设置 privateKeyPassphrase。
密码短语从该对象中获取。
可选。
timeout::超时属性用作套接字超时参数,以及默认的连接超时。
默认为 0,这意味着不会发生超时。
allowUnknownKeys::设置为 true 以允许连接到具有未知(或已更改)密钥的主机。
其默认值为 'false'。
如果为 false,则需要一个预填充的 knownHosts 文件。
userInteraction::身份验证期间使用的自定义 org.apache.sshd.client.auth.keyboard.UserInteraction。
委托会话工厂
版本 4.2 引入了 DelegatingSessionFactory,允许在运行时选择实际的会话工厂。
在调用 SFTP 端点之前,您可以在该工厂上调用 setThreadKey(),将密钥与当前线程关联。
随后使用该密钥来查找实际要使用的会话工厂。
使用完毕后,您可以通过调用 clearThreadKey() 来清除该密钥。
我们添加了便捷方法,以便您可以更轻松地通过消息流执行此操作,如下例所示:
<bean id="dsf" class="org.springframework.integration.file.remote.session.DelegatingSessionFactory">
<constructor-arg>
<bean class="o.s.i.file.remote.session.DefaultSessionFactoryLocator">
<!-- delegate factories here -->
</bean>
</constructor-arg>
</bean>
<int:service-activator input-channel="in" output-channel="c1"
expression="@dsf.setThreadKey(#root, headers['factoryToUse'])" />
<int-sftp:outbound-gateway request-channel="c1" reply-channel="c2" ... />
<int:service-activator input-channel="c2" output-channel="out"
expression="@dsf.clearThreadKey(#root)" />
当使用会话缓存时(参见SFTP 会话缓存),每个委托对象都应被缓存。
您无法缓存DelegatingSessionFactory本身。 |
从版本 5.0.7 开始,DelegatingSessionFactory 可与 RotatingServerAdvice 配合使用以轮询多个服务器;请参见 入站通道适配器:轮询多个服务器和目录。
SFTP 会话缓存
从 Spring Integration 3.0 版本开始,会话默认不再被缓存。
端点上的 cache-sessions 属性不再受支持。
如果您希望缓存会话,则必须使用 CachingSessionFactory(请参见下一个示例)。 |
在 3.0 之前的版本中,会话默认会自动缓存。
提供了一个 cache-sessions 属性用于禁用自动缓存,但该方案无法配置其他会话缓存属性。
例如,您无法限制创建的会话数量。
为了支持该需求及其他配置选项,我们添加了 CachingSessionFactory。
它提供了 sessionCacheSize 和 sessionWaitTimeout 属性。
顾名思义,sessionCacheSize 属性控制工厂在其缓存中维护的活跃会话数量(默认为无限制)。
如果已达到 sessionCacheSize 阈值,任何尝试获取新会话的请求都将阻塞,直到其中一个缓存中的会话变为可用,或者等待会话的时间过期(默认等待时间为 Integer.MAX_VALUE)。
sessionWaitTimeout 属性用于配置等待时间。
如果您希望缓存会话,请配置您的默认会话工厂(如前文所述),然后将其包装在CachingSessionFactory实例中,您可以在其中提供这些额外属性。
以下示例展示了如何操作:
<bean id="sftpSessionFactory"
class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
<property name="host" value="localhost"/>
</bean>
<bean id="cachingSessionFactory"
class="org.springframework.integration.file.remote.session.CachingSessionFactory">
<constructor-arg ref="sftpSessionFactory"/>
<constructor-arg value="10"/>
<property name="sessionWaitTimeout" value="1000"/>
</bean>
上述示例创建了一个CachingSessionFactory,其sessionCacheSize设置为10,其sessionWaitTimeout设置为一秒(1000毫秒)。
从 Spring Integration 3.0 版本开始,CachingConnectionFactory 提供了一种 resetCache() 方法。
当该方法被调用时,所有空闲会话将立即关闭,正在使用的会话将在其返回到缓存时关闭。
使用 isSharedSession=true 时,通道被关闭,共享会话仅在最后一个通道关闭时才被关闭。
新的会话请求会根据需要建立新会话。
从版本 5.1 开始,CachingSessionFactory 新增了一个属性 testSession。
当该属性为 true 时,系统将通过执行一个针对空路径的 REALPATH 命令来测试会话是否仍然活跃;如果会话已失效,则将其从缓存中移除;若缓存中没有活跃的会话,则会创建一个新的会话。
使用RemoteFileTemplate
Spring Integration 3.0 版本为 SftpSession 对象提供了新的抽象。
该模板提供了发送、检索(作为 InputStream)、删除和重命名文件的方法。
此外,我们还提供了一个 execute 方法,允许调用者在会话中运行多个操作。
在所有情况下,该模板都会负责可靠地关闭会话。
更多信息请参阅 RemoteFileTemplate 的 Javadoc。还有一个 SFTP 的子类:SftpRemoteFileTemplate。
我们在4.1版本中添加了额外方法,包括getClientInstance()。
它提供了对底层ChannelSftp的访问权限,从而能够访问低级API。
版本 5.0 引入了 RemoteFileOperations.invoke(OperationsCallback<F, T> action) 方法。
该方法允许在同一个线程绑定的 Session 范围内调用多个 RemoteFileOperations 调用。
当您需要将 RemoteFileTemplate 的多个高级操作作为一个工作单元执行时,这非常有用。
例如,AbstractRemoteFileOutboundGateway 将其与 mput 命令实现一起使用,在该实现中,我们对提供的目录中的每个文件执行 put 操作,并递归地对其子目录执行该操作。
有关更多信息,请参阅 Javadoc。
SFTP 入站通道适配器
SFTP 入站通道适配器是一种特殊的监听器,它连接到服务器并监听远程目录事件(例如创建新文件),此时它将启动文件传输。 以下示例展示了如何配置 SFTP 入站通道适配器:
<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
session-factory="sftpSessionFactory"
channel="requestChannel"
filename-pattern="*.txt"
remote-directory="/foo/bar"
preserve-timestamp="true"
local-directory="file:target/foo"
auto-create-local-directory="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
delete-remote-files="false">
<int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>
上述配置示例展示了如何为各种属性提供值,包括以下内容:
-
local-directory: 文件将被传输到的位置 -
remote-directory: 要从中传输文件的远程源目录 -
session-factory: 对之前配置的 bean 的引用
默认情况下,传输的文件会保留与原始文件相同的名称。
如果您希望覆盖此行为,可以设置 local-filename-generator-expression 属性,该属性允许您提供一个 SpEL 表达式来生成本地文件的名称。
与出站网关和适配器不同(在那些场景中,SpEL 求值上下文的根对象是 Message),此入站适配器在求值时尚未拥有消息,因为它最终生成的正是以传输文件为有效负载的消息。
因此,SpEL 求值上下文的根对象是远程文件的原始名称(一个 String)。
入站通道适配器首先将文件检索到本地目录,然后根据轮询配置逐个发出每个文件。
从 5.0 版本开始,当需要新文件检索时,您可以限制从 SFTP 服务器获取的文件数量。
当目标文件较大或在具有持久文件列表过滤器(本节稍后讨论)的集群系统中运行时,这可能会带来好处。
为此请使用 max-fetch-size。
负值(默认值)表示无限制,将检索所有匹配的文件。
有关更多信息,请参阅 入站通道适配器:控制远程文件获取。
自 5.0 版本起,您还可以通过设置 scanner 属性向 inbound-channel-adapter 提供自定义的 DirectoryScanner 实现。
从 Spring Integration 3.0 开始,您可以指定 preserve-timestamp 属性(默认值为 false)。
当设置为 true 时,本地文件的修改时间戳将被设置为从服务器检索到的值。
否则,它将设置为当前时间。
从版本 4.2 开始,您可以使用 remote-directory-expression 替代 remote-directory,从而在每次轮询时动态确定目录——例如 remote-directory-expression="@myBean.determineRemoteDir()"。
有时,仅通过 filename-pattern 属性指定的简单模式进行文件过滤可能不够用。
如果是这种情况,您可以使用 filename-regex 属性指定一个正则表达式(例如 filename-regex=".*\.test$")。
如果您需要完全的控制权,可以使用 filter 属性提供一个对 org.springframework.integration.file.filters.FileListFilter 自定义实现的引用,org.springframework.integration.file.filters.FileListFilter 是一个用于过滤文件列表的策略接口。
该过滤器决定了哪些远程文件将被获取。
您还可以通过使用 CompositeFileListFilter 将基于模式的过滤器与其他过滤器(例如 AcceptOnceFileListFilter,以避免同步之前已获取的文件)组合起来。
AcceptOnceFileListFilter将状态存储在内存中。
如果您希望状态在系统重启后仍然存在,请考虑使用SftpPersistentAcceptOnceFileListFilter代替。
此过滤器将接受的文件名存储在一个MetadataStore策略的实例中(请参阅元数据存储)。
此过滤器基于文件名和远程修改时间进行匹配。
自 4.0 版本起,此过滤器需要一个 ConcurrentMetadataStore。
当与共享数据存储(例如使用 Redis 配合 RedisMetadataStore)一起使用时,它允许过滤器键在多个应用程序或服务器实例之间共享。
从 5.0 版本开始,SftpPersistentAcceptOnceFileListFilter 与内存中的 SimpleMetadataStore 默认应用于 SftpInboundFileSynchronizer。
此过滤器也与应用了 XML 配置中的 regex 或 pattern 选项的情况一起使用,同时也可通过 Java DSL 中的 SftpInboundChannelAdapterSpec 进行应用。
您可以通过使用 CompositeFileListFilter(或 ChainFileListFilter)来处理任何其他用例。
上述讨论涉及在检索文件之前对文件进行过滤。 一旦文件被检索,将对文件系统上的文件应用额外的过滤器。 默认情况下,这是一个`AcceptOnceFileListFilter`,如本节所述,它在内存中保留状态且不考虑文件的修改时间。 除非您的应用程序在处理后删除了文件,否则适配器在应用程序重启后将默认重新处理磁盘上的文件。
此外,如果您配置filter使用SftpPersistentAcceptOnceFileListFilter,并且远程文件的时间戳发生变化(导致重新获取),默认的本地过滤器将不允许处理此新文件。
有关此筛选器的更多信息及其用法,请参阅远程持久文件列表筛选器。
您可以使用 local-filter 属性来配置本地文件系统过滤器的行为。
从版本 4.3.8 开始,默认配置为 FileSystemPersistentAcceptOnceFileListFilter。
该过滤器将接受的文件名和修改时间戳存储在 MetadataStore 策略的实例中(参见 元数据存储),并检测本地文件修改时间的变化。
默认的 MetadataStore 是一个 SimpleMetadataStore,它将状态存储在内存中。
自版本 4.1.5 起,这些过滤器拥有一个名为 flushOnUpdate 的新属性,该属性会导致它们在每次更新时刷新元数据存储(如果该存储实现了 Flushable)。
此外,如果您使用分布式MetadataStore(例如Redis元数据存储),您可以拥有多个相同适配器或应用程序的实例,并确保只有一个实例处理文件。 |
实际的本地过滤器是一个CompositeFileListFilter,它包含提供的过滤器和一个模式过滤器,用于防止处理正在下载过程中的文件(基于temporary-file-suffix)。
文件以下列后缀进行下载(默认为.writing),当传输完成后,文件会被重命名为其最终名称,从而使它们对过滤器“可见”。
有关这些属性的更多详细信息,请参阅模式。
SFTP 入站通道适配器是一个轮询消费者。
因此,您必须配置一个轮询器(可以是全局默认值或本地元素)。
一旦文件被传输到本地目录,就会生成一个负载类型为 java.io.File 的消息,并将其发送到由 channel 属性标识的通道。
有关文件过滤和大文件的更多信息
有时,刚出现在监控(远程)目录中的文件可能尚未完成。
通常,此类文件会使用某种临时扩展名写入(例如名为 something.txt.writing 的文件使用 .writing 作为扩展名),然后在写入过程完成后重命名。
在大多数情况下,开发者只关心已完成的文件,并希望仅过滤这些文件。
为处理这些场景,您可以使用 filename-pattern、filename-regex 和 filter 属性提供的过滤支持。
如果您需要自定义过滤器实现,可以通过设置 filter 属性在适配器中包含引用。
以下示例展示了如何实现:
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="receiveChannel"
session-factory="sftpSessionFactory"
filter="customFilter"
local-directory="file:/local-test-dir"
remote-directory="/remote-test-dir">
<int:poller fixed-rate="1000" max-messages-per-poll="10" task-executor="executor"/>
</int-sftp:inbound-channel-adapter>
<bean id="customFilter" class="org.foo.CustomFilter"/>
从故障中恢复
您应该理解适配器的架构。
文件同步器会获取文件,并且一个 FileReadingMessageSource 会为每个已同步的文件发送一条消息。
正如前文所述,涉及两个过滤器。
filter 属性(及模式)指的是远程(SFTP)文件列表,以避免获取已经获取过的文件。
FileReadingMessageSource 使用 local-filter 来确定哪些文件应作为消息发送。
同步器会列出远程文件并查询其过滤器。
随后进行文件传输。
如果在文件传输过程中发生IO错误,则会将已添加到过滤器中的任何文件移除,以便它们在下次轮询时能够重新获取。
这仅适用于实现了ReversibleFileListFilter的过滤器(例如AcceptOnceFileListFilter)。
如果在同步文件后,下游流在处理文件时发生错误,则不会自动回滚过滤器,因此默认情况下失败的文件不会被重新处理。
如果您希望在失败后重新处理此类文件,可以使用以下类似的配置,以便从过滤器中移除失败的文件:
<int-sftp:inbound-channel-adapter id="sftpAdapter"
session-factory="sftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/sftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-sftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
上述配置适用于任何 ResettableFileListFilter。
从 5.0 版本开始,入站通道适配器可以根据生成的本地文件名在本地构建子目录。
这也可以是一个远程子路径。
为了能够根据层次结构支持递归读取本地目录以进行修改,您现在可以提供一个内部的 FileReadingMessageSource,并基于 Files.walk() 算法使用新的 RecursiveDirectoryScanner。
有关更多信息,请参见 AbstractInboundFileSynchronizingMessageSource.setScanner()。
此外,您现在可以使用 setUseWatchService() 选项将 AbstractInboundFileSynchronizingMessageSource 切换为基于 WatchService 的 DirectoryScanner。
它也已配置为所有 WatchEventType 实例以响应本地目录中的任何修改。
前面所示的重处理示例基于 FileReadingMessageSource.WatchServiceDirectoryScanner 的内置功能,该功能在文件从本地目录被删除(StandardWatchEventKinds.ENTRY_DELETE)时使用 ResettableFileListFilter.remove()。
有关更多信息,请参见 WatchServiceDirectoryScanner。
使用 Java 配置进行配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置入站适配器的示例:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(port);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
factory.setTestSession(true);
return new CachingSessionFactory<>(factory);
}
@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(new File("sftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序示例展示了如何使用 Java DSL 配置入站适配器:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow sftpInboundFlow() {
return IntegrationFlow
.from(Sftp.inboundAdapter(this.sftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilenameExpression("#this.toUpperCase() + '.a'")
.localDirectory(new File("sftp-inbound")),
e -> e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
SFTP 流式入站通道适配器
版本 4.3 引入了流式入站通道适配器。
该适配器生成负载类型为 InputStream 的消息,使您无需写入本地文件系统即可获取文件。
由于会话保持打开状态,消费应用程序负责在文件被消费后关闭会话。
会话通过 closeableResource 头提供(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE)。
标准框架组件,如 FileSplitter 和 StreamTransformer,会自动关闭会话。
有关这些组件的更多信息,请参阅 文件分割器 和 流转换器。
以下示例展示了如何配置 SFTP 流式入站通道适配器:
<int-sftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-sftp:inbound-streaming-channel-adapter>
您只能使用 filename-pattern、filename-regex、filter 或 filter-expression 中的一个。
从 5.0 版本开始,默认情况下,SftpStreamingMessageSource 适配器会基于内存中的 SimpleMetadataStore 使用 SftpPersistentAcceptOnceFileListFilter 来防止远程文件重复。
默认情况下,此过滤器也会与文件名模式(或正则表达式)一起应用。
如果您需要允许重复,可以使用 AcceptAllFileListFilter。
您可以通过使用 CompositeFileListFilter(或 ChainFileListFilter)来处理任何其他用例。
稍后显示的 Java 配置 所示 展示了一种在处理后删除远程文件以避免重复的技术。 |
有关 SftpPersistentAcceptOnceFileListFilter 的更多信息及其用法,请参阅 远程持久文件列表过滤器。
您可以使用 max-fetch-size 属性来限制在需要获取时每次轮询所获取的文件数量。
将其设置为 1,并在集群环境中运行时使用持久化过滤器。
有关更多信息,请参阅 入站通道适配器:控制远程文件获取。
适配器将远程目录和文件名放入请求头(分别为FileHeaders.REMOTE_DIRECTORY和FileHeaders.REMOTE_FILE)。
从版本 5.0 开始,FileHeaders.REMOTE_FILE_INFO 请求头提供了额外的远程文件信息(以 JSON 格式)。
如果您将 SftpStreamingMessageSource 上的 fileInfoJson 属性设置为 false,则该请求头将包含一个 SftpFileInfo 对象。
您可以通过调用 SftpFileInfo.getFileInfo() 方法来访问底层 SftpClient 提供的 SftpClient.DirEntry 对象。
在使用 XML 配置时,fileInfoJson 属性不可用,但您可以将 SftpStreamingMessageSource 注入到您的某个配置类中来设置它。
另请参阅 远程文件信息。
使用 Java 配置进行配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置入站适配器的示例:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
messageSource.setRemoteDirectory("sftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public SftpRemoteFileTemplate template() {
return new SftpRemoteFileTemplate(sftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
请注意,在此示例中,转换器下游的消息处理器具有一个 advice,用于在处理完成后删除远程文件。
入站通道适配器:轮询多个服务器和目录
从版本 5.0.7 开始,RotatingServerAdvice 可用;当配置为轮询建议时,入站适配器可以轮询多个服务器和目录。
按常规方式配置该建议并将其添加到轮询器的建议链中。
使用 DelegatingSessionFactory 来选择服务器,有关更多信息请参阅 委托会话工厂。
建议配置由一个 RotationPolicy.KeyDirectory 对象列表组成。
@Bean
public RotatingServerAdvice advice() {
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}
此建议将轮询服务器 foo 上的目录 foo,直到不存在新文件,然后移动到目录 bar,接着移动到服务器 two 上的目录 baz,依此类推。
此默认行为可以通过 fair 构造函数参数进行修改:
@Bean
public RotatingServerAdvice advice() {
...
return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}
在这种情况下,无论前一次轮询是否返回文件,建议都会移动到下一个服务器/目录。
或者,您可以提供自己的 RotationPolicy 以根据需要重新配置消息源:
public interface RotationPolicy {
void beforeReceive(MessageSource<?> source);
void afterReceive(boolean messageReceived, MessageSource<?> source);
}
和
@Bean
public RotatingServerAdvice advice() {
return new RotatingServerAdvice(myRotationPolicy());
}
local-filename-generator-expression 属性(在同步器上为 localFilenameGeneratorExpression)现在可以包含 #remoteDirectory 变量。
这允许从不同目录检索的文件下载到本地类似的目录中:
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from(Sftp.inboundAdapter(sf())
.filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.localDirectory(new File(tmpDir))
.localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
.channel(MessageChannels.queue("files"))
.get();
}
当使用此建议时,请勿在轮询器上配置TaskExecutor;有关更多信息,请参阅消息源的有条件轮询器。 |
入站通道适配器:控制远程文件获取
在配置入站通道适配器时,您应该考虑两个属性。
max-messages-per-poll:与所有轮询器一样,可用于限制每次轮询发出的消息数量(如果准备好的消息超过配置值)。
max-fetch-size(自 5.0 版本起)可限制从远程服务器一次性检索的文件数量。
以下场景假设起始状态是一个空的本地目录:
-
max-messages-per-poll=2和max-fetch-size=1:适配器获取一个文件,将其发出,然后获取下一个文件并再次发出。 随后它会休眠直到下一次轮询。 -
max-messages-per-poll=2andmax-fetch-size=2): 适配器获取这两个文件,然后依次发出每个文件。 -
max-messages-per-poll=2和max-fetch-size=4:适配器最多获取 4 个文件(如果可用),并先发出前两个(如果至少有两个)。 接下来的两个文件将在下一次轮询时发出。 -
max-messages-per-poll=2和max-fetch-size未指定:适配器会获取所有远程文件,并发出前两个(如果至少有两个)。 后续文件将在后续的轮询中发出(每次两个)。 当所有文件都被消费后,将再次尝试远程获取,以捕获任何新文件。
当您部署多个应用实例时,我们建议设置一个较小的 max-fetch-size,以避免单个实例“抢占”所有文件而导致其他实例资源饥饿。 |
另一个使用 max-fetch-size 的场景是,当您希望停止获取远程文件,但继续处理已获取的文件时。
在 MessageSource(通过编程方式、JMX 或 控制总线)上设置 maxFetchSize 属性,可有效停止适配器获取更多文件,同时允许轮询器继续为之前已获取的文件发送消息。
如果轮询器在更改该属性时处于活动状态,则更改将在下一次轮询时生效。
从版本 5.1 开始,同步器可以接收一个 Comparator<?>。
当限制获取的文件数量为 maxFetchSize 时,这非常有用。
SFTP 出站通道适配器
SFTP 出站通道适配器是一种特殊的MessageHandler,它连接到远程目录,并为每个作为传入Message负载接收的文件发起文件传输。
它还支持多种文件表示形式,因此您不必局限于File对象。
与 FTP 出站适配器类似,SFTP 出站通道适配器支持以下负载:
-
java.io.File: 实际的文件对象 -
byte[]: 表示文件内容的字节数组 -
java.lang.String: 表示文件内容的文本 -
java.io.InputStream: 要传输到远程文件的数据流 -
org.springframework.core.io.Resource: 用于向远程文件传输数据的一个资源
以下示例展示了如何配置 SFTP 出站通道适配器:
<int-sftp:outbound-channel-adapter id="sftpOutboundAdapter"
session-factory="sftpSessionFactory"
channel="inputChannel"
charset="UTF-8"
remote-file-separator="/"
remote-directory="foo/bar"
remote-filename-generator-expression="payload.getName() + '-mysuffix'"
filename-generator="fileNameGenerator"
use-temporary-filename="true"
chmod="600"
mode="REPLACE"/>
有关这些属性的更多详细信息,请参阅模式。
SpEL 与 SFTP 出站适配器
与 Spring Integration 中的许多其他组件一样,在配置 SFTP 出站通道适配器时,您可以使用 Spring 表达式语言 (SpEL),只需指定两个属性:remote-directory-expression 和 remote-filename-generator-expression(前文已描述)。
表达式求值上下文以消息作为其根对象,这使得您可以使用表达式,根据消息中的数据(来自 'payload' 或 'headers')动态计算文件名或现有目录路径。
在前面的示例中,我们定义了带有表达式值的 remote-filename-generator-expression 属性,该表达式基于文件的原始名称计算文件名,同时附加后缀:'-mysuffix'。
从版本 4.1 开始,在传输文件时您可以指定 mode。
默认情况下,现有文件会被覆盖。
这些模式由 FileExistsMode 枚举定义,其中包括以下值:
-
REPLACE(默认) -
REPLACE_IF_MODIFIED -
APPEND -
APPEND_NO_FLUSH -
IGNORE -
FAIL
使用 IGNORE 和 FAIL 时,文件不会被传输。
FAIL 会导致抛出异常,而 IGNORE 会静默忽略传输(尽管会产生一条 DEBUG 日志条目)。
版本 4.3 引入了 chmod 属性,您可以在上传后使用它来更改远程文件的权限。
您可以使用传统的 Unix 八进制格式(例如,600 仅允许文件所有者进行读写)。
在使用 Java 配置适配器时,可以使用 setChmodOctal("600") 或 setChmod(0600)。
避免写入部分文件
在处理文件传输时,一个常见问题是可能处理了不完整的文件。 文件可能在传输尚未实际完成时就出现在文件系统中。
为了解决这个问题,Spring Integration SFTP 适配器使用一种通用算法:文件先以临时名称传输,待完全传输完成后再进行重命名。
默认情况下,正在传输的每个文件在文件系统中都会显示一个额外的后缀,默认情况下为 .writing。
您可以通过设置 temporary-file-suffix 属性来更改它。
然而,在某些情况下您可能不希望使用此技术(例如,如果服务器不允许重命名文件)。
对于此类情况,您可以通过将 use-temporary-file-name 设置为 false 来禁用此功能(默认值为 true)。
当该属性为 false 时,文件将以最终名称写入,消费应用程序需要其他机制来检测在访问文件之前文件是否已完全上传。
使用 Java 配置进行配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置出站适配器:
@SpringBootApplication
@IntegrationComponentScan
public class SftpJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToSftp(new File("/foo/bar.txt"));
}
@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(port);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
factory.setTestSession(true);
return new CachingSessionFactory<SftpClient.DirEntry>(factory);
}
@Bean
@ServiceActivator(inputChannel = "toSftpChannel")
public MessageHandler handler() {
SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpressionString("headers['remote-target-dir']");
handler.setFileNameGenerator(new FileNameGenerator() {
@Override
public String generateFileName(Message<?> message) {
return "handlerContent.test";
}
});
return handler;
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toSftpChannel")
void sendToSftp(File file);
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序展示了如何使用 Java DSL 配置出站适配器:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow sftpOutboundFlow() {
return IntegrationFlow.from("toSftpChannel")
.handle(Sftp.outboundAdapter(this.sftpSessionFactory, FileExistsMode.FAIL)
.useTemporaryFileName(false)
.remoteDirectory("/foo")
).get();
}
}
SFTP 出站网关
SFTP 出站网关提供了一组有限的命令,允许您与远程 SFTP 服务器进行交互:
-
ls(列出文件) -
nlst(列出文件名) -
get(检索文件) -
mget(检索多个文件) -
rm(移除文件) -
mv(移动和重命名文件) -
put(发送文件) -
mput(发送多个文件)
使用ls命令
ls 列出远程文件并支持以下选项:
-
-1: 获取文件名列表。 默认情况下,将获取FileInfo对象的列表 -
-a: 包含所有文件(包括以 '.' 开头的文件) -
-f: 不要对列表进行排序 -
-dirs: 包含目录(默认排除) -
-links: 包含符号链接(默认排除) -
-R: 递归列出远程目录
此外,文件名过滤以与 inbound-channel-adapter 相同的方式提供。
由 ls 操作产生的消息负载是文件名列表或 FileInfo 对象列表(取决于您是否使用了 -1 开关)。
这些对象提供诸如修改时间、权限等信息。
对ls命令操作的远程目录信息在file_remoteDirectory头中提供。
当使用递归选项(-R)时,fileName包含任何子目录元素,并代表文件的相对路径(相对于远程目录)。
如果您使用-dirs选项,则每个递归目录也将作为列表中的一个元素返回。
在这种情况下,我们建议您不要使用-1选项,因为您将无法区分文件和目录,而在使用FileInfo对象时您可以做到这一点。
如果远程路径列表以 / 符号开头,SFTP 将其视为绝对路径;否则视为当前用户主目录中的相对路径。
使用nlst命令
版本 5 引入对 nlst 命令的支持。
nlst 列出远程文件名,仅支持一个选项:
-
-f: 不要对列表进行排序
由 nlst 操作产生的消息负载是一个文件名列表。
file_remoteDirectory 头部保存了 nlst 命令所作用的远程目录。
SFTP协议不提供列出名称的功能。
此命令等同于带有-1选项的ls命令,此处添加是为了方便使用。
使用get命令
get 检索远程文件,并支持以下选项:
-
-P: 保留远程文件的时间戳。 -
-stream: 将远程文件作为流检索。 -
-D: 传输成功后删除远程文件。 如果传输被忽略,则不会删除远程文件,因为FileExistsMode为IGNORE且本地文件已存在。
The file_remoteDirectory 表头保存远程目录,file_remoteFile 表头保存文件名。
由 get 操作产生的消息负载是一个表示检索到的文件的 File 对象。
如果您使用了 -stream 选项,则负载将是一个 InputStream,而不是 File。
对于文本文件,一个常见的用例是将此操作与 文件分割器 或 流转换器 结合使用。
当以流的形式消费远程文件时,您需要负责在流消费完毕后关闭 Session。
为了方便起见,Session 已提供在 closeableResource 头中,并且 IntegrationMessageHeaderAccessor 提供了便捷方法:
Closeable closeable = new IntegrationMessageHeaderAccessor(message).getCloseableResource();
if (closeable != null) {
closeable.close();
}
以下示例演示如何以流的形式消费文件:
<int-sftp:outbound-gateway session-factory="ftpSessionFactory"
request-channel="inboundGetStream"
command="get"
command-options="-stream"
expression="payload"
remote-directory="ftpTarget"
reply-channel="stream" />
<int-file:splitter input-channel="stream" output-channel="lines" />
如果您在自定义组件中消费输入流,则必须关闭 Session。
您可以在自定义代码中执行此操作,或者将消息的副本路由到 service-activator 并使用 SpEL,如下例所示: |
<int:service-activator input-channel="closeSession"
expression="headers['closeableResource'].close()" />
使用mget命令
mget 根据模式检索多个远程文件,并支持以下选项:
-
-P: 保留远程文件的时间戳。 -
-R: 递归检索整个目录树。 -
-x: 如果没有文件匹配该模式,则抛出异常(否则返回空列表)。 -
-D: 在传输成功后删除每个远程文件。 如果传输被忽略,则不会删除远程文件,因为FileExistsMode为IGNORE且本地文件已存在。
由 mget 操作产生的消息负载是一个 List<File> 对象(即一个包含 File 个对象的 List,每个对象代表一个检索到的文件)。
从版本 5.0 开始,如果 FileExistsMode 为 IGNORE,输出消息的负载将不再包含因文件已存在而未获取的文件。
此前,数组包含所有文件,包括那些已经存在的文件。 |
您使用的表达式决定了远程路径应产生以 * 结尾的结果,例如 myfiles/* 会获取 myfiles 下的完整树结构。
从版本 5.0 开始,您可以使用递归 MGET,并结合 FileExistsMode.REPLACE_IF_MODIFIED 模式,定期将整个远程目录树同步到本地。
此模式会将本地文件的最后修改时间戳设置为远程文件的时间戳,无论是否启用 -P(保留时间戳)选项。
|
关于使用递归的笔记 (
-R)该模式将被忽略,并假定为 如果过滤了子目录,则不会对该子目录执行额外的遍历。 不允许使用 通常,您会在 |
持久化文件列表过滤器现在有一个布尔属性 forRecursion。
将此属性设置为 true,也会设置 alwaysAcceptDirectories,这意味着对外部网关(ls 和 mget)的递归操作现在将每次遍历完整的目录树。
这是为了解决目录树深处更改未被检测到的问题。
此外,forRecursion=true 会导致使用文件的完整路径作为元数据存储键;这解决了如果同一名称的文件出现在不同目录中多次时过滤器无法正常工作的问题。
重要提示:这意味着持久化元数据存储中的现有键将无法在顶层目录下的文件中找到。
因此,该属性默认值为 false;此行为可能在未来的版本中发生变化。
从版本 5.0 开始,您可以通过将 alwaysAcceptDirectorties 设置为 true 来配置 SftpSimplePatternFileListFilter 和 SftpRegexPatternFileListFilter,以始终传递目录。
这样做允许对简单模式进行递归,如下面的示例所示:
<bean id="starDotTxtFilter"
class="org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter">
<constructor-arg value="*.txt" />
<property name="alwaysAcceptDirectories" value="true" />
</bean>
<bean id="dotStarDotTxtFilter"
class="org.springframework.integration.sftp.filters.SftpRegexPatternFileListFilter">
<constructor-arg value="^.*\.txt$" />
<property name="alwaysAcceptDirectories" value="true" />
</bean>
您可以通过在网关上使用filter属性来提供这些过滤器之一。
另请参阅 出站网关部分成功 (mget 和 mput)。
使用put命令
put 向远程服务器发送文件。
消息的负载可以是 java.io.File、byte[] 或 String。
remote-filename-generator(或表达式)用于命名远程文件。
其他可用属性包括 remote-directory、temporary-remote-directory 及其对应的 *-expression 等价物:use-temporary-file-name 和 auto-create-directory。
有关更多信息,请参阅 架构文档。
由 put 操作产生的消息负载是一个 String,其中包含文件在服务器上传输后的完整路径。
版本 4.3 引入了 chmod 属性,用于在上传后更改远程文件的权限。
您可以使用传统的 Unix 八进制格式(例如,600 仅允许文件所有者进行读写)。
在使用 Java 配置适配器时,可以使用 setChmod(0600)。
使用mput命令
mput 向服务器发送多个文件,并支持以下选项:
-
-R: 递归 — 发送目录及子目录中的所有文件(可能经过过滤)
消息负载必须是一个代表本地目录的 java.io.File(或 String)。
自 5.1 版本起,也支持 File 或 String 的集合。
支持与 put 命令 相同的属性。
此外,您可以使用 mput-pattern、mput-regex、mput-filter 或 mput-filter-expression 之一来过滤本地目录中的文件。
该过滤器支持递归操作,只要子目录本身也通过过滤器即可继续递归。
未通过过滤器的子目录将不会被递归处理。
由 mput 操作生成的消息负载是一个 List<String> 对象(即,由传输产生的远程文件路径的 List)。
另请参阅 出站网关部分成功 (mget 和 mput)。
版本 4.3 引入了 chmod 属性,允许您在上传后更改远程文件的权限。
您可以使用标准的 Unix 八进制格式(例如,600 仅允许文件所有者进行读写)。
在使用 Java 配置适配器时,可以使用 setChmodOctal("600") 或 setChmod(0600)。
使用rm命令
命令 rm 没有选项。
如果移除操作成功,结果消息负载为Boolean.TRUE。
否则,消息负载为Boolean.FALSE。
file_remoteDirectory头包含远程目录,file_remoteFile头包含文件名。
使用mv命令
命令 mv 没有选项。
expression属性定义“源”路径,rename-expression属性定义“目标”路径。
默认情况下,rename-expression为headers['file_renameTo']。
该表达式的求值结果不能为null或空String。
如有必要,将创建所需的任何远程目录。
结果消息的负载为Boolean.TRUE。
file_remoteDirectory头信息包含原始远程目录,file_remoteFile头信息包含文件名。
file_renameTo头信息包含新路径。
从版本 5.5.6 开始,remoteDirectoryExpression 可用于 mv 命令以方便使用。
如果“from”文件不是完整文件路径,则使用 remoteDirectoryExpression 的结果作为远程目录。
“to”文件也适用同样的规则,例如,如果任务只是重命名某个目录中的远程文件。
附加命令信息
get 和 mget 命令支持 local-filename-generator-expression 属性。
它定义了一个 SpEL 表达式,用于在传输过程中生成本地文件的名称。
求值上下文的根对象是请求消息。
remoteFileName 变量也可用。
它特别适用于 mget(例如:local-filename-generator-expression="#remoteFileName.toUpperCase() + headers.foo")。
get和mget命令支持local-directory-expression属性。
它定义了一个 SpEL 表达式,用于在传输期间生成本地目录的名称。
求值上下文的根对象是请求消息。
remoteDirectory变量也可用。
这对于 mget(例如:local-directory-expression="'/tmp/local/' + #remoteDirectory.toUpperCase() + headers.myheader")特别有用。
此属性与local-directory属性互斥。
对于所有命令,网关的 'expression' 属性保存该命令所作用的路径。
对于 mget 命令,表达式可能求值为 *(表示检索所有文件)、somedirectory/*,以及其他以 * 结尾的值。
以下示例展示了一个为 ls 命令配置的网关:
<int-ftp:outbound-gateway id="gateway1"
session-factory="ftpSessionFactory"
request-channel="inbound1"
command="ls"
command-options="-1"
expression="payload"
reply-channel="toSplitter"/>
发送到toSplitter通道的消息负载是一个包含文件名的String对象列表。
如果省略了command-options="-1",则负载将是一个FileInfo对象的列表。
您可以提供以空格分隔的选项列表(例如,command-options="-1 -dirs -links")。
从 4.2 版本开始,GET、MGET、PUT和MPUT命令支持一个FileExistsMode属性(在使用命名空间支持时为mode)。
这会影响当本地文件存在时(GET和MGET)或远程文件存在时(PUT和MPUT)的行为。
支持的模式包括REPLACE、APPEND、FAIL和IGNORE。
为了向后兼容,PUT和MPUT操作的默认模式为REPLACE。
对于GET和MGET操作,默认值为FAIL。
使用 Java 配置进行配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置出站网关的示例:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new SftpOutboundGateway(ftpSessionFactory(), "ls", "'my_remote_dir/'");
}
}
使用 Java DSL 进行配置
下面的 Spring Boot 应用程序展示了如何使用 Java DSL 配置出站网关的示例:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
factory.setTestSession(true);
return new CachingSessionFactory<>(sf);
}
@Bean
public QueueChannelSpec remoteFileOutputChannel() {
return MessageChannels.queue();
}
@Bean
public IntegrationFlow sftpMGetFlow() {
return IntegrationFlow.from("sftpMgetInputChannel")
.handle(Sftp.outboundGateway(sftpSessionFactory(),
AbstractRemoteFileOutboundGateway.Command.MGET, "payload")
.options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE)
.regexFileNameFilter("(subSftpSource|.*1.txt)")
.localDirectoryExpression("'myDir/' + #remoteDirectory")
.localFilenameExpression("#remoteFileName.replaceFirst('sftpSource', 'localTarget')"))
.channel("remoteFileOutputChannel")
.get();
}
}
出站网关部分成功 (mget和mput)
当对多个文件执行操作时(使用 mget 和 mput),在传输一个或多个文件后的一段时间内可能会发生异常。
在这种情况下(从版本 4.2 开始),将抛出 PartialSuccessException。
除了常规的 MessagingException 属性(failedMessage 和 cause)之外,此异常还有两个附加属性:
-
partialResults: 成功的转账结果。 -
derivedInput: 从请求消息生成的文件列表(例如,用于mput传输的本地文件)。
这些属性可帮助您确定哪些文件已成功传输,哪些未能成功传输。
在递归mput的情况下,PartialSuccessException可能包含嵌套的PartialSuccessException实例。
考虑以下目录结构:
root/
|- file1.txt
|- subdir/
| - file2.txt
| - file3.txt
|- zoo.txt
如果异常发生在 file3.txt,则网关抛出的 PartialSuccessException 具有 derivedInput 的 file1.txt、subdir 和 zoo.txt,以及 partialResults 的 file1.txt。
其 cause 是另一个带有 derivedInput 的 file2.txt 和 file3.txt,以及 partialResults 的 file2.txt 的 PartialSuccessException。
消息会话回调
从 Spring Integration 4.2 版本开始,您可以使用 MessageSessionCallback<F, T> 实现配合 <int-sftp:outbound-gateway/>(SftpOutboundGateway)在 Session<SftpClient.DirEntry> 上执行任何操作,并使用 requestMessage 上下文。
您可以将其用于任何非标准或底层的 SFTP 操作(或多个操作),例如允许从集成流定义或函数式接口(lambda)实现注入中访问。
以下示例使用了 lambda:
@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler sftpOutboundGateway(SessionFactory<SftpClient.DirEntry> sessionFactory) {
return new SftpOutboundGateway(sessionFactory,
(session, requestMessage) -> session.list(requestMessage.getPayload()));
}
另一个例子可能是对发送或检索的文件数据进行预处理或后处理。
当使用 XML 配置时,<int-sftp:outbound-gateway/> 提供了一个 session-callback 属性,允许您指定 MessageSessionCallback Bean 的名称。
The session-callback is mutually exclusive with the command and expression attributes.
When configuring with Java, the SftpOutboundGateway class offers different constructors. |
Apache Mina SFTP 服务器事件
ApacheMinaSftpEventListener(在版本 5.2 中引入)会监听某些 Apache Mina SFTP 服务器事件,并将其发布为 ApplicationEvent,这些事件可由任何 ApplicationListener bean、@EventListener bean 方法或 事件入站通道适配器 接收。
目前支持的事件包括:
-
SessionOpenedEvent- 已打开客户端会话 -
DirectoryCreatedEvent- 已创建目录 -
FileWrittenEvent- 文件已写入 -
PathMovedEvent- 文件或目录已被重命名 -
PathRemovedEvent- 文件或目录已被删除 -
SessionClosedEvent- 客户端已断开连接
这些类都是 ApacheMinaSftpEvent 的子类;您可以配置一个单一的监听器来接收所有类型的事件。
每个事件的 source 属性是一个 ServerSession,您可以从中获取诸如客户端地址等信息;抽象事件上提供了一个便捷的 getSession() 方法。
要使用监听器(必须是一个 Spring bean)配置服务器,只需将其添加到 SftpSubsystemFactory 中:
server = SshServer.setUpDefaultServer();
...
SftpSubsystemFactory sftpFactory = new SftpSubsystemFactory();
sftpFactory.addSftpEventListener(apacheMinaSftpEventListenerBean);
...
要使用 Spring Integration 事件适配器来消费这些事件:
@Bean
public ApplicationEventListeningMessageProducer eventsAdapter() {
ApplicationEventListeningMessageProducer producer =
new ApplicationEventListeningMessageProducer();
producer.setEventTypes(ApacheMinaSftpEvent.class);
producer.setOutputChannel(eventChannel());
return producer;
}
远程文件信息
从版本 5.2 开始,SftpStreamingMessageSource(SFTP 流式传输入站通道适配器)、SftpInboundFileSynchronizingMessageSource(SFTP 入站通道适配器)以及 SftpOutboundGateway(SFTP 出站网关)的 "read" 命令会在生成的消息中提供额外的消息头,用于包含有关远程文件的信息:
-
FileHeaders.REMOTE_HOST_PORT- 远程会话在文件传输操作期间连接到的主机:端口对; -
FileHeaders.REMOTE_DIRECTORY- 操作已执行的远程目录; -
FileHeaders.REMOTE_FILE- 远程文件名;仅适用于单个文件操作。
由于 SftpInboundFileSynchronizingMessageSource 不会针对远程文件生成消息,而是使用本地副本,因此在同步操作期间,AbstractInboundFileSynchronizer 会以 URI 样式(protocol://host:port/remoteDirectory#remoteFileName)将关于远程文件的信息存储在 MetadataStore(可外部配置)中。
当轮询本地文件时,该元数据由 SftpInboundFileSynchronizingMessageSource 检索。
删除本地文件时,建议移除其对应的元数据条目。
为此,AbstractInboundFileSynchronizer 提供了一个 removeRemoteFileMetadata() 回调。
此外,还有一个 setMetadataStorePrefix() 可用于元数据键中。
建议在同一个 MetadataStore 实例在这些组件间共享时,使此前缀与基于 MetadataStore 的 FileListFilter 实现中使用的前缀不同,以避免条目被覆盖,因为过滤器和 AbstractInboundFileSynchronizer 都使用相同的本地文件名作为元数据条目的键。