此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1spring-doc.cadn.net.cn

AMQP 支持的消息通道

有两种消息通道实现可用。 一种是点对点,另一种是发布-订阅。 这两个通道都为底层AmqpTemplateSimpleMessageListenerContainer(如本章前面的通道适配器和网关所示)。 但是,我们在这里展示的示例具有最少的配置。 浏览 XML 模式以查看可用属性。spring-doc.cadn.net.cn

点对点通道可能类似于以下示例:spring-doc.cadn.net.cn

<int-amqp:channel id="p2pChannel"/>

在幕后,前面的示例会导致Queuesi.p2pChannel要声明,该通道发送到该通道Queue(从技术上讲,通过发送到无名直接交换,路由密钥与此名称匹配Queue). 该通道还在该通道上注册一个使用者Queue. 如果您希望通道是“可轮询”的,而不是消息驱动的,请提供message-driven标志,值为false,如以下示例所示:spring-doc.cadn.net.cn

<int-amqp:channel id="p2pPollableChannel"  message-driven="false"/>

发布-订阅频道可能如下所示:spring-doc.cadn.net.cn

<int-amqp:publish-subscribe-channel id="pubSubChannel"/>

在幕后,前面的示例会导致名为si.fanout.pubSubChannel,该通道发送到该扇出交换。 此通道还声明一个服务器命名的独占、自动删除、非持久Queue并将其绑定到扇出交换,同时在该交换上注册消费者Queue接收消息。 发布-订阅-频道没有“可轮询”选项。 它必须是消息驱动的。spring-doc.cadn.net.cn

从版本 4.1 开始,AMQP 支持的消息通道(结合channel-transacted) 支持template-channel-transacted分离transactional配置AbstractMessageListenerContainer和 对于RabbitTemplate. 请注意,以前,channel-transactedtrue默认情况下。 现在,默认情况下,它是false对于AbstractMessageListenerContainer.spring-doc.cadn.net.cn

在 4.3 版之前,AMQP 支持的通道仅支持Serializable有效负载和标头。 整个消息被转换(序列化)并发送到 RabbitMQ。 现在,您可以将extract-payload属性(或setExtractPayload()使用 Java 配置时)设置为true. 当此标志为true,则转换消息有效负载并映射标头,其方式类似于使用通道适配器时。 这种安排允许 AMQP 支持的通道与不可序列化的有效负载一起使用(可能与另一个消息转换器一起使用,例如Jackson2JsonMessageConverter). 有关默认映射标头的详细信息,请参阅 AMQP 消息标头。 您可以通过提供使用outbound-header-mapperinbound-header-mapper属性。 现在还可以指定default-delivery-mode,用于设置没有amqp_deliveryMode页眉。 默认情况下,Spring AMQPMessageProperties使用PERSISTENT交付模式。spring-doc.cadn.net.cn

与其他持久性支持的通道一样,AMQP 支持的通道旨在提供消息持久性以避免消息丢失。 它们不打算将工作分发到其他对等应用程序。 为此,请改用通道适配器。
从 V5.0 开始,可轮询通道现在阻止指定的轮询器线程receiveTimeout(默认值为 1 秒)。 以前,与其他PollableChannel实现时,如果没有可用的消息,则线程会立即返回到调度程序,而不管接收超时如何。 阻止比使用basicGet()检索消息(没有超时),因为必须创建一个使用者才能接收每条消息。 要恢复以前的行为,请将轮询器的receiveTimeout设置为 0。

使用 Java 配置进行配置

以下示例显示如何使用 Java 配置配置通道:spring-doc.cadn.net.cn

@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("foo");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("bar");
    factoryBean.setPubSub(false);
    return factoryBean;
}

@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
    AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
    factoryBean.setConnectionFactory(connectionFactory);
    factoryBean.setQueueName("baz");
    factoryBean.setPubSub(false);
    return factoryBean;
}

使用 Java DSL 进行配置

以下示例显示如何使用 Java DSL 配置通道:spring-doc.cadn.net.cn

@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.pollableChannel(connectionFactory)
                    .queueName("foo"))
            ...
            .get();
}

@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.channel(connectionFactory)
                    .queueName("bar"))
            ...
            .get();
}

@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
    return IntegrationFlow.from(...)
            ...
            .channel(Amqp.publishSubscribeChannel(connectionFactory)
                    .queueName("baz"))
            ...
            .get();
}