此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring AMQP 3.2.6spring-doc.cadn.net.cn

@RabbitListener批处理

当接收到一批消息时,通常由容器执行去批处理,并且一次调用一条消息的侦听器。 从 2.2 版本开始,您可以将侦听器容器工厂和侦听器配置为在一次调用中接收整个批次,只需将工厂的batchListener属性,并将方法有效负载参数设置为ListCollection:spring-doc.cadn.net.cn

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setBatchListener(true);
    return factory;
}

@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
    ...
}

// or

@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
    ...
}

设置batchListener属性设置为 true 会自动关闭deBatchingEnabledcontainer 属性(除非consumerBatchEnabledtrue- 见下文)。实际上,取消批处理从容器移动到侦听器适配器,适配器创建传递给侦听器的列表。spring-doc.cadn.net.cn

启用批处理的工厂不能与多方法侦听器一起使用。spring-doc.cadn.net.cn

同样从 2.2 版本开始。一次接收一条批处理消息时,最后一条消息包含设置为true. 可以通过添加@Header(AmqpHeaders.LAST_IN_BATCH)boolean last' 参数添加到您的侦听器方法。 标头映射自MessageProperties.isLastInBatch(). 另外AmqpHeaders.BATCH_SIZE填充了每个消息片段中的批处理大小。spring-doc.cadn.net.cn

此外,还有一个新房产consumerBatchEnabled已添加到SimpleMessageListenerContainer. 当此值为 true 时,容器将创建一批消息,最多batchSize;如果出现以下情况,则交付部分批次receiveTimeout已过,没有新消息到达。 如果收到生产者创建的批次,则将其取消批处理并添加到使用者端批次中;因此,实际传递的消息数可能会超过batchSize,表示从代理接收的消息数。deBatchingEnabledconsumerBatchEnabled是真的;集装箱工厂将强制执行此要求。spring-doc.cadn.net.cn

@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory());
    factory.setConsumerTagStrategy(consumerTagStrategy());
    factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
    factory.setBatchSize(2);
    factory.setConsumerBatchEnabled(true);
    return factory;
}

使用时consumerBatchEnabled@RabbitListener:spring-doc.cadn.net.cn

@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
    ...
}

@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
    ...
}

@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
    ...
}
  • 第一个是用原始的、未转换的org.springframework.amqp.core.Messages 收到。spring-doc.cadn.net.cn

  • 第二个是使用org.springframework.messaging.Message<?>s 具有转换后的有效负载和映射的标头/属性。spring-doc.cadn.net.cn

  • 第三个是使用转换后的有效负载调用的,无法访问标头/属性。spring-doc.cadn.net.cn

您还可以添加Channel参数,在使用MANUALack 模式。 这对于第三个示例不是很有用,因为您无权访问delivery_tag财产。spring-doc.cadn.net.cn

Spring Boot 提供了一个配置属性consumerBatchEnabledbatchSize,但不是batchListener. 从 3.0 版开始,将consumerBatchEnabledtrue在集装箱工厂也设置batchListenertrue. 什么时候consumerBatchEnabledtrue,则侦听器必须是批处理侦听器。spring-doc.cadn.net.cn

从 3.0 版开始,侦听器方法可以使用Collection<?>List<?>.spring-doc.cadn.net.cn

批处理模式下的侦听器不支持回复,因为批处理中的消息与生成的单个回复之间可能没有相关性。 批处理侦听器仍支持异步返回类型