文件支持

文件支持

Spring Integration 的文件支持通过专用的词汇扩展了 Spring Integration 核心,用于处理文件的读取、写入和转换。spring-doc.cadn.net.cn

您需要将以下依赖项包含到您的项目中:spring-doc.cadn.net.cn

Maven
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-file</artifactId>
    <version>6.0.9</version>
</dependency>
Gradle
compile "org.springframework.integration:spring-integration-file:6.0.9"

它提供了一个命名空间,该命名空间支持定义专用于文件的通道适配器元素,并支持能够将文件内容读取为字符串或字节数组的转换器。spring-doc.cadn.net.cn

本节解释了FileReadingMessageSourceFileWritingMessageHandler的工作原理,以及如何将它们配置为 Bean。 它还讨论了通过Transformer的文件特定实现来处理文件的支持。 最后,它解释了文件特定的命名空间。spring-doc.cadn.net.cn

读取文件

可以使用 FileReadingMessageSource 从文件系统消费文件。 这是 MessageSource 的一个实现,用于从文件系统目录创建消息。 以下示例展示了如何配置 FileReadingMessageSourcespring-doc.cadn.net.cn

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:directory="${input.directory}"/>

为防止为特定文件创建消息,您可以提供 FileListFilter。 默认情况下,我们使用以下过滤器:spring-doc.cadn.net.cn

IgnoreHiddenFileListFilter 确保不处理隐藏文件。 请注意,隐藏的确切定义取决于系统。 例如,在基于 UNIX 的系统中,以点号开头的文件被视为隐藏文件。 而 Microsoft Windows 则使用专用的文件属性来标识隐藏文件。spring-doc.cadn.net.cn

版本 4.2 引入了 IgnoreHiddenFileListFilter。 在之前的版本中,隐藏文件被包含在内。 使用默认配置时,IgnoreHiddenFileListFilter 会首先被触发,随后是 AcceptOnceFileListFilterspring-doc.cadn.net.cn

AcceptOnceFileListFilter 确保文件仅从目录中选取一次。spring-doc.cadn.net.cn

AcceptOnceFileListFilter将其状态存储在内存中。 如果您希望状态在系统重启后仍然保留,可以使用FileSystemPersistentAcceptOnceFileListFilter。 此过滤器将接受的文件名存储在一个MetadataStore实现中(请参阅元数据存储)。 此过滤器根据文件名和修改时间进行匹配。spring-doc.cadn.net.cn

自 4.0 版本起,此过滤器需要一个 ConcurrentMetadataStore。 当与共享数据存储(例如配合 RedisMetadataStore 使用的 Redis)一起使用时,它允许过滤器键在多个应用程序实例之间或在多台服务器使用的网络文件共享之间共享。spring-doc.cadn.net.cn

自 4.1.5 版本起,此过滤器新增了一个属性(flushOnUpdate),该属性会导致每次更新时刷新元数据存储(如果存储实现了 Flushable)。spring-doc.cadn.net.cn

持久化文件列表过滤器现在有一个布尔属性 forRecursion。 将此属性设置为 true,也会设置 alwaysAcceptDirectories,这意味着对外部网关(lsmget)的递归操作现在将每次遍历完整的目录树。 这是为了解决目录树深处更改未被检测到的问题。 此外,forRecursion=true 会导致使用文件的完整路径作为元数据存储键;这解决了如果同一名称的文件出现在不同目录中多次时过滤器无法正常工作的问题。 重要提示:这意味着持久化元数据存储中的现有键将无法在顶层目录下的文件中找到。 因此,该属性默认值为 false;此行为可能在未来的版本中发生变化。spring-doc.cadn.net.cn

以下示例使用过滤器配置了 FileReadingMessageSourcespring-doc.cadn.net.cn

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:inputDirectory="${input.directory}"
    p:filter-ref="customFilterBean"/>

读取文件时的一个常见问题是,文件可能在尚未准备好时就被检测到(即其他进程可能仍在写入该文件)。 默认的 AcceptOnceFileListFilter 无法防止这种情况。 在大多数情况下,如果文件写入过程在文件准备好后立即重命名每个文件,则可以避免此问题。 组合使用默认值 AcceptOnceFileListFilter 与仅接受已就绪文件(例如基于已知后缀)的 filename-patternfilename-regex 过滤器,可以应对这种情形。 CompositeFileListFilter 可启用组合功能,如下示例所示:spring-doc.cadn.net.cn

<bean id="pollableFileSource"
    class="org.springframework.integration.file.FileReadingMessageSource"
    p:inputDirectory="${input.directory}"
    p:filter-ref="compositeFilter"/>

<bean id="compositeFilter"
    class="org.springframework.integration.file.filters.CompositeFileListFilter">
    <constructor-arg>
        <list>
            <bean class="o.s.i.file.filters.AcceptOnceFileListFilter"/>
            <bean class="o.s.i.file.filters.RegexPatternFileListFilter">
                <constructor-arg value="^test.*$"/>
            </bean>
        </list>
    </constructor-arg>
</bean>

如果无法使用临时名称创建文件并重命名为最终名称,Spring Integration 提供了另一种替代方案。 版本 4.2 添加了 LastModifiedFileListFilter。 可以通过 age 属性配置此过滤器,以便仅传递早于该值的文件。 默认年龄为 60 秒,但您应选择足够大的年龄值,以避免过早获取文件(例如由于网络故障)。 以下示例展示了如何配置 LastModifiedFileListFilterspring-doc.cadn.net.cn

<bean id="filter" class="org.springframework.integration.file.filters.LastModifiedFileListFilter">
    <property name="age" value="120" />
</bean>

从版本 4.3.7 开始,引入了 ChainFileListFilter(作为 CompositeFileListFilter 的扩展),以支持后续过滤器仅查看前一个过滤器结果的场景。 (使用 CompositeFileListFilter 时,所有过滤器都能看到所有文件,但只会传递通过所有过滤器的文件)。 需要新行为的一个示例是 LastModifiedFileListFilterAcceptOnceFileListFilter 的组合,即我们希望在经过一段时间后才接受文件。 使用 CompositeFileListFilter 时,由于 AcceptOnceFileListFilter 在首次遍历时会查看所有文件,因此当其他过滤器处理时它不会再传递该文件。 CompositeFileListFilter 方法适用于将模式过滤器与自定义过滤器结合使用的场景,后者用于查找辅助文件以指示文件传输已完成。 模式过滤器可能仅传递主文件(例如 something.txt),但“完成”过滤器需要检查(例如)something.done 是否存在。spring-doc.cadn.net.cn

假设我们拥有文件 a.txta.doneb.txtspring-doc.cadn.net.cn

模式过滤器仅传递 a.txtb.txt,而“完成”过滤器会查看所有三个文件并仅传递 a.txt。 复合过滤器的最终结果是仅释放 a.txtspring-doc.cadn.net.cn

使用 ChainFileListFilter,如果链中的任何过滤器返回空列表,则不会调用剩余的过滤器。

版本 5.0 引入了一个 ExpressionFileListFilter,用于将 SpEL 表达式针对文件执行,并将该文件作为上下文评估根对象。 为此,所有用于文件处理(本地和远程)的 XML 组件,连同现有的 filter 属性,都已提供 filter-expression 选项,如下示例所示:spring-doc.cadn.net.cn

