对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
流支持
在许多情况下,应用程序数据是从流中获取的。 不建议将对流的引用作为消息有效负载发送给使用者。 相反,消息是根据从输入流读取的数据创建的,消息有效负载将逐个写入输出流。
您需要将此依赖项包含在您的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-stream</artifactId>
<version>6.3.11</version>
</dependency>
compile "org.springframework.integration:spring-integration-stream:6.3.11"
从流读取
Spring Integration 为流提供了两个适配器。
双ByteStreamReadingMessageSource
和CharacterStreamReadingMessageSource
实现MessageSource
.
通过在通道适配器元素中配置其中一个,可以配置轮询周期,消息总线可以自动检测和调度它们。
字节流版本需要InputStream
,并且字符流版本需要Reader
作为单个构造函数参数。
这ByteStreamReadingMessageSource
还接受“bytesPerMessage”属性来确定它尝试读取每个字节的字节数Message
.
默认值为1024
.
以下示例创建一个输入流,该输入流创建每个包含 2048 字节的消息:
<bean class="org.springframework.integration.stream.ByteStreamReadingMessageSource">
<constructor-arg ref="someInputStream"/>
<property name="bytesPerMessage" value="2048"/>
</bean>
<bean class="org.springframework.integration.stream.CharacterStreamReadingMessageSource">
<constructor-arg ref="someReader"/>
</bean>
这CharacterStreamReadingMessageSource
将阅读器包装在BufferedReader
(如果还没有)。
您可以在第二个构造函数参数中设置缓冲读取器使用的缓冲区大小。
从版本 5.0 开始,第三个构造函数参数 (blockToDetectEOF
) 控制CharacterStreamReadingMessageSource
.
什么时候false
(默认值)、receive()
方法检查读取器是否ready()
如果不是,则返回 null。
在这种情况下,未检测到 EOF(文件末尾)。
什么时候true
这receive()
方法会阻塞,直到数据可用或在基础流上检测到 EOF。
检测到 EOF 时,一个StreamClosedEvent
(应用程序事件)已发布。
您可以将此事件与实现ApplicationListener<StreamClosedEvent>
.
为了便于EOF检测,轮询器线程在receive() 方法,直到数据到达或检测到 EOF 为止。 |
检测到 EOF 后,轮询程序将继续在每次轮询上发布事件。
应用程序侦听器可以停止适配器以防止这种情况。
该事件发布在轮询器线程上。
停止适配器会导致线程中断。
如果打算在停止适配器后执行某些可中断任务,则必须执行stop() 在不同的线程上,或者对这些下游活动使用不同的线程。
请注意,发送到QueueChannel 是可中断的,因此,如果您希望从侦听器发送消息,请在停止适配器之前执行此作。 |
这有助于将数据“管道”或重定向到stdin
,如以下两个示例所示:
cat myfile.txt | java -jar my.jar
java -jar my.jar < foo.txt
这种方法允许应用程序在管道关闭时停止。
有四种方便的工厂方法可供选择:
public static final CharacterStreamReadingMessageSource stdin() { ... }
public static final CharacterStreamReadingMessageSource stdin(String charsetName) { ... }
public static final CharacterStreamReadingMessageSource stdinPipe() { ... }
public static final CharacterStreamReadingMessageSource stdinPipe(String charsetName) { ... }
写入流
对于目标流,可以使用以下两种实现之一:ByteStreamWritingMessageHandler
或CharacterStreamWritingMessageHandler
.
每个都需要一个构造函数参数 (OutputStream
对于字节流或Writer
对于字符流),并且每个都提供了第二个构造函数,该构造函数添加了可选的“bufferSize”。
由于这两者最终都实现了MessageHandler
接口,您可以从channel-adapter
配置,如通道适配器中所述。
<bean class="org.springframework.integration.stream.ByteStreamWritingMessageHandler">
<constructor-arg ref="someOutputStream"/>
<constructor-arg value="1024"/>
</bean>
<bean class="org.springframework.integration.stream.CharacterStreamWritingMessageHandler">
<constructor-arg ref="someWriter"/>
</bean>
流命名空间支持
Spring Integration定义了一个命名空间,以减少与流相关的通道适配器所需的配置。 使用它需要以下架构位置:
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xsi:schemaLocation="http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration/stream
https://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd">
以下代码片段显示了支持配置入站通道适配器的不同配置选项:
<int-stream:stdin-channel-adapter id="adapterWithDefaultCharset"/>
<int-stream:stdin-channel-adapter id="adapterWithProvidedCharset" charset="UTF-8"/>
从 5.0 版开始,您可以将detect-eof
属性,该属性将blockToDetectEOF
财产。
有关详细信息,请参阅从流读取。
若要配置出站通道适配器,也可以使用命名空间支持。 以下示例显示了出站通道适配器的不同配置:
<int-stream:stdout-channel-adapter id="stdoutAdapterWithDefaultCharset"
channel="testChannel"/>
<int-stream:stdout-channel-adapter id="stdoutAdapterWithProvidedCharset" charset="UTF-8"
channel="testChannel"/>
<int-stream:stderr-channel-adapter id="stderrAdapter" channel="testChannel"/>
<int-stream:stdout-channel-adapter id="newlineAdapter" append-newline="true"
channel="testChannel"/>