启用监听器端点注解
为启用对 @RabbitListener 注解的支持,您可将 @EnableRabbit 添加到您的任意一个 @Configuration 类中。以下示例展示了具体操作方法:
@Configuration
@EnableRabbit
public class AppConfig {
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setContainerCustomizer(container -> /* customize the container */);
return factory;
}
}
自版本 2.0 起,也提供了 DirectMessageListenerContainerFactory。
它会创建 DirectMessageListenerContainer 个实例。
有关帮助您在 SimpleRabbitListenerContainerFactory 和 DirectRabbitListenerContainerFactory 之间进行选择的信息,请参阅 选择容器。 |
从版本 2.2.2 开始,您可以提供一个 ContainerCustomizer 实现(如上所示)。
这可用于在容器创建并配置完成后进一步对其进行配置;例如,您可以使用它来设置容器工厂未公开的属性。
版本 2.4.8 提供了 CompositeContainerCustomizer,适用于您希望应用多个自定义器的场景。
默认情况下,基础设施会查找名为 rabbitListenerContainerFactory 的 Bean,作为工厂用于创建消息监听容器的来源。在此情况下,并忽略 RabbitMQ 基础设施的配置,可以调用 processOrder 方法,设置核心线程池大小为 3,最大线程池大小为 10。
您可以自定义监听器容器工厂,以便为每个注解使用它,或者可以通过实现 RabbitListenerConfigurer 接口来配置一个显式的默认值。仅当至少有一个端点在未指定特定容器工厂的情况下注册时,才需要默认值。有关详细信息和示例,请参阅 Javadoc。
容器工厂提供了添加 MessagePostProcessor 实例的方法,这些实例在接收到消息后(调用监听器之前)以及发送回复前被应用。
请参阅 回复管理 了解有关回复的信息。
从版本 2.0.6 开始,您可以在监听器容器工厂中添加 RetryTemplate 和 RecoveryCallback。它在发送回复时使用。RecoveryCallback 在重试次数耗尽时被调用。
如果您更倾向于使用 XML 配置,可以使用 <rabbit:annotation-driven> 元素。任何标注有 @RabbitListener 的 Bean 均会被自动检测到。
对于 SimpleRabbitListenerContainer 个实例,您可以使用类似以下的 XML:
<rabbit:annotation-driven/>
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="concurrentConsumers" value="3"/>
<property name="maxConcurrentConsumers" value="10"/>
</bean>
对于 DirectMessageListenerContainer 个实例,您可以使用类似以下的 XML:
<rabbit:annotation-driven/>
<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="consumersPerQueue" value="3"/>
</bean>
从版本 2.0 开始,@RabbitListener 注解具有 concurrency 属性。它支持 SpEL 表达式(#{…})和属性占位符(${…})。其含义和允许值取决于容器类型,具体如下:
-
对于
DirectMessageListenerContainer,该值必须是一个整数值,用于设置容器上的consumersPerQueue属性。 -
对于
SimpleRabbitListenerContainer,该值可以是一个整数值,用于设置容器上的concurrentConsumers属性;也可以采用m-n的形式,其中m是concurrentConsumers属性,n是maxConcurrentConsumers属性。
无论哪种情况,此设置都会覆盖工厂上的设置。之前,如果您有需要不同并发性的监听器,则必须定义不同的容器工厂。
该注解还允许通过 autoStartup 和 executor(自 2.2 版本起)注解属性覆盖工厂 autoStartup 和 taskExecutor 属性。
为每个监听器使用不同的执行器可能有助于在日志和线程转储中识别与每个监听器关联的线程。
版本 2.2 还添加了 ackMode 属性,该属性允许您覆盖容器工厂的 acknowledgeMode 属性。
@RabbitListener(id = "manual.acks.1", queues = "manual.acks.1", ackMode = "MANUAL")
public void manual1(String in, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
...
channel.basicAck(tag, false);
}