<int-file:inbound-channel-adapter
        directory="${inputdir}"
        filter-expression="name matches '.text'"
        auto-startup="false"/>

版本 5.0.5 引入了对被拒绝文件感兴趣的 DiscardAwareFileListFilter 实现。 为此,应通过 addDiscardCallback(Consumer<File>) 向此类过滤器实现提供一个回调。 在框架中,此功能由 FileReadingMessageSource.WatchServiceDirectoryScanner 使用,并与 LastModifiedFileListFilter 结合使用。 与常规 DirectoryScanner 不同,WatchService 根据目标文件系统上的事件提供待处理的文件。 在轮询包含这些文件的内部队列时,LastModifiedFileListFilter 可能会丢弃它们,因为它们相对于其配置的 age 太新。 因此,我们失去了该文件以供未来可能的考虑。 丢弃回调钩子使我们能够将该文件保留在内部队列中,以便在后续轮询时针对 age 进行检查。 CompositeFileListFilter 还实现了 DiscardAwareFileListFilter,并向其所有 DiscardAwareFileListFilter 委托对象填充了丢弃回调。spring-doc.cadn.net.cn

由于 CompositeFileListFilter 会将文件与所有委托进行匹配,因此 discardCallback 可能会针对同一文件被多次调用。

从版本 5.1 开始,FileReadingMessageSource 不会检查目录是否存在,也不会创建它,直到其 start() 被调用(通常通过包装 SourcePollingChannelAdapter 实现)。 此前,在引用目录时(例如来自测试或稍后应用权限时),没有简单的方法可以防止操作系统权限错误。spring-doc.cadn.net.cn

消息头

从版本 5.0 开始,FileReadingMessageSource(除了作为轮询 Filepayload 之外)会填充以下标头到出站 Messagespring-doc.cadn.net.cn

  • FileHeaders.FILENAME: 要发送文件的 File.getName()。 可用于后续的重命名或复制逻辑。spring-doc.cadn.net.cn

  • FileHeaders.ORIGINAL_FILE: File 对象本身。 通常,此头由框架组件(例如 分割器转换器)在丢失原始 File 对象时自动填充。 然而,为了与任何其他自定义用例保持一致性和便利性,此头可用于访问原始文件。spring-doc.cadn.net.cn

  • FileHeaders.RELATIVE_PATH: 引入的新头部,用于表示相对于扫描根目录的文件路径部分。 当需要在其他位置恢复源目录层次结构时,此头部非常有用。 为此,可以配置 DefaultFileNameGenerator(参见"生成文件名")以使用此头部。spring-doc.cadn.net.cn

目录扫描与轮询

FileReadingMessageSource 不会立即为来自该目录的文件生成消息。它使用内部队列来处理由 scanner 返回的“符合条件的文件”。scanEachPoll 选项用于确保内部队列在每次轮询时都刷新为最新的输入目录内容。默认情况下(scanEachPoll = false),FileReadingMessageSource会在再次扫描目录之前清空其队列。这种默认行为特别有助于减少扫描目录中大量文件。然而,在需要自定义排序的情况下,重要的是要考虑将此标志设置为 true 所产生的影响。文件的处理顺序可能与预期不符。默认情况下,队列中的文件按其自然(path)顺序处理。即使队列中已有文件,扫描新增的文件也会插入到适当的位置,以维持自然顺序。要自定义顺序,FileReadingMessageSource 可以接受一个 Comparator<File> 作为构造函数参数。它被内部(PriorityBlockingQueue)用于根据业务需求重新排序其内容。因此,若要按特定顺序处理文件,您应向 FileReadingMessageSource 提供一个比较器,而不是对由自定义 DirectoryScanner 生成的列表进行排序。spring-doc.cadn.net.cn

版本 5.0 引入了 RecursiveDirectoryScanner 用于执行文件树遍历。 该实现基于 Files.walk(Path start, int maxDepth, FileVisitOption…​ options) 功能。 根目录(DirectoryScanner.listFiles(File))参数不包含在结果中。 所有其他子目录的包含与排除均基于目标 FileListFilter 实现。 例如,SimplePatternFileListFilter 默认会过滤掉目录。 有关更多信息,请参阅 AbstractDirectoryAwareFileListFilter 及其实现。spring-doc.cadn.net.cn

从版本 5.5 开始,Java DSL 的 FileInboundChannelAdapterSpec 提供了一个方便的 recursive(boolean) 选项,允许在目标 FileReadingMessageSource 中使用 RecursiveDirectoryScanner,而不是默认的那个。

命名空间支持

通过使用文件特定的命名空间,可以简化文件读取的配置。 为此,请使用以下模板:spring-doc.cadn.net.cn

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-file="http://www.springframework.org/schema/integration/file"
  xsi:schemaLocation="http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/file
    https://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
</beans>

在此命名空间内,您可以将 FileReadingMessageSource 减少并将其封装在一个入站通道适配器中,如下所示:spring-doc.cadn.net.cn

<int-file:inbound-channel-adapter id="filesIn1"
    directory="file:${input.directory}" prevent-duplicates="true" ignore-hidden="true"/>

<int-file:inbound-channel-adapter id="filesIn2"
    directory="file:${input.directory}"
    filter="customFilterBean" />

<int-file:inbound-channel-adapter id="filesIn3"
    directory="file:${input.directory}"
    filename-pattern="test*" />

<int-file:inbound-channel-adapter id="filesIn4"
    directory="file:${input.directory}"
    filename-regex="test[0-9]+\.txt" />

第一个通道适配器示例依赖于默认的 FileListFilter 实现:spring-doc.cadn.net.cn

因此,您也可以省略 prevent-duplicatesignore-hidden 属性,因为它们默认是 truespring-doc.cadn.net.cn

Spring Integration 4.2 引入了 ignore-hidden 属性。 在之前的版本中,隐藏文件会被包含在内。spring-doc.cadn.net.cn

第二个通道适配器示例使用自定义过滤器,第三个使用 filename-pattern 属性添加基于 AntPathMatcher 的过滤器,第四个使用 filename-regex 属性向 FileReadingMessageSource 添加基于正则表达式模式的过滤器。 filename-patternfilename-regex 属性分别与普通的 filter 引用属性互斥。 然而,您可以使用 filter 属性来引用 CompositeFileListFilter 的一个实例,该实例可以组合任意数量的过滤器,包括一个或多个基于模式的过滤器,以满足您的特定需求。spring-doc.cadn.net.cn

当多个进程从同一目录读取时,您可能希望锁定文件以防止它们被并发拾取。 为此,您可以使用 FileLocker。 存在一个基于 java.nio 的实现,但您也可以自行实现锁定方案。 nio 锁可以通过以下方式注入:spring-doc.cadn.net.cn

<int-file:inbound-channel-adapter id="filesIn"
    directory="file:${input.directory}" prevent-duplicates="true">
    <int-file:nio-locker/>
</int-file:inbound-channel-adapter>

您可以按如下方式配置自定义锁:spring-doc.cadn.net.cn

