此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
文件拆分器
这FileSplitter
在 4.1.2 版中添加了,并且在 4.2 版中添加了其命名空间支持。
这FileSplitter
将文本文件拆分为单独的行,基于BufferedReader.readLine()
.
默认情况下,拆分器使用Iterator
在从文件中读取一行时一次发出一行。
设置iterator
属性设置为false
导致它在将所有行作为消息发出之前将其读取到内存中。
这方面的一个用例可能是,如果要在发送任何包含行的消息之前检测文件上的 I/O 错误。
但是,它仅适用于相对较短的文件。
入站有效负载可以是File
,String
(一个File
路径)、InputStream
或Reader
.
其他有效负载类型将保持不变。
以下列表显示了配置FileSplitter
:
-
Java DSL
-
Kotlin DSL
-
Java
-
XML
@SpringBootApplication
public class FileSplitterApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FileSplitterApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow fileSplitterFlow() {
return IntegrationFlow
.from(Files.inboundAdapter(tmpDir.getRoot())
.filter(new ChainFileListFilter<File>()
.addFilter(new AcceptOnceFileListFilter<>())
.addFilter(new ExpressionFileListFilter<>(
new FunctionExpression<File>(f -> "foo.tmp".equals(f.getName()))))))
.split(Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true))
.channel(c -> c.queue("fileSplittingResultChannel"))
.get();
}
}
@Bean
fun fileSplitterFlow() =
integrationFlow(
Files.inboundAdapter(tmpDir.getRoot())
.filter(
ChainFileListFilter<File?>()
.addFilter(AcceptOnceFileListFilter())
.addFilter(ExpressionFileListFilter(FunctionExpression { f: File? -> "foo.tmp" == f!!.name }))
)
) {
split(
Files.splitter()
.markers()
.charset(StandardCharsets.US_ASCII)
.firstLineAsHeader("fileHeader")
.applySequence(true)
)
channel { queue("fileSplittingResultChannel") }
}
@Splitter(inputChannel="toSplitter")
@Bean
public MessageHandler fileSplitter() {
FileSplitter splitter = new FileSplitter(true, true);
splitter.setApplySequence(true);
splitter.setOutputChannel(outputChannel);
return splitter;
}
<int-file:splitter id="splitter" (1)
iterator="" (2)
markers="" (3)
markers-json="" (4)
apply-sequence="" (5)
requires-reply="" (6)
charset="" (7)
first-line-as-header="" (8)
input-channel="" (9)
output-channel="" (10)
send-timeout="" (11)
auto-startup="" (12)
order="" (13)
phase="" /> (14)
1 | 拆分器的 bean 名称。 |
2 | 设置为true (默认)使用迭代器或false 在发送行之前将文件加载到内存中。 |
3 | 设置为true 在文件数据之前和之后发出文件开始和文件结束标记消息。
标记是带有FileSplitter.FileMarker 有效负载(使用START 和END 值mark 属性)。
在过滤某些行的下游流中按顺序处理文件时,可以使用标记。
它们使下游处理能够知道文件何时已完全处理。
此外,一个file_marker 包含START 或END 添加到这些消息中。
这END 标记包括行数。
如果文件为空,则仅START 和END 标记会以0 作为lineCount .
默认值为false .
什么时候true ,apply-sequence 是false 默认情况下。
也可以看看markers-json (下一个属性)。 |
4 | 什么时候markers 为 true,则将此设置为true 要拥有FileMarker 对象转换为 JSON 字符串。
(使用SimpleJsonSerializer 下面)。 |
5 | 设置为false 禁用包含sequenceSize 和sequenceNumber 报头。
默认值为true 除非markers 是true .
什么时候true 和markers 是true ,标记包含在测序中。
什么时候true 和iterator 是true 这sequenceSize header 设置为0 ,因为尺寸未知。 |
6 | 设置为true 导致RequiresReplyException 如果文件中没有行,则引发。
默认值为false . |
7 | 设置将文本数据读入String 负载。
默认值是平台字符集。 |
8 | 要作为其余行发出的邮件中的标头携带的第一行的标头名称。 从 5.0 版本开始。 |
9 | 设置用于向拆分器发送消息的输入通道。 |
10 | 设置将消息发送到的输出通道。 |
11 | 设置发送超时。
仅适用于output-channel 可以阻止 — 例如 fullQueueChannel . |
12 | 设置为false 以禁用在刷新上下文时自动启动拆分器。
默认值为true . |
13 | 设置此端点的顺序,如果input-channel 是一个<publish-subscribe-channel/> . |
14 | 设置分路器的启动阶段(在以下情况下使用auto-startup 是true ). |
这FileSplitter
还拆分任何基于文本的InputStream
成行。
从版本 4.3 开始,当与 FTP 或 SFTP 流式入站通道适配器或使用stream
选项检索文件时,拆分器会在文件完全使用时自动关闭支持流的会话
有关这些设施的详细信息,请参阅 FTP 流式处理入站通道适配器和 SFTP 流式处理入站通道适配器以及 FTP 出站网关和 SFTP 出站网关。
使用 Java 配置时,可以使用其他构造函数,如以下示例所示:
public FileSplitter(boolean iterator, boolean markers, boolean markersJson)
什么时候markersJson
为 true,则标记表示为 JSON 字符串(使用SimpleJsonSerializer
).
5.0 版本引入了firstLineAsHeader
选项来指定内容的第一行是标题(例如 CSV 文件中的列名)。
传递给此属性的参数是标头名称,在该标头名称下,第一行作为其余行发出的消息中的标头。
此行不包含在序列标头中(如果applySequence
为 true),也不在lineCount
关联FileMarker.END
.
注意:从版本 5.5 开始,lineCount' 也包含在FileHeaders.LINE_COUNT
到FileMarker.END
message,因为FileMarker
可以序列化为 JSON。
如果文件仅包含标题行,则该文件被视为空,因此仅FileMarker
在拆分期间发出实例(如果启用了标记,否则不会发出任何消息)。
默认情况下(如果未设置标头名称),则第一行被视为数据,并成为第一个发出的消息的有效负载。
如果您需要有关从文件内容中提取标头的更复杂的逻辑(不是第一行,不是行的全部内容,不是一个特定标头等),请考虑在FileSplitter
.
请注意,已移动到标题的行可能会从正常内容进程的下游进行过滤。
幂等下游处理拆分文件
什么时候apply-sequence
为 true,则拆分器在SEQUENCE_NUMBER
标头(当markers
为 true,则标记计为行)。
行号可以与幂等接收器一起使用,以避免在重新启动后重新处理行。
例如:
@Bean
public ConcurrentMetadataStore store() {
return new ZookeeperMetadataStore();
}
@Bean
public MetadataStoreSelector selector() {
return new MetadataStoreSelector(
message -> message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class)
.getAbsolutePath(),
message -> message.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER)
.toString(),
store())
.compareValues(
(oldVal, newVal) -> Integer.parseInt(oldVal) < Integer.parseInt(newVal));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
return new IdempotentReceiverInterceptor(selector());
}
@Bean
public IntegrationFlow flow() {
...
.split(new FileSplitter())
...
.handle("lineHandler", e -> e.advice(idempotentReceiverInterceptor()))
...
}