此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1spring-doc.cadn.net.cn

入站通道适配器

以下列表显示了 AMQP 入站通道适配器的可能配置选项:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName"))
            .handle(m -> System.out.println(m.getPayload()))
            .get();
}
@Bean
public MessageChannel amqpInputChannel() {
    return new DirectChannel();
}

@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
        @Qualifier("amqpInputChannel") MessageChannel channel) {
    AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
    adapter.setOutputChannel(channel);
    return adapter;
}

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
    SimpleMessageListenerContainer container =
                               new SimpleMessageListenerContainer(connectionFactory);
    container.setQueueNames("aName");
    container.setConcurrentConsumers(2);
    // ...
    return container;
}

@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
    return new MessageHandler() {

        @Override
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println(message.getPayload());
        }

    };
}
<int-amqp:inbound-channel-adapter
                                  id="inboundAmqp"                (1)
                                  channel="inboundChannel"        (2)
                                  queue-names="si.test.queue"     (3)
                                  acknowledge-mode="AUTO"         (4)
                                  advice-chain=""                 (5)
                                  channel-transacted=""           (6)
                                  concurrent-consumers=""         (7)
                                  connection-factory=""           (8)
                                  error-channel=""                (9)
                                  expose-listener-channel=""      (10)
                                  header-mapper=""                (11)
                                  mapped-request-headers=""       (12)
                                  listener-container=""           (13)
                                  message-converter=""            (14)
                                  message-properties-converter="" (15)
                                  phase=""                        (16)
                                  prefetch-count=""               (17)
                                  receive-timeout=""              (18)
                                  recovery-interval=""            (19)
                                  missing-queues-fatal=""         (20)
                                  shutdown-timeout=""             (21)
                                  task-executor=""                (22)
                                  transaction-attribute=""        (23)
                                  transaction-manager=""          (24)
                                  batch-size=""                   (25)
                                  consumers-per-queue             (26)
                                  batch-mode="MESSAGES"/>         (27)
1 此适配器的唯一 ID。 自选。
2 应将转换后的消息发送到的消息通道。 必填。
3 应从中使用消息的 AMQP 队列(逗号分隔列表)的名称。 必填。
4 确认模式MessageListenerContainer. 当设置为MANUAL,则在邮件头中提供传递标记和通道amqp_deliveryTagamqp_channel分别。 用户应用程序负责确认。NONE表示没有确认 (autoAck).AUTO表示适配器的容器在下游流完成时确认。 可选(默认为 AUTO)。 请参阅入站终端确认模式
5 额外的 AOP 建议,用于处理与此入站通道适配器关联的横切行为。 自选。
6 指示此组件创建的通道是事务性的标志。 如果为 true,它告诉框架使用事务通道,并根据结果通过提交或回滚结束所有作(发送或接收),但有一个表示回滚的例外。 可选(默认为 false)。
7 指定要创建的并发使用者数。 默认值为1. 建议增加并发使用者数,以缩放从队列传入的消息的消耗。 但是,请注意,一旦注册了多个消费者,任何订购保证都会丢失。 通常,对于低容量队列,使用一个使用者。 设置了“consumers-per-queue”时不允许。 自选。
8 对 RabbitMQ 的 Bean 引用ConnectionFactory. 可选(默认为connectionFactory).
9 应将错误消息发送到的消息通道。 自选。
10 侦听器通道(com.rabbitmq.client.Channel)是否向已注册的ChannelAwareMessageListener. 可选(默认为 true)。
11 AmqpHeaderMapper在接收 AMQP 消息时使用。 自选。 默认情况下,只有标准 AMQP 属性(例如contentType)被复制到 Spring IntegrationMessageHeaders. AMQP 中的任何用户定义的标头MessageProperties默认情况下不会复制到消息中DefaultAmqpHeaderMapper. 如果出现以下情况,则不允许request-header-names被提供。
12 要从 AMQP 请求映射到MessageHeaders. 只有在未提供“header-mapper”引用时才能提供此设置。 此列表中的值也可以是与标头名称匹配的简单模式(例如“*”或“thing1*、thing2”或“*something”)。
13 引用AbstractMessageListenerContainer用于接收 AMQP 消息。 如果提供了此属性,则不应提供与侦听器容器配置相关的其他属性。 换句话说,通过设置此引用,您必须对侦听器容器配置承担全部责任。 唯一的例外是MessageListener本身。 由于这实际上是此通道适配器实现的核心职责,因此引用的侦听器容器必须还没有自己的MessageListener. 自选。
14 MessageConverter在接收 AMQP 消息时使用。 自选。
15 MessagePropertiesConverter在接收 AMQP 消息时使用。 自选。
16 指定基础AbstractMessageListenerContainer应启动和停止。 启动顺序从最低到最高,关闭顺序与此相反。 默认情况下,此值为Integer.MAX_VALUE,这意味着该容器尽可能晚地启动并尽快停止。 自选。
17 告诉 AMQP 代理在单个请求中要向每个使用者发送多少条消息。 通常,可以将此值设置得较高以提高吞吐量。 它应该大于或等于事务大小(请参阅batch-size属性,在此列表后面)。 可选(默认为1).
18 接收超时(以毫秒为单位)。 可选(默认为1000).
19 指定基础AbstractMessageListenerContainer(以毫秒为单位)。可选(默认为5000).
20 如果 'true' 并且代理上没有可用的队列,则容器会在启动期间抛出致命异常,并在容器运行时删除队列时停止(在尝试三次被动声明队列之后)。 如果false,则容器不会抛出异常并进入恢复模式,尝试根据recovery-interval. 可选(默认为true).
21 在基础之后等待工作线程的时间(以毫秒为单位)AbstractMessageListenerContainer已停止,并且在强制关闭 AMQP 连接之前。如果在关闭信号发出时任何工作线程处于活动状态,则允许它们完成处理,只要它们能够在此超时内完成。否则,连接将关闭,并且消息将保持未确认状态(如果通道是事务性的)。可选(默认为5000).
22 默认情况下,基础AbstractMessageListenerContainer使用SimpleAsyncTaskExecutor实现,为每个任务启动一个新线程,异步运行它。默认情况下,并发线程数是无限的。请注意,此实现不会重用线程。考虑使用线程池TaskExecutor实现作为替代方案。可选(默认为SimpleAsyncTaskExecutor).
23 默认情况下,基础AbstractMessageListenerContainer创建DefaultTransactionAttribute(它采用 EJB 方法在运行时回滚,但不检查异常)。可选(默认为DefaultTransactionAttribute).
24 将 bean 引用设置为外部PlatformTransactionManager在基础上AbstractMessageListenerContainer. 事务管理器与channel-transacted属性。 如果框架在发送或接收消息时已经有正在进行的事务,并且channelTransactedflag 是true,则消息传递事务的提交或回滚将推迟到当前事务结束。如果channelTransactedflag 是false,没有事务语义适用于消息传递作(它是自动确认的)。有关更多信息,请参阅使用 Spring AMQP 进行事务。 自选。
25 告诉SimpleMessageListenerContainer在单个请求中要处理的消息数。为获得最佳结果,它应小于或等于prefetch-count. 设置了“consumers-per-queue”时不允许。可选(默认为1).
26 指示基础侦听器容器应为DirectMessageListenerContainer而不是默认的SimpleMessageListenerContainer. 有关更多信息,请参阅 Spring AMQP 参考手册
27 当容器的consumerBatchEnabledtrue,确定适配器如何在消息有效负载中显示消息批次。当设置为MESSAGES(默认),有效负载是List<Message<?>>其中每封邮件都有从传入 AMQP 映射的标头Message有效载荷是转换后的body. 当设置为EXTRACT_PAYLOADS,则有效负载是List<?>其中元素是从 AMQP 转换而来的Message身体。EXTRACT_PAYLOADS_WITH_HEADERS类似于EXTRACT_PAYLOADS但是,此外,每条消息的标头都是从MessageProperties变成一个List<Map<String, Object>在相应的索引中;标头名称为AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS.
容器