<int-file:inbound-channel-adapter id="filesIn"
    directory="file:${input.directory}" prevent-duplicates="true">
    <int-file:locker ref="customLocker"/>
</int-file:inbound-channel-adapter>
当文件传入适配器配置了锁时,它负责在允许接收文件之前获取锁。 它不承担解锁文件的责任。 如果您已处理文件但让锁一直挂起,就会导致内存泄漏。 如果这是一个问题,您应该在适当的时候自行调用 FileLocker.unlock(File file)

当过滤和锁定文件不足以满足需求时,您可能需要完全控制文件的列出方式。 要实现此类需求,您可以使用 DirectoryScanner 的实现类。 该扫描器使您能够精确确定每次轮询中列出的文件。 这也是 Spring Integration 内部用于将 FileListFilter 实例和 FileLocker 连接到 FileReadingMessageSource 的接口。 您可以像以下示例所示,将自定义的 DirectoryScanner 注入到 <int-file:inbound-channel-adapter/>scanner 属性中:spring-doc.cadn.net.cn

<int-file:inbound-channel-adapter id="filesIn" directory="file:${input.directory}"
     scanner="customDirectoryScanner"/>

这样做可以让您完全自由地选择排序、列表和锁定策略。spring-doc.cadn.net.cn

同样重要的是要理解,过滤器(包括 patternsregexprevent-duplicates 等)以及 locker 实例实际上是由 scanner 使用的。 在适配器上设置的任何这些属性随后都会被注入到内部的 scanner 中。 对于外部 scanner 的情况,所有过滤器和锁属性都禁止在 FileReadingMessageSource 上设置。 它们必须(如果需要)在该自定义 DirectoryScanner 上进行指定。 换句话说,如果您将 scanner 注入到 FileReadingMessageSource 中,则应在该 scanner 上提供 filterlocker,而不是在 FileReadingMessageSource 上。spring-doc.cadn.net.cn

默认情况下,DefaultDirectoryScanner 使用 IgnoreHiddenFileListFilterAcceptOnceFileListFilter。 若要阻止其使用,您可以配置自己的过滤器(例如 AcceptAllFileListFilter),甚至将其设置为 null

WatchServiceDirectoryScanner

FileReadingMessageSource.WatchServiceDirectoryScanner在将新文件添加到目录时依赖文件系统事件。 初始化期间,会注册该目录以生成事件。 初始文件列表也是在初始化期间构建的。 遍历目录树时,遇到的任何子目录也会被注册以生成事件。 在第一次轮询时,返回从遍历目录获得的初始文件列表。 在随后的轮询中,返回来自新创建事件的文件。 如果添加了新的子目录,则使用其创建事件来遍历新的子树以查找现有文件,并注册发现的任何新子目录。spring-doc.cadn.net.cn

WatchKey 的内部事件 queue 未被程序以与目录修改事件发生速度相匹配的速度处理时,会出现问题。 如果队列大小被超出,将发出 StandardWatchEventKinds.OVERFLOW 以指示某些文件系统事件可能已丢失。 在这种情况下,将完全重新扫描根目录。 为避免重复,请考虑使用适当的 FileListFilter(例如 AcceptOnceFileListFilter),或在处理完成后删除文件。

WatchServiceDirectoryScanner 可以通过 FileReadingMessageSource.use-watch-service 选项启用,该选项与 scanner 选项互斥。 将为提供的 directory 填充一个内部的 FileReadingMessageSource.WatchServiceDirectoryScanner 实例。spring-doc.cadn.net.cn

此外,现在 WatchService 轮询逻辑可以跟踪 StandardWatchEventKinds.ENTRY_MODIFYStandardWatchEventKinds.ENTRY_DELETEspring-doc.cadn.net.cn

如果您需要跟踪现有文件和新建文件的修改,您应该在 FileListFilter 中实现 ENTRY_MODIFY 事件的逻辑。 否则,这些事件中的文件将被以相同的方式处理。spring-doc.cadn.net.cn

The ResettableFileListFilter 实现会接收 ENTRY_DELETE 事件。 因此,它们的文件会被提供给 remove() 操作使用。 当启用此事件时,诸如 AcceptOnceFileListFilter 之类的过滤器会将这些文件移除。 结果,如果出现同名文件,它将通过过滤器并被作为消息发送。spring-doc.cadn.net.cn

为此,引入了 watch-events 属性(FileReadingMessageSource.setWatchEvents(WatchEventType…​ watchEvents))。 (WatchEventTypeFileReadingMessageSource 中的一个公共内部枚举。) 通过这种选项,我们可以对新建文件使用一套下游流程逻辑,而对修改后的文件使用另一套逻辑。 以下示例展示了如何在同一目录中为创建和修改事件配置不同的逻辑:spring-doc.cadn.net.cn

值得一提的是,ENTRY_DELETE事件涉及被监视目录的子目录的重命名操作。 更具体地说,与先前目录名相关的ENTRY_DELETE事件发生在通知新(已重命名)目录的ENTRY_CREATE事件之前。 在某些操作系统(如Windows)上,必须注册ENTRY_DELETE事件来处理这种情况。 否则,在文件资源管理器中重命名被监视的子目录可能导致该子目录中的新文件无法被检测到。spring-doc.cadn.net.cn

<int-file:inbound-channel-adapter id="newFiles"
     directory="${input.directory}"
     use-watch-service="true"/>

<int-file:inbound-channel-adapter id="modifiedFiles"
     directory="${input.directory}"
     use-watch-service="true"
     filter="acceptAllFilter"
     watch-events="MODIFY"/> <!-- The default is CREATE. -->

限制内存消耗

您可以使用 HeadDirectoryScanner 来限制内存中保留的文件数量。 这在扫描大型目录时非常有用。 通过 XML 配置,可以通过在入站通道适配器上设置 queue-size 属性来启用此功能。spring-doc.cadn.net.cn

在 4.2 版本之前,此设置与使用任何其他过滤器不兼容。 任何其他过滤器(包括 prevent-duplicates="true")都会覆盖用于限制大小的过滤器。spring-doc.cadn.net.cn

使用 HeadDirectoryScannerAcceptOnceFileListFilter 不兼容。 由于在轮询决策过程中会检查所有过滤器,因此 AcceptOnceFileListFilter 无法得知其他过滤器可能暂时过滤了文件。 即使之前被 HeadDirectoryScanner.HeadFilter 过滤的文件现在已可用,AcceptOnceFileListFilter 仍会过滤它们。spring-doc.cadn.net.cn

通常,在这种情况下,您不应使用AcceptOnceFileListFilter,而应删除已处理的文件,以便之前过滤的文件在下次轮询时可用。spring-doc.cadn.net.cn

使用 Java 配置进行配置

以下 Spring Boot 应用程序展示了如何使用 Java 配置来配置出站适配器:spring-doc.cadn.net.cn

