此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
AMQP 支持的消息通道
有两种消息通道实现可用。
一种是点对点,另一种是发布-订阅。
这两个通道都为底层AmqpTemplate
和SimpleMessageListenerContainer
(如本章前面的通道适配器和网关所示)。
但是,我们在这里展示的示例具有最少的配置。
浏览 XML 模式以查看可用属性。
点对点通道可能类似于以下示例:
<int-amqp:channel id="p2pChannel"/>
在幕后,前面的示例会导致Queue
叫si.p2pChannel
要声明,该通道发送到该通道Queue
(从技术上讲,通过发送到无名直接交换,路由密钥与此名称匹配Queue
).
该通道还在该通道上注册一个使用者Queue
.
如果您希望通道是“可轮询”的,而不是消息驱动的,请提供message-driven
标志,值为false
,如以下示例所示:
<int-amqp:channel id="p2pPollableChannel" message-driven="false"/>
发布-订阅频道可能如下所示:
<int-amqp:publish-subscribe-channel id="pubSubChannel"/>
在幕后,前面的示例会导致名为si.fanout.pubSubChannel
,该通道发送到该扇出交换。
此通道还声明一个服务器命名的独占、自动删除、非持久Queue
并将其绑定到扇出交换,同时在该交换上注册消费者Queue
接收消息。
发布-订阅-频道没有“可轮询”选项。
它必须是消息驱动的。
从版本 4.1 开始,AMQP 支持的消息通道(结合channel-transacted
) 支持template-channel-transacted
分离transactional
配置AbstractMessageListenerContainer
和
对于RabbitTemplate
.
请注意,以前,channel-transacted
是true
默认情况下。
现在,默认情况下,它是false
对于AbstractMessageListenerContainer
.
在 4.3 版之前,AMQP 支持的通道仅支持Serializable
有效负载和标头。
整个消息被转换(序列化)并发送到 RabbitMQ。
现在,您可以将extract-payload
属性(或setExtractPayload()
使用 Java 配置时)设置为true
.
当此标志为true
,则转换消息有效负载并映射标头,其方式类似于使用通道适配器时。
这种安排允许 AMQP 支持的通道与不可序列化的有效负载一起使用(可能与另一个消息转换器一起使用,例如Jackson2JsonMessageConverter
).
有关默认映射标头的详细信息,请参阅 AMQP 消息标头。
您可以通过提供使用outbound-header-mapper
和inbound-header-mapper
属性。
现在还可以指定default-delivery-mode
,用于设置没有amqp_deliveryMode
页眉。
默认情况下,Spring AMQPMessageProperties
使用PERSISTENT
交付模式。
与其他持久性支持的通道一样,AMQP 支持的通道旨在提供消息持久性以避免消息丢失。 它们不打算将工作分发到其他对等应用程序。 为此,请改用通道适配器。 |
从 V5.0 开始,可轮询通道现在阻止指定的轮询器线程receiveTimeout (默认值为 1 秒)。
以前,与其他PollableChannel 实现时,如果没有可用的消息,则线程会立即返回到调度程序,而不管接收超时如何。
阻止比使用basicGet() 检索消息(没有超时),因为必须创建一个使用者才能接收每条消息。
要恢复以前的行为,请将轮询器的receiveTimeout 设置为 0。 |
使用 Java 配置进行配置
以下示例显示如何使用 Java 配置配置通道:
@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("foo");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("bar");
factoryBean.setPubSub(false);
return factoryBean;
}
@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("baz");
factoryBean.setPubSub(false);
return factoryBean;
}
使用 Java DSL 进行配置
以下示例显示如何使用 Java DSL 配置通道:
@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.pollableChannel(connectionFactory)
.queueName("foo"))
...
.get();
}
@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.channel(connectionFactory)
.queueName("bar"))
...
.get();
}
@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.publishSubscribeChannel(connectionFactory)
.queueName("baz"))
...
.get();
}