此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1spring-doc.cadn.net.cn

SFTP 流式入站通道适配器

版本 4.3 引入了流式入站通道适配器。 此适配器生成一条消息,其中包含类型为InputStream,让您无需写入本地文件系统即可获取文件。 由于会话保持打开状态,因此使用应用程序负责在文件使用时关闭会话。 会话在closeableResource标头 (IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE). 标准框架组件,例如FileSplitterStreamTransformer,自动关闭会话。 有关这些组件的更多信息,请参阅文件拆分器和流转换器。 以下示例演示如何配置 SFTP 流式入站通道适配器:spring-doc.cadn.net.cn

<int-sftp:inbound-streaming-channel-adapter id="ftpInbound"
            channel="ftpChannel"
            session-factory="sessionFactory"
            filename-pattern="*.txt"
            filename-regex=".*\.txt"
            filter="filter"
            filter-expression="@myFilterBean.check(#root)"
            remote-file-separator="/"
            comparator="comparator"
            max-fetch-size="1"
            remote-directory-expression="'foo/bar'">
        <int:poller fixed-rate="1000" />
</int-sftp:inbound-streaming-channel-adapter>

您只能使用其中一种filename-pattern,filename-regex,filterfilter-expression.spring-doc.cadn.net.cn

从 5.0 版开始,默认情况下,SftpStreamingMessageSource适配器通过使用SftpPersistentAcceptOnceFileListFilter基于内存中的SimpleMetadataStore. 默认情况下,此过滤器也与文件名模式(或正则表达式)一起应用。 如果您需要允许重复,您可以使用AcceptAllFileListFilter. 您可以使用以下命令来处理任何其他用例CompositeFileListFilter(或ChainFileListFilter). 稍后显示的 Java 配置显示了一种在处理后删除远程文件的技术,避免重复。

有关SftpPersistentAcceptOnceFileListFilter及其使用方式,请参阅远程持久文件列表过滤器spring-doc.cadn.net.cn

您可以使用max-fetch-size属性来限制在需要获取时每次轮询时获取的文件数量。 将其设置为1并在集群环境中运行时使用持久过滤器。 有关更多信息,请参阅入站通道适配器:控制远程文件获取spring-doc.cadn.net.cn

适配器将远程目录和文件名放在标头 (FileHeaders.REMOTE_DIRECTORYFileHeaders.REMOTE_FILE,分别)。 从 5.0 版开始,FileHeaders.REMOTE_FILE_INFOheader 提供额外的远程文件信息(JSON 格式)。 如果您将fileInfoJson属性SftpStreamingMessageSourcefalse,标头包含一个SftpFileInfo对象。 您可以访问SftpClient.DirEntry由底层提供的对象SftpClient通过使用SftpFileInfo.getFileInfo()方法。 这fileInfoJson属性在使用 XML 配置时不可用,但您可以通过注入SftpStreamingMessageSource到您的配置类之一。 另请参阅远程文件信息spring-doc.cadn.net.cn

使用 Java 配置进行配置

以下 Spring Boot 应用程序显示了如何使用 Java 配置入站适配器的示例:spring-doc.cadn.net.cn

@SpringBootApplication
public class SftpJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(SftpJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    @InboundChannelAdapter(channel = "stream")
    public MessageSource<InputStream> ftpMessageSource() {
        SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
        messageSource.setRemoteDirectory("sftpSource/");
        messageSource.setFilter(new AcceptAllFileListFilter<>());
        messageSource.setMaxFetchSize(1);
        return messageSource;
    }

    @Bean
    @Transformer(inputChannel = "stream", outputChannel = "data")
    public org.springframework.integration.transformer.Transformer transformer() {
        return new StreamTransformer("UTF-8");
    }

    @Bean
    public SftpRemoteFileTemplate template() {
        return new SftpRemoteFileTemplate(sftpSessionFactory());
    }

    @ServiceActivator(inputChannel = "data", adviceChain = "after")
    @Bean
    public MessageHandler handle() {
        return System.out::println;
    }

    @Bean
    public ExpressionEvaluatingRequestHandlerAdvice after() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setOnSuccessExpression(
                "@template.remove(headers['file_remoteDirectory'] + '/' +  headers['file_remoteFile'])");
        advice.setPropagateEvaluationFailures(true);
        return advice;
    }

}

请注意,在此示例中,转换器下游的消息处理程序具有advice在处理后删除远程文件。spring-doc.cadn.net.cn