此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
严格的消息排序
本节介绍入站和出站消息的消息排序。
入境
如果需要对入站消息进行严格排序,则必须配置入站侦听器容器的prefetchCount
属性设置为1
.
这是因为,如果消息失败并重新传递,则该消息会在现有预取消息之后到达。
从 Spring AMQP 2.0 版开始,prefetchCount
默认为250
以提高性能。
严格的订购要求是以性能下降为代价的。
出境
请考虑以下集成流程:
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(","))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
假设我们发送消息A
,B
和C
到网关。
虽然很可能消息A
,B
,C
均按顺序发送,不作保证。
这是因为模板为每个发送作从缓存中“借用”一个通道,并且不能保证每个消息都使用相同的通道。
一种解决方案是在拆分器之前启动事务,但在 RabbitMQ 中事务成本高昂,并且会降低数百倍的性能。
为了更有效地解决这个问题,从 5.1 版本开始,Spring Integration 提供了BoundRabbitChannelAdvice
这是一个HandleMessageAdvice
.
请参阅处理消息通知。
在拆分器之前应用时,它可确保在同一通道上执行所有下游作,并且可以选择等待,直到收到所有已发送消息的发布者确认(如果连接工厂配置为确认)。
以下示例演示如何使用BoundRabbitChannelAdvice
:
@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
请注意,同样RabbitTemplate
(实现RabbitOperations
) 用于通知和出站适配器。
建议在模板的invoke
方法,以便所有作在同一通道上运行。
如果提供了可选的超时,则当流完成时,通知会调用waitForConfirmsOrDie
方法,如果在指定时间内未收到确认,则会抛出异常。
下游流中不得有螺纹放手(QueueChannel ,ExecutorChannel 等)。 |