@SpringBootApplication
public class FileReadingJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileReadingJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public MessageChannel fileInputChannel() {
        return new DirectChannel();
    }

    @Bean
    @InboundChannelAdapter(value = "fileInputChannel", poller = @Poller(fixedDelay = "1000"))
    public MessageSource<File> fileReadingMessageSource() {
         FileReadingMessageSource source = new FileReadingMessageSource();
         source.setDirectory(new File(INBOUND_PATH));
         source.setFilter(new SimplePatternFileListFilter("*.txt"));
         return source;
    }

    @Bean
    @Transformer(inputChannel = "fileInputChannel", outputChannel = "processFileChannel")
    public FileToStringTransformer fileToStringTransformer() {
        return new FileToStringTransformer();
    }

}

使用 Java DSL 进行配置

以下 Spring Boot 应用程序展示了如何使用 Java DSL 配置出站适配器:spring-doc.cadn.net.cn

@SpringBootApplication
public class FileReadingJavaApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileReadingJavaApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow fileReadingFlow() {
         return IntegrationFlow
                  .from(Files.inboundAdapter(new File(INBOUND_PATH))
                              .patternFilter("*.txt"),
                          e -> e.poller(Pollers.fixedDelay(1000)))
                  .transform(Files.toStringTransformer())
                  .channel("processFileChannel")
                  .get();
    }

}

'尾随'文件

另一个常用的用例是从文件末尾(或尾部)获取“行”,并在添加新行时捕获它们。 提供了两种实现方式。 第一种实现,OSDelegatingFileTailingMessageProducer,使用原生的 tail 命令(在支持该命令的操作系统上)。 在那些平台上,这通常是最高效的实现方式。 对于没有 tail 命令的操作系统,第二种实现,ApacheCommonsFileTailingMessageProducer,则使用 Apache commons-io Tailer 类。spring-doc.cadn.net.cn

在这两种情况下,文件系统事件(例如文件不可用)以及其他事件均通过正常的 Spring 事件发布机制作为 ApplicationEvent 实例发布。 此类事件的示例包括以下:spring-doc.cadn.net.cn

[message=tail: cannot open '/tmp/somefile' for reading:
               No such file or directory, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has become accessible, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has become inaccessible:
               No such file or directory, file=/tmp/somefile]

[message=tail: '/tmp/somefile' has appeared;
               following end of new file, file=/tmp/somefile]

前例中所示的事件序列可能会发生,例如在文件轮转时。spring-doc.cadn.net.cn

从版本 5.0 开始,在 idleEventInterval 期间,如果文件中没有数据,则会发出 FileTailingIdleEvent。 以下示例展示了此类事件的外观:spring-doc.cadn.net.cn

[message=Idle timeout, file=/tmp/somefile] [idle time=5438]
并非所有支持 tail 命令的平台都提供这些状态消息。

从这些端点发出的消息具有以下标头:spring-doc.cadn.net.cn

在 5.0 版本之前的版本中,FileHeaders.FILENAME 头包含文件绝对路径的字符串表示。 现在,您可以通过调用原始文件头中的 getAbsolutePath() 来获取该字符串表示。

以下示例使用默认选项('-F -n 0',表示从当前末尾开始跟随文件名)创建一个原生适配器。spring-doc.cadn.net.cn

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	task-executor="exec"
	file="/tmp/foo"/>

以下示例创建一个带有'-F -n +0'选项的原生适配器(意味着跟随文件名,输出所有现有行)。spring-doc.cadn.net.cn

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	native-options="-F -n +0"
	task-executor="exec"
	file-delay=10000
	file="/tmp/foo"/>

如果 tail 命令失败(在某些平台上,缺少文件会导致即使指定了 -Ftail 也会失败),该命令将每 10 秒重试一次。spring-doc.cadn.net.cn

默认情况下,原生适配器会从标准输出捕获内容并将其作为消息发送。 它们也会从标准错误捕获以触发事件。 从版本 4.3.6 开始,您可以通过将 enable-status-reader 设置为 false 来丢弃标准错误事件,如下例所示:spring-doc.cadn.net.cn

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	enable-status-reader="false"
	task-executor="exec"
	file="/tmp/foo"/>

在以下示例中,IdleEventInterval被设置为5000,这意味着如果五秒钟内没有写入任何行,则每五秒触发一次FileTailingIdleEventspring-doc.cadn.net.cn

<int-file:tail-inbound-channel-adapter id="native"
	channel="input"
	idle-event-interval="5000"
	task-executor="exec"
	file="/tmp/somefile"/>

这在需要停止适配器时可能很有用。spring-doc.cadn.net.cn

以下示例创建一个 Apache commons-io Tailer 适配器,该适配器每两秒检查文件中的新行,并每十秒检查缺失文件是否存在:spring-doc.cadn.net.cn

<int-file:tail-inbound-channel-adapter id="apache"
	channel="input"
	task-executor="exec"
	file="/tmp/bar"
	delay="2000"
	end="false"             (1)
	reopen="true"           (2)
	file-delay="10000"/>
1 该文件是从开头(end="false")开始尾随的,而不是从末尾(这是默认行为)开始。
2 每个块都会重新打开文件(默认行为是保持文件打开)。
指定 delayendreopen 属性会强制使用 Apache commons-io 适配器,并使 native-options 属性不可用。

处理不完整数据

在文件传输场景中,一个常见问题是如何确定传输已完成,从而避免读取不完整的文件。 解决此问题的常用技术是:先使用临时名称写入文件,然后原子性地将其重命名为最终名称。 该技术配合一个过滤器(用于防止消费者捕获临时文件),提供了一种稳健的解决方案。 Spring Integration 中负责写入文件(本地或远程)的组件均采用此技术。 默认情况下,它们会在文件名后追加.writing,并在传输完成后移除该后缀。spring-doc.cadn.net.cn

另一种常用技术是编写第二个“标记”文件,以指示文件传输已完成。 在此场景中,您不应认为 somefile.txt(例如)可用,除非 somefile.txt.complete 也已存在。 Spring Integration 5.0 版本引入了新过滤器以支持此机制。 文件系统 (FileSystemMarkerFilePresentFileListFilter)、FTPSFTP 均提供了相应实现。 这些过滤器可配置为允许标记文件使用任意名称,尽管它通常与正在传输的文件相关。 有关更多信息,请参阅 Javadocspring-doc.cadn.net.cn

写入文件

要将消息写入文件系统,您可以使用 FileWritingMessageHandler。 此类可处理以下有效负载类型:spring-doc.cadn.net.cn

对于字符串负载,您可以配置编码和字符集。spring-doc.cadn.net.cn

为了简化操作,您可以使用 XML 命名空间将 FileWritingMessageHandler 配置为出站通道适配器或出站网关的一部分。spring-doc.cadn.net.cn

从版本 4.3 开始,您可以指定写入文件时使用的缓冲区大小。spring-doc.cadn.net.cn

从版本 5.1 开始,您可以提供一个 BiConsumer<File, Message<?>> newFileCallback,当使用 FileExistsMode.APPENDFileExistsMode.APPEND_NO_FLUSH 且需要创建新文件时触发该回调。 此回调会接收一个新创建的文件以及触发它的消息。 例如,可以使用此回调将消息头中定义的 CSV 表头写入文件。spring-doc.cadn.net.cn

生成文件名

在最简单的形式下,FileWritingMessageHandler只需要一个用于写入文件的目的地目录。 要写入的文件名由处理程序的FileNameGenerator决定。 默认实现会查找其键与定义为FileHeaders.FILENAME的常量相匹配的消息头。spring-doc.cadn.net.cn

