文件拆分器
这FileSplitter在版本 4.1.2 中添加,其命名空间支持在版本 4.2 中添加。
这FileSplitter将文本文件拆分为单独的行,基于BufferedReader.readLine().
默认情况下,拆分器使用Iterator在从文件中读取行时一次发出一行。
设置iteratorproperty 设置为false使它在将所有行作为消息发出之前将其读入内存。
一个用例可能是,您希望在发送任何包含行的消息之前检测文件上的 I/O 错误。
但是,它仅适用于相对较短的文件。
入站有效负载可以是File,String(一个Filepath)、InputStream或Reader.
其他负载类型保持不变。
以下清单显示了配置FileSplitter:
- 
Java DSL 
- 
Kotlin DSL 
- 
Java 
- 
XML 
@SpringBootApplication
public class FileSplitterApplication {
    public static void main(String[] args) {
        new SpringApplicationBuilder(FileSplitterApplication.class)
            .web(false)
            .run(args);
    }
    @Bean
    public IntegrationFlow fileSplitterFlow() {
        return IntegrationFlow
            .from(Files.inboundAdapter(tmpDir.getRoot())
                 .filter(new ChainFileListFilter<File>()
                        .addFilter(new AcceptOnceFileListFilter<>())
                        .addFilter(new ExpressionFileListFilter<>(
                             new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
            .split(Files.splitter()
                     .markers()
                     .charset(StandardCharsets.US_ASCII)
                     .firstLineAsHeader("fileHeader")
                     .applySequence(true))
            .channel(c -> c.queue("fileSplittingResultChannel"))
            .get();
    }
}@Bean
fun fileSplitterFlow() =
    integrationFlow(
        Files.inboundAdapter(tmpDir.getRoot())
            .filter(
                ChainFileListFilter<File?>()
                    .addFilter(AcceptOnceFileListFilter())
                    .addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
            )
    ) {
        split(
            Files.splitter()
                .markers()
                .charset(StandardCharsets.US_ASCII)
                .firstLineAsHeader("fileHeader")
                .applySequence(true)
        )
        channel { queue("fileSplittingResultChannel") }
    }@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
    FileSplitter splitter = new FileSplitter(true, true);
    splitter.setApplySequence(true);
    splitter.setOutputChannel(outputChannel);
    return splitter;
}<int-file:splitter id="splitter" (1)
    iterator=""                  (2)
    markers=""                   (3)
    markers-json=""              (4)
    apply-sequence=""            (5)
    requires-reply=""            (6)
    charset=""                   (7)
    first-line-as-header=""      (8)
    input-channel=""             (9)
    output-channel=""            (10)
    send-timeout=""              (11)
    auto-startup=""              (12)
    order=""                     (13)
    phase="" />                  (14)| 1 | splitter 的 bean 名称。 | 
| 2 | 设置为 true(默认值)来使用迭代器,或者false在发送行之前将文件加载到内存中。 | 
| 3 | 设置为 true在文件数据之前和之后发出文件开始和文件结束标记消息。
标记是带有FileSplitter.FileMarkerpayloads(使用START和END值中的mark属性)。
在筛选某些行的下游流中按顺序处理文件时,可以使用标记。
它们使下游处理能够知道文件何时被完全处理。
此外,一个file_marker标头,其中包含START或END已添加到这些消息中。
这ENDmarker 包括行数。
如果文件为空,则仅START和END标记使用0作为lineCount.
默认值为false.
什么时候true,apply-sequence是false默认情况下。
另请参阅markers-json(next 属性)。 | 
| 4 | 什么时候 markers为 true,请将此项设置为true以将FileMarker对象转换为 JSON 字符串。
(使用SimpleJsonSerializer下面)。 | 
| 5 | 设置为 false要禁用sequenceSize和sequenceNumber消息中的标头。
默认值为true除非markers是true.
什么时候true和markers是true,则标记将包含在排序中。
什么时候true和iterator是true这sequenceSizeheader 设置为0,因为大小未知。 | 
| 6 | 设置为 true以引起RequiresReplyException如果文件中没有行,则引发。
默认值为false. | 
| 7 | 设置将文本数据读入时使用的字符集名称 String负载。
默认值为 platform charset。 | 
| 8 | 在为其余行发出的消息中作为标题携带的第一行的标题名称。 从 5.0 版本开始。 | 
| 9 | 设置用于将消息发送到拆分器的输入通道。 | 
| 10 | 设置将消息发送到的输出通道。 | 
| 11 | 设置发送超时。
仅当 output-channelcan 阻止 — 例如完整的QueueChannel. | 
| 12 | 设置为 false以禁用在刷新上下文时自动启动 Splitter。
默认值为true. | 
| 13 | 如果 input-channel是一个<publish-subscribe-channel/>. | 
| 14 | 设置拆分器的启动阶段(在 auto-startup是true). | 
这FileSplitter还会拆分任何基于文本的InputStream转换为行。
从版本 4.3 开始,当与 FTP 或 SFTP 流入站通道适配器或者使用stream选项来检索文件,则当文件被完全消耗时,拆分器会自动关闭支持流的会话
有关这些工具的更多信息,请参阅 FTP Streaming Inbound Channel Adapter 和 SFTP Streaming Inbound Channel Adapter 以及 FTP Outbound Gateway 和 SFTP Outbound Gateway。
使用 Java 配置时,可以使用其他构造函数,如下例所示:
public FileSplitter(boolean iterator, boolean markers, boolean markersJson)什么时候markersJson为 true,则标记表示为 JSON 字符串(使用SimpleJsonSerializer).
版本 5.0 引入了firstLineAsHeader选项以指定内容的第一行是标题(如 CSV 文件中的列名)。
传递给此属性的参数是标头名称,在该名称下,第一行作为其余行发出的消息中的标头。
此行不包含在序列标头中(如果applySequence为 true),也不在lineCount关联FileMarker.END.
注意:从版本 5.5 开始,lineCount' 也作为FileHeaders.LINE_COUNT到FileMarker.END消息,由于FileMarker可以序列化为 JSON。
如果文件仅包含标题行,则该文件将被视为空,因此仅FileMarker在拆分期间发出实例(如果启用了标记 — 否则,不会发出任何消息)。
默认情况下(如果未设置标头名称),第一行被视为 data 并成为第一个发出的消息的有效负载。
如果您需要有关从文件内容中提取 Headers 的更复杂的逻辑(不是第一行,不是行的整个内容,也不是一个特定的 Header,等等),请考虑在FileSplitter.
请注意,已移动到标题的行可能会从正常内容流程的下游进行筛选。
幂等下游处理拆分文件
什么时候apply-sequence为 true,则拆分器会将SEQUENCE_NUMBER标头(当markers为 true,则标记计为行)。
该行号可与幂等接收器一起使用,以避免在重新启动后重新处理行。
例如:
@Bean
public ConcurrentMetadataStore store() {
    return new ZookeeperMetadataStore();
}
@Bean
public MetadataStoreSelector selector() {
    return new MetadataStoreSelector(
            message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
                    .getAbsolutePath(),
            message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
                    .toString(),
            store())
                    .compareValues(
                            (oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(selector());
}
@Bean
public IntegrationFlow flow() {
    ...
    .split(new FileSplitter())
    ...
    .handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
    ...
}