AMQP 支持的消息通道
有两种消息通道实现可用。一种是点对点,另一种是发布-订阅。这两个通道都为底层提供了广泛的配置属性AmqpTemplate
和SimpleMessageListenerContainer
(如本章前面的通道适配器和网关所示)。但是,我们在这里显示的示例具有最少的配置。浏览 XML 模式以查看可用属性。
点对点通道可能类似于以下示例:
<int-amqp:channel id="p2pChannel"/>
在幕后,前面的示例会导致Queue
叫si.p2pChannel
要声明,该通道发送到该通道Queue
(从技术上讲,通过发送到无名直接交换,路由密钥与此名称匹配Queue
). 该通道还在该通道上注册一个使用者Queue
. 如果您希望通道是“可轮询”的,而不是消息驱动的,请提供message-driven
标志,值为false
,如以下示例所示:
<int-amqp:channel id="p2pPollableChannel" message-driven="false"/>
发布-订阅频道可能如下所示:
<int-amqp:publish-subscribe-channel id="pubSubChannel"/>
在幕后,前面的示例会导致名为si.fanout.pubSubChannel
,此通道发送到该扇出交换。此通道还声明一个服务器命名的独占、自动删除、非持久Queue
并将其绑定到扇出交换,同时在该交换上注册消费者Queue
接收消息。发布-订阅通道没有“可轮询”选项。它必须是消息驱动的。
从版本 4.1 开始,AMQP 支持的消息通道(结合channel-transacted
) 支持template-channel-transacted
分离transactional
配置AbstractMessageListenerContainer
和 对于RabbitTemplate
. 请注意,以前,channel-transacted
是true
默认情况下。现在,默认情况下,它是false
对于AbstractMessageListenerContainer
.
在 4.3 版之前,AMQP 支持的通道仅支持Serializable
有效负载和标头。整个消息被转换(序列化)并发送到 RabbitMQ。现在,您可以将extract-payload
属性(或setExtractPayload()
使用 Java 配置时)设置为true
. 当此标志为true
,则转换消息有效负载并映射标头,其方式类似于使用通道适配器时。这种安排允许将 AMQP 支持的通道与不可序列化的有效负载一起使用(可能与另一个消息转换器一起使用,例如Jackson2JsonMessageConverter
). 有关默认映射标头的更多信息,请参阅 AMQP 消息标头。您可以通过提供使用outbound-header-mapper
和inbound-header-mapper
属性。 现在还可以指定default-delivery-mode
,用于设置没有amqp_deliveryMode
页眉。 默认情况下,Spring AMQPMessageProperties
使用PERSISTENT
交付模式。
与其他持久性支持的通道一样,AMQP 支持的通道旨在提供消息持久性以避免消息丢失。它们不打算将工作分发到其他对等应用程序。为此,请改用通道适配器。 |
从 V5.0 开始,可轮询通道现在阻止指定的轮询器线程receiveTimeout (默认值为 1 秒)。以前,与其他PollableChannel 实现,如果没有可用的消息,线程会立即返回给调度程序,无论接收超时如何。阻塞比使用basicGet() 检索消息(没有超时),因为必须创建一个使用者才能接收每条消息。要恢复以前的行为,请将轮询器的receiveTimeout 设置为 0。 |
使用 Java 配置进行配置
以下示例显示如何使用 Java 配置配置通道:
@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("foo");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("bar");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("baz");
factoryBean.setPubSub(false);
return factoryBean;
}
使用 Java DSL 进行配置
以下示例显示如何使用 Java DSL 配置通道:
@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.pollableChannel(connectionFactory)
.queueName("foo"))
...
.get();
}
@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.channel(connectionFactory)
.queueName("bar"))
...
.get();
}
@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.publishSubscribeChannel(connectionFactory)
.queueName("baz"))
...
.get();
}