或者,您可以指定一个表达式,该表达式将针对消息进行求值以生成文件名——例如,headers['myCustomHeader'] + '.something'。 该表达式的结果必须为 String。 为方便起见,DefaultFileNameGenerator 还提供了 setHeaderName 方法,允许您显式指定要用作文件名的消息头。spring-doc.cadn.net.cn

一旦设置完成,DefaultFileNameGenerator将采用以下解析步骤来确定给定消息负载的文件名:spring-doc.cadn.net.cn

  1. 评估该表达式相对于消息的结果,如果结果为非空的 String,则将其用作文件名。spring-doc.cadn.net.cn

  2. 否则,如果负载是 java.io.File,则使用 File 对象的文件名。spring-doc.cadn.net.cn

  3. 否则,使用附加了 .msg 的消息 ID 作为文件名。spring-doc.cadn.net.cn

当您使用 XML 命名空间支持时,文件出站通道适配器(file outbound channel adapter)和文件出站网关(file outbound gateway)均支持以下互斥的配置属性:spring-doc.cadn.net.cn

写入文件时,会使用临时文件后缀(默认为.writing)。 在文件写入过程中,该后缀会附加到文件名之后。 若要自定义该后缀,您可以在文件出站通道适配器(file outbound channel adapter)和文件出站网关(file outbound gateway)上设置temporary-file-suffix属性。spring-doc.cadn.net.cn

当使用 APPEND 文件 mode 时,temporary-file-suffix 属性会被忽略,因为数据是直接追加到文件中的。

从版本 4.2.5 开始,生成的文件名(作为filename-generatorfilename-generator-expression求值的结果)可以表示子路径与目标文件名的组合。 它继续作为File(File parent, String child)的第二个构造函数参数使用。 然而,过去我们并未为子路径创建mkdirs()目录,而仅假设了文件名。 这种方法在需要恢复文件系统树以匹配源目录的情况下非常有用——例如,在解压归档文件并将所有文件按原始顺序保存到目标目录时。spring-doc.cadn.net.cn

指定输出目录

文件出站通道适配器与文件出站网关均提供两个互斥的配置属性,用于指定输出目录:spring-doc.cadn.net.cn

Spring Integration 2.2 引入了 directory-expression 属性。
使用directory属性

当您使用 directory 属性时,输出目录被设置为一个固定值,该值在 FileWritingMessageHandler 初始化时设定。 如果您未指定此属性,则必须使用 directory-expression 属性。spring-doc.cadn.net.cn

使用directory-expression属性

如果您想要完整的 SpEL 支持,可以使用 directory-expression 属性。 该属性接受一个 SpEL 表达式,该表达式会在处理每条消息时进行求值。 因此,当动态指定输出文件目录时,您可以完全访问消息的负载(payload)及其头部信息。spring-doc.cadn.net.cn

SpEL 表达式必须解析为 Stringjava.io.Fileorg.springframework.core.io.Resource。 (后者无论如何都会被评估为 File。) 此外,生成的 StringFile 必须指向一个目录。 如果您未指定 directory-expression 属性,则必须设置 directory 属性。spring-doc.cadn.net.cn

使用auto-create-directory属性

默认情况下,如果目标目录不存在,则会自动创建相应的目标目录以及任何不存在的父目录。 若要阻止此行为,您可以将 auto-create-directory 属性设置为 false。 此属性同时适用于 directorydirectory-expression 属性。spring-doc.cadn.net.cn

当使用 directory 属性且 auto-create-directoryfalse 时,自 Spring Integration 2.2 起进行了以下更改:spring-doc.cadn.net.cn

不再在适配器初始化时检查目标目录是否存在,而是改为在处理每条消息时进行检查。spring-doc.cadn.net.cn

此外,如果 auto-create-directorytrue 且在消息处理之间目录被删除,则每个正在处理的消息都会重新创建该目录。spring-doc.cadn.net.cn

处理现有目标文件

当您写入文件且目标文件已存在时,默认行为是覆盖该目标文件。 您可以通过在相关的文件出站组件上设置 mode 属性来更改此行为。 存在以下选项:spring-doc.cadn.net.cn

Spring Integration 2.2 引入了 mode 属性以及 APPENDFAILIGNORE 选项。
REPLACE

如果目标文件已存在,则会被覆盖。 如果未指定 mode 属性,则在写入文件时这是默认行为。spring-doc.cadn.net.cn

REPLACE_IF_MODIFIED

如果目标文件已存在,仅当最后修改时间戳与源文件不同时才会覆盖它。 对于 File 负载,将 lastModified 次的时间与现有文件进行比较。 对于其他负载,将 FileHeaders.SET_MODIFIED (file_setModified) 头与现有文件进行比较。 如果该头缺失或值不是 Number,则始终替换该文件。spring-doc.cadn.net.cn

APPEND

此模式允许您将消息内容追加到现有文件中,而不是每次都创建新文件。 请注意,此属性与 temporary-file-suffix 属性互斥,因为当它将内容追加到现有文件时,适配器不再使用临时文件。 每条消息处理完毕后,文件将被关闭。spring-doc.cadn.net.cn

APPEND_NO_FLUSH

此选项具有与APPEND相同的语义,但在每条消息后不会刷新数据且不会关闭文件。 这可以在发生故障导致数据丢失的风险下提供显著的性能提升。 有关更多信息,请参阅使用APPEND_NO_FLUSH时刷新文件spring-doc.cadn.net.cn

FAIL

如果目标文件存在,将抛出 MessageHandlingExceptionspring-doc.cadn.net.cn

IGNORE

如果目标文件已存在,消息负载将被静默忽略。spring-doc.cadn.net.cn

当使用临时文件后缀(默认为.writing)时,如果最终文件名或临时文件名已存在,则IGNORE选项适用。

使用 Spring 框架时刷新文件APPEND_NO_FLUSH

APPEND_NO_FLUSH模式是在版本4.3中引入的。 使用它可以提高性能,因为文件在每条消息后不会被关闭。 然而,这可能会导致在发生故障时数据丢失。spring-doc.cadn.net.cn

Spring Integration 提供了多种刷新策略以减轻数据丢失:spring-doc.cadn.net.cn

  • 使用 flushInterval。 如果在此时间段内未写入文件,则会自动刷新。 这只是一个近似值,最多可能达到 1.33x 次(平均为 1.167x)。spring-doc.cadn.net.cn

  • 向消息处理器的trigger方法发送包含正则表达式的消息。 匹配该模式的绝对路径文件将被刷新。spring-doc.cadn.net.cn

  • 提供处理程序的一个自定义 MessageFlushPredicate 实现,以修改当消息发送到 trigger 方法时所采取的操作。spring-doc.cadn.net.cn

  • 通过传入自定义的 flushIfNeededFileWritingMessageHandler.MessageFlushPredicate 实现来调用处理器的 FileWritingMessageHandler.FlushPredicate 方法之一。spring-doc.cadn.net.cn

谓词会对每个打开的文件进行调用。 有关这些接口的更多信息,请参阅 Javadoc。 请注意,自 5.0 版本起,谓词方法提供了另一个参数:如果是新文件或之前已关闭的文件,则为其首次写入的时间。spring-doc.cadn.net.cn

