此版本仍在开发中,目前尚不稳定。如需最新稳定版本,请使用 Spring AMQP 4.0.2spring-doc.cadn.net.cn

监听器并发性

SimpleMessageListenerContainer

默认情况下,监听器容器会启动一个单一的消费者,从队列中接收消息。spring-doc.cadn.net.cn

在查看前一节中的表格时,您可以看到许多用于控制并发性的属性和特性。最简单的是 concurrentConsumers,它会创建固定数量的消费者,这些消费者可并行处理消息。spring-doc.cadn.net.cn

在 1.3.0 版本之前,这是唯一可用的设置,且必须停止并重新启动容器才能更改该设置。spring-doc.cadn.net.cn

自版本 1.3.0 起,您现在可以动态调整 concurrentConsumers 属性。如果在容器运行期间对其进行更改,则会根据需要添加或移除消费者,以适应新的设置。spring-doc.cadn.net.cn

此外,还新增了一个名为 maxConcurrentConsumers 的属性,容器会根据工作负载动态调整并发度。这与另外四个属性协同工作:consecutiveActiveTriggerstartConsumerMinIntervalconsecutiveIdleTriggerstopConsumerMinInterval。在默认设置下,增加消费者数量的算法如下:spring-doc.cadn.net.cn

如果 maxConcurrentConsumers 尚未达到,并且在连续十个周期内已有活跃的消费者,且自上一次启动消费者以来已过去至少 10 秒,则会启动一个新的消费者。若消费者在 batchSize * receiveTimeout 毫秒的时间内至少接收了一条消息,则认为该消费者处于活跃状态。spring-doc.cadn.net.cn

在默认设置下,减少消费者(consumers)的算法工作方式如下:spring-doc.cadn.net.cn

如果运行中的消费者数量超过 concurrentConsumers,且一个消费者检测到连续十次超时(空闲)状态,并且最后停止的消费者至少在 60 秒前已停止,则会停止该消费者。超时时间取决于 receiveTimeoutbatchSize 这两个属性。若消费者在 batchSize * receiveTimeout 毫秒内未收到任何消息,则视为处于空闲状态。因此,以默认超时时间(1 秒)和 batchSize 值为 4 的情况为例,当消费者空闲 40 秒后(即四次超时对应一次空闲检测)将被停止。spring-doc.cadn.net.cn

实际上,只有在整个容器空闲一段时间后,消费者才能被停止。这是因为代理服务器在其所有活跃消费者之间分配其工作。

每个消费者都使用一个通道,无论配置了多少个队列。spring-doc.cadn.net.cn

从版本 2.0 开始,concurrentConsumersmaxConcurrentConsumers 属性可以通过 concurrency 属性进行设置——例如,2-4spring-doc.cadn.net.cn

使用DirectMessageListenerContainer

使用此容器时,并发性基于配置的队列和 consumersPerQueue。每个队列的每个消费者均使用独立的通道,而并发性由 Rabbit 客户端库进行控制。默认情况下,在撰写本文时,它使用一个大小为 DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2 的线程池。spring-doc.cadn.net.cn

您可以配置一个 taskExecutor 来提供所需的最高并发数。spring-doc.cadn.net.cn