此版本仍在开发中,目前尚不稳定。如需最新稳定版本,请使用 Spring AMQP 4.0.2spring-doc.cadn.net.cn

@RabbitListener with Batching

当接收 一批 消息时,解批处理通常由容器完成,监听器会逐条消息调用一次。从版本 2.2 开始,您可以配置监听器容器工厂和监听器,以在一次调用中接收整个批次,只需设置工厂的 batchListener 属性,并将方法的参数负载(payload)设为 ListCollectionspring-doc.cadn.net.cn

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory());
    factory.setBatchListener(true);
    return factory;
}

@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
    ...
}

// or

@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
    ...
}

batchListener 属性设置为 true 会自动关闭工厂所创建容器中的 deBatchingEnabled 容器属性(除非 consumerBatchEnabledtrue —— 详见下文)。实际上,去批处理操作从容器转移到了监听器适配器,而适配器则负责创建传递给监听器的列表。spring-doc.cadn.net.cn

批处理启用的工厂不能与 多方法监听器 一起使用。spring-doc.cadn.net.cn

此外,从版本 2.2 开始,当以一次一条的方式接收批处理消息时,最后一则消息会包含一个布尔类型头,其值设为 true。您可以通过在监听器方法中添加 @Header(AmqpHeaders.LAST_IN_BATCH) 参数(即 boolean last)来获取该头信息。该头信息由 MessageProperties.isLastInBatch() 映射而来。此外,AmqpHeaders.BATCH_SIZE 会在每条消息分片中填充批处理的大小。spring-doc.cadn.net.cn

此外,已在 SimpleMessageListenerContainer 中添加了一个新属性 consumerBatchEnabled。当该属性为 true 时,容器将创建一批消息,最多可达 batchSize;若在 receiveTimeout 时间内无新消息到达,则会发送部分批次。如果收到由生产者创建的批次,该批次将被解包并加入到消费者端的批次中;因此实际发送的消息数量可能超过 batchSize(即从代理服务器接收到的消息数量)。当 consumerBatchEnabled 为 true 时,deBatchingEnabled 必须为 true;容器工厂将强制执行此要求。spring-doc.cadn.net.cn

@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(rabbitConnectionFactory());
    factory.setConsumerTagStrategy(consumerTagStrategy());
    factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
    factory.setBatchSize(2);
    factory.setConsumerBatchEnabled(true);
    return factory;
}

当使用 consumerBatchEnabled@RabbitListener 时:spring-doc.cadn.net.cn

@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
    ...
}

@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
    ...
}

@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
    ...
}
  • 第一个是使用接收到的原始未转换的 org.springframework.amqp.core.Message 进行调用。spring-doc.cadn.net.cn

  • 第二个调用时,会使用转换后的有效载荷和映射的头/属性传递 org.springframework.messaging.Message<?>spring-doc.cadn.net.cn

  • 第三个调用时使用转换后的有效载荷,但无法访问头信息/属性。spring-doc.cadn.net.cn

您还可以添加一个 Channel 参数,通常在使用 MANUAL 确认模式时使用。由于第三个示例中您无法访问 delivery_tag 属性,因此该方法用处不大。spring-doc.cadn.net.cn

Spring Boot 为 consumerBatchEnabledbatchSize 提供了配置属性,但不为 batchListener 提供。
从 3.0 版本开始,在容器工厂上将 consumerBatchEnabled 设置为 true 也会将 batchListener 设置为 true
consumerBatchEnabledtrue 时,监听器 必须 是批处理监听器。spring-doc.cadn.net.cn

从版本 3.0 开始,监听器方法可以接收 Collection<?>List<?> 个参数。spring-doc.cadn.net.cn

批处理模式下的监听器不支持回复,因为批处理中的消息与单个回复之间可能不存在关联关系。尽管如此,异步返回类型仍可与批处理监听器一起使用。