当使用 flushInterval 时,间隔从最后一次写入开始计算。 仅当文件在间隔时间内处于空闲状态时才会进行刷新。 从版本 4.3.7 开始,可以设置一个额外的属性 (flushWhenIdle) 为 false,表示间隔从对之前已刷新(或新建)文件的**首次**写入开始计算。spring-doc.cadn.net.cn

文件时间戳

默认情况下,目标文件的 lastModified 时间戳为文件创建的时间(就地重命名除外,此时保留当前时间戳)。 从 4.3 版本开始,您可以配置 preserve-timestamp(在使用 Java 配置时则为 setPreserveTimestamp(true))。 对于 File 类型的负载,这会将时间戳从入站文件传输到出站文件(无论是否需要复制)。 对于其他类型的负载,如果存在 FileHeaders.SET_MODIFIED 头信息(file_setModified),则使用该头信息来设置目标文件的 lastModified 时间戳,前提是该头信息的值为 Numberspring-doc.cadn.net.cn

文件权限

从版本 5.0 开始,当向支持 POSIX 权限的文件系统写入文件时,您可以在出站通道适配器或网关中指定这些权限。 该属性是一个整数,通常以熟悉的八进制格式提供——例如,0640,表示所有者具有读/写权限,组具有只读权限,其他人没有访问权限。spring-doc.cadn.net.cn

文件出站通道适配器

以下示例配置了一个文件出站通道适配器:spring-doc.cadn.net.cn

<int-file:outbound-channel-adapter id="filesOut" directory="${input.directory.property}"/>

基于命名空间的配置还支持 delete-source-files 属性。 如果设置为 true,它将在写入目标后触发删除原始源文件。 该标志的默认值为 false。 以下示例展示了如何将其设置为 truespring-doc.cadn.net.cn

<int-file:outbound-channel-adapter id="filesOut"
    directory="${output.directory}"
    delete-source-files="true"/>
delete-source-files属性仅在入站消息包含File负载,或者FileHeaders.ORIGINAL_FILE头值包含源File实例或代表原始文件路径的String时才会生效。

从版本 4.2 开始,FileWritingMessageHandler支持一个append-new-line选项。 如果设置为true,则在写入消息后会在文件末尾追加一个新行。 默认属性值为false。 以下示例展示了如何使用append-new-line选项:spring-doc.cadn.net.cn

<int-file:outbound-channel-adapter id="newlineAdapter"
	append-new-line="true"
    directory="${output.directory}"/>

出站网关

在需要根据已写入的文件继续处理消息的情况下,您可以使用outbound-gateway代替。 它起到与outbound-channel-adapter类似的作用。 不过,在写入文件后,它还会将文件作为消息的负载发送到回复通道。spring-doc.cadn.net.cn

以下示例配置了一个出站网关:spring-doc.cadn.net.cn

<int-file:outbound-gateway id="mover" request-channel="moveInput"
    reply-channel="output"
    directory="${output.directory}"
    mode="REPLACE" delete-source-files="true"/>

如前所述,您还可以指定 mode 属性,该属性定义了当目标文件已存在时如何处理这种情况。 有关更多详细信息,请参阅 处理已存在的目标文件。 通常,在使用文件出站网关时,结果文件会作为回复通道上的消息负载返回。spring-doc.cadn.net.cn

这也适用于指定 IGNORE 模式的情况。 在这种情况下,将返回预先存在的目标文件。 如果请求消息的有效载荷是一个文件,您仍然可以通过消息头访问该原始文件。 请参阅 FileHeaders.ORIGINAL_FILEspring-doc.cadn.net.cn

'outbound-gateway'在您需要先移动文件,然后将其通过处理管道的情况下工作良好。 在这种情况下,您可以将文件命名空间的inbound-channel-adapter元素连接到outbound-gateway,然后将该网关的reply-channel连接到管道的开头。

如果您有更复杂的需求,或者需要支持额外的负载类型作为输入以转换为文件内容,您可以扩展 FileWritingMessageHandler,但更好的选择是依赖一个 Transformerspring-doc.cadn.net.cn

使用 Java 配置进行配置

以下 Spring Boot 应用示例展示了如何使用 Java 配置来配置入站适配器:spring-doc.cadn.net.cn

@SpringBootApplication
@IntegrationComponentScan
public class FileWritingJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                      new SpringApplicationBuilder(FileWritingJavaApplication.class)
                              .web(false)
                              .run(args);
             MyGateway gateway = context.getBean(MyGateway.class);
             gateway.writeToFile("foo.txt", new File(tmpDir.getRoot(), "fileWritingFlow"), "foo");
    }

    @Bean
    @ServiceActivator(inputChannel = "writeToFileChannel")
    public MessageHandler fileWritingMessageHandler() {
         Expression directoryExpression = new SpelExpressionParser().parseExpression("headers.directory");
         FileWritingMessageHandler handler = new FileWritingMessageHandler(directoryExpression);
         handler.setFileExistsMode(FileExistsMode.APPEND);
         return handler;
    }

    @MessagingGateway(defaultRequestChannel = "writeToFileChannel")
    public interface MyGateway {

        void writeToFile(@Header(FileHeaders.FILENAME) String fileName,
                       @Header(FileHeaders.FILENAME) File directory, String data);

    }
}

使用 Java DSL 进行配置

以下 Spring Boot 应用程序示例展示了如何使用 Java DSL 配置入站适配器:spring-doc.cadn.net.cn

@SpringBootApplication
public class FileWritingJavaApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext context =
                 new SpringApplicationBuilder(FileWritingJavaApplication.class)
                         .web(false)
                         .run(args);
        MessageChannel fileWritingInput = context.getBean("fileWritingInput", MessageChannel.class);
        fileWritingInput.send(new GenericMessage<>("foo"));
    }

    @Bean
   	public IntegrationFlow fileWritingFlow() {
   	    return IntegrationFlow.from("fileWritingInput")
   		        .enrichHeaders(h -> h.header(FileHeaders.FILENAME, "foo.txt")
   		                  .header("directory", new File(tmpDir.getRoot(), "fileWritingFlow")))
   	            .handle(Files.outboundGateway(m -> m.getHeaders().get("directory")))
   	            .channel(MessageChannels.queue("fileWritingResultChannel"))
   	            .get();
    }

}

文件转换器

To transform data read from the file system to objects and the other way around, you need to do some work. Unlike FileReadingMessageSource and to a lesser extent FileWritingMessageHandler, you probably need your own mechanism to get the job done. For this, you can implement the Transformer interface. Alternatively, you can extend the AbstractFilePayloadTransformer for inbound messages. Spring Integration provides some obvious implementations.spring-doc.cadn.net.cn

查看 Javadoc for the Transformer interface 以了解哪些 Spring Integration 类实现了它。 同样,您可以检查 Javadoc for the AbstractFilePayloadTransformer class 以了解哪些 Spring Integration 类扩展了它。spring-doc.cadn.net.cn

