对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
SFTP 流式入站通道适配器
版本 4.3 引入了流式入站通道适配器。
此适配器生成有效负载类型为InputStream
,让您无需写入本地文件系统即可获取文件。
由于会话保持打开状态,因此使用应用程序负责在文件使用时关闭会话。
会话在closeableResource
标头 (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
).
标准框架组件,例如FileSplitter
和StreamTransformer
,自动关闭会话。
有关这些组件的更多信息,请参阅文件拆分器和流转换器。
以下示例演示如何配置 SFTP 流式入站通道适配器:
<int-sftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-sftp:inbound-streaming-channel-adapter>
您只能使用其中一种filename-pattern
,filename-regex
,filter
或filter-expression
.
从 5.0 版开始,默认情况下,SftpStreamingMessageSource 适配器通过使用SftpPersistentAcceptOnceFileListFilter 基于内存中的SimpleMetadataStore .
默认情况下,此过滤器也与文件名模式(或正则表达式)一起应用。
如果您需要允许重复,您可以使用AcceptAllFileListFilter .
您可以使用以下命令来处理任何其他用例CompositeFileListFilter (或ChainFileListFilter ).
稍后显示的 Java 配置显示了一种在处理后删除远程文件的技术,避免重复。 |
有关SftpPersistentAcceptOnceFileListFilter
及其使用方式,请参阅远程持久文件列表过滤器。
您可以使用max-fetch-size
属性来限制在需要获取时每次轮询时获取的文件数量。
将其设置为1
并在集群环境中运行时使用持久过滤器。
有关更多信息,请参阅入站通道适配器:控制远程文件获取。
适配器将远程目录和文件名放在标头 (FileHeaders.REMOTE_DIRECTORY
和FileHeaders.REMOTE_FILE
,分别)。
从 5.0 版开始,FileHeaders.REMOTE_FILE_INFO
header 提供额外的远程文件信息(JSON 格式)。
如果您将fileInfoJson
属性SftpStreamingMessageSource
自false
,标头包含一个SftpFileInfo
对象。
您可以访问SftpClient.DirEntry
由底层提供的对象SftpClient
通过使用SftpFileInfo.getFileInfo()
方法。
这fileInfoJson
属性在使用 XML 配置时不可用,但您可以通过注入SftpStreamingMessageSource
到您的配置类之一。
另请参阅远程文件信息。
使用 Java 配置进行配置
以下 Spring Boot 应用程序显示了如何使用 Java 配置入站适配器的示例:
@SpringBootApplication
public class SftpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
messageSource.setRemoteDirectory("sftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public SftpRemoteFileTemplate template() {
return new SftpRemoteFileTemplate(sftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + '/' + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
请注意,在此示例中,转换器下游的消息处理程序具有advice
在处理后删除远程文件。