此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring Integration 6.5.1! |
入站通道适配器
以下列表显示了 AMQP 入站通道适配器的可能配置选项:
-
Java DSL
-
Java
-
XML
@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName"))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
@Bean
public MessageChannel amqpInputChannel() {
return new DirectChannel();
}
@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
@Qualifier("amqpInputChannel") MessageChannel channel) {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(channel);
return adapter;
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("aName");
container.setConcurrentConsumers(2);
// ...
return container;
}
@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
<int-amqp:inbound-channel-adapter
id="inboundAmqp" (1)
channel="inboundChannel" (2)
queue-names="si.test.queue" (3)
acknowledge-mode="AUTO" (4)
advice-chain="" (5)
channel-transacted="" (6)
concurrent-consumers="" (7)
connection-factory="" (8)
error-channel="" (9)
expose-listener-channel="" (10)
header-mapper="" (11)
mapped-request-headers="" (12)
listener-container="" (13)
message-converter="" (14)
message-properties-converter="" (15)
phase="" (16)
prefetch-count="" (17)
receive-timeout="" (18)
recovery-interval="" (19)
missing-queues-fatal="" (20)
shutdown-timeout="" (21)
task-executor="" (22)
transaction-attribute="" (23)
transaction-manager="" (24)
batch-size="" (25)
consumers-per-queue (26)
batch-mode="MESSAGES"/> (27)
1 | 此适配器的唯一 ID。 自选。 |
2 | 应将转换后的消息发送到的消息通道。 必填。 |
3 | 应从中使用消息的 AMQP 队列(逗号分隔列表)的名称。 必填。 |
4 | 确认模式MessageListenerContainer .
当设置为MANUAL ,则在邮件头中提供传递标记和通道amqp_deliveryTag 和amqp_channel 分别。
用户应用程序负责确认。NONE 表示没有确认 (autoAck ).AUTO 表示适配器的容器在下游流完成时确认。
可选(默认为 AUTO)。
请参阅入站终端确认模式。 |
5 | 额外的 AOP 建议,用于处理与此入站通道适配器关联的横切行为。 自选。 |
6 | 指示此组件创建的通道是事务性的标志。 如果为 true,它告诉框架使用事务通道,并根据结果通过提交或回滚结束所有作(发送或接收),但有一个表示回滚的例外。 可选(默认为 false)。 |
7 | 指定要创建的并发使用者数。
默认值为1 .
建议增加并发使用者数,以缩放从队列传入的消息的消耗。
但是,请注意,一旦注册了多个消费者,任何订购保证都会丢失。
通常,对于低容量队列,使用一个使用者。
设置了“consumers-per-queue”时不允许。
自选。 |
8 | 对 RabbitMQ 的 Bean 引用ConnectionFactory .
可选(默认为connectionFactory ). |
9 | 应将错误消息发送到的消息通道。 自选。 |
10 | 侦听器通道(com.rabbitmq.client.Channel)是否向已注册的ChannelAwareMessageListener .
可选(默认为 true)。 |
11 | 对AmqpHeaderMapper 在接收 AMQP 消息时使用。
自选。
默认情况下,只有标准 AMQP 属性(例如contentType )被复制到 Spring IntegrationMessageHeaders .
AMQP 中的任何用户定义的标头MessageProperties 默认情况下不会复制到消息中DefaultAmqpHeaderMapper .
如果出现以下情况,则不允许request-header-names 被提供。 |
12 | 要从 AMQP 请求映射到MessageHeaders .
只有在未提供“header-mapper”引用时才能提供此设置。
此列表中的值也可以是与标头名称匹配的简单模式(例如“*”或“thing1*、thing2”或“*something”)。 |
13 | 引用AbstractMessageListenerContainer 用于接收 AMQP 消息。
如果提供了此属性,则不应提供与侦听器容器配置相关的其他属性。
换句话说,通过设置此引用,您必须对侦听器容器配置承担全部责任。
唯一的例外是MessageListener 本身。
由于这实际上是此通道适配器实现的核心职责,因此引用的侦听器容器必须还没有自己的MessageListener .
自选。 |
14 | 这MessageConverter 在接收 AMQP 消息时使用。
自选。 |
15 | 这MessagePropertiesConverter 在接收 AMQP 消息时使用。
自选。 |
16 | 指定基础AbstractMessageListenerContainer 应启动和停止。
启动顺序从最低到最高,关闭顺序与此相反。
默认情况下,此值为Integer.MAX_VALUE ,这意味着该容器尽可能晚地启动并尽快停止。
自选。 |
17 | 告诉 AMQP 代理在单个请求中要向每个使用者发送多少条消息。
通常,可以将此值设置得较高以提高吞吐量。
它应该大于或等于事务大小(请参阅batch-size 属性,在此列表后面)。
可选(默认为1 ). |
18 | 接收超时(以毫秒为单位)。
可选(默认为1000 ). |
19 | 指定基础AbstractMessageListenerContainer (以毫秒为单位)。可选(默认为5000 ). |
20 | 如果 'true' 并且代理上没有可用的队列,则容器会在启动期间抛出致命异常,并在容器运行时删除队列时停止(在尝试三次被动声明队列之后)。 如果false ,则容器不会抛出异常并进入恢复模式,尝试根据recovery-interval .
可选(默认为true ). |
21 | 在基础之后等待工作线程的时间(以毫秒为单位)AbstractMessageListenerContainer 已停止,并且在强制关闭 AMQP 连接之前。如果在关闭信号发出时任何工作线程处于活动状态,则允许它们完成处理,只要它们能够在此超时内完成。否则,连接将关闭,并且消息将保持未确认状态(如果通道是事务性的)。可选(默认为5000 ). |
22 | 默认情况下,基础AbstractMessageListenerContainer 使用SimpleAsyncTaskExecutor 实现,为每个任务启动一个新线程,异步运行它。默认情况下,并发线程数是无限的。请注意,此实现不会重用线程。考虑使用线程池TaskExecutor 实现作为替代方案。可选(默认为SimpleAsyncTaskExecutor ). |
23 | 默认情况下,基础AbstractMessageListenerContainer 创建DefaultTransactionAttribute (它采用 EJB 方法在运行时回滚,但不检查异常)。可选(默认为DefaultTransactionAttribute ). |
24 | 将 bean 引用设置为外部PlatformTransactionManager 在基础上AbstractMessageListenerContainer . 事务管理器与channel-transacted 属性。 如果框架在发送或接收消息时已经有正在进行的事务,并且channelTransacted flag 是true ,则消息传递事务的提交或回滚将推迟到当前事务结束。如果channelTransacted flag 是false ,没有事务语义适用于消息传递作(它是自动确认的)。有关更多信息,请参阅使用 Spring AMQP 进行事务。 自选。 |
25 | 告诉SimpleMessageListenerContainer 在单个请求中要处理的消息数。为获得最佳结果,它应小于或等于prefetch-count . 设置了“consumers-per-queue”时不允许。可选(默认为1 ). |
26 | 指示基础侦听器容器应为DirectMessageListenerContainer 而不是默认的SimpleMessageListenerContainer . 有关更多信息,请参阅 Spring AMQP 参考手册。 |
27 | 当容器的consumerBatchEnabled 是true ,确定适配器如何在消息有效负载中显示消息批次。当设置为MESSAGES (默认),有效负载是List<Message<?>> 其中每封邮件都有从传入 AMQP 映射的标头Message 有效载荷是转换后的body .
当设置为EXTRACT_PAYLOADS ,则有效负载是List<?> 其中元素是从 AMQP 转换而来的Message 身体。EXTRACT_PAYLOADS_WITH_HEADERS 类似于EXTRACT_PAYLOADS 但是,此外,每条消息的标头都是从MessageProperties 变成一个List<Map<String, Object> 在相应的索引中;标头名称为AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS . |
容器
请注意,使用 XML 配置外部容器时,不能使用 Spring AMQP 命名空间来定义容器。
这是因为命名空间至少需要一个
|
尽管 Spring Integration JMS 和 AMQP 支持相似,但存在重要差异。
JMS 入站通道适配器正在使用JmsDestinationPollingSource 在幕后,并期望配置一个轮询器。
AMQP 入站通道适配器使用AbstractMessageListenerContainer 并且是消息驱动的。
在这方面,它更类似于 JMS 消息驱动的通道适配器。 |
从 5.5 版本开始,AmqpInboundChannelAdapter
可以配置为org.springframework.amqp.rabbit.retry.MessageRecoverer
策略,用于RecoveryCallback
当在内部调用重试作时。
看setMessageRecoverer()
JavaDocs 了解更多信息。
这@Publisher
注释也可以与@RabbitListener
:
@Configuration
@EnableIntegration
@EnableRabbit
@EnablePublisher
public static class ContextConfiguration {
@Bean
QueueChannel fromRabbitViaPublisher() {
return new QueueChannel();
}
@RabbitListener(queuesToDeclare = @Queue("publisherQueue"))
@Publisher("fromRabbitViaPublisher")
@Payload("#args.payload.toUpperCase()")
public void consumeForPublisher(String payload) {
}
}
默认情况下,@Publisher
AOP 拦截器处理方法调用的返回值。
但是,来自@RabbitListener
方法被视为 AMQP 回复消息。
因此,这种方法不能与@Publisher
,所以一个@Payload
针对方法参数使用相应的 SpEL 表达式进行注释是此组合的推荐方法。
查看有关@Publisher
在“注释驱动配置”(Annotation-driven Configuration) 部分中。
在监听器容器中使用独占或单活动消费者时,建议将容器属性forceStop 自true .
这将防止出现竞争条件,即在停止容器后,另一个使用者可能会在此实例完全停止之前开始使用消息。 |
批处理消息
有关批处理消息的更多信息,请参阅 Spring AMQP 文档。
要使用 Spring Integration 生成批量消息,只需使用BatchingRabbitTemplate
.
在接收批处理消息时,默认情况下,监听器容器会提取每个片段消息,适配器将生成一个Message<?>
对于每个片段。
从 5.2 版开始,如果容器的deBatchingEnabled
属性设置为false
,取消批处理由适配器执行,并且单个Message<List<?>>
生成有效负载是片段有效负载的列表(如果适用,在转换后)。
默认值BatchingStrategy
是SimpleBatchingStrategy
,但这可以在适配器上覆盖。
这org.springframework.amqp.rabbit.retry.MessageBatchRecoverer 当重试作需要恢复时,必须与批处理一起使用。 |