此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring AMQP 3.2.6spring-doc.cadn.net.cn

侦听器并发

SimpleMessageListenerContainer

默认情况下,侦听器容器启动一个从队列接收消息的单个使用者。spring-doc.cadn.net.cn

在检查上一节中的表时,您可以看到许多控制并发的属性和属性。 最简单的是concurrentConsumers,这将创建并发处理消息的(固定)数量的使用者。spring-doc.cadn.net.cn

在 1.3.0 版之前,这是唯一可用的设置,必须停止容器并重新启动才能更改设置。spring-doc.cadn.net.cn

从 1.3.0 版本开始,您现在可以动态调整concurrentConsumers财产。 如果在容器运行时更改了它,则会根据需要添加或删除使用者以适应新设置。spring-doc.cadn.net.cn

此外,一个名为maxConcurrentConsumers,容器会根据工作负载动态调整并发。 这与四个附加属性结合使用:consecutiveActiveTrigger,startConsumerMinInterval,consecutiveIdleTriggerstopConsumerMinInterval. 在默认设置下,增加消费者的算法工作原理如下:spring-doc.cadn.net.cn

如果maxConcurrentConsumers未达到,并且现有使用者连续十个周期处于活动状态,并且自上次使用者启动以来至少已过去 10 秒,则启动新的使用者。 如果使用者在batchSize * receiveTimeout毫秒。spring-doc.cadn.net.cn

使用默认设置,减少使用者的算法工作原理如下:spring-doc.cadn.net.cn

如果超过concurrentConsumers正在运行,并且使用者检测到连续十次超时(空闲),并且最后一个使用者至少在 60 秒前停止,则使用者停止。 超时取决于receiveTimeoutbatchSize性能。 如果使用者在batchSize * receiveTimeout毫秒。 因此,使用默认超时(一秒)和batchSize在 4 个空闲时间中,考虑在 40 秒的空闲时间后停止使用者(四个超时对应于一个空闲检测)。spring-doc.cadn.net.cn

实际上,只有当整个容器闲置一段时间时,消费者才能停止。 这是因为经纪人在所有活跃消费者之间共享其工作。

每个使用者使用单个通道,而不考虑配置的队列数量。spring-doc.cadn.net.cn

从 2.0 版开始,concurrentConsumersmaxConcurrentConsumers属性可以使用concurrency属性 — 例如,2-4.spring-doc.cadn.net.cn

DirectMessageListenerContainer

使用此容器,并发基于配置的队列和consumersPerQueue. 每个队列的每个使用者都使用单独的通道,并发由 rabbit 客户端库控制。 默认情况下,在撰写本文时,它使用DEFAULT_NUM_THREADS = Runtime.getRuntime().availableProcessors() * 2线程。spring-doc.cadn.net.cn

您可以配置taskExecutor以提供所需的最大并发。spring-doc.cadn.net.cn