TCP 和 UDP 支持
TCP和UDP支持
Spring Integration 提供了用于通过互联网协议接收和发送消息的通道适配器。 同时提供了 UDP(用户数据报协议)和 TCP(传输控制协议)适配器。 每个适配器都支持在底层协议上进行单向通信。 此外,Spring Integration 还提供了简单的入站和出站 TCP 网关。 这些网关在需要双向通信时使用。
您需要将以下依赖项包含到您的项目中:
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-ip</artifactId>
<version>6.1.9</version>
</dependency>
compile "org.springframework.integration:spring-integration-ip:6.1.9"
介绍
提供了两种 UDP 入站和出站通道适配器:
-
UnicastSendingMessageHandler向单个目标发送数据报数据包。 -
UnicastReceivingChannelAdapter接收传入的数据报数据包。 -
MulticastSendingMessageHandler向组播地址发送(广播)数据报数据包。 -
MulticastReceivingChannelAdapter通过加入组播地址来接收传入的数据包。
提供 TCP 入站和出站通道适配器:
-
TcpSendingMessageHandler通过 TCP 发送消息。 -
TcpReceivingChannelAdapter通过 TCP 接收消息。
提供了一个入站 TCP 网关。 它支持简单的请求 - 响应处理。 虽然该网关可以支持任意数量的连接,但每个连接只能串行处理。 从套接字读取的线程在再次读取之前会等待并发送响应。 如果连接工厂配置为单用连接,则在套接字超时后关闭连接。
提供了一个出站 TCP 网关。 它支持简单的请求 - 响应处理。 如果关联的连接工厂配置为使用一次性连接,则每个新请求都会立即创建一个新的连接。 否则,如果连接正在使用中,调用线程将在该连接上阻塞,直到收到响应或发生超时/I/O 错误。
TCP 和 UDP 入站通道适配器以及 TCP 入站网关支持 error-channel 属性。
这提供了与 进入 GatewayProxyFactoryBean 中描述相同的基本功能。
UDP 适配器
本节介绍如何配置和使用 UDP 适配器。
出站 UDP 适配器(XML 配置)
以下示例配置了一个 UDP 出站通道适配器:
<int-ip:udp-outbound-channel-adapter id="udpOut"
host="somehost"
port="11111"
multicast="false"
socket-customizer="udpCustomizer"
channel="exampleChannel"/>
当将 multicast 设置为 true 时,您还应在 host 属性中提供组播地址。 |
UDP 是一种高效但不可靠的协议。
Spring Integration 添加了两个属性以提高可靠性:check-length 和 acknowledge。
当 check-length 设置为 true 时,适配器会在消息数据前添加一个长度字段(四个字节,采用网络字节序)。
这使得接收方能够验证接收到的数据包长度。
如果接收系统使用的缓冲区不足以容纳该数据包,数据包可能会被截断。
length 头提供了一个检测此情况的机制。
从版本 4.3 开始,您可以将 port 设置为 0,在这种情况下由操作系统选择端口。
适配器启动后,可以通过调用 getPort() 并在 isListening() 返回 true 时获取所选的端口。
从版本 5.3.3 开始,您可以添加一个 SocketCustomizer Bean 来在创建后修改 DatagramSocket(例如,调用 setTrafficClass(0x10))。
以下示例展示了一个出站通道适配器,该适配器为数据报包添加了长度检查:
<int-ip:udp-outbound-channel-adapter id="udpOut"
host="somehost"
port="11111"
multicast="false"
check-length="true"
channel="exampleChannel"/>
数据包接收方也必须配置为预期在数据之前存在长度信息。
对于 Spring Integration UDP 入站通道适配器,请设置其 check-length 属性。 |
第二次可靠性改进允许使用应用层确认协议。 接收方必须在指定时间内向发送方发送确认。
以下示例展示了一个出站通道适配器,它为数据报包添加长度检查并等待确认:
<int-ip:udp-outbound-channel-adapter id="udpOut"
host="somehost"
port="11111"
multicast="false"
check-length="true"
acknowledge="true"
ack-host="thishost"
ack-port="22222"
ack-timeout="10000"
channel="exampleChannel"/>
将 acknowledge 设置为 true 意味着数据包的接收方能够解释添加到包含确认数据(主机和端口)的数据包上的头部信息。
最可能的情况是,接收方是一个 Spring Integration 入站通道适配器。 |
当 multicast 为 true 时,一个额外的属性(min-acks-for-success)指定在 ack-timeout 内必须接收到多少个确认响应。 |
从 4.3 版本开始,您可以将 ackPort 设置为 0,在这种情况下,操作系统将选择端口。
出站 UDP 适配器(Java 配置)
以下示例展示了如何使用 Java 配置出站 UDP 适配器:
@Bean
@ServiceActivator(inputChannel = "udpOut")
public UnicastSendingMessageHandler handler() {
return new UnicastSendingMessageHandler("localhost", 11111);
}
(或 MulticastSendingChannelAdapter 用于组播)。
出站 UDP 适配器(Java DSL 配置)
以下示例展示了如何使用 Java DSL 配置出站 UDP 适配器:
@Bean
public IntegrationFlow udpOutFlow() {
return f -> f.handle(Udp.outboundAdapter("localhost", 1234)
.configureSocket(socket -> socket.setTrafficClass(0x10)))
.get();
}
入站 UDP 适配器(XML 配置)
以下示例展示了如何配置基本的单播入站 UDP 通道适配器。
<int-ip:udp-inbound-channel-adapter id="udpReceiver"
channel="udpOutChannel"
port="11111"
receive-buffer-size="500"
multicast="false"
socket-customizer="udpCustomizer"
check-length="true"/>
下面的示例展示了如何配置一个基本的多播入站 UDP 通道适配器:
<int-ip:udp-inbound-channel-adapter id="udpReceiver"
channel="udpOutChannel"
port="11111"
receive-buffer-size="500"
multicast="true"
multicast-address="225.6.7.8"
check-length="true"/>
默认情况下,不会对入站数据包执行反向 DNS 查找:在 DNS 未配置的环境中(例如 Docker 容器),这可能会导致连接延迟。
若要将 IP 地址转换为用于消息头的主机名,可通过将 lookup-host 属性设置为 true 来覆盖默认行为。
从版本 5.3.3 开始,您可以添加一个 SocketCustomizer bean 以在创建后修改 DatagramSocket。
它用于接收套接字以及为发送确认而创建的任意套接字。
入站 UDP 适配器(Java 配置)
以下示例展示了如何使用 Java 配置入站 UDP 适配器:
@Bean
public UnicastReceivingChannelAdapter udpIn() {
UnicastReceivingChannelAdapter adapter = new UnicastReceivingChannelAdapter(11111);
adapter.setOutputChannelName("udpChannel");
return adapter;
}
以下示例展示了如何使用 Java DSL 配置入站 UDP 适配器:
入站 UDP 适配器(Java DSL 配置)
@Bean
public IntegrationFlow udpIn() {
return IntegrationFlow.from(Udp.inboundAdapter(11111))
.channel("udpChannel")
.get();
}
服务器监听事件
从版本 5.0.2 开始,当入站适配器启动并开始监听时,会发出一个UdpServerListeningEvent。
当适配器配置为在端口0上监听(即由操作系统选择端口)时,这非常有用。
如果您需要在启动其他将连接到该套接字的进程之前等待,也可以使用它来替代轮询isListening()。
高级出站配置
The <int-ip:udp-outbound-channel-adapter> (UnicastSendingMessageHandler) 具有 destination-expression 和 socket-expression 选项。
您可以使用 destination-expression 作为硬编码的 host-port 对的运行时替代方案,以确定发往 requestMessage(评估上下文的根对象)的传出数据报包的目标地址。
该表达式必须求值为一个 URI、URI 风格的 String(参见 RFC-2396),或一个 SocketAddress。
您也可以为此表达式使用入站 IpHeaders.PACKET_ADDRESS 头。
在框架中,当我们在 UnicastReceivingChannelAdapter 中接收数据报并将其转换为消息时,DatagramPacketMessageMapper 会填充此头。
该头的值正是入站数据报 DatagramPacket.getSocketAddress() 的结果。
使用 socket-expression,出站通道适配器可以使用(例如)入站通道适配器套接字,通过接收数据报的同一端口发送数据报。
这在我们的应用程序作为 UDP 服务器且客户端位于网络地址转换 (NAT) 之后的场景中非常有用。
此表达式必须求值为 DatagramSocket。
requestMessage 用作求值上下文的根对象。
您不能将 socket-expression 参数与 multicast 和 acknowledge 参数一起使用。
以下示例展示了如何配置一个 UDP 入站通道适配器,该适配器包含一个转换器,用于转换为大写并使用套接字:
<int-ip:udp-inbound-channel-adapter id="inbound" port="0" channel="in" />
<int:channel id="in" />
<int:transformer expression="new String(payload).toUpperCase()"
input-channel="in" output-channel="out"/>
<int:channel id="out" />
<int-ip:udp-outbound-channel-adapter id="outbound"
socket-expression="@inbound.socket"
destination-expression="headers['ip_packetAddress']"
channel="out" />
下面的示例展示了使用 Java DSL 的等效配置:
@Bean
public IntegrationFlow udpEchoUpcaseServer() {
return IntegrationFlow.from(Udp.inboundAdapter(11111).id("udpIn"))
.<byte[], String>transform(p -> new String(p).toUpperCase())
.handle(Udp.outboundAdapter("headers['ip_packetAddress']")
.socketExpression("@udpIn.socket"))
.get();
}
TCP 连接工厂
概述
对于 TCP,底层连接的配置通过使用连接工厂提供。 提供了两种类型的连接工厂:客户端连接工厂和服务器端连接工厂。 客户端连接工厂建立出站连接。 服务器端连接工厂监听入站连接。
出站通道适配器使用客户端连接工厂,但您也可以将客户端连接工厂的引用提供给入站通道适配器。 该适配器接收通过出站适配器创建的连接所收到的任何传入消息。
入站通道适配器或网关使用服务器连接工厂。 (实际上,没有它连接工厂无法工作)。 您也可以将服务器连接工厂的引用提供给出站适配器。 然后,您可以使用该适配器在同一连接上发送对传入消息的回复。
回复消息仅当包含由连接工厂插入到原始消息中的 ip_connectionId 标头时,才会路由到该连接。 |
| 这是当入站和出站适配器之间共享连接工厂时执行的消息关联范围。 这种共享允许通过 TCP 进行异步双向通信。 默认情况下,仅使用 TCP 传输负载信息。 因此,任何消息关联都必须由下游组件(如聚合器或其他端点)执行。 从 3.0 版本开始支持传输选定的头部信息。 更多信息,请参阅 TCP 消息关联。 |
您可以将连接工厂的引用最多提供给每种类型的适配器一个。
Spring Integration 提供了使用 java.net.Socket 和 java.nio.channel.SocketChannel 的连接工厂。
下面的示例展示了一个简单的服务器连接工厂,它使用 java.net.Socket 个连接:
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"/>
下面的示例展示了一个简单的服务器连接工厂,它使用 java.nio.channel.SocketChannel 个连接:
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"
using-nio="true"/>
从 Spring Integration 4.2 版本开始,如果服务器配置为监听随机端口(通过将端口设置为 0),则可以使用 getPort() 获取操作系统实际选择的端口。
此外,getServerSocketAddress() 允许您获取完整的 SocketAddress。
有关更多信息,请参阅 TcpServerConnectionFactory 接口的 Javadoc。 |
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="1234"
single-use="true"
so-timeout="10000"/>
下面的示例展示了一个客户端连接工厂,它使用 java.net.Socket 个连接,并为每条消息创建一个新的连接:
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="1234"
single-use="true"
so-timeout="10000"
using-nio=true/>
从 5.2 版本开始,客户端连接工厂支持以秒为单位的 connectTimeout 属性,默认值为 60。
消息界定(序列化和反序列化)
TCP 是一种流式协议。 这意味着必须为通过 TCP 传输的数据提供某种结构,以便接收方能够将数据分割成离散的消息。 连接工厂被配置为使用序列化和反序列化器,以在消息负载和通过 TCP 发送的位之间进行转换。 这是通过分别为入站和出站消息提供反序列化器和序列化器来实现的。 Spring Integration 提供了多种标准序列化和反序列化器。
ByteArrayCrlfSerializer* 将字节数组转换为后跟回车符和换行符的字节流 (\r\n)。
这是默认的序列化器(和解序列化器),可用于(例如)作为客户端的 telnet。
The ByteArraySingleTerminatorSerializer* 将字节数组转换为字节流,后跟单个终止字符(默认为 0x00)。
The ByteArrayLfSerializer*将字节数组转换为字节流,后跟一个单行换行符 (0x0a)。
The ByteArrayStxEtxSerializer* 将字节数组转换为以 STX(0x02)开头、ETX(0x03)结尾的字节流。
The ByteArrayLengthHeaderSerializer 将字节数组转换为以网络字节序(大端)的二进制长度开头的字节流。这是一个高效的反序列化器,因为它无需解析每个字节来查找终止字符序列。它也可用于包含二进制数据的有效载荷。前面的序列化程序仅支持负载中的文本。长度头的默认大小为四个字节(一个 Integer),允许消息最大为 (2^31 - 1) 字节。然而,length 头部对于长度不超过 255 字节的消息可以是一个单字节(无符号),而对于长度不超过 (2^16 - 1) 字节的消息则可以是无符号短整型(2 字节)。如果你需要其他格式的头部,可以子类化 ByteArrayLengthHeaderSerializer 并为 readHeader 和 writeHeader 方法提供实现。绝对最大数据大小为 (2^31 - 1) 字节。从版本 5 开始。2、标头值除了有效载荷外还可以包含标头的长度。将 inclusive 属性设置为启用该机制(生产者和消费者必须将其设置为相同的值)。
The ByteArrayRawSerializer*将字节数组转换为字节流,且不添加任何额外的消息定界数据。使用此序列化器(和解序列化器),消息的结束由客户端以有序方式关闭套接字来指示。当使用此序列化器时,消息接收将挂起,直到客户端关闭套接字或发生超时。超时不会导致消息生成。当使用该序列化器且客户端为 Spring Integration 应用程序时,客户端必须使用配置了 single-use="true" 的连接工厂。这样做会导致适配器在发送消息后关闭套接字。序列化器本身不会关闭连接。您应仅在通道适配器(而非网关)所使用的连接工厂中使用此序列化器,且这些连接工厂应由入站或出站适配器之一使用,而不能同时被两者使用。另请参阅本节后面的ByteArrayElasticRawDeserializer。然而,自 5.0 版本起。2,出站网关有一个新属性 closeStreamAfterSend;这允许使用原始序列器/反序列器,因为 EOF 信号已发送给服务器,同时保持连接打开以接收回复。
在4.2.2版本之前,当使用非阻塞I/O(NIO)时,该序列化器会将读取过程中的超时视为文件结束,并将已读取的数据作为一条消息发出。
这种行为不可靠,不应用于消息定界。
现在,此类情况将被视为异常处理。
如果您确实以这种方式使用了它,可以通过将treatTimeoutAsEndOfMessage构造函数参数设置为true来恢复之前的行为。 |
这些类都是 AbstractByteArraySerializer 的子类,同时实现了 org.springframework.core.serializer.Serializer 和 org.springframework.core.serializer.Deserializer。
为了向后兼容,使用任何 AbstractByteArraySerializer 子类进行序列化的连接也会接受一个首先转换为字节数组的 String。
每个序列化和反序列化器都将包含相应格式的数据流转换为字节数组负载。
为避免因行为不当的客户端(即不遵守配置序列化器协议的客户端)导致内存耗尽,这些序列化器会限制最大消息大小。
如果传入的消息超过此大小,将抛出异常。
默认最大消息大小为 2048 字节。
您可以通过设置 maxMessageSize 属性来增加该值。
如果您使用默认的序列化器或反序列化器并希望增加最大消息大小,则必须将最大消息大小声明为一个显式 Bean,并设置其 maxMessageSize 属性,同时配置连接工厂以使用该 Bean。
本节前文中标记为 * 的类会使用一个中间缓冲区,并将解码后的数据复制到正确大小的最终缓冲区中。
从 4.3 版本开始,您可以通过设置 poolSize 属性来配置这些缓冲区,使这些原始缓冲区可被复用,而不是为每条消息分配并丢弃(这是默认行为)。
将属性设置为负值会创建一个无界池。
如果池是有界的,您还可以设置 poolWaitTimeout 属性(单位为毫秒),之后如果没有可用的缓冲区则会抛出异常。
其默认值为无穷大。
此类异常会导致套接字被关闭。
如果您希望在自定义反序列化器中使用相同的机制,您可以扩展 AbstractPooledBufferByteArraySerializer(而不是其超类 AbstractByteArraySerializer),并实现 doDeserialize() 而不是 deserialize()。
缓冲区会自动返回到池中。
AbstractPooledBufferByteArraySerializer 还提供了一个便捷的实用方法:copyToSizedArray()。
版本 5.0 添加了 ByteArrayElasticRawDeserializer。
这与上面的 ByteArrayRawSerializer 的反序列化侧类似,不同之处在于无需设置 maxMessageSize。
内部使用了一个 ByteArrayOutputStream,允许缓冲区根据需要增长。
客户端必须以有序方式关闭套接字以指示消息结束。
| 此反序列化器仅在对等方可信时才应使用;由于内存不足条件,它容易受到拒绝服务(DoS)攻击。 |
The MapJsonSerializer 使用 Jackson ObjectMapper 在 Map 和 JSON 之间进行转换。
您可以将此序列化器与 MessageConvertingTcpMessageMapper 和 MapMessageConverter 配合使用,以传输选定的 HTTP 头和以 JSON 格式表示的有效负载。
Jackson ObjectMapper 无法在流中界定消息。
因此,MapJsonSerializer 需要委托给另一个序列化器或反序列化器来处理消息界定。
默认情况下,使用 ByteArrayLfSerializer,导致在线路上生成的消息格式为 <json><LF>,但您可以配置它使用其他格式。(下一个示例展示了如何操作。) |
最终的默认序列化为 org.springframework.core.serializer.DefaultSerializer,可用于使用 Java 序列化转换可序列化对象。
org.springframework.core.serializer.DefaultDeserializer 用于对包含可序列化对象的流进行入站反序列化。
如果您不想使用默认的序列化和反序列化器(ByteArrayCrLfSerializer),则必须在连接工厂上设置 serializer 和 deserializer 属性。
以下示例展示了如何操作:
<bean id="javaSerializer"
class="org.springframework.core.serializer.DefaultSerializer" />
<bean id="javaDeserializer"
class="org.springframework.core.serializer.DefaultDeserializer" />
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"
deserializer="javaDeserializer"
serializer="javaSerializer"/>
一个使用 java.net.Socket 个连接并在传输层使用 Java 序列化的服务器连接工厂。
有关连接工厂上可用属性的完整详细信息,请参阅本节末尾的 参考文档。
默认情况下,不会对入站数据包执行反向 DNS 查找:在 DNS 未配置的环境中(例如 Docker 容器),这可能会导致连接延迟。
若要将 IP 地址转换为用于消息头的主机名,可通过将 lookup-host 属性设置为 true 来覆盖默认行为。
| 您还可以修改套接字和套接字工厂的属性。 有关更多信息,请参阅 SSL/TLS 支持。 正如该处所述,无论是否使用 SSL,都可以进行此类修改。 |
自定义序列化器和反序列化器
如果您的数据不是由标准反序列化器支持的格式,您可以实现自己的反序列化器;您还可以实现自定义的序列化器。
要实现自定义的序列化和反序列化对,请实现 org.springframework.core.serializer.Deserializer 和 org.springframework.core.serializer.Serializer 接口。
当反序列化器在消息之间检测到输入流已关闭时,必须抛出 SoftEndOfStreamException;这是向框架发出的信号,表明该关闭是“正常”的。
如果在解码消息时流被关闭,则应抛出其他异常。
从 5.2 版本开始,SoftEndOfStreamException 现在是一个 RuntimeException,而不是扩展 IOException。
TCP 缓存客户端连接工厂
正如前文所述,TCP 套接字可以是‘单次使用’的(一个请求或响应),也可以是共享的。 在高并发环境中,共享套接字与出站网关配合不佳,因为套接字一次只能处理一个请求或响应。
为了提高性能,您可以使用协作的通道适配器代替网关,但这需要应用级别的消息关联。 有关更多信息,请参阅 TCP 消息关联。
Spring Integration 2.2 引入了一个缓存客户端连接工厂,该工厂使用共享套接字池,使网关能够通过共享连接池处理多个并发请求。
TCP 故障转移客户端连接工厂
您可以配置一个 TCP 连接工厂,该工厂支持故障转移至一个或多个其他服务器。 发送消息时,该工厂会遍历其所有配置的工厂,直到消息成功发送或找不到可用连接为止。 初始状态下,将使用配置列表中的第一个工厂。 如果后续连接失败,则下一个工厂将成为当前工厂。 以下示例展示了如何配置故障转移客户端连接工厂:
<bean id="failCF" class="o.s.i.ip.tcp.connection.FailoverClientConnectionFactory">
<constructor-arg>
<list>
<ref bean="clientFactory1"/>
<ref bean="clientFactory2"/>
</list>
</constructor-arg>
</bean>
当使用故障转移连接工厂时,singleUse 属性必须在工厂本身及其配置使用的工厂列表之间保持一致。 |
连接工厂有两个与故障转移相关的属性,当与共享连接(singleUse=false)一起使用时:
-
refreshSharedInterval -
closeOnRefresh
考虑以下基于上述配置的场景:
假设 clientFactory1 无法建立连接,但 clientFactory2 可以。
当 refreshSharedInterval 过期后调用 failCF getConnection() 方法时,我们将再次尝试使用 clientFactory1 进行连接;如果成功,则关闭到 clientFactory2 的连接。
如果 closeOnRefresh 为 false,则“旧”连接将保持打开状态,并在首次工厂再次失败时可能被重用。
设置 refreshSharedInterval 表示仅在该时间过期后尝试使用第一个工厂重新连接;设置为 Long.MAX_VALUE(默认值)表示仅在当前连接失败时才回退到第一个工厂。
设置 closeOnRefresh 可在刷新实际创建新连接后关闭“旧”连接。
如果任何委托工厂是 CachingClientConnectionFactory,则这些属性不适用,因为连接缓存由该处处理;在这种情况下,将始终通过连接工厂列表来获取连接。 |
从版本 5.3 开始,这些默认值分别为 Long.MAX_VALUE 和 true,因此工厂仅在当前连接失败时才尝试回退。
若要恢复为之前版本的默认行为,请将它们设置为 0 和 false。
另请参阅 测试连接。
TCP 线程亲和连接工厂
Spring Integration 5.0 版本引入了此连接工厂。
它将连接绑定到调用线程,并且该线程每次发送消息时都会重用同一个连接。
只要连接未被关闭(由服务器或网络关闭),或者线程未调用 releaseConnection() 方法,这种重用就会持续进行。
连接本身由另一个客户端工厂实现提供,该实现必须配置为提供非共享(单次使用)的连接,以确保每个线程都能获得一个独立的连接。
以下示例展示了如何配置 TCP 线程亲和性连接工厂:
@Bean
public TcpNetClientConnectionFactory cf() {
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory("localhost",
Integer.parseInt(System.getProperty(PORT)));
cf.setSingleUse(true);
return cf;
}
@Bean
public ThreadAffinityClientConnectionFactory tacf() {
return new ThreadAffinityClientConnectionFactory(cf());
}
@Bean
@ServiceActivator(inputChannel = "out")
public TcpOutboundGateway outGate() {
TcpOutboundGateway outGate = new TcpOutboundGateway();
outGate.setConnectionFactory(tacf());
outGate.setReplyChannelName("toString");
return outGate;
}
测试连接
在某些场景中,当连接首次打开时发送某种健康检查请求可能非常有用。 其中一个场景可能是使用 TCP 故障转移客户端连接工厂,以便在选定的服务器允许建立连接但报告其不健康时进行故障转移。
为了支持此功能,请向客户端连接工厂添加一个 connectionTest。
/**
* Set a {@link Predicate} that will be invoked to test a new connection; return true
* to accept the connection, false the reject.
* @param connectionTest the predicate.
* @since 5.3
*/
public void setConnectionTest(@Nullable Predicate<TcpConnectionSupport> connectionTest) {
this.connectionTest = connectionTest;
}
为了测试连接,请在测试中为连接附加一个临时监听器。 如果测试失败,连接将被关闭并抛出异常。 当与 TCP 故障转移客户端连接工厂 配合使用时,这将触发尝试下一个服务器。
| 只有服务器的第一条回复会发送到测试监听器。 |
在以下示例中,如果我们发送 PING,服务器回复 PONG 时,则认为服务器是健康的。
Message<String> ping = new GenericMessage<>("PING");
byte[] pong = "PONG".getBytes();
clientFactory.setConnectionTest(conn -> {
CountDownLatch latch = new CountDownLatch(1);
AtomicBoolean result = new AtomicBoolean();
conn.registerTestListener(msg -> {
if (Arrays.equals(pong, (byte[]) msg.getPayload())) {
result.set(true);
}
latch.countDown();
return false;
});
conn.send(ping);
try {
latch.await(10, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return result.get();
});
TCP 连接拦截器
您可以使用对 TcpConnectionInterceptorFactoryChain 的引用来配置连接工厂。
您可以使用拦截器为连接添加行为,例如协商、安全性以及其他选项。
框架当前未提供拦截器,但请参阅源仓库中的 InterceptedSharedConnectionTests 作为示例。
测试用例中使用的HelloWorldInterceptor的工作原理如下:
拦截器首先配置一个客户端连接工厂。 当第一条消息通过被拦截的连接发送时,拦截器会通过该连接发送'Hello'并期望收到'world!'。 发生这种情况后,协商即完成,原始消息将被发送。 此外,使用同一连接发送的后续消息将无需任何额外的协商。
当配置了服务器连接工厂时,拦截器要求第一条消息为'Hello',如果是,则返回'world!'。 否则,它将抛出一个导致连接关闭的异常。
所有 TcpConnection 方法都会被拦截。
拦截器实例由拦截器工厂为每个连接创建。
如果拦截器是有状态的,工厂应为每个连接创建一个新实例。
如果没有状态,同一个拦截器可以包装每个连接。
拦截器工厂被添加到拦截器工厂链的配置中,您可以通过设置 interceptor-factory 属性将其提供给连接工厂。
拦截器必须扩展 TcpConnectionInterceptorSupport。
工厂必须实现 TcpConnectionInterceptorFactory 接口。
TcpConnectionInterceptorSupport 具有透传方法。
通过扩展此类,您只需实现希望拦截的方法即可。
以下示例展示了如何配置连接拦截器工厂链:
<bean id="helloWorldInterceptorFactory"
class="o.s.i.ip.tcp.connection.TcpConnectionInterceptorFactoryChain">
<property name="interceptors">
<array>
<bean class="o.s.i.ip.tcp.connection.HelloWorldInterceptorFactory"/>
</array>
</property>
</bean>
<int-ip:tcp-connection-factory id="server"
type="server"
port="12345"
using-nio="true"
single-use="true"
interceptor-factory-chain="helloWorldInterceptorFactory"/>
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="12345"
single-use="true"
so-timeout="100000"
using-nio="true"
interceptor-factory-chain="helloWorldInterceptorFactory"/>
TCP 连接事件
从 3.0 版本开始,对 TcpConnection 实例的更改由 TcpConnectionEvent 实例报告。
TcpConnectionEvent 是 ApplicationEvent 的子类,因此可以被任何在 ApplicationContext 中定义的 ApplicationListener 接收——例如 事件入站通道适配器。
TcpConnectionEvents 具有以下属性:
-
connectionId: 连接标识符,您可以在消息头中使用它向该连接发送数据。 -
connectionFactoryName: 连接所属的连接工厂的 Bean 名称。 -
throwable: 仅适用于TcpConnectionExceptionEvent事件的Throwable。 -
source:TcpConnection。 例如,您可以使用此方法配合getHostAddress()(需要类型转换)来确定远程 IP 地址。
此外,自 4.0 版本起,在TCP 连接工厂中讨论的标准反序列化器在解码数据流时遇到问题,现在将发出TcpDeserializationExceptionEvent实例。
这些事件包含异常、正在构建的缓冲区,以及发生异常时缓冲区中的偏移量(如果可用)。
应用程序可以使用普通的ApplicationListener或ApplicationEventListeningMessageProducer(参见接收 Spring 应用程序事件)来捕获这些事件,从而分析问题。
从版本 4.0.7 和 4.1.3 开始,每当服务器套接字上发生意外异常时(例如当服务器套接字正在使用时出现 BindException),都会发布 TcpConnectionServerExceptionEvent 个实例。
这些事件包含对连接工厂和原因的引用。
从 4.2 版本开始,每当端点(入站网关或协作的出站通道适配器)收到一条无法路由到连接的消息时(因为 TcpConnectionFailedCorrelationEvent 头无效),就会发布 TcpConnectionFailedCorrelationEvent 个实例。
当收到延迟回复时(发送者线程超时),出站网关也会发布此事件。
该事件包含连接 ID 以及 cause 属性中的异常,其中包含失败的消息。
从版本 4.3 开始,当服务器连接工厂启动时会发出一个 TcpConnectionServerListeningEvent。
当工厂配置为在端口 0 上监听时(即由操作系统选择端口),这一特性非常有用。
此外,如果您需要在启动连接到该套接字的其他进程之前等待,也可以使用它来替代轮询 isListening()。
| 为了避免延迟监听线程接受连接,该事件在单独的线程上发布。 |
从版本 4.3.2 开始,每当无法创建客户端连接时,就会发出一个TcpConnectionFailedEvent。
该事件的来源是连接工厂,您可以利用它来确定无法建立连接的宿主机和端口号。
TCP 适配器
提供了使用前述连接工厂的 TCP 入站和出站通道适配器。
这些适配器具有两个相关属性:connection-factory 和 channel。
connection-factory 属性指示用于管理适配器连接的连接工厂。
channel 属性指定消息到达出站适配器的通道,以及入站适配器放置消息的通道。
虽然入站和出站适配器可以共享一个连接工厂,但服务器连接工厂始终由入站适配器“拥有”;客户端连接工厂始终由出站适配器“拥有”。
每种类型的适配器只能有一个引用连接工厂。
以下示例展示了如何定义客户端和服务器 TCP 连接工厂:
<bean id="javaSerializer"
class="org.springframework.core.serializer.DefaultSerializer"/>
<bean id="javaDeserializer"
class="org.springframework.core.serializer.DefaultDeserializer"/>
<int-ip:tcp-connection-factory id="server"
type="server"
port="1234"
deserializer="javaDeserializer"
serializer="javaSerializer"
using-nio="true"
single-use="true"/>
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="#{server.port}"
single-use="true"
so-timeout="10000"
deserializer="javaDeserializer"
serializer="javaSerializer"/>
<int:channel id="input" />
<int:channel id="replies">
<int:queue/>
</int:channel>
<int-ip:tcp-outbound-channel-adapter id="outboundClient"
channel="input"
connection-factory="client"/>
<int-ip:tcp-inbound-channel-adapter id="inboundClient"
channel="replies"
connection-factory="client"/>
<int-ip:tcp-inbound-channel-adapter id="inboundServer"
channel="loop"
connection-factory="server"/>
<int-ip:tcp-outbound-channel-adapter id="outboundServer"
channel="loop"
connection-factory="server"/>
<int:channel id="loop"/>
在前面的配置中,到达input通道的消息通过由client连接工厂创建的连接进行序列化,在服务器端接收后,被放置到loop通道上。
由于loop是outboundServer的输入通道,该消息会沿同一连接循环返回,由inboundClient接收并存入replies通道。
传输过程中使用的是 Java 序列化。
通常,入站适配器使用一个type="server"连接工厂来监听传入的连接请求。
在某些情况下,您可能希望以相反的方式建立连接,即入站适配器连接到外部服务器,然后在该连接上等待传入消息。
通过在内向适配器上设置 client-mode="true" 可支持此拓扑结构。
在此情况下,连接工厂的类型必须为 client,且必须将 single-use 设置为 false。
两个额外的属性支持此机制。
retry-interval指定(以毫秒为单位)在连接失败后框架尝试重新连接的频率。
scheduler提供一个TaskScheduler来调度连接尝试并测试连接是否仍然活跃。
如果您未提供调度器,框架将使用默认的 taskScheduler Bean。
对于出站适配器,连接通常在发送第一条消息时建立。
出站适配器上的 client-mode="true" 会导致在适配器启动时建立连接。
默认情况下,适配器会自动启动。
同样,连接工厂必须是类型为 client 且具备 single-use="false" 的工厂。
也支持 retry-interval 和 scheduler。
如果连接失败,将由调度器或在下一次发送消息时重新建立。
对于入站和出站,如果适配器已启动,您可以通过发送<control-bus />命令来强制适配器建立连接:@adapter_id.retryConnection()。
然后您可以使用@adapter_id.isClientModeConnected()检查当前状态。
TCP 网关
入站 TCP 网关 TcpInboundGateway 和出站 TCP 网关 TcpOutboundGateway 分别使用服务器和客户端连接工厂。
每个连接一次只能处理一个请求或响应。
入站网关在根据传入的有效负载构建消息并将其发送到requestChannel后,会等待响应,并通过将响应消息中的有效负载写入连接来发送该有效负载。
对于入站网关,您必须保留或填充ip_connectionId头,因为它用于将消息与连接关联起来。
源自网关的消息会自动设置该头。
如果回复是作为新消息构建的,则需要设置该头。
头的值可以从传入的消息中捕获。 |
与入站适配器一样,入站网关通常使用一个type="server"连接工厂来监听传入的连接请求。
在某些情况下,您可能希望以相反的方式建立连接,即入站网关连接到外部服务器,然后在该连接上等待并响应传入的消息。
此拓扑通过在使用入站网关时配置 client-mode="true" 来支持。
在此情况下,连接工厂必须是 client 类型,并且必须将 single-use 设置为 false。
两个额外的属性支持此机制。
retry-interval 指定(以毫秒为单位)框架在连接失败后尝试重新连接的频率。
scheduler 提供一个 TaskScheduler 来调度连接尝试并测试连接是否仍然活跃。
如果网关已启动,您可以通过发送 <control-bus/> 命令强制网关建立连接:@adapter_id.retryConnection(),并使用 @adapter_id.isClientModeConnected() 检查当前状态。
出站网关在通过连接发送消息后,会等待响应、构造响应消息并将其放入回复通道。 连接上的通信是单线程的。 同一时间只能处理一条消息。 如果另一个线程在当前响应接收完成前尝试发送消息,它将阻塞,直到所有先前的请求完成(或超时)。 然而,如果客户端连接工厂配置为使用单次连接,则每个新请求都会获得自己的连接并立即被处理。 以下示例配置了一个入站 TCP 网关:
<int-ip:tcp-inbound-gateway id="inGateway"
request-channel="tcpChannel"
reply-channel="replyChannel"
connection-factory="cfServer"
reply-timeout="10000"/>
如果使用了使用默认序列化器或反序列化器配置的连接工厂,则消息为\r\n分隔的数据,并且网关可以通过简单的客户端(如 telnet)使用。
以下示例展示了一个出站 TCP 网关:
<int-ip:tcp-outbound-gateway id="outGateway"
request-channel="tcpChannel"
reply-channel="replyChannel"
connection-factory="cfClient"
request-timeout="10000"
remote-timeout="10000"/> <!-- or e.g. remote-timeout-expression="headers['timeout']" -->
client-mode 当前不支持出站网关。
从 5.2 版本开始,出站网关可以使用属性 closeStreamAfterSend 进行配置。
如果连接工厂被配置为 single-use(每个请求/回复建立新连接),则网关将关闭输出流;这会向服务器发送 EOF 信号。
这在服务器使用 EOF 来确定消息结束(而不是在流中使用某些分隔符)时非常有用,同时保持连接打开以接收回复。
通常情况下,调用线程会在网关处阻塞,等待回复(或超时)。
从 5.3 版本开始,您可以在网关上设置 async 属性,发送线程将被释放以执行其他工作。
回复(或错误)将在接收线程上发送。
这仅在使用 TcpNetClientConnectionFactory 时适用;使用 NIO 时会忽略此设置,因为存在竞态条件:在接收到回复后发生的套接字错误可能会在回复之前被传递给网关。
当使用共享连接(singleUse=false)时,如果另一个请求正在处理中,新请求将被阻塞,直到收到当前响应。
如果您希望在长连接池中支持并发请求,请考虑使用 CachingClientConnectionFactory。 |
从 5.4 版本开始,入站通道可以使用 unsolicitedMessageChannel 进行配置。
未请求的入站消息将发送到该通道,包括超时后的延迟回复(即客户端超时的情况)。
为了在服务端支持此功能,您现在可以在连接工厂中注册多个 TcpSender。
网关和通道适配器会自动注册自身。
当从服务端发送未请求的消息时,必须为发送的消息添加适当的 IpHeaders.CONNECTION_ID。
TCP 消息关联
IP 端点的一个目标是提供与 Spring Integration 应用之外的系统进行通信。 因此,默认情况下只发送和接收消息负载。 从 3.0 版本开始,您可以使用 JSON、Java 序列化或自定义序列化和反序列化器来传输头部信息。 有关更多信息,请参阅 传输头部。 框架本身(服务器端的网关除外)或协作通道适配器不提供消息关联功能。 本文档稍后部分将讨论应用程序可用的各种关联技术。 在大多数情况下,这需要特定于应用级别的消息关联,即使消息负载中包含一些自然的关联数据(如订单号)。
网关
网关会自动关联消息。 然而,对于相对低流量的应用,您应该使用出站网关。 当您配置连接工厂以让所有消息对使用单个共享连接('single-use="false"')时,同一时间只能处理一条消息。 新消息必须等待收到前一条消息的回复后才能继续处理。 当连接工厂被配置为每条新消息都使用新连接('single-use="true"')时,此限制将不再适用。 虽然此设置可能比共享连接环境提供更高的吞吐量,但它会带来为每对消息打开和关闭新连接的开销。
因此,对于高容量的消息,请考虑使用一对协作的通道适配器。 然而,要实现这一点,您需要提供协作逻辑。
另一种解决方案是在 Spring Integration 2.2 中引入的,它使用 CachingClientConnectionFactory,从而允许使用共享连接池。
协作外部和内部通道适配器
为了实现高吞吐量(避免使用网关,正如前面提到的),您可以配置一对协作的出站和入站通道适配器。 您也可以使用协作适配器(服务器端或客户端)进行完全异步通信(而不是请求 - 响应语义)。 在服务器端,消息关联由适配器自动处理,因为入站适配器会添加一个头信息,使出站适配器能够确定发送回复消息时使用的连接。
在服务器端,您必须填充ip_connectionId头部,因为它用于将消息与连接关联起来。
源自入站适配器的消息会自动设置该头部。
如果您希望构建其他要发送的消息,则需要设置该头部。
您可以从传入的消息中获取该头部的值。 |
在客户端,应用程序必须提供其自身的关联逻辑(如果需要)。 您可以通过多种方式实现这一点。
如果消息负载包含某些自然关联数据(例如事务 ID 或订单号),并且您无需保留原始出站消息中的任何信息(例如回复通道标头),则关联非常简单,无论如何都将在应用层完成。
如果消息负载包含某些自然关联数据(如事务 ID 或订单号),但您需要从原始出站消息中保留一些信息(如回复通道头),则可以保留原始出站消息的副本(例如通过使用发布 - 订阅通道),并使用聚合器重新组合必要的数据。
对于上述两种场景中的任何一种,如果负载没有天然的相关性数据,您可以在出站通道适配器之前提供一个转换器,以通过此类数据增强负载。 这样的转换器可以将原始负载转换为一个新对象,该对象既包含原始负载,又包含消息头的一个子集。 当然,来自消息头的实时对象(例如回复通道)不能包含在转换后的负载中。
如果您选择此类策略,则需要确保连接工厂具有适当的序列化器 - 反序列化器对来处理此类负载(例如 DefaultSerializer 和 DefaultDeserializer,它们使用 Java 序列化,或者自定义的序列化器和反序列化器)。
TCP 连接工厂中提到的 ByteArray*Serializer 个选项,包括默认的 ByteArrayCrLfSerializer,不支持此类负载,除非转换后的负载是 String 或 byte[]。
|
在 2.2 版本发布之前,当协作通道适配器使用客户端连接工厂时, 这种默认行为在真正的异步环境中并不合适,因此现在默认为无限超时。
您可以通过将客户端连接工厂的 |
从 5.4 版本开始,多个出站通道适配器和一个 TcpInboundChannelAdapter 可以共享同一个连接工厂。
这使得应用程序能够同时支持请求/响应和任意服务器 → 客户端消息传递。
有关更多信息,请参阅 TCP 网关。
传递请求头
TCP 是一种流式协议。
Serializers 和 Deserializers 用于界定流中的消息。
在 3.0 版本之前,只有消息负载(String 或 byte[])可以通过 TCP 传输。
从 3.0 版本开始,您不仅可以传输负载,还可以传输选定的头部信息。
然而,像 replyChannel 这样的“实时”对象无法被序列化。
通过 TCP 发送头部信息需要一些额外的配置。
第一步是提供一个使用mapper属性的MessageConvertingTcpMessageMapper来为ConnectionFactory赋值。
此映射器委托给任何MessageConverter实现,以便将消息转换为可由配置的serializer和deserializer进行序列化和反序列化的对象。
Spring Integration 提供了一个 MapMessageConverter,允许指定一组添加到 Map 对象中的头部信息,同时包含有效负载。
生成的 Map 包含两个条目:payload 和 headers。
headers 条目本身是一个 Map,并包含所选的头部信息。
第二步是提供一个序列化和反序列化器,用于在Map和某种 wire 格式之间进行转换。
这可以是一个自定义的Serializer或Deserializer,通常当对端系统不是 Spring Integration 应用程序时需要这样做。
Spring Integration 提供了一个 MapJsonSerializer,用于在 JSON 和 Map 之间进行转换。
它使用了一个 Spring Integration JsonObjectMapper。
如有需要,您可以提供一个自定义的 JsonObjectMapper。
默认情况下,序列化器会在对象之间插入一个换行符(0x0a)。
有关更多信息,请参阅 Javadoc。
The JsonObjectMapper uses whichever version of Jackson is on the classpath. |
您也可以通过使用 DefaultSerializer 和 DefaultDeserializer,对 Map 使用标准的 Java 序列化。
下面的示例展示了如何使用 JSON 配置一个连接工厂,以传递 correlationId、sequenceNumber 和 sequenceSize 标头:
<int-ip:tcp-connection-factory id="client"
type="client"
host="localhost"
port="12345"
mapper="mapper"
serializer="jsonSerializer"
deserializer="jsonSerializer"/>
<bean id="mapper"
class="o.sf.integration.ip.tcp.connection.MessageConvertingTcpMessageMapper">
<constructor-arg name="messageConverter">
<bean class="o.sf.integration.support.converter.MapMessageConverter">
<property name="headerNames">
<list>
<value>correlationId</value>
<value>sequenceNumber</value>
<value>sequenceSize</value>
</list>
</property>
</bean>
</constructor-arg>
</bean>
<bean id="jsonSerializer" class="o.sf.integration.ip.tcp.serializer.MapJsonSerializer" />
使用上述配置发送的、负载为 'something' 的消息,在传输线路上将显示如下:
{"headers":{"correlationId":"things","sequenceSize":5,"sequenceNumber":1},"payload":"something"}
关于非阻塞 I/O (NIO)
使用 NIO(见 using-nio 中的 IP 配置属性)可避免为每个套接字分配一个线程进行读取。
对于少量套接字,您很可能会发现,不使用 NIO 并结合异步转交(例如转交给 QueueChannel),其性能表现与使用 NIO 相当甚至更优。
在处理大量连接时,您应该考虑使用 NIO。 然而,使用 NIO 会带来一些其他影响。 线程池(在任务执行器中)在所有套接字之间共享。 每个传入消息都会被组装,并作为独立的工作单元发送到配置的通道,该工作由从池中选择的线程处理。 同一套接字上连续到达的两个消息可能会由不同的线程处理。 这意味着发送到通道的消息顺序是不确定的。 不保证套接字上到达的消息的严格顺序。
对于某些应用程序,这不是问题。
对于其他应用程序,这是一个问题。
如果您需要严格的顺序,请考虑将 using-nio 设置为 false 并使用异步交接。
或者,您可以在入站端点之后插入一个重排序器,将消息恢复为正确的顺序。
如果在连接工厂上将 apply-sequence 设置为 true,则到达 TCP 连接的消息将设置 sequenceNumber 和 correlationId 标头。
重排序器使用这些标头将消息恢复为正确的顺序。
从 5.1.4 版本开始,优先接受新连接而非读取现有连接。
通常情况下,除非您有非常高的新入站连接速率,否则这应该几乎没有影响。
如果您希望恢复到之前优先读取的行为,请将 multiAccept 属性设置为 false(位于 TcpNioServerConnectionFactory 上)。 |
连接池大小
池大小属性不再被使用。
此前,它指定了未指定任务执行器时的默认线程池大小。
它也用于设置服务器套接字的连接积压数。
第一个功能已不再需要(见下一段)。
第二个功能由 backlog 属性替代。
此前,在使用固定线程池任务执行器(这是默认配置)配合 NIO 时,可能会出现死锁,导致处理停止。 当缓冲区已满、从套接字读取数据的线程试图向缓冲区添加更多数据,且没有可用线程来腾出缓冲区空间时,就会发生此问题。 这种情况仅在线程池大小非常小时出现,但在极端条件下也可能发生。 自 2.2 版本起,两项更改已消除此问题。 首先,默认的任务执行器改为缓存线程池执行器。 其次,添加了死锁检测逻辑:如果发生线程饥饿,系统将不再死锁,而是抛出异常,从而释放被死锁占用的资源。
| 现在默认的任务执行器是无限制的,如果消息处理耗时较长,在高频率的消息流入情况下,可能会发生内存溢出条件。 如果您的应用程序表现出这种行为,您应该使用具有适当池大小的池化任务执行器,但请参见下一节。 |
线程池任务执行器与CALLER_RUNSpolicy
使用固定线程池时,您应当牢记一些重要的注意事项:当 CallerRunsPolicy(在使用 <task/> 命名空间时为 CALLER_RUNS)且队列容量较小时。
如果您不使用固定线程池,则以下规则不适用。
使用 NIO 连接时,存在三种不同的任务类型。 I/O 选择器的处理由一个专用线程执行(检测事件、接受新连接,并通过任务执行器将 I/O 读操作分派给其他线程)。 当 I/O 读取线程(读操作被分派到该线程)读取数据时,它将把数据交给另一个线程来组装传入的消息。 大消息可能需要多次读取才能完成。 这些“组装”线程在等待数据时可能会阻塞。 当发生新的读取事件时,读取器会检查该套接字是否已有组装线程,如果没有,则启动一个新的组装线程。 组装过程完成后,组装线程将被返回到线程池中。
当连接池耗尽、使用CALLER_RUNS拒绝策略且任务队列已满时,这可能导致死锁。
当连接池为空且队列中没有空间时,IO选择器线程会收到OP_READ事件,并使用执行器分发读取操作。
由于队列已满,选择器线程本身开始启动读取过程。
此时它检测到该套接字没有对应的组装器,并在执行读取之前触发一个组装器。
再次出现队列已满的情况,选择器线程变成了组装器。
现在组装器被阻塞,等待数据被读取,但数据永远不会到来。
连接工厂因此陷入死锁,因为选择器线程无法处理新的事件。
为了避免这种死锁,我们必须避免选择器(或读取)线程执行组装任务。 我们希望为 IO 操作和组装操作使用独立的线程池。
该框架提供了一个CompositeExecutor,允许配置两个独立的执行器:一个用于执行IO操作,另一个用于消息组装。
在此环境中,IO线程永远无法成为组装线程,因此不会发生死锁。
此外,任务执行器应配置为使用 AbortPolicy(使用 <task> 时为 ABORT)。
当 I/O 任务无法完成时,会暂时推迟并持续重试,直到能够完成并分配一个组装器。
默认延迟时间为 100ms,但您可以通过在连接工厂上设置 readDelay 属性来更改它(在使用 XML 命名空间配置时为 read-delay)。
以下三个示例展示了如何配置组合执行器:
@Bean
private CompositeExecutor compositeExecutor() {
ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
ioExec.setCorePoolSize(4);
ioExec.setMaxPoolSize(10);
ioExec.setQueueCapacity(0);
ioExec.setThreadNamePrefix("io-");
ioExec.setRejectedExecutionHandler(new AbortPolicy());
ioExec.initialize();
ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
assemblerExec.setCorePoolSize(4);
assemblerExec.setMaxPoolSize(10);
assemblerExec.setQueueCapacity(0);
assemblerExec.setThreadNamePrefix("assembler-");
assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
assemblerExec.initialize();
return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg ref="io"/>
<constructor-arg ref="assembler"/>
</bean>
<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="io-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="8" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="assembler-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
</bean>
SSL/TLS 支持
支持安全套接字层/传输层安全协议。
使用 NIO 时,JDK 5+ SSLEngine 功能用于在连接建立后处理握手过程。
不使用 NIO 时,使用标准的 SSLSocketFactory 和 SSLServerSocketFactory 对象来创建连接。
提供了多种策略接口以允许进行大量自定义。
这些接口的默认实现提供了开始安全通信的最简单方式。
快速开始
无论您是否使用 NIO,都需要在连接工厂上配置 ssl-context-support 属性。
此属性引用一个 <bean/> 定义,该定义描述了所需密钥库的位置和密码。
SSL/TLS 对等方各自需要两个密钥库:
-
包含用于标识对等方的私钥和公钥对的密钥库
-
包含受信任对等方公钥的密钥库。 请查看随 JDK 提供的
keytool工具的文档。 基本步骤如下-
创建一个新的密钥对并将其存储在密钥库中。
-
导出公钥。
-
将公钥导入对等方的信任库。
-
为其他对等节点重复此操作。
-
| 在测试用例中,双方使用相同的密钥库是常见的做法,但在生产环境中应避免这样做。 |
在建立密钥库之后,下一步是向 TcpSSLContextSupport bean 指示它们的位置,并向连接工厂提供对该 bean 的引用。
以下示例配置了一个 SSL 连接:
<bean id="sslContextSupport"
class="o.sf.integration.ip.tcp.connection.support.DefaultTcpSSLContextSupport">
<constructor-arg value="client.ks"/>
<constructor-arg value="client.truststore.ks"/>
<constructor-arg value="secret"/>
<constructor-arg value="secret"/>
</bean>
<ip:tcp-connection-factory id="clientFactory"
type="client"
host="localhost"
port="1234"
ssl-context-support="sslContextSupport" />
The DefaultTcpSSLContextSupport 类还有一个可选的 protocol 属性,它可以是 SSL 或 TLS(默认值)。
密钥库文件名(前两个构造函数参数)使用了 Spring Resource 抽象。
默认情况下,这些文件位于类路径上,但您可以使用 file: 前缀来覆盖此行为(以便在文件系统上查找文件)。
从 4.3.6 版本开始,当您使用 NIO 时,可以在连接工厂中指定一个 ssl-handshake-timeout(以秒为单位)。
此超时时间(默认值为 30 秒)在 SSL 握手期间等待数据时使用。
如果超过超时时间,进程将停止并关闭套接字。
主机验证
从 5.0.8 版本开始,您可以配置是否启用主机验证。 从 5.1 版本开始,默认启用;禁用该机制取决于您是否使用 NIO。
主机验证用于确保您连接的服务器与证书中的信息匹配,即使该证书是受信任的。
使用 NIO 时,配置 DefaultTcpNioSSLConnectionSupport,例如。
@Bean
public DefaultTcpNioSSLConnectionSupport connectionSupport() {
DefaultTcpSSLContextSupport sslContextSupport = new DefaultTcpSSLContextSupport("test.ks",
"test.truststore.ks", "secret", "secret");
sslContextSupport.setProtocol("SSL");
DefaultTcpNioSSLConnectionSupport tcpNioConnectionSupport =
new DefaultTcpNioSSLConnectionSupport(sslContextSupport, false);
return tcpNioConnectionSupport;
}
第二个构造函数参数会禁用主机验证。
随后将 connectionSupport bean 注入到 NIO 连接工厂中。
当不使用 NIO 时,配置位于 TcpSocketSupport 中:
connectionFactory.setTcpSocketSupport(new DefaultTcpSocketSupport(false));
同样,构造函数参数会禁用主机验证。
高级技术
本节介绍在某些情况下可能对您有帮助的高级技术。
策略接口
在许多情况下,前述配置已足以启用基于 TCP/IP 的安全通信。 然而,Spring Integration 提供了多种策略接口,允许对 Socket 工厂和 Socket 进行自定义和修改:
-
TcpSSLContextSupport -
TcpSocketFactorySupport -
TcpSocketSupport -
TcpNetConnectionSupport -
TcpNioConnectionSupport
这TcpSSLContextSupport策略接口
以下清单展示了 TcpSSLContextSupport 策略接口:
public interface TcpSSLContextSupport {
SSLContext getSSLContext() throws Exception;
}
实现 TcpSSLContextSupport 接口的类负责创建 SSLContext 对象。
框架提供的实现是 DefaultTcpSSLContextSupport,前文已描述。
如果您需要不同的行为,请实现此接口,并将连接工厂引用指向您类实现的 Bean。
这TcpSocketFactorySupport策略接口
以下清单展示了 TcpSocketFactorySupport 策略接口的定义:
public interface TcpSocketFactorySupport {
ServerSocketFactory getServerSocketFactory();
SocketFactory getSocketFactory();
}
此接口的实现负责获取对 ServerSocketFactory 和 SocketFactory 的引用。
提供了两种实现方式。
第一种是 DefaultTcpNetSocketFactorySupport,用于非 SSL 套接字(当未定义 ssl-context-support 属性时)。
它使用 JDK 的默认工厂。
第二种实现是 DefaultTcpNetSSLSocketFactorySupport。
默认情况下,当定义了 ssl-context-support 属性时使用该实现。
它使用该 Bean 创建的 SSLContext 来创建套接字工厂。
此接口仅在 using-nio 为 false 时适用。
NIO 不使用套接字工厂。 |
这TcpSocketSupport策略接口
以下清单展示了 TcpSocketSupport 策略接口的定义:
public interface TcpSocketSupport {
void postProcessServerSocket(ServerSocket serverSocket);
void postProcessSocket(Socket socket);
}
此接口的实现可以在套接字创建后且所有配置的属性已应用但尚未使用时修改套接字。
无论是否使用 NIO,此规则均适用。
例如,您可以使用该接口的实现来修改 SSL 套接字上支持的加密套件,或者添加一个监听器,以便在 SSL 握手完成后收到通知。
框架提供的唯一实现是 DefaultTcpSocketSupport,它不会以任何方式修改套接字。
要提供您自己的TcpSocketFactorySupport或TcpSocketSupport实现,请通过分别设置socket-factory-support和socket-support属性,向连接工厂提供对您自定义类型 Bean 的引用。
这TcpNetConnectionSupport策略接口
以下清单展示了 TcpNetConnectionSupport 策略接口的定义:
public interface TcpNetConnectionSupport {
TcpNetConnection createNewConnection(Socket socket,
boolean server, boolean lookupHost,
ApplicationEventPublisher applicationEventPublisher,
String connectionFactoryName) throws Exception;
}
此接口用于创建类型为 TcpNetConnection(或其子类)的对象。
该框架提供了一个单一的实现 (DefaultTcpNetConnectionSupport),默认情况下会创建简单的 TcpNetConnection 对象。
它有两个属性:pushbackCapable 和 pushbackBufferSize。
当启用回退(push back)时,该实现返回一个包装连接 InputStream 的子类,并将其封装在 PushbackInputStream 中。
与 PushbackInputStream 的默认值保持一致,缓冲区大小默认为 1。
这使得反序列化器能够将字节“回读”(推回)到流中。
以下简单示例展示了如何在委托反序列化器中使用它,该反序列化器会“窥视”第一个字节以确定调用哪个反序列化器:
public class CompositeDeserializer implements Deserializer<byte[]> {
private final ByteArrayStxEtxSerializer stxEtx = new ByteArrayStxEtxSerializer();
private final ByteArrayCrLfSerializer crlf = new ByteArrayCrLfSerializer();
@Override
public byte[] deserialize(InputStream inputStream) throws IOException {
PushbackInputStream pbis = (PushbackInputStream) inputStream;
int first = pbis.read();
if (first < 0) {
throw new SoftEndOfStreamException();
}
pbis.unread(first);
if (first == ByteArrayStxEtxSerializer.STX) {
this.receivedStxEtx = true;
return this.stxEtx.deserialize(pbis);
}
else {
this.receivedCrLf = true;
return this.crlf.deserialize(pbis);
}
}
}
这TcpNioConnectionSupport策略接口
以下清单展示了 TcpNioConnectionSupport 策略接口的定义:
public interface TcpNioConnectionSupport {
TcpNioConnection createNewConnection(SocketChannel socketChannel,
boolean server, boolean lookupHost,
ApplicationEventPublisher applicationEventPublisher,
String connectionFactoryName) throws Exception;
}
此接口用于创建TcpNioConnection个对象(或来自子类的对象)。
Spring Integration 提供了两种实现:DefaultTcpNioSSLConnectionSupport和DefaultTcpNioConnectionSupport。
具体使用哪一种取决于是否启用了 SSL。
一个常见的用法是继承DefaultTcpNioSSLConnectionSupport并重写postProcessSSLEngine。
请参阅SSL 客户端身份验证示例。
与DefaultTcpNetConnectionSupport类似,这些实现也支持反向推送(push back)。
示例:启用 SSL 客户端身份验证
当使用 SSL 启用客户端证书认证时,具体技术取决于您是否使用了 NIO。
如果您未使用 NIO,请提供一个自定义的 TcpSocketSupport 实现来对服务器套接字进行后处理:
serverFactory.setTcpSocketSupport(new DefaultTcpSocketSupport() {
@Override
public void postProcessServerSocket(ServerSocket serverSocket) {
((SSLServerSocket) serverSocket).setNeedClientAuth(true);
}
});
(当您使用 XML 配置时,通过设置 socket-support 属性来引用您的 bean)。
当您使用 NIO 时,请提供自定义的 TcpNioSslConnectionSupport 实现以处理 SSLEngine 的后处理,如下例所示:
@Bean
public DefaultTcpNioSSLConnectionSupport tcpNioConnectionSupport() {
return new DefaultTcpNioSSLConnectionSupport(serverSslContextSupport) {
@Override
protected void postProcessSSLEngine(SSLEngine sslEngine) {
sslEngine.setNeedClientAuth(true);
}
}
}
@Bean
public TcpNioServerConnectionFactory server() {
...
serverFactory.setTcpNioConnectionSupport(tcpNioConnectionSupport());
...
}
(当您使用 XML 配置时,从 4.3.7 版本开始,通过设置 nio-connection-support 属性来引用您的 Bean)。
IP 配置属性
下表描述了可用于配置 IP 连接的属性:
| 属性名称 | 客户端? | 服务器? | 允许的值 | 属性描述 |
|---|---|---|---|---|
|
Y |
Y |
客户端,服务器 |
确定连接工厂是客户端还是服务器。 |
|
Y |
N |
目的地的主机名或IP地址。 |
|
|
Y |
Y |
端口。 |
|
|
Y |
Y |
用于序列化负载的 |
|
|
Y |
Y |
用于反序列化负载的 |
|
|
Y |
Y |
|
连接是否使用 NIO。
有关更多信息,请参阅 |
|
Y |
N |
|
使用 NIO 时,连接是否使用直接缓冲区。
有关更多信息,请参阅 |
|
Y |
Y |
|
当您使用 NIO 时,可能需要对消息进行重新排序。
当此属性设置为 |
|
Y |
Y |
默认为 |
|
|
Y |
Y |
查看 |
|
|
Y |
Y |
查看 |
|
|
Y |
Y |
|
参见 |
|
Y |
Y |
将 |
|
|
Y |
Y |
|
参见 |
|
Y |
Y |
查看 |
|
|
N |
Y |
在多宿主系统上,指定套接字绑定的接口IP地址。 |
|
|
Y |
Y |
指定用于套接字处理的特定执行器。
如果未提供,则使用内部缓存线程执行器。
在某些需要特定任务执行器的平台上是必需的,例如 |
|
|
Y |
Y |
|
指定连接是否可用于多个消息。
如果为 |
|
N |
N |
此属性不再使用。
为了向后兼容,它设置 backlog,但您应该在服务器工厂中使用 |
|
|
N |
Y |
设置服务器工厂的连接 backlog。 |
|
|
Y |
Y |
|
指定是否对 IP 地址执行反向查找,将其转换为主机名以用于消息头。
如果为 false,则直接使用 IP 地址。
默认值: |
|
Y |
Y |
查看 TCP 连接拦截器。 |
|
|
Y |
Y |
参见 |
|
|
Y |
Y |
参见 |
|
|
Y |
Y |
查看 SSL/TLS 支持。 |
|
|
Y |
Y |
查看 高级技术。 |
|
|
Y |
Y |
长 > 0 |
在因线程不足导致前一次读取尝试失败后,重试读取之前的延迟(以毫秒为单位)。
默认值:100。
仅当 |
下表描述了您可以设置的属性,用于配置 UDP 入站通道适配器:
| 属性名称 | 允许的值 | 属性描述 |
|---|---|---|
|
适配器监听的端口。 |
|
|
|
无论UDP适配器是否使用多播。 |
|
当 multicast 为 true 时,适配器加入的多播地址。 |
|
|
指定可同时处理的数据包数量。 仅当未配置 task-executor 时生效。 默认值:5。 |
|
task-executor |
指定用于套接字处理的特定执行器。
如果未提供,则使用内部池化执行器。
在某些需要特定任务执行器的平台上是必需的,例如 |
|
|
用于接收 |
|
|
|
UDP 适配器是否期望在接收到的数据包中包含数据长度字段。 用于检测数据包截断。 |
|
查看 |
|
|
用于 UDP 确认数据包。
有关更多信息,请参阅 |
|
|
有关更多信息,请参阅 |
|
|
在多宿主系统上,指定套接字绑定的接口IP地址。 |
|
|
如果下游组件抛出异常,则包含该异常和失败消息的 |
|
|
|
指定是否对 IP 地址执行反向查找,将其转换为主机名以用于消息头。
如果设置为 |
下表描述了您可以设置的属性,用于配置 UDP 出站通道适配器:
| 属性名称 | 允许的值 | 属性描述 |
|---|---|---|
|
目标的主机名或IP地址。 对于多播UDP适配器,即多播地址。 |
|
|
目标端口。 |
|
|
|
无论UDP适配器是否使用多播。 |
|
|
UDP 适配器是否需要来自目标端的确认。
启用时,需要设置以下四个属性: |
|
当 |
|
|
当 |
|
|
当 |
|
|
默认为 1。 对于多播适配器,您可以将其设置为更大的值,这需要来自多个目的地的确认。 |
|
|
|
无论 UDP 适配器是否在发往目标的数据包中包含数据长度字段。 |
|
对于多播适配器,指定 |
|
|
查看 |
|
|
查看 |
|
|
用于 UDP 确认数据包。
有关更多信息,请参阅 |
|
local-address |
在多宿主系统上,对于 UDP 适配器,指定用于回复消息的套接字所绑定的接口的 IP 地址。 对于多播适配器,它还确定多播数据包通过哪个接口发送。 |
|
|
指定用于确认处理的特定执行器。
如果未提供,则使用内部单线程执行器。
在某些需要特定任务执行器的平台上是必需的,例如 |
|
|
SpEL 表达式 |
一个将被评估的 SpEL 表达式,用于确定应使用哪个 |
|
SpEL 表达式 |
一个 SpEL 表达式,用于评估以确定发送出站 UDP 数据包时使用哪个数据报套接字。 |
下表描述了您可以设置的属性,用于配置 TCP 入站通道适配器:
| 属性名称 | 允许的值 | 属性描述 |
|---|---|---|
|
发送入站消息的通道。 |
|
|
如果连接工厂的类型为 |
|
|
如果下游组件抛出异常,则包含该异常和失败消息的 |
|
|
|
当设置为 |
|
当值为 |
|
|
|
指定一个 |
下表描述了可用于配置 TCP 出站通道适配器的属性:
| 属性名称 | 允许的值 | 属性描述 |
|---|---|---|
|
到达出站消息的通道。 |
|
|
如果连接工厂的类型为 |
|
|
|
当值为 |
|
当值为 |
|
|
|
指定一个 |
下表描述了您可以设置的属性,用于配置 TCP 入站网关:
| 属性名称 | 允许的值 | 属性描述 |
|---|---|---|
|
连接工厂必须是服务器类型。 |
|
|
接收传入消息的通道。 |
|
|
用于接收回复消息的通道。 通常,回复会到达添加到入站消息头部的临时回复通道。 |
|
|
网关等待回复的毫秒数。 默认值:1000(1 秒)。 |
|
|
如果下游组件抛出异常,则包含该异常和失败消息的 |
|
|
|
当为 |
|
当值为 |
|
|
|
指定一个 |
下表描述了可用于配置 TCP 出站网关的属性:
| 属性名称 | 允许的值 | 属性描述 |
|---|---|---|
|
连接工厂的类型必须为 |
|
|
传出消息到达的通道。 |
|
|
可选。 发送回复消息的通道。 |
|
|
网关等待远程系统回复的毫秒数。
与 |
|
|
针对消息求值的 SpEL 表达式,用于确定网关等待远程系统回复的毫秒数。
与 |
|
|
如果不使用一次性连接工厂,则网关等待获取共享连接的毫秒数。 |
|
|
网关在将回复发送到回复通道时等待的毫秒时间。 仅当回复通道可能阻塞时适用(例如,当前已满的有界 QueueChannel)。 |
|
|
发送后释放发送线程;回复(或错误)将在接收线程上发送。 |
|
|
一个用于发送非请求消息和迟回复的通道。 |
IP 消息头
此模块使用以下 MessageHeader 个实例:
| Header Name | IpHeaders 常量 | 描述 |
|---|---|---|
|
|
接收到的 TCP 消息或 UDP 数据包的主机名。
如果 |
|
|
接收到的 TCP 消息或 UDP 数据包来源的 IP 地址。 |
|
|
UDP 数据包的远程端口。 |
ip_localInetAddress |
|
与套接字连接的本地 |
|
|
发送给UDP应用程序层确认的远程IP地址。 该框架在数据包中包含确认信息。 |
|
|
用于 UDP 应用层确认的相关 ID。 该框架将确认信息包含在数据包中。 |
|
|
TCP 连接的远程端口。 |
|
|
TCP 连接的唯一标识符。 由框架为入站消息设置。 在向服务器端入站通道适配器发送或响应入站网关时,需要此标头,以便端点能够确定要向其发送消息的连接。 |
|
|
仅供信息参考。 当使用缓存或故障转移客户端连接工厂时,它将包含实际的基础连接 ID。 |
|
|
用于入站消息的可选内容类型
在此表格后描述。
请注意,与其他标头常量不同,此常量位于 |
对于入站消息,ip_hostname、ip_address、ip_tcp_remotePort和ip_connectionId默认由TcpHeaderMapper进行映射。
如果您将映射器的addContentTypeHeader属性设置为true,则映射器会设置contentType头(默认为application/octet-stream;charset="UTF-8")。
您可以通过设置contentType属性来更改默认值。
您可以通过继承TcpHeaderMapper并重写supplyCustomHeaders方法来添加额外的头信息。
例如,当您使用SSL时,可以通过从TcpConnection对象获取会话对象来添加SSLSession的属性,该对象作为参数提供给supplyCustomHeaders方法。
对于出站消息,String 负载在默认 (UTF-8) 字符集下被转换为 byte[]。
设置 charset 属性以更改默认值。
在自定义映射器属性或进行子类化时,请将映射器声明为 Bean,并通过使用 mapper 属性将其实例提供给连接工厂。
基于注解的配置
以下示例来自 samples 仓库,展示了当您使用注解而非 XML 时可用的部分配置选项:
@EnableIntegration (1)
@IntegrationComponentScan (2)
@Configuration
public static class Config {
@Value(${some.port})
private int port;
@MessagingGateway(defaultRequestChannel="toTcp") (3)
public interface Gateway {
String viaTcp(String in);
}
@Bean
@ServiceActivator(inputChannel="toTcp") (4)
public MessageHandler tcpOutGate(AbstractClientConnectionFactory connectionFactory) {
TcpOutboundGateway gate = new TcpOutboundGateway();
gate.setConnectionFactory(connectionFactory);
gate.setOutputChannelName("resultToString");
return gate;
}
@Bean (5)
public TcpInboundGateway tcpInGate(AbstractServerConnectionFactory connectionFactory) {
TcpInboundGateway inGate = new TcpInboundGateway();
inGate.setConnectionFactory(connectionFactory);
inGate.setRequestChannel(fromTcp());
return inGate;
}
@Bean
public MessageChannel fromTcp() {
return new DirectChannel();
}
@MessageEndpoint
public static class Echo { (6)
@Transformer(inputChannel="fromTcp", outputChannel="toEcho")
public String convert(byte[] bytes) {
return new String(bytes);
}
@ServiceActivator(inputChannel="toEcho")
public String upCase(String in) {
return in.toUpperCase();
}
@Transformer(inputChannel="resultToString")
public String convertResult(byte[] bytes) {
return new String(bytes);
}
}
@Bean
public AbstractClientConnectionFactory clientCF() { (7)
return new TcpNetClientConnectionFactory("localhost", this.port);
}
@Bean
public AbstractServerConnectionFactory serverCF() { (8)
return new TcpNetServerConnectionFactory(this.port);
}
}
| 1 | 启用集成应用基础架构的标准 Spring 集成注解。 |
| 2 | 搜索到 @MessagingGateway 个接口。 |
| 3 | 流程的客户端入口点。
调用应用程序可以使用 @Autowired 作为此 Gateway bean 并调用其方法。 |
| 4 | 出站端点由一个MessageHandler和一个包装它的消费者组成。
在此场景中,@ServiceActivator根据通道类型配置该端点。 |
| 5 | 入站端点(在 TCP/UDP 模块中)都是基于消息驱动的,因此只需将其声明为简单的 @Bean 实例即可。 |
| 6 | 此类提供了一些 POJO 方法,供本示例流程使用(服务器端为 @Transformer 和 @ServiceActivator,客户端为 @Transformer)。 |
| 7 | 客户端连接工厂。 |
| 8 | 服务器端连接工厂。 |
使用 TCP 组件的 Java DSL
TCP 组件的 DSL 支持包括适配器和网关的规范、具有创建连接工厂 bean 工厂方法的 Tcp 类,以及具有创建序列化和反序列化器工厂方法的 TcpCodecs 类。
有关更多信息,请参阅它们的 Javadocs。
以下是一些使用 DSL 配置流程的示例。
@Bean
public IntegrationFlow server() {
return IntegrationFlow.from(Tcp.inboundAdapter(Tcp.netServer(1234)
.deserializer(TcpCodecs.lengthHeader1())
.backlog(30))
.errorChannel("tcpIn.errorChannel")
.id("tcpIn"))
.transform(Transformers.objectToString())
.channel("tcpInbound")
.get();
}
@Bean
public IntegrationFlow client() {
return f -> f.handle(Tcp.outboundAdapter(Tcp.nioClient("localhost", 1234)
.serializer(TcpCodecs.lengthHeader1())));
}
@Bean
public IntegrationFlow server() {
return IntegrationFlow.from(Tcp.inboundGateway(Tcp.netServer(1234)
.deserializer(TcpCodecs.lengthHeader1())
.serializer(TcpCodecs.lengthHeader1())
.backlog(30))
.errorChannel("tcpIn.errorChannel")
.id("tcpIn"))
.transform(Transformers.objectToString())
.channel("tcpInbound")
.get();
}
@Bean
public IntegrationFlow client() {
return f -> f.handle(Tcp.outboundGateway(Tcp.nioClient("localhost", 1234)
.deserializer(TcpCodecs.lengthHeader1())
.serializer(TcpCodecs.lengthHeader1())));
}