FTP/FTPS适配器
FTP/FTPS 调用器
Spring Integration 提供了对通过 FTP 和 FTPS 进行文件传输操作的支持。
文件传输协议(FTP)是一种简单的网络协议,允许您在互联网上的两台计算机之间传输文件。 FTPS 代表“基于 SSL 的 FTP”。
您需要将以下依赖项包含到您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ftp</artifactId>
<version>6.0.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-ftp:6.0.9"
在 FTP 通信中涉及两个角色:客户端和服务器。 要使用 FTP 或 FTPS 传输文件,您需要一个客户端来发起与运行 FTP 服务器的远程计算机的连接。 连接建立后,客户端可以选择发送或接收文件的副本。
Spring Integration 支持通过提供三个客户端端点:入站通道适配器、出站通道适配器和出站网关,来发送和接收通过 FTP 或 FTPS 的文件。 它还提供了基于命名空间的便捷配置选项,用于定义这些客户端组件。
要使用 FTP 命名空间,请在 XML 文件的头部添加以下内容:
xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
xsi:schemaLocation="http://www.springframework.org/schema/integration/ftp
https://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd"
FTP 会话工厂
Spring Integration 提供了可用于创建 FTP(或 FTPS)会话的工厂类。
默认工厂
| 从版本 3.0 开始,会话默认不再被缓存。 请参见 FTP 会话缓存。 |
在配置 FTP 适配器之前,必须先配置 FTP 会话工厂。
您可以使用常规 Bean 定义来配置 FTP 会话工厂,其中实现类为 o.s.i.ftp.session.DefaultFtpSessionFactory。
以下示例展示了一个基本配置:
<bean id="ftpClientFactory"
class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
<property name="host" value="localhost"/>
<property name="port" value="22"/>
<property name="username" value="kermit"/>
<property name="password" value="frog"/>
<property name="clientMode" value="0"/>
<property name="fileType" value="2"/>
<property name="bufferSize" value="100000"/>
</bean>
对于 FTPS 连接,您可以使用 o.s.i.ftp.session.DefaultFtpsSessionFactory 代替。
下面的示例展示了一个完整的配置:
<bean id="ftpClientFactory"
class="org.springframework.integration.ftp.session.DefaultFtpsSessionFactory">
<property name="host" value="localhost"/>
<property name="port" value="22"/>
<property name="username" value="oleg"/>
<property name="password" value="password"/>
<property name="clientMode" value="1"/>
<property name="fileType" value="2"/>
<property name="useClientMode" value="true"/>
<property name="cipherSuites" value="a,b.c"/>
<property name="keyManager" ref="keyManager"/>
<property name="protocol" value="SSL"/>
<property name="trustManager" ref="trustManager"/>
<property name="prot" value="P"/>
<property name="needClientAuth" value="true"/>
<property name="authValue" value="oleg"/>
<property name="sessionCreation" value="true"/>
<property name="protocols" value="SSL, TLS"/>
<property name="implicit" value="true"/>
</bean>
如果您遇到连接问题,并希望跟踪会话创建以及查看哪些会话被轮询,您可以通过将日志记录器设置为 TRACE 级别(例如 log4j.category.org.springframework.integration.file=TRACE)来启用会话跟踪。 |
现在您只需将这些会话工厂注入到您的适配器中。 适配器使用的协议(FTP 或 FTPS)取决于已注入到该适配器的会话工厂类型。
| 为 FTP 或 FTPS 会话工厂提供值的一种更实用的方法是使用 Spring 的属性占位符支持(参见 https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#beans-factory-placeholderconfigurer)。 |
高级配置
DefaultFtpSessionFactory 提供了对底层客户端 API 的抽象,该 API(自 Spring Integration 2.0 起)为 Apache Commons Net。
这使您无需处理 org.apache.commons.net.ftp.FTPClient 的低级配置细节。
会话工厂上暴露了几个常用属性(自版本 4.0 起,现在包括 connectTimeout、defaultTimeout 和 dataTimeout)。
然而,有时您需要访问更低级别的 FTPClient 配置以实现更高级的配置(例如设置主动模式的端口范围)。
为此,AbstractFtpSessionFactory(所有 FTP 会话工厂的基类)以如下清单中所示的两个后处理方法的形式提供了钩子:
/**
* Will handle additional initialization after client.connect() method was invoked,
* but before any action on the client has been taken
*/
protected void postProcessClientAfterConnect(T t) throws IOException {
// NOOP
}
/**
* Will handle additional initialization before client.connect() method was invoked.
*/
protected void postProcessClientBeforeConnect(T client) throws IOException {
// NOOP
}
如您所见,这两个方法没有默认实现。
然而,通过扩展 DefaultFtpSessionFactory,您可以重写这些方法以提供更高级的 FTPClient 配置,如下例所示:
public class AdvancedFtpSessionFactory extends DefaultFtpSessionFactory {
protected void postProcessClientBeforeConnect(FTPClient ftpClient) throws IOException {
ftpClient.setActivePortRange(4000, 5000);
}
}
FTPS 与共享 SSL 会话
当使用基于 SSL 或 TLS 的 FTP 时,某些服务器要求在控制连接和数据连接上使用相同的 SSLSession。
这是为了防止“窃取”数据连接。
有关更多信息,请参阅 https://scarybeastsecurity.blogspot.cz/2009/02/vsftpd-210-released.html。
目前,Apache FTPSClient 不支持此功能。 请参阅 NET-408。
以下解决方案由Stack Overflow提供,它使用了sun.security.ssl.SSLSessionContextImpl上的反射,因此在其他 JVM 上可能无法工作。
该 Stack Overflow 答案提交于 2015 年,Spring Integration 团队已在 JDK 1.8.0_112 上对该解决方案进行了测试。
以下示例展示了如何创建 FTPS 会话:
@Bean
public DefaultFtpsSessionFactory sf() {
DefaultFtpsSessionFactory sf = new DefaultFtpsSessionFactory() {
@Override
protected FTPSClient createClientInstance() {
return new SharedSSLFTPSClient();
}
};
sf.setHost("...");
sf.setPort(21);
sf.setUsername("...");
sf.setPassword("...");
sf.setNeedClientAuth(true);
return sf;
}
private static final class SharedSSLFTPSClient extends FTPSClient {
@Override
protected void _prepareDataSocket_(final Socket socket) throws IOException {
if (socket instanceof SSLSocket) {
// Control socket is SSL
final SSLSession session = ((SSLSocket) _socket_).getSession();
final SSLSessionContext context = session.getSessionContext();
context.setSessionCacheSize(0); // you might want to limit the cache
try {
final Field sessionHostPortCache = context.getClass()
.getDeclaredField("sessionHostPortCache");
sessionHostPortCache.setAccessible(true);
final Object cache = sessionHostPortCache.get(context);
final Method method = cache.getClass().getDeclaredMethod("put", Object.class,
Object.class);
method.setAccessible(true);
String key = String.format("%s:%s", socket.getInetAddress().getHostName(),
String.valueOf(socket.getPort())).toLowerCase(Locale.ROOT);
method.invoke(cache, key, session);
key = String.format("%s:%s", socket.getInetAddress().getHostAddress(),
String.valueOf(socket.getPort())).toLowerCase(Locale.ROOT);
method.invoke(cache, key, session);
}
catch (NoSuchFieldException e) {
// Not running in expected JRE
logger.warn("No field sessionHostPortCache in SSLSessionContext", e);
}
catch (Exception e) {
// Not running in expected JRE
logger.warn(e.getMessage());
}
}
}
}
委托会话工厂
版本 4.2 引入了 DelegatingSessionFactory,允许在运行时选择实际的会话工厂。
在调用 FTP 端点之前,请在该工厂上调用 setThreadKey(),将键与当前线程关联。
随后使用该键查找要使用的实际会话工厂。
使用完毕后,可通过调用 clearThreadKey() 来清除该键。
我们添加了便捷方法,以便您可以轻松地从消息流中使用委托会话工厂。
以下示例展示了如何声明一个委托会话工厂:
<bean id="dsf" class="org.springframework.integration.file.remote.session.DelegatingSessionFactory">
<constructor-arg>
<bean class="o.s.i.file.remote.session.DefaultSessionFactoryLocator">
<!-- delegate factories here -->
</bean>
</constructor-arg>
</bean>
<int:service-activator input-channel="in" output-channel="c1"
expression="@dsf.setThreadKey(#root, headers['factoryToUse'])" />
<int-ftp:outbound-gateway request-channel="c1" reply-channel="c2" ... />
<int:service-activator input-channel="c2" output-channel="out"
expression="@dsf.clearThreadKey(#root)" />
当您使用会话缓存(参见 FTP 会话缓存)时,每个委托对象都应当被缓存。
您无法缓存 DelegatingSessionFactory 本身。 |
从版本 5.0.7 开始,DelegatingSessionFactory 可与 RotatingServerAdvice 配合使用以轮询多个服务器;请参见 入站通道适配器:轮询多个服务器和目录。
FTP 入站通道适配器
FTP 入站通道适配器是一个特殊的监听器,它连接到 FTP 服务器并监听远程目录事件(例如新文件创建),此时它将启动文件传输。
以下示例展示了如何配置一个 inbound-channel-adapter:
<int-ftp:inbound-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="ftpSessionFactory"
auto-create-local-directory="true"
delete-remote-files="true"
filename-pattern="*.txt"
remote-directory="some/remote/path"
remote-file-separator="/"
preserve-timestamp="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
local-directory=".">
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
如前所述配置所示,您可以通过使用 inbound-channel-adapter 元素来配置 FTP 入站通道适配器,同时为各种属性提供值,例如 local-directory、filename-pattern(基于简单模式匹配,而非正则表达式),以及对 session-factory 的引用。
默认情况下,传输的文件会保留与原始文件相同的名称。
如果您想覆盖此行为,可以设置 local-filename-generator-expression 属性,该属性允许您提供一个 SpEL 表达式来生成本地文件的名称。
与出站网关和适配器不同(在这些场景中,SpEL 求值上下文的根对象是 Message),此入站适配器在求值时尚未拥有消息,因为它最终会使用传输的文件作为有效负载来生成消息。
因此,SpEL 求值上下文的根对象是远程文件的原始名称(一个 String)。
入站通道适配器首先为本地目录检索 File 对象,然后根据轮询配置发出每个文件。
从版本 5.0 开始,当需要获取新文件时,您可以限制从 FTP 服务器获取的文件数量。
当目标文件非常大或在具有持久化文件列表过滤器(稍后讨论)的集群系统中运行时,这非常有益。
请使用 max-fetch-size 实现此目的。
负值(默认值)表示没有限制,将检索所有匹配的文件。
有关更多信息,请参阅 入站通道适配器:控制远程文件获取。
自版本 5.0 起,您还可以通过设置 scanner 属性,向 inbound-channel-adapter 提供自定义的 DirectoryScanner 实现。
从 Spring Integration 3.0 开始,您可以指定 preserve-timestamp 属性(其默认值为 false)。
当设置为 true 时,本地文件的修改时间戳将被设置为从服务器检索到的值。
否则,它将设置为当前时间。
从版本 4.2 开始,您可以指定 remote-directory-expression 代替 remote-directory,从而在每次轮询时动态确定目录——例如 remote-directory-expression="@myBean.determineRemoteDir()"。
从 4.3 版本开始,您可以省略 remote-directory 和 remote-directory-expression 属性。
它们默认值为 null。
在这种情况下,根据 FTP 协议,客户端工作目录将用作默认远程目录。
有时,基于使用 filename-pattern 属性指定的简单模式进行的文件过滤可能不够用。
如果是这种情况,您可以使用 filename-regex 属性来指定正则表达式(例如 filename-regex=".*\.test$")。
此外,如果您需要完全的控制权,可以使用 filter 属性并提供对任何自定义实现的 o.s.i.file.filters.FileListFilter 的引用,这是一个用于过滤文件列表的策略接口。
此过滤器决定哪些远程文件会被检索。
您还可以通过使用 CompositeFileListFilter 将基于模式的过滤器与其他过滤器(例如用于避免同步先前已获取文件的 AcceptOnceFileListFilter)组合起来。
AcceptOnceFileListFilter将状态存储在内存中。
如果您希望状态在系统重启后仍然存在,请考虑使用FtpPersistentAcceptOnceFileListFilter代替。
此过滤器将接受的文件名存储在一个MetadataStore策略的实例中(请参阅元数据存储)。
此过滤器基于文件名和远程修改时间进行匹配。
自版本 4.0 起,此过滤器需要一个 ConcurrentMetadataStore。
当与共享数据存储(例如使用 RedisMetadataStore 的 Redis)配合使用时,它允许过滤器键在多个应用程序或服务器实例之间共享。
从版本 5.0 开始,FtpPersistentAcceptOnceFileListFilter 用于内存中的 SimpleMetadataStore 默认应用于 FtpInboundFileSynchronizer。
该过滤器也适用于 XML 配置中的 regex 或 pattern 选项,以及 Java DSL 中的 FtpInboundChannelAdapterSpec。
任何其他用例都可以通过 CompositeFileListFilter(或 ChainFileListFilter)进行管理。
前面的讨论涉及在检索文件之前进行过滤。
一旦文件被检索,将对文件系统上的文件应用额外的过滤器。
默认情况下,这是一个AcceptOnceFileListFilter,如前所述,它在内存中保留状态且不考虑文件的修改时间。
除非您的应用程序在处理后删除文件,否则适配器将在应用程序重启后默认重新处理磁盘上的文件。
此外,如果您配置filter使用FtpPersistentAcceptOnceFileListFilter,并且远程文件的时间戳发生变化(导致重新获取该文件),默认本地过滤器将不允许处理此新文件。
有关此筛选器的更多信息及其用法,请参阅远程持久文件列表筛选器。
您可以使用 local-filter 属性来配置本地文件系统过滤器的行为。
从版本 4.3.8 开始,默认配置为 FileSystemPersistentAcceptOnceFileListFilter。
该过滤器将接受的文件名和修改时间戳存储在 MetadataStore 策略的实例中(请参阅 元数据存储),并检测本地文件修改时间的变化。
默认的 MetadataStore 是 SimpleMetadataStore,它将状态存储在内存中。
自 4.1.5 版本起,这些过滤器新增了一个属性(flushOnUpdate),该属性会导致它们在每次更新时刷新元数据存储(如果该存储实现了 Flushable)。
此外,如果您使用分布式MetadataStore(例如Redis),您可以拥有多个相同的适配器或应用程序实例,并确保每个文件仅被处理一次。 |
实际的本地过滤器是一个包含所提供过滤器的CompositeFileListFilter,以及一个模式过滤器,用于防止处理正在下载的文件(基于temporary-file-suffix)。
文件使用此后缀进行下载(默认为.writing),当传输完成时,文件会被重命名为其最终名称,从而对过滤器变为‘可见’。
remote-file-separator 属性允许您配置一个文件分隔符字符,以便在默认值 '/' 不适用于您的特定环境时使用。
有关这些属性的更多详细信息,请参阅 schema。
您还应理解,FTP 入站通道适配器是一个轮询消费者。
因此,您必须配置一个轮询器(通过使用全局默认值或局部子元素)。
一旦文件传输完成,就会生成一条以其负载为 java.io.File 的消息,并发送到由 channel 属性标识的通道中。
有关文件过滤和未完成文件的更多信息
有时,刚刚出现在监控(远程)目录中的文件尚不完整。
通常,此类文件会先使用临时扩展名(例如 somefile.txt.writing)进行写入,待写入过程完成后才会重命名。
在大多数情况下,您只关心完整的文件,并希望仅筛选出已完成的文件。
为了处理这些场景,您可以使用由 filename-pattern、filename-regex 和 filter 属性提供的过滤支持。
以下示例使用了自定义过滤器实现:
<int-ftp:inbound-channel-adapter
channel="ftpChannel"
session-factory="ftpSessionFactory"
filter="customFilter"
local-directory="file:/my_transfers">
remote-directory="some/remote/path"
<int:poller fixed-rate="1000"/>
</int-ftp:inbound-channel-adapter>
<bean id="customFilter" class="org.example.CustomFilter"/>
Inbound FTP 适配器轮询配置说明
入站 FTP 适配器的职责包括两项任务:
-
与远程服务器通信,以便将文件从远程目录传输到本地目录。
-
对于每个传输的文件,生成一个以该文件为负载的消息,并将其发送到由 'channel' 属性标识的通道。 这就是它们被称为“通道适配器”(channel adapters)而不是仅仅称为“适配器”的原因。 此类适配器的主要工作是生成要发送到消息通道的消息。 本质上,第二个任务优先执行:如果本地目录中已经存在一个或多个文件,它会首先生成这些文件的消息。 只有当所有本地文件都处理完毕后,它才会发起远程通信以获取更多文件。
此外,在配置轮询器的触发器时,您应密切关注 max-messages-per-poll 属性。
其默认值为 1,适用于所有 SourcePollingChannelAdapter 实例(包括 FTP)。
这意味着,一旦处理完一个文件,它就会等待由您的触发器配置确定的下一次执行时间。
如果您恰好有在一个或多个文件驻留在 local-directory 中,那么它会在与远程 FTP 服务器建立通信之前先处理这些文件。
另外,如果 max-messages-per-poll 设置为 1(默认值),则它每次只处理一个文件,间隔时间由您的触发器定义,本质上相当于“一次轮询 === 一个文件”。
对于典型的文件传输用例,您最可能希望具有相反的行为:即针对每次轮询处理尽可能多的文件,然后才等待下一次轮询。
如果是这种情况,请将 max-messages-per-poll 设置为 -1。
这样,在每次轮询时,适配器将尝试生成尽可能多的消息。
换句话说,它会先处理本地目录中的所有文件,然后连接到远程目录,将那里可用的所有文件传输到本地进行处理。
只有完成这些操作后,轮询操作才被视为完成,轮询器才会等待下一次执行时间。
您也可以将'max-messages-per-poll'值设置为一个正数,该值表示每次轮询时从文件中创建的消息数量的上限。
例如,值为10意味着在每次轮询时,它尝试处理不超过十个文件。
从故障中恢复
理解适配器的架构非常重要。
存在一个文件同步器用于获取文件,以及一个FileReadingMessageSource为每个同步后的文件发送消息。
如前所述,涉及两个过滤器。
filter属性(及模式)引用远程(FTP)文件列表,以避免获取已获取的文件。
local-filter被FileReadingMessageSource用来确定哪些文件应作为消息发送。
同步器列出远程文件并查询其过滤器。
随后传输这些文件。
如果在文件传输过程中发生 IO 错误,则已添加到过滤器的任何文件都将被移除,以便在下一次轮询时能够重新获取。
这仅适用于实现了 ReversibleFileListFilter 的过滤器(例如 AcceptOnceFileListFilter)。
如果在同步文件后,下游流在处理文件时发生错误,则不会自动回滚过滤器,因此默认情况下失败的文件不会被重新处理。
如果您希望在失败后重新处理此类文件,可以使用类似于以下的配置,以便从过滤器中移除失败的文件:
<int-ftp:inbound-channel-adapter id="ftpAdapter"
session-factory="ftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/ftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-ftp:inbound-channel-adapter>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>
<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />
上述配置适用于任何 ResettableFileListFilter。
从版本 5.0 开始,入站通道适配器可以在本地构建与生成的本地文件名相对应的子目录。
这也可以是一个远程子路径。
为了能够根据层次结构支持递归读取本地目录以进行修改,现在可以提供一个内部的 FileReadingMessageSource,并基于 Files.walk() 算法使用新的 RecursiveDirectoryScanner。
有关更多信息,请参见 AbstractInboundFileSynchronizingMessageSource.setScanner()。
此外,您现在可以使用 setUseWatchService() 选项将 AbstractInboundFileSynchronizingMessageSource 切换为基于 WatchService 的 DirectoryScanner。
它也已配置为所有 WatchEventType 实例,以便对本地目录中的任何修改做出反应。
前面显示的重新处理示例基于 FileReadingMessageSource.WatchServiceDirectoryScanner 的内置功能,当文件 (StandardWatchEventKinds.ENTRY_DELETE) 从本地目录中删除时执行 ResettableFileListFilter.remove()。
有关更多信息,请参见 WatchServiceDirectoryScanner。
使用 Java 配置进行配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置来配置入站适配器:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> ftpMessageSource() {
FtpInboundFileSynchronizingMessageSource source =
new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
source.setLocalDirectory(new File("ftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序示例展示了如何使用 Java DSL 配置入站适配器:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlow
.from(Ftp.inboundAdapter(this.ftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilename(f -> f.toUpperCase() + ".a")
.localDirectory(new File("d:\\ftp_files")),
e -> e.id("ftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
FTP 流式入站通道适配器
版本 4.3 引入了流式入站通道适配器。
此适配器生成负载类型为 InputStream 的消息,允许在无需写入本地文件系统的情况下获取文件。
由于会话保持打开状态,因此消费应用程序负责在文件被消费后关闭会话。
该会话通过 closeableResource 标头提供(IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE)。
标准框架组件,例如 FileSplitter 和 StreamTransformer,会自动关闭会话。
有关这些组件的更多信息,请参阅 文件拆分器 和 流转换器。
以下示例展示了如何配置 inbound-streaming-channel-adapter:
<int-ftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-ftp:inbound-streaming-channel-adapter>
只允许 filename-pattern、filename-regex、filter 或 filter-expression 中的一个。
从 5.0 版本开始,默认情况下,FtpStreamingMessageSource 适配器会基于内存中的 SimpleMetadataStore 防止远程文件出现重复(针对 FtpPersistentAcceptOnceFileListFilter)。
默认情况下,此过滤器也应用于文件名模式(或正则表达式)。
如果您需要允许重复,可以使用 AcceptAllFileListFilter。
其他任何用例均可通过 CompositeFileListFilter(或 ChainFileListFilter)处理。
Java 配置(本文档后文部分)展示了一种在处理后删除远程文件以避免重复的技术。 |
有关 FtpPersistentAcceptOnceFileListFilter 的更多信息及其用法,请参阅 远程持久文件列表过滤器。
使用 max-fetch-size 属性来限制在需要获取时每次轮询获取的文件数量。
将其设置为 1,并在集群环境中运行时使用持久化过滤器。
有关更多信息,请参阅 入站通道适配器:控制远程文件获取。
适配器将远程目录和文件名分别放入 FileHeaders.REMOTE_DIRECTORY 和 FileHeaders.REMOTE_FILE 头中。
从版本 5.0 开始,FileHeaders.REMOTE_FILE_INFO 头提供额外的远程文件信息(默认以 JSON 表示)。
如果您将 FtpStreamingMessageSource 上的 fileInfoJson 属性设置为 false,则该头包含一个 FtpFileInfo 对象。
可以通过使用 FtpFileInfo.getFileInfo() 方法访问底层 Apache Net 库提供的 FTPFile 对象。
当使用 XML 配置时,fileInfoJson 属性不可用,但您可以通过将 FtpStreamingMessageSource 注入到其中一个配置类中来设置它。
另请参阅 远程文件信息。
从 5.1 版本开始,comparator 的泛型类型为 FTPFile。
此前,它为 AbstractFileInfo<FTPFile>。
这是因为排序现在在处理的更早阶段进行,即在过滤和应用 maxFetch 之前完成。
使用 Java 配置进行配置
以下 Spring Boot 应用示例展示了如何使用 Java 配置来配置入站适配器:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
messageSource.setRemoteDirectory("ftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public FtpRemoteFileTemplate template() {
return new FtpRemoteFileTemplate(ftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
请注意,在此示例中,转换器下游的消息处理器具有一个 advice,用于在处理完成后删除远程文件。
入站通道适配器:轮询多个服务器和目录
从版本 5.0.7 开始,RotatingServerAdvice 可用;当配置为轮询建议时,入站适配器可以轮询多个服务器和目录。
按常规方式配置该建议并将其添加到轮询器的建议链中。
使用 DelegatingSessionFactory 来选择服务器,有关更多信息请参阅 委托会话工厂。
建议配置由一个 RotationPolicy.KeyDirectory 对象列表组成。
@Bean
public RotatingServerAdvice advice() {
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}
此建议将轮询服务器 foo 上的目录 foo,直到不存在新文件,然后移动到目录 bar,接着移动到服务器 two 上的目录 baz,依此类推。
此默认行为可以通过 fair 构造函数参数进行修改:
@Bean
public RotatingServerAdvice advice() {
...
return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}
在这种情况下,无论前一次轮询是否返回文件,建议都会移动到下一个服务器/目录。
或者,您可以提供自己的 RotationPolicy 以根据需要重新配置消息源:
public interface RotationPolicy {
void beforeReceive(MessageSource<?> source);
void afterReceive(boolean messageReceived, MessageSource<?> source);
}
和
@Bean
public RotatingServerAdvice advice() {
return new RotatingServerAdvice(myRotationPolicy());
}
local-filename-generator-expression 属性(在同步器上为 localFilenameGeneratorExpression)现在可以包含 #remoteDirectory 变量。
这允许从不同目录检索的文件下载到本地类似的目录中:
@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from(Ftp.inboundAdapter(sf())
.filter(new FtpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.localDirectory(new File(tmpDir))
.localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
.channel(MessageChannels.queue("files"))
.get();
}
当使用此建议时,请勿在轮询器上配置TaskExecutor;有关更多信息,请参阅消息源的有条件轮询器。 |
入站通道适配器:控制远程文件获取
在配置入站通道适配器时,您应该考虑两个属性。
max-messages-per-poll,与所有轮询器一样,可用于限制每次轮询发出的消息数量(如果准备就绪的消息多于配置的值)。
max-fetch-size(自5.0版本起)可限制一次从远程服务器获取的文件数量。
以下场景假设起始状态是一个空的本地目录:
-
max-messages-per-poll=2和max-fetch-size=1:适配器获取一个文件,发出它,获取下一个文件,发出它,然后进入休眠状态直到下一次轮询。 -
max-messages-per-poll=2andmax-fetch-size=2): 适配器会获取这两个文件,然后依次发出每一个。 -
max-messages-per-poll=2和max-fetch-size=4:适配器最多获取四个文件(如果可用),并输出前两个(如果至少有两个)。 接下来的两个文件将在下一次轮询时输出。 -
max-messages-per-poll=2和max-fetch-size未指定:适配器会获取所有远程文件,并输出前两个(如果至少有两个)。 后续的文件将在后续的轮询中输出(每次两个)。 当所有文件都被消费后,将再次尝试获取远程文件,以捕获任何新文件。
当您部署应用程序的多个实例时,我们建议设置一个较小的 max-fetch-size,以避免某个实例“抢占”所有文件而导致其他实例资源匮乏。 |
另一个使用 max-fetch-size 的场景是,如果您希望停止获取远程文件,但继续处理已经获取的文件。
在 MessageSource 上设置 maxFetchSize 属性(通过编程方式、JMX 或 控制总线)将有效地阻止适配器获取更多文件,但允许轮询器继续为之前已获取的文件发出消息。
如果更改属性时轮询器处于活动状态,则该更改将在下一次轮询时生效。
从版本 5.1 开始,同步器可以接收一个 Comparator<FTPFile>。
当限制获取的文件数量为 maxFetchSize 时,这非常有用。
FTP 出站通道适配器
FTP 出站通道适配器依赖于一个 MessageHandler 实现,该实现连接到 FTP 服务器,并为每个接收到的消息负载中的文件发起 FTP 传输。
它还支持多种文件表示形式,因此您不仅限于使用 java.io.File 类型的负载。
FTP 出站通道适配器支持以下负载:
-
java.io.File: 实际的文件对象 -
byte[]: 表示文件内容的字节数组 -
java.lang.String: 表示文件内容的文本 -
java.io.InputStream: 要传输到远程文件的数据流 -
org.springframework.core.io.Resource: 用于向远程文件传输数据的一个资源
以下示例展示了如何配置一个 outbound-channel-adapter:
<int-ftp:outbound-channel-adapter id="ftpOutbound"
channel="ftpChannel"
session-factory="ftpSessionFactory"
charset="UTF-8"
remote-file-separator="/"
auto-create-directory="true"
remote-directory-expression="headers['remote_dir']"
temporary-remote-directory-expression="headers['temp_remote_dir']"
filename-generator="fileNameGenerator"
use-temporary-filename="true"
chmod="600"
mode="REPLACE"/>
上述配置展示了如何使用 outbound-channel-adapter 元素配置 FTP 出站通道适配器,同时为各种属性提供值,例如 filename-generator(o.s.i.file.FileNameGenerator 策略接口的一个实现)、对 session-factory 的引用以及其他属性。
您还可以看到一些 *expression 属性的示例,这些属性允许您使用 SpEL 来配置设置,如 remote-directory-expression、temporary-remote-directory-expression 和 remote-filename-generator-expression(filename-generator 的 SpEL 替代方案,如上例所示)。
与任何允许使用 SpEL 的组件一样,可以通过 'payload' 和 'headers' 变量访问负载和消息头。
有关可用属性的更多详细信息,请参阅 模式。
默认情况下,如果未指定文件名生成器,Spring Integration 将使用 o.s.i.file.DefaultFileNameGenerator。
DefaultFileNameGenerator 根据 MessageHeaders 中 file_name 头部的值(如果存在)来确定文件名;或者,如果消息的负载已经是 java.io.File,则使用该文件的原始名称。 |
定义某些值(例如 remote-directory)可能依赖于平台或 FTP 服务器。
例如,正如在 https://forum.spring.io/showthread.php?p=333478&posted=1#post333478 中报道的那样,在某些平台上,您必须在目录定义的末尾添加一个斜杠(例如,使用 remote-directory="/thing1/thing2/" 而不是 remote-directory="/thing1/thing2")。 |
从版本 4.1 开始,您可以在传输文件时指定 mode。
默认情况下,现有文件将被覆盖。
这些模式由 FileExistsMode 枚举定义,包含以下值:
-
REPLACE(默认) -
REPLACE_IF_MODIFIED -
APPEND -
APPEND_NO_FLUSH -
IGNORE -
FAIL
IGNORE 和 FAIL 不会传输文件。
FAIL 会导致抛出异常,而 IGNORE 会静默忽略传输(尽管会产生一条 DEBUG 日志条目)。
版本 5.2 引入了 chmod 属性,您可以在上传后使用它来更改远程文件的权限。
您可以使用传统的 Unix 八进制格式(例如,600 仅允许文件所有者具有读写权限)。
在使用 Java 配置适配器时,可以使用 setChmodOctal("600") 或 setChmod(0600)。
仅当您的 FTP 服务器支持 SITE CHMOD 子命令时才适用。
避免写入部分文件
在处理文件传输时,一个常见的问题是可能会处理到不完整的文件。 也就是说,文件可能在传输尚未真正完成时就出现在文件系统中。
为了解决这个问题,Spring Integration FTP 适配器使用了一种通用算法:文件在完全传输之前会先以临时名称进行传输,传输完成后再进行重命名。
默认情况下,每个正在传输的文件在文件系统中都会带有一个额外的后缀,默认值为 .writing。
您可以通过设置 temporary-file-suffix 属性来更改此后缀。
然而,可能存在您不想使用此技术的情况(例如,如果服务器不允许重命名文件)。
对于此类情况,您可以通过将 use-temporary-file-name 设置为 false 来禁用此功能(默认值为 true)。
当此属性为 false 时,文件将以最终名称写入,且消费应用程序需要其他机制来检测文件在访问前是否已完全上传。
使用 Java 配置进行配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置来配置出站适配器:
@SpringBootApplication
@IntegrationComponentScan
public class FtpJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToFtp(new File("/foo/bar.txt"));
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
FtpMessageHandler handler = new FtpMessageHandler(ftpSessionFactory());
handler.setRemoteDirectoryExpressionString("headers['remote-target-dir']");
handler.setFileNameGenerator(new FileNameGenerator() {
@Override
public String generateFileName(Message<?> message) {
return "handlerContent.test";
}
});
return handler;
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toFtpChannel")
void sendToFtp(File file);
}
}
使用 Java DSL 进行配置
以下 Spring Boot 应用程序展示了如何使用 Java DSL 配置出站适配器:
@SpringBootApplication
@IntegrationComponentScan
public class FtpJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToFtp(new File("/foo/bar.txt"));
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public IntegrationFlow ftpOutboundFlow() {
return IntegrationFlow.from("toFtpChannel")
.handle(Ftp.outboundAdapter(ftpSessionFactory(), FileExistsMode.FAIL)
.useTemporaryFileName(false)
.fileNameExpression("headers['" + FileHeaders.FILENAME + "']")
.remoteDirectory(this.ftpServer.getTargetFtpDirectory().getName())
).get();
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toFtpChannel")
void sendToFtp(File file);
}
}
FTP 出站网关
FTP 出站网关提供了一组有限的命令,用于与远程 FTP 或 FTPS 服务器进行交互。 支持的命令包括:
-
ls(列出文件) -
nlst(列出文件名) -
get(检索文件) -
mget(检索文件) -
rm(移除文件) -
mv(移动/重命名文件) -
put(发送文件) -
mput(发送多个文件)
使用ls命令
ls 列出远程文件并支持以下选项:
-
-1: 获取文件名列表。 默认情况下,将获取FileInfo对象的列表。 -
-a: 包含所有文件(包括以 '.' 开头的文件) -
-f: 不要对列表进行排序 -
-dirs: 包含目录(默认情况下会被排除) -
-links: 包含符号链接(默认情况下会被排除) -
-R: 递归列出远程目录
此外,还提供了文件名过滤功能,其方式与 inbound-channel-adapter 相同。
请参阅 FTP 入站通道适配器。
由 ls 操作生成的消息负载是文件名列表或 FileInfo 对象列表。
这些对象提供修改时间、权限及其他详细信息。
对ls命令操作的远程目录信息在file_remoteDirectory头中提供。
当使用递归选项(-R)时,fileName 包含任何子目录元素,表示相对于文件的相对路径(相对于远程目录)。
如果包含 -dirs 选项,则每个递归目录也将作为列表中的一个元素返回。
在这种情况下,建议您不要使用 -1 选项,因为您将无法区分文件和目录,而通过 FileInfo 对象可以做到这一点。
从 4.3 版本开始,FtpSession 支持为 list() 和 listNames() 方法使用 null。
因此,您可以省略 expression 属性。
为了便于使用,Java 配置提供了两个不带 expression 参数的构造函数。
对于 LS、NLST、PUT 和 MPUT 命令,null 被视为客户端工作目录,符合 FTP 协议规定。
所有其他命令都必须提供 expression,以便根据请求消息评估远程路径。
当您扩展 DefaultFtpSessionFactory 并实现 postProcessClientAfterConnect() 回调时,可以使用 FTPClient.changeWorkingDirectory() 函数来设置工作目录。
使用nlst命令
版本 5 引入对 nlst 命令的支持。
nlst 列出远程文件名,仅支持一个选项:
-
-f: 不要对列表进行排序
由 nlst 操作产生的消息负载是一个文件名列表。
对nlst命令操作的远程目录信息在file_remoteDirectory头中提供。
与使用LIST命令的ls命令的-1选项不同,nlst命令会向目标 FTP 服务器发送NLST命令。
当服务器不支持LIST(例如由于安全限制)时,此命令非常有用。
nlst操作的结果是不包含其他详细信息的名称列表。
因此,框架无法判断实体是否为目录,从而无法执行过滤或递归列出等操作。
使用get命令
get 检索远程文件。
它支持以下选项:
-
-P: 保留远程文件的时间戳。 -
-stream: 将远程文件作为流检索。 -
-D: 传输成功后删除远程文件。 如果传输被忽略,则不会删除远程文件,因为FileExistsMode为IGNORE且本地文件已存在。
file_remoteDirectory 标头提供远程目录名称,file_remoteFile 标头提供文件名。
由 get 操作生成的消息负载是一个 File 对象,它表示检索到的文件;如果您使用了 -stream 选项,则返回一个 InputStream。
-stream 选项允许将文件作为流进行检索。
对于文本文件,常见的用例是将此操作与 文件分割器 或 流转换器 结合使用。
在作为流消费远程文件时,您负责在流消费完毕后关闭 Session。
为了方便起见,Session 已提供在 closeableResource 头中,您可以通过 IntegrationMessageHeaderAccessor 上的便捷方法访问它。
以下示例展示了如何使用该便捷方法:
Closeable closeable = new IntegrationMessageHeaderAccessor(message).getCloseableResource();
if (closeable != null) {
closeable.close();
}
以下示例演示如何以流的形式消费文件:
<int-ftp:outbound-gateway session-factory="ftpSessionFactory"
request-channel="inboundGetStream"
command="get"
command-options="-stream"
expression="payload"
remote-directory="ftpTarget"
reply-channel="stream" />
<int-file:splitter input-channel="stream" output-channel="lines" />
如果您在自定义组件中消费输入流,则必须关闭 Session。
您可以通过以下方式完成此操作:要么在您的自定义代码中处理,要么将消息的副本路由到 service-activator 并使用 SpEL,如下例所示: |
<int:service-activator input-channel="closeSession"
expression="headers['closeableResource'].close()" />
使用mget命令
mget 根据模式检索多个远程文件,并支持以下选项:
-
-P: 保留远程文件的时间戳。 -
-R: 递归检索整个目录树。 -
-x: 如果没有文件匹配该模式则抛出异常(否则返回空列表)。 -
-D: 成功传输后删除每个远程文件。 如果传输被忽略,则不会删除远程文件,因为FileExistsMode为IGNORE且本地文件已存在。
由 mget 操作产生的消息负载是一个 List<File> 对象(即一个包含 File 个对象的 List,每个对象代表一个检索到的文件)。
从 5.0 版本开始,如果 FileExistsMode 为 IGNORE,输出消息的负载将不再包含因文件已存在而未获取的文件。
此前,列表包含所有文件,包括那些已存在的文件。 |
用于确定远程路径的表达式应产生以 - e.g. somedir/将获取其下完整的树结构somedir.
从 5.0 版本开始,递归 mget 结合新的 FileExistsMode.REPLACE_IF_MODIFIED 模式可用于定期将完整的远程目录树同步到本地。
此模式会替换本地文件的最后修改时间戳为远程时间戳,无论是否启用 -P(保留时间戳)选项。
|
使用递归 (
-R)该模式将被忽略,并假设为 如果某个子目录被过滤,则不会对该子目录进行额外的遍历。
通常,您会在 |
持久化文件列表过滤器现在有一个布尔属性 forRecursion。
将此属性设置为 true,也会设置 alwaysAcceptDirectories,这意味着对外部网关(ls 和 mget)的递归操作现在将每次遍历完整的目录树。
这是为了解决目录树深处更改未被检测到的问题。
此外,forRecursion=true 会导致使用文件的完整路径作为元数据存储键;这解决了如果同一名称的文件出现在不同目录中多次时过滤器无法正常工作的问题。
重要提示:这意味着持久化元数据存储中的现有键将无法在顶层目录下的文件中找到。
因此,该属性默认值为 false;此行为可能在未来的版本中发生变化。
从 5.0 版本开始,可以通过将 alwaysAcceptDirectories 属性设置为 true,配置 FtpSimplePatternFileListFilter 和 FtpRegexPatternFileListFilter 始终传递目录。
这样做允许对简单模式进行递归,如下面的示例所示:
<bean id="starDotTxtFilter"
class="org.springframework.integration.ftp.filters.FtpSimplePatternFileListFilter">
<constructor-arg value="*.txt" />
<property name="alwaysAcceptDirectories" value="true" />
</bean>
<bean id="dotStarDotTxtFilter"
class="org.springframework.integration.ftp.filters.FtpRegexPatternFileListFilter">
<constructor-arg value="^.*\.txt$" />
<property name="alwaysAcceptDirectories" value="true" />
</bean>
一旦您定义了如前例所示的过滤器,您可以通过在网关上设置 filter 属性来使用其中一个。
另请参阅 出站网关部分成功 (mget 和 mput)。
使用put命令
The put 命令用于将文件发送到远程服务器。
消息的负载可以是 java.io.File、byte[] 或 String。
remote-filename-generator(或表达式)用于命名远程文件。
其他可用属性包括 remote-directory、temporary-remote-directory 及其对应的 *-expression 等价物:use-temporary-file-name 和 auto-create-directory。
有关更多信息,请参阅 schema 文档。
由 put 操作生成的消息负载是一个 String,它表示文件在服务器上传输后的完整路径。
版本 5.2 引入了 chmod 属性,用于在上传后更改远程文件的权限。
您可以使用传统的 Unix 八进制格式(例如,600 仅允许文件所有者进行读写)。
在使用 Java 配置适配器时,可以使用 setChmod(0600)。
仅当您的 FTP 服务器支持 SITE CHMOD 子命令时才会生效。
使用mput命令
The mput 向服务器发送多个文件,仅支持一个选项:
-
-R: 递归。 发送目录及其子目录中的所有文件(可能经过过滤)。
消息负载必须是一个代表本地目录的 java.io.File(或 String)。
自 5.1 版本起,也支持 File 或 String 的集合。
此命令支持与 put 命令 相同的属性。
此外,本地目录中的文件可以使用 mput-pattern、mput-regex、mput-filter 或 mput-filter-expression 之一进行过滤。
该过滤器支持递归操作,只要子目录本身也通过过滤器即可。
未通过过滤器的子目录将不会被递归处理。
由 mput 操作产生的消息负载是一个 List<String> 对象(即,一组因传输而产生的远程文件路径的 List)。
另请参阅 出站网关部分成功 (mget 和 mput)。
版本 5.2 引入了 chmod 属性,允许您在上传后更改远程文件的权限。
您可以使用传统的 Unix 八进制格式(例如,600 仅允许文件所有者进行读写)。
在使用 Java 配置适配器时,可以使用 setChmodOctal("600") 或 setChmod(0600)。
仅当您的 FTP 服务器支持 SITE CHMOD 子命令时才适用。
使用rm命令
The rm 命令用于删除文件。
命令 rm 没有选项。
从 rm 操作生成的消息负载,如果移除成功则为 Boolean.TRUE,否则为 Boolean.FALSE。
file_remoteDirectory 标头提供远程目录,file_remoteFile 标头提供文件名。
使用mv命令
The mv 命令用于移动文件。
命令 mv 没有选项。
expression属性定义“源”路径,rename-expression属性定义“目标”路径。
默认情况下,rename-expression为headers['file_renameTo']。
该表达式求值结果不能为null或空的String。
如有必要,将创建所需的远程目录。
结果消息的负载为Boolean.TRUE。
file_remoteDirectory头提供原始远程目录,file_remoteFile头提供文件名。
新路径位于file_renameTo头中。
从版本 5.5.6 开始,remoteDirectoryExpression 可用于 mv 命令以方便使用。
如果“from”文件不是完整文件路径,则使用 remoteDirectoryExpression 的结果作为远程目录。
“to”文件也适用同样的规则,例如,如果任务只是重命名某个目录中的远程文件。
关于 FTP 出站网关命令的其他信息
get和mget命令支持local-filename-generator-expression属性。
它定义了一个 SpEL 表达式,用于在传输期间生成本地文件的名称。
求值上下文的根对象是请求消息。
remoteFileName变量同样可用,尤其适用于mget——例如:local-filename-generator-expression="#remoteFileName.toUpperCase() + headers.something"。
get和mget命令支持local-directory-expression属性。
它定义了一个 SpEL 表达式,用于在传输过程中生成本地目录的名称。
评估上下文的根对象是请求消息但(注:原文"but"疑似为笔误或截断,此处直译)。
remoteDirectory变量同样可用,这对mget特别有用,例如:local-directory-expression="'/tmp/local/' + #remoteDirectory.toUpperCase() + headers.something"。
此属性与local-directory属性互斥。
对于所有命令,网关的 'expression' 属性提供命令所作用的路径。
对于 mget 命令,表达式可能求值为 '',表示检索所有文件,或 'somedirectory/' 等。
以下示例展示了一个为 ls 命令配置的网关:
<int-ftp:outbound-gateway id="gateway1"
session-factory="ftpSessionFactory"
request-channel="inbound1"
command="ls"
command-options="-1"
expression="payload"
reply-channel="toSplitter"/>
发送到 toSplitter 通道的消息负载是一个包含文件名列表的 String 对象。如果省略了 command-options 属性,则默认包含 FileInfo 个对象。
它使用空格分隔的选项——例如:command-options="-1 -dirs -links"。
从 4.2 版本开始,GET、MGET、PUT和MPUT命令支持FileExistsMode属性(使用命名空间支持时为mode)。
这会影响本地文件存在时(GET和MGET)或远程文件存在时(PUT和MPUT)的行为。
支持的模式包括REPLACE、APPEND、FAIL和IGNORE。
为了向后兼容,PUT和MPUT操作的默认模式为REPLACE。
对于GET和MGET操作,默认值为FAIL。
从 5.0 版本开始,setWorkingDirExpression()(在 XML 中为working-dir-expression)选项已提供于FtpOutboundGateway(在 XML 中为<int-ftp:outbound-gateway>)。
它允许您在运行时更改客户端工作目录。
该表达式会针对请求消息进行求值。
每次网关操作后都会恢复之前的工作目录。
使用 Java 配置进行配置
以下 Spring Boot 应用程序展示了如何使用 Java 配置配置出站网关的示例:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
FtpOutboundGateway ftpOutboundGateway =
new FtpOutboundGateway(ftpSessionFactory(), "ls", "'my_remote_dir/'");
ftpOutboundGateway.setOutputChannelName("lsReplyChannel");
return ftpOutboundGateway;
}
}
使用 Java DSL 进行配置
下面的 Spring Boot 应用程序展示了如何使用 Java DSL 配置出站网关的示例:
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory<FTPFile> ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory<FTPFile>(sf);
}
@Bean
public FtpOutboundGatewaySpec ftpOutboundGateway() {
return Ftp.outboundGateway(ftpSessionFactory(),
AbstractRemoteFileOutboundGateway.Command.MGET, "payload")
.options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE)
.regexFileNameFilter("(subFtpSource|.*1.txt)")
.localDirectoryExpression("'localDirectory/' + #remoteDirectory")
.localFilenameExpression("#remoteFileName.replaceFirst('ftpSource', 'localTarget')");
}
@Bean
public IntegrationFlow ftpMGetFlow(AbstractRemoteFileOutboundGateway<FTPFile> ftpOutboundGateway) {
return f -> f
.handle(ftpOutboundGateway)
.channel(c -> c.queue("remoteFileOutputChannel"));
}
}
出站网关部分成功 (mget和mput)
当您使用 mget 和 mput 对多个文件执行操作时,有时在传输一个或多个文件之后可能会发生异常。
在这种情况下(从版本 4.2 开始),会抛出 PartialSuccessException。
除了常规的 MessagingException 属性(failedMessage 和 cause)之外,此异常还有两个附加属性:
-
partialResults: 成功的转账结果。 -
derivedInput: 从请求消息生成的文件列表(例如,用于传输mput的本地文件)。
这些属性可帮助您确定哪些文件已成功传输,哪些未能成功传输。
在递归 mput 的情况下,PartialSuccessException 可能包含嵌套的 PartialSuccessException 次出现。
考虑以下目录结构:
root/
|- file1.txt
|- subdir/
| - file2.txt
| - file3.txt
|- zoo.txt
如果异常发生在 file3.txt,则网关抛出的 PartialSuccessException 具有 derivedInput 的 file1.txt、subdir 和 zoo.txt,以及 partialResults 的 file1.txt。
其 cause 是另一个带有 derivedInput 的 file2.txt 和 file3.txt,以及 partialResults 的 file2.txt 的 PartialSuccessException。
FTP 会话缓存
从 Spring Integration 3.0 开始,会话默认不再被缓存。
端点上不再支持 cache-sessions 属性。
如果您希望缓存会话,则必须使用 CachingSessionFactory(如下一个示例所示)。 |
在 3.0 版本之前,会话默认会自动缓存。
有一个 cache-sessions 属性可用于禁用自动缓存,但该方案无法配置其他会话缓存属性。
例如,您无法限制创建的会话数量。
为了支持该需求及其他配置选项,添加了 CachingSessionFactory。
它提供了 sessionCacheSize 和 sessionWaitTimeout 属性。
sessionCacheSize 属性控制工厂在其缓存中维护的活跃会话数量(默认无上限)。
如果已达到 sessionCacheSize 阈值,任何尝试获取新会话的操作将阻塞,直到其中一个缓存会话变为可用或会话等待时间到期(默认等待时间为 Integer.MAX_VALUE)。
sessionWaitTimeout 属性用于配置该值。
如果您希望缓存您的会话,请按照前述方式配置默认会话工厂,然后将其封装在 CachingSessionFactory 的实例中,您可以在其中提供这些附加属性。
以下示例展示了如何操作:
<bean id="ftpSessionFactory" class="o.s.i.ftp.session.DefaultFtpSessionFactory">
<property name="host" value="localhost"/>
</bean>
<bean id="cachingSessionFactory" class="o.s.i.file.remote.session.CachingSessionFactory">
<constructor-arg ref="ftpSessionFactory"/>
<constructor-arg value="10"/>
<property name="sessionWaitTimeout" value="1000"/>
</bean>
上述示例展示了一个使用 CachingSessionFactory 创建的实例,其中 sessionCacheSize 被设置为 10,且 sessionWaitTimeout 被设置为一秒(其值以毫秒为单位)。
从 Spring Integration 3.0 开始,CachingConnectionFactory 提供了一种 resetCache() 方法。
当被调用时,所有空闲会话将立即关闭,正在使用的会话将在它们返回缓存时被关闭。
新的会话请求将根据需要建立新会话。
从版本 5.1 开始,CachingSessionFactory 新增了一个属性 testSession。
当该属性为 true 时,会话将通过发送 NOOP 命令进行测试,以确保其仍处于活动状态;如果会话已失效,则将其从缓存中移除;若缓存中没有活动的会话,则会创建一个新的会话。
使用RemoteFileTemplate
从 Spring Integration 3.0 开始,为 FtpSession 对象提供了新的抽象。
该模板提供了发送、检索(作为 InputStream)、删除和重命名文件的方法。
此外,还提供了 execute 方法,允许调用者在会话中执行多个操作。
在所有情况下,模板都会负责可靠地关闭会话。
更多信息,请参见
RemoteFileTemplate 的 Javadoc。
还有一个 FTP 的子类:FtpRemoteFileTemplate。
版本 4.1 添加了额外方法,包括 getClientInstance(),它提供对底层 FTPClient 的访问,从而让您能够使用低级 API。
并非所有 FTP 服务器都正确实现了 STAT <path> 命令。
某些服务器会对不存在的路径返回成功结果。
NLST 命令在路径为文件且存在时,能可靠地返回文件名。
然而,这无法支持检查空目录是否存在,因为当路径为目录时,NLST 始终返回空列表。
由于模板无法判断路径是否代表目录,因此在路径看似不存在时必须执行额外检查(使用 NLST 时)。
这会增加开销,需要向服务器发送多个请求。
从版本 4.1.9 开始,FtpRemoteFileTemplate 提供了 FtpRemoteFileTemplate.ExistsMode 属性,该属性包含以下选项:
-
STAT: 执行STATFTP 命令(FTPClient.getStatus(path))以检查路径是否存在。 这是默认行为,要求您的 FTP 服务器正确支持带有路径的STAT命令。 -
NLST: 执行NLSTFTP 命令 —FTPClient.listName(path)。 仅当测试路径为文件的完整路径时使用此选项。 该方式不适用于空目录。 -
NLST_AND_DIRS: 首先执行NLST命令,如果该命令未返回任何文件,则回退到一种临时切换工作目录的技术,使用FTPClient.changeWorkingDirectory(path)。 有关更多信息,请参见FtpSession.exists()。
既然我们知道 FileExistsMode.FAIL 情况始终只查找文件(而非目录),我们可以安全地为 FtpMessageHandler 和 FtpOutboundGateway 组件使用 NLST 模式。
对于其他任何情况,FtpRemoteFileTemplate 可以被扩展以实现自定义逻辑,该逻辑在覆写的 exist() 方法中执行。
从 5.0 版本开始,提供了新的 RemoteFileOperations.invoke(OperationsCallback<F, T> action) 方法。
此方法允许在同一个线程绑定的 Session 作用域内调用多个 RemoteFileOperations 方法。
当您需要将 RemoteFileTemplate 的多个高层操作作为一个工作单元执行时,这非常有用。
例如,AbstractRemoteFileOutboundGateway 在使用 mput 命令实现时采用了该方法,其中对提供的目录中的每个文件执行 put 操作,并递归处理其子目录。
有关更多信息,请参阅 Javadoc。
使用MessageSessionCallback
从 Spring Integration 4.2 开始,您可以使用 MessageSessionCallback<F, T> 实现配合 <int-ftp:outbound-gateway/>(在 Java 中为 FtpOutboundGateway),以利用 requestMessage 上下文对 Session<FTPFile> 执行任何操作。
它可用于任何非标准或低级别的 FTP 操作,并允许从集成流定义和功能接口(Lambda)实现注入中访问,如下例所示:
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler ftpOutboundGateway(SessionFactory<FTPFile> sessionFactory) {
return new FtpOutboundGateway(sessionFactory,
(session, requestMessage) -> session.list(requestMessage.getPayload()));
}
另一个例子可能是对发送或检索的文件数据进行预处理或后处理。
当使用 XML 配置时,<int-ftp:outbound-gateway/> 提供了一个 session-callback 属性,允许您指定 MessageSessionCallback Bean 的名称。
The session-callback is mutually exclusive with the command and expression attributes.
When configuring with Java, different constructors are available in the FtpOutboundGateway class. |
Apache Mina FTP Server 事件
ApacheMinaFtplet(于 5.2 版本中引入)会监听某些 Apache Mina FTP 服务器事件,并将其发布为 ApplicationEvent,这些事件可由任何 ApplicationListener Bean、@EventListener Bean 方法或事件入站通道适配器接收。
目前支持的事件包括:
-
SessionOpenedEvent- 已打开客户端会话 -
DirectoryCreatedEvent- 已创建目录 -
FileWrittenEvent- 文件已写入 -
PathMovedEvent- 文件或目录已被重命名 -
PathRemovedEvent- 文件或目录已被删除 -
SessionClosedEvent- 客户端已断开连接
这些类都是 ApacheMinaFtpEvent 的子类;您可以配置一个单一的监听器来接收所有类型的事件。
每个事件的 source 属性是一个 FtpSession,您可以从中获取诸如客户端地址等信息;抽象事件上提供了一个便捷的 getSession() 方法。
除会话打开/关闭之外的事件具有另一个属性 FtpRequest,该属性包含命令和参数等属性。
To configure the server with the listener (which must be a Spring bean), add it to the server factory:
FtpServerFactory serverFactory = new FtpServerFactory();
...
ListenerFactory factory = new ListenerFactory();
...
serverFactory.addListener("default", factory.createListener());
serverFactory.setFtplets(new HashMap<>(Collections.singletonMap("springFtplet", apacheMinaFtpletBean)));
server = serverFactory.createServer();
server.start();
要使用 Spring Integration 事件适配器来消费这些事件:
@Bean
public ApplicationEventListeningMessageProducer eventsAdapter() {
ApplicationEventListeningMessageProducer producer =
new ApplicationEventListeningMessageProducer();
producer.setEventTypes(ApacheMinaFtpEvent.class);
producer.setOutputChannel(eventChannel());
return producer;
}
远程文件信息
从版本 5.2 开始,FtpStreamingMessageSource(FTP 流式入站通道适配器)、FtpInboundFileSynchronizingMessageSource(FTP 入站通道适配器)以及 FtpOutboundGateway(FTP 出站网关)的 "read" 命令会在生成的消息中提供额外的头信息,用于描述远程文件的信息:
-
FileHeaders.REMOTE_HOST_PORT- 远程会话在文件传输操作期间连接到的主机:端口对; -
FileHeaders.REMOTE_DIRECTORY- 操作已执行的远程目录; -
FileHeaders.REMOTE_FILE- 远程文件名;仅适用于单个文件操作。
由于 FtpInboundFileSynchronizingMessageSource 不会针对远程文件生成消息,而是使用本地副本,因此在同步操作期间,AbstractInboundFileSynchronizer 会以 URI 样式(protocol://host:port/remoteDirectory#remoteFileName)将关于远程文件的信息存储在 MetadataStore(可外部配置)中。
当轮询本地文件时,该元数据由 FtpInboundFileSynchronizingMessageSource 检索。
删除本地文件时,建议移除其对应的元数据条目。
为此,AbstractInboundFileSynchronizer 提供了一个 removeRemoteFileMetadata() 回调。
此外,还有一个 setMetadataStorePrefix() 可用于元数据键中。
建议在同一个 MetadataStore 实例在这些组件间共享时,使此前缀与基于 MetadataStore 的 FileListFilter 实现中使用的前缀不同,以避免条目被覆盖,因为过滤器和 AbstractInboundFileSynchronizer 都使用相同的本地文件名作为元数据条目的键。