对于最新的稳定版本,请使用 Spring AMQP 3.2.6! |
@RabbitListener批处理
当收到一批消息时,通常由容器执行取消批处理,并且一次调用一个消息。
从 2.2 版本开始,您可以将侦听器容器工厂和侦听器配置为在一次调用中接收整个批次,只需将工厂的batchListener
属性,并将方法有效负载参数设置为List
或Collection
:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setBatchListener(true);
return factory;
}
@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
...
}
// or
@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
...
}
设置batchListener
属性设置为 true 会自动关闭deBatchingEnabled
container 属性(除非consumerBatchEnabled
是true
- 见下文)。实际上,取消批处理从容器移动到侦听器适配器,适配器创建传递给侦听器的列表。
启用批处理的工厂不能与多方法侦听器一起使用。
同样从 2.2 版本开始。一次接收一条批处理消息时,最后一条消息包含设置为true
.
可以通过添加@Header(AmqpHeaders.LAST_IN_BATCH)
boolean last' 参数添加到您的侦听器方法。
标头映射自MessageProperties.isLastInBatch()
.
另外AmqpHeaders.BATCH_SIZE
填充了每个消息片段中的批处理大小。
此外,还有一个新房产consumerBatchEnabled
已添加到SimpleMessageListenerContainer
.
当此值为 true 时,容器将创建一批消息,最多batchSize
;如果出现以下情况,则交付部分批次receiveTimeout
已过,没有新消息到达。
如果收到生产者创建的批次,则将其取消批处理并添加到使用者端批次中;因此,实际传递的消息数可能会超过batchSize
,表示从代理接收的消息数。deBatchingEnabled
当consumerBatchEnabled
是真的;集装箱工厂将强制执行此要求。
@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setConsumerTagStrategy(consumerTagStrategy());
factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
return factory;
}
使用时consumerBatchEnabled
跟@RabbitListener
:
@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
...
}
@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
...
}
@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
...
}
-
第一个是用原始的、未转换的
org.springframework.amqp.core.Message
s 收到。 -
第二个是使用
org.springframework.messaging.Message<?>
s 具有转换后的有效负载和映射的标头/属性。 -
第三个是使用转换后的有效负载调用的,无法访问标头/属性。
您还可以添加Channel
参数,在使用MANUAL
ack 模式。
这对于第三个示例不是很有用,因为您无权访问delivery_tag
财产。
Spring Boot 提供了一个配置属性consumerBatchEnabled
和batchSize
,但不是batchListener
.
从 3.0 版开始,将consumerBatchEnabled
自true
在集装箱工厂也设置batchListener
自true
.
什么时候consumerBatchEnabled
是true
,则侦听器必须是批处理侦听器。
从 3.0 版开始,侦听器方法可以使用Collection<?>
或List<?>
.