此版本仍在开发中,目前尚不稳定。如需最新稳定版本,请使用 Spring AMQP 4.0.2spring-doc.cadn.net.cn

线程与异步消费者

许多不同的线程参与异步消费者。spring-doc.cadn.net.cn

TaskExecutor 配置的线程池中的线程在 SimpleMessageListenerContainer 通过 RabbitMQ Client 接收新消息时,用于调用 MessageListener。如果未进行配置,则使用 SimpleAsyncTaskExecutor。如果您使用了线程池执行器,则需确保线程池大小足以处理所配置的并发量。在使用 DirectMessageListenerContainer 的情况下,MessageListener 会直接在 RabbitMQ Client 线程上被调用。在此情况下,taskExecutor 将用于执行监控消费者的任务。spring-doc.cadn.net.cn

当使用默认值 SimpleAsyncTaskExecutor 时,监听器在调用的线程上,由监听器容器 beanNamethreadNamePrefix 中使用。这有利于日志分析。我们通常建议在日志追加器配置中始终包含线程名称。当通过容器上的 taskExecutor 属性明确提供一个 TaskExecutor 时,会原样使用,不做任何修改。建议您采用类似的方法为自定义 TaskExecutor Bean 定义所创建的线程命名,以帮助在日志消息中识别线程。

CachingConnectionFactory 中配置的 Executor 会在创建连接时传递给 RabbitMQ Client,并由其线程用于将新消息分发至监听器容器。
如果未进行此配置,客户端将使用一个内部线程池执行器(在撰写本文时)为每个连接分配大小为 Runtime.getRuntime().availableProcessors() * 2 的线程池。spring-doc.cadn.net.cn

如果您拥有大量工厂或正在使用 CacheMode.CONNECTION,您可能希望考虑使用一个共享的 ThreadPoolTaskExecutor,并确保其线程数量足以满足您的工作负载需求。spring-doc.cadn.net.cn

使用 DirectMessageListenerContainer 时,您需要确保连接工厂配置了一个任务执行器,该执行器具备足够的线程数,以支持所有使用该工厂的监听容器所期望的并发量。默认池大小(在撰写本文时)为 Runtime.getRuntime().availableProcessors() * 2

RabbitMQ client 使用 ThreadFactory 为底层 I/O(套接字)操作创建线程。要修改此工厂,您需要配置底层的 RabbitMQ ConnectionFactory,如 配置底层客户端连接工厂 中所述。spring-doc.cadn.net.cn