FileToByteArrayTransformer 继承自 AbstractFilePayloadTransformer,并通过 Spring 的 FileCopyUtilsFile 对象转换为 byte[]。 通常使用一系列转换器比将所有转换逻辑放在单个类中更好。 在这种情况下,从 Filebyte[] 的转换可能是一个逻辑上的第一步。spring-doc.cadn.net.cn

FileToStringTransformer 扩展 AbstractFilePayloadTransformer,将 File 对象转换为 String。 如果没有其他用途,这在调试时可能很有用(建议与 线束拦截器 配合使用)。spring-doc.cadn.net.cn

要配置特定文件的转换器,您可以使用来自 file 命名空间的相应元素,如下例所示:spring-doc.cadn.net.cn

<int-file:file-to-bytes-transformer  input-channel="input" output-channel="output"
    delete-files="true"/>

<int-file:file-to-string-transformer input-channel="input" output-channel="output"
    delete-files="true" charset="UTF-8"/>

delete-files选项向转换器发出信号,指示它在转换完成后删除传入文件。 这绝不能替代在FileReadingMessageSource被用于多线程环境(例如当您通常使用Spring Integration时)时使用AcceptOnceFileListFilterspring-doc.cadn.net.cn

文件分割器

FileSplitter 在版本 4.1.2 中引入,其命名空间支持在版本 4.2 中添加。 FileSplitter 基于 BufferedReader.readLine() 将文本文件拆分为单独的行。 默认情况下,拆分器使用 Iterator 按顺序逐行发射从文件中读取的行。 将 iterator 属性设置为 false 会导致它在发射消息之前将所有行读入内存。 这种用法的场景可能是:如果您希望在发送包含行的任何消息之前检测文件上的 I/O 错误。 然而,它仅适用于相对较短的文件。spring-doc.cadn.net.cn

入站负载可以是FileString(一个File路径)、InputStreamReader。 其他类型的负载将按原样发出。spring-doc.cadn.net.cn

以下列表展示了配置 FileSplitter 的可能方式:spring-doc.cadn.net.cn

