此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
关于无阻塞 I/O (NIO)
使用蔚来(参见using-nio
在 IP 配置属性中)避免专用于从每个套接字读取线程。
对于少数套接字,您可能会发现不使用 NIO,以及异步切换(例如QueueChannel
),性能与使用蔚来一样好或更好。
在处理大量连接时,您应该考虑使用蔚来汽车。 然而,蔚来汽车的使用还有一些其他影响。 线程池(在任务执行器中)在所有套接字之间共享。 每个传入消息都被组装并作为从该池中选择的线程上的单独工作单元发送到配置的通道。 到达同一套接字的两条顺序消息可能由不同的线程处理。 这意味着消息发送到通道的顺序是不确定的。 不维护到达套接字的消息的严格排序。
对于某些应用程序,这不是问题。
对于其他人来说,这是一个问题。
如果您需要严格排序,请考虑将using-nio
自false
以及使用异步切换。
或者,您可以在入站端点的下游插入重排序器,以将消息返回到正确的顺序。
如果您将apply-sequence
自true
在连接工厂上,到达 TCP 连接的消息具有sequenceNumber
和correlationId
headers 设置。
重排序器使用这些标头将消息返回到正确的顺序。
从版本 5.1.4 开始,接受新连接优先于从现有连接读取。
通常,除非新传入连接的速率非常高,否则这应该影响不大。
如果您希望恢复到之前赋予读取优先级的行为,请将multiAccept 属性TcpNioServerConnectionFactory 自false . |
池大小
不再使用池大小属性。
以前,当未指定任务执行器时,它会指定默认线程池的大小。
它还用于在服务器套接字上设置连接积压。
不再需要第一个功能(见下一段)。
第二个函数被backlog
属性。
以前,当将固定线程池任务执行器(这是默认值)与 NIO 一起使用时,可能会出现死锁并且处理将停止。 当缓冲区已满、从套接字读取的线程尝试向缓冲区添加更多数据,并且没有线程可用于在缓冲区中腾出空间时,会出现此问题。 这只发生在池子面积非常小的情况下,但在极端条件下可能是可能的。 从 2.2 开始,有两个变化消除了这个问题。 首先,默认任务执行器是缓存的线程池执行器。 其次,添加了死锁检测逻辑,如果发生线程匮乏,则抛出异常而不是死锁,从而释放死锁资源。
现在,默认任务执行器是无界的,如果消息处理需要较长时间,则可能会在传入消息速率较高的情况下出现内存不足情况。 如果应用程序表现出此类行为,则应使用具有适当池大小的池化任务执行器,但请参阅下一节。 |
线程池任务执行器CALLER_RUNS
政策
当您将固定线程池与CallerRunsPolicy
(CALLER_RUNS
使用<task/>
命名空间),队列容量较小。
如果不使用固定线程池,则以下内容不适用。
对于蔚来连接,有三种不同的任务类型。 I/O 选择器处理在一个专用线程上执行(检测事件、接受新连接以及使用任务执行器将 I/O 读取作分派到其他线程)。 当 I/O 读取器线程 (读取作被分派到) 读取数据时,它会移交给另一个线程来组合传入消息。 大型消息可能需要多次读取才能完成。 这些“汇编器”线程可能会在等待数据时阻塞。 当发生新的读取事件时,读取器会确定此套接字是否已经具有汇编程序,如果没有,则运行一个新的汇编程序。 汇编过程完成后,汇编程序线程将返回到池中。
当池耗尽时,这可能会导致死锁,CALLER_RUNS
拒绝策略正在使用中,并且任务队列已满。
当池为空且队列中没有空间时,IO 选择器线程会收到OP_READ
事件并使用 executor 调度读取。
队列已满,因此选择器线程本身启动读取过程。
现在,它检测到此套接字没有汇编程序,并在读取之前触发汇编程序。
同样,队列已满,选择器线程成为汇编器。
汇编程序现在被阻止,等待读取数据,这永远不会发生。
连接工厂现在死锁,因为选择器线程无法处理新事件。
为了避免这种死锁,我们必须避免执行汇编任务的选择器(或读取器)线程。 我们希望将单独的池用于 IO 和汇编作。
该框架提供了一个CompositeExecutor
,它允许配置两个不同的执行器:一个用于执行 IO作,一个用于消息汇编。
在此环境中,IO 线程永远无法成为汇编器线程,并且不会发生死锁。
此外,任务执行器应配置为使用AbortPolicy
(ABORT
使用时<task>
).
当 I/O 任务无法完成时,它会延迟一小段时间,并不断重试,直到可以完成并分配汇编程序。
默认情况下,延迟为 100 毫秒,但您可以通过设置readDelay
连接工厂 (read-delay
使用 XML 命名空间进行配置时)。
以下三个示例展示了如何配置复合执行器:
@Bean
private CompositeExecutor compositeExecutor() {
ThreadPoolTaskExecutor ioExec = new ThreadPoolTaskExecutor();
ioExec.setCorePoolSize(4);
ioExec.setMaxPoolSize(10);
ioExec.setQueueCapacity(0);
ioExec.setThreadNamePrefix("io-");
ioExec.setRejectedExecutionHandler(new AbortPolicy());
ioExec.initialize();
ThreadPoolTaskExecutor assemblerExec = new ThreadPoolTaskExecutor();
assemblerExec.setCorePoolSize(4);
assemblerExec.setMaxPoolSize(10);
assemblerExec.setQueueCapacity(0);
assemblerExec.setThreadNamePrefix("assembler-");
assemblerExec.setRejectedExecutionHandler(new AbortPolicy());
assemblerExec.initialize();
return new CompositeExecutor(ioExec, assemblerExec);
}
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg ref="io"/>
<constructor-arg ref="assembler"/>
</bean>
<task:executor id="io" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<task:executor id="assembler" pool-size="4-10" queue-capacity="0" rejection-policy="ABORT" />
<bean id="myTaskExecutor" class="org.springframework.integration.util.CompositeExecutor">
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="io-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="8" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
<constructor-arg>
<bean class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="threadNamePrefix" value="assembler-" />
<property name="corePoolSize" value="4" />
<property name="maxPoolSize" value="10" />
<property name="queueCapacity" value="0" />
<property name="rejectedExecutionHandler">
<bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
</property>
</bean>
</constructor-arg>
</bean>