请注意,使用 XML 配置外部容器时,不能使用 Spring AMQP 命名空间来定义容器。 这是因为命名空间至少需要一个<listener/>元素。 在此环境中,侦听器位于适配器内部。 因此,您必须使用普通的 Spring 来定义容器<bean/>定义,如以下示例所示:spring-doc.cadn.net.cn

<bean id="container"
 class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="queueNames" value="aName.queue" />
    <property name="defaultRequeueRejected" value="false"/>
</bean>
尽管 Spring Integration JMS 和 AMQP 支持相似,但存在重要差异。 JMS 入站通道适配器正在使用JmsDestinationPollingSource在幕后,并期望配置一个轮询器。 AMQP 入站通道适配器使用AbstractMessageListenerContainer并且是消息驱动的。 在这方面,它更类似于 JMS 消息驱动的通道适配器。

从 5.5 版本开始,AmqpInboundChannelAdapter可以配置为org.springframework.amqp.rabbit.retry.MessageRecoverer策略,用于RecoveryCallback当在内部调用重试作时。 看setMessageRecoverer()JavaDocs 了解更多信息。spring-doc.cadn.net.cn

@Publisher注释也可以与@RabbitListener:spring-doc.cadn.net.cn

@Configuration
@EnableIntegration
@EnableRabbit
@EnablePublisher
public static class ContextConfiguration {

    @Bean
    QueueChannel fromRabbitViaPublisher() {
        return new QueueChannel();
    }

    @RabbitListener(queuesToDeclare = @Queue("publisherQueue"))
    @Publisher("fromRabbitViaPublisher")
    @Payload("#args.payload.toUpperCase()")
    public void consumeForPublisher(String payload) {

    }

}

默认情况下,@PublisherAOP 拦截器处理方法调用的返回值。 但是,来自@RabbitListener方法被视为 AMQP 回复消息。 因此,这种方法不能与@Publisher,所以一个@Payload针对方法参数使用相应的 SpEL 表达式进行注释是此组合的推荐方法。 查看有关@Publisher在“注释驱动配置”(Annotation-driven Configuration) 部分中。spring-doc.cadn.net.cn

在监听器容器中使用独占或单活动消费者时,建议将容器属性forceStoptrue. 这将防止出现竞争条件,即在停止容器后,另一个使用者可能会在此实例完全停止之前开始使用消息。

批处理消息

有关批处理消息的更多信息,请参阅 Spring AMQP 文档spring-doc.cadn.net.cn

要使用 Spring Integration 生成批量消息,只需使用BatchingRabbitTemplate.spring-doc.cadn.net.cn

在接收批处理消息时,默认情况下,监听器容器会提取每个片段消息,适配器将生成一个Message<?>对于每个片段。 从 5.2 版开始,如果容器的deBatchingEnabled属性设置为false,取消批处理由适配器执行,并且单个Message<List<?>>生成有效负载是片段有效负载的列表(如果适用,在转换后)。spring-doc.cadn.net.cn

默认值BatchingStrategySimpleBatchingStrategy,但这可以在适配器上覆盖。spring-doc.cadn.net.cn

org.springframework.amqp.rabbit.retry.MessageBatchRecoverer当重试作需要恢复时,必须与批处理一起使用。