Java DSL
@SpringBootApplication
public class FileSplitterApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(FileSplitterApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public IntegrationFlow fileSplitterFlow() {
        return IntegrationFlow
            .from(Files.inboundAdapter(tmpDir.getRoot())
                 .filter(new ChainFileListFilter<File>()
                        .addFilter(new AcceptOnceFileListFilter<>())
                        .addFilter(new ExpressionFileListFilter<>(
                             new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
            .split(Files.splitter()
                     .markers()
                     .charset(StandardCharsets.US_ASCII)
                     .firstLineAsHeader("fileHeader")
                     .applySequence(true))
            .channel(c -> c.queue("fileSplittingResultChannel"))
            .get();
    }

}
Kotlin DSL
@Bean
fun fileSplitterFlow() =
    integrationFlow(
        Files.inboundAdapter(tmpDir.getRoot())
            .filter(
                ChainFileListFilter<File?>()
                    .addFilter(AcceptOnceFileListFilter())
                    .addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
            )
    ) {
        split(
            Files.splitter()
                .markers()
                .charset(StandardCharsets.US_ASCII)
                .firstLineAsHeader("fileHeader")
                .applySequence(true)
        )
        channel { queue("fileSplittingResultChannel") }
    }
Java
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
    FileSplitter splitter = new FileSplitter(true, true);
    splitter.setApplySequence(true);
    splitter.setOutputChannel(outputChannel);
    return splitter;
}
XML
<int-file:splitter id="splitter" (1)
    iterator=""                  (2)
    markers=""                   (3)
    markers-json=""              (4)
    apply-sequence=""            (5)
    requires-reply=""            (6)
    charset=""                   (7)
    first-line-as-header=""      (8)
    input-channel=""             (9)
    output-channel=""            (10)
    send-timeout=""              (11)
    auto-startup=""              (12)
    order=""                     (13)
    phase="" />                  (14)
1 分裂器的 Bean 名称。
2 设置为 true(默认值)以使用迭代器,或设置为 false 在发送行之前将文件加载到内存中。
3 设置为 true 可在文件数据之前和之后发出文件开始和结束标记消息。 标记是负载为 FileSplitter.FileMarker 的消息(其中 mark 属性包含 STARTEND 值)。 当在下游流程中顺序处理文件且某些行被过滤时,您可能需要使用这些标记。 它们使下游处理能够知道文件是否已完全处理。 此外,还会向这些消息添加一个包含 STARTENDfile_marker 头信息。 END 标记包含行数计数。 如果文件为空,则仅发出 STARTEND 标记,并将 0 作为 lineCount。 默认值为 false。 当 true 时,apply-sequence 默认为 false。 另请参阅 markers-json(下一个属性)。
4 markers为true时,将此设置为true,以便将FileMarker对象转换为JSON字符串。 (底层使用SimpleJsonSerializer)。
5 设置为 false 可禁用在消息中包含 sequenceSizesequenceNumber 头信息。 默认值为 true,除非 markerstrue。 当 truemarkerstrue 时,标记将包含在序列中。 当 trueiteratortrue 时,sequenceSize 头信息将被设置为 0,因为大小未知。
6 设置为 true 会在文件中没有行时抛出 RequiresReplyException。 默认值为 false
7 设置用于将文本数据读取到String有效载荷时的字符集名称。 默认值为平台字符集。
8 作为消息中剩余行携带的标头的第一个行的标头名称。 自 5.0 版本起。
9 设置用于向拆分器发送消息的输入通道。
10 设置发送消息的输出通道。
11 设置发送超时时间。 仅当 output-channel 可能阻塞时适用,例如 QueueChannel 已满时。
12 设置为 false 可禁用在上下文刷新时自动启动分割器。 默认值为 true
13 如果 input-channel<publish-subscribe-channel/>,请设置此端点的顺序。
14 设置分裂器的启动阶段(当 auto-startuptrue 时使用)。

The FileSplitter 还会将任何基于文本的 InputStream 拆分为行。 从 4.3 版本开始,当与使用 stream 选项检索文件的 FTP 或 SFTP 流式传入通道适配器,或使用该选项的 FTP 或 SFTP 传出网关配合使用时,拆分器会在文件完全消耗后自动关闭支持该流的会话。 有关这些功能的更多信息,请参阅 FTP 流式传入通道适配器SFTP 流式传入通道适配器,以及 FTP 传出网关SFTP 传出网关spring-doc.cadn.net.cn

使用 Java 配置时,可以使用额外的构造函数,如下例所示:spring-doc.cadn.net.cn

public FileSplitter(boolean iterator, boolean markers, boolean markersJson)

markersJson 为 true 时,标记将表示为 JSON 字符串(使用 SimpleJsonSerializer)。spring-doc.cadn.net.cn

版本 5.0 引入了 firstLineAsHeader 选项,用于指定内容的第一行为标题(例如 CSV 文件中的列名)。 传递给此属性的参数是:在发出的消息中,第一行将作为标题被携带的标题名称,该标题适用于剩余的行。 该行不会包含在序列标题中(如果 applySequence 为 true),也不会包含在与 FileMarker.END 关联的 lineCount 中。 注意:从版本 5.5 开始,lineCount 也会作为 FileHeaders.LINE_COUNT 包含在 FileMarker.END 消息的标题中,因为 FileMarker 可能会被序列化为 JSON。 如果文件仅包含标题行,则该文件被视为空,因此在拆分时只会发出 FileMarker 个实例(如果启用了标记符;否则,不会发出任何消息)。 默认情况下(如果未设置标题名称),第一行被视为数据,并成为第一个发出消息的有效负载(payload)。spring-doc.cadn.net.cn

如果您需要从文件内容中提取更复杂的逻辑(不是第一行,不是整行内容,也不是某个特定头信息,等等),请在 FileSplitter 之前考虑使用 header enricher。 请注意,已移至头信息的行可能会在正常内容处理流程的下游被过滤掉。spring-doc.cadn.net.cn

拆分文件的幂等下游处理

apply-sequence 为 true 时,拆分器会在 SEQUENCE_NUMBER 标题中添加行号(当 markers 为 true 时,标记被计为行)。 该行号可与 幂等接收器 配合使用,以避免在重启后重新处理行。spring-doc.cadn.net.cn

@Bean
public ConcurrentMetadataStore store() {
    return new ZookeeperMetadataStore();
}

@Bean
public MetadataStoreSelector selector() {
    return new MetadataStoreSelector(
            message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
                    .getAbsolutePath(),
            message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
                    .toString(),
            store())
                    .compareValues(
                            (oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}

@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(selector());
}

@Bean
public IntegrationFlow flow() {
    ...
    .split(new FileSplitter())
    ...
    .handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
    ...
}

文件聚合器

从版本 5.5 开始,引入了FileAggregator以覆盖当启用 START/END 标记时FileSplitter用例的另一侧。 为方便起见,FileAggregator实现了所有三种序列详情策略:spring-doc.cadn.net.cn

  • 带有 HeaderAttributeCorrelationStrategy 属性的 FileHeaders.FILENAME 用于关联键计算。 当在 FileSplitter 上启用标记时,它不会填充序列详细信息头,因为 START/END 标记消息也包含在序列大小中。 对于每一行输出(包括 START/END 标记消息),FileHeaders.FILENAME 仍会被填充。spring-doc.cadn.net.cn

  • FileMarkerReleaseStrategy - 检查组中是否存在 FileSplitter.FileMarker.Mark.END 消息,然后将 FileHeaders.LINE_COUNT 头部的值与组大小减去 2 - FileSplitter.FileMarker 个实例进行比较。 它还实现了一个便捷的 GroupConditionProvider 联系人,用于在 AbstractCorrelatingMessageHandler 中调用 conditionSupplier 函数。 有关更多信息,请参阅 消息组条件spring-doc.cadn.net.cn

  • FileAggregatingMessageGroupProcessor 仅会从组中移除 FileSplitter.FileMarker 条消息,并将其余消息收集到列表负载中以进行生产。spring-doc.cadn.net.cn

以下列表展示了配置 FileAggregator 的可能方式:spring-doc.cadn.net.cn

Java DSL
@Bean
public IntegrationFlow fileSplitterAggregatorFlow(TaskExecutor taskExecutor) {
    return f -> f
            .split(Files.splitter()
                    .markers()
                    .firstLineAsHeader("firstLine"))
            .channel(c -> c.executor(taskExecutor))
            .filter(payload -> !(payload instanceof FileSplitter.FileMarker),
                    e -> e.discardChannel("aggregatorChannel"))
            .<String, String>transform(String::toUpperCase)
            .channel("aggregatorChannel")
            .aggregate(new FileAggregator())
            .channel(c -> c.queue("resultChannel"));
}
Kotlin DSL
@Bean
fun fileSplitterAggregatorFlow(taskExecutor: TaskExecutor?) =
    integrationFlow {
        split(Files.splitter().markers().firstLineAsHeader("firstLine"))
        channel { executor(taskExecutor) }
        filter<Any>({ it !is FileMarker }) { discardChannel("aggregatorChannel") }
        transform(String::toUpperCase)
        channel("aggregatorChannel")
        aggregate(FileAggregator())
        channel { queue("resultChannel") }
    }
Java
@serviceActivator(inputChannel="toAggregateFile")
@Bean
public AggregatorFactoryBean fileAggregator() {
    AggregatorFactoryBean aggregator = new AggregatorFactoryBean();
    aggregator.setProcessorBean(new FileAggregator());
    aggregator.setOutputChannel(outputChannel);
    return aggregator;
}
XML
<int:chain input-channel="input" output-channel="output">
    <int-file:splitter markers="true"/>
    <int:aggregator>
        <bean class="org.springframework.integration.file.aggregator.FileAggregator"/>
    </int:aggregator>
</int:chain>

如果 FileAggregator 的默认行为不满足目标逻辑,建议配置一个聚合器端点并指定独立的策略。 有关更多信息,请参阅 FileAggregator JavaDocs。spring-doc.cadn.net.cn

远程持久化文件列表过滤器

入站和流式传输入站远程文件通道适配器(FTPSFTP及其他技术)默认使用相应的AbstractPersistentFileListFilter实现进行配置,并配备内存中的MetadataStore。 要在集群中运行,可以使用共享的MetadataStore替换这些过滤器(有关更多信息,请参见元数据存储)。 这些过滤器用于防止多次获取同一文件(除非其修改时间发生变化)。 从版本 5.2 开始,在获取文件之前立即将文件添加到过滤器中(如果获取失败则撤销该操作)。spring-doc.cadn.net.cn

在发生灾难性故障(例如断电)的情况下,当前正在获取的文件可能会保留在过滤器中,并且在重启应用程序时不会被重新获取。 在这种情况下,您需要手动从MetadataStore中移除该文件。

在之前的版本中,文件是在获取任何文件之前进行过滤的,这意味着在发生灾难性故障后,可能会有多个文件处于这种状态。spring-doc.cadn.net.cn

为了便于实现这一新行为,已在 FileListFilter 中添加了两个新方法。spring-doc.cadn.net.cn

boolean accept(F file);

boolean supportsSingleFileFiltering();

如果过滤器在 supportsSingleFileFiltering 中返回 true,则它必须实现 accept()spring-doc.cadn.net.cn

如果远程过滤器不支持单文件过滤(例如 AbstractMarkerFilePresentFileListFilter),则适配器将回退到之前的行为。spring-doc.cadn.net.cn

如果使用了多个过滤器(使用 CompositeFileListFilterChainFileListFilter),那么所有委托过滤器都必须支持单文件过滤,复合过滤器才能支持该功能。spring-doc.cadn.net.cn

持久化文件列表过滤器现在有一个布尔属性 forRecursion。 将此属性设置为 true,也会设置 alwaysAcceptDirectories,这意味着对外部网关(lsmget)的递归操作现在将每次遍历完整的目录树。 这是为了解决目录树深处更改未被检测到的问题。 此外,forRecursion=true 会导致使用文件的完整路径作为元数据存储键;这解决了如果同一名称的文件出现在不同目录中多次时过滤器无法正常工作的问题。 重要提示:这意味着持久化元数据存储中的现有键将无法在顶层目录下的文件中找到。 因此,该属性默认值为 false;此行为可能在未来的版本中发生变化。spring-doc.cadn.net.cn