此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring AMQP 3.2.6! |
RabbitMQ AMQP 1.0 支持
4.0 版引入了spring-rabbitmq-client
模块,用于在 RabbitMQ 上支持 AMQP 1.0 协议。
此工件基于 com.rabbitmq.client:amqp-client 库,因此只能与 RabbitMQ 及其 AMQP 1.0 协议支持一起使用。 它不能用于任何任意 AMQP 1.0 代理。 为此,到目前为止,建议使用 JMS 桥接器和相应的 Spring JMS 集成。
必须将此依赖项添加到项目中,以便能够与 RabbitMQ AMQP 1.0 支持进行交互:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbitmq-client</artifactId>
<version>4.0.0-M4</version>
</dependency>
compile 'org.springframework.amqp:spring-rabbitmq-client:4.0.0-M4'
这spring-rabbit
(对于 AMQP 0.9.1 协议)作为传递依赖项,用于在此新客户端中重用一些常见 API,例如异常、@RabbitListener
支持。
没有必要在目标项目中使用这两个功能,但 RabbitMQ 允许 AMQP 0.9.1 和 1.0 共存。
有关 RabbitMQ AMQP 1.0 Java 客户端的更多信息,请参阅其文档。
RabbitMQ AMQP 1.0 环境
这com.rabbitmq.client.amqp.Environment
是必须添加到项目中以进行连接管理和其他常见设置的第一件事。
它是节点或节点集群的入口点。
该环境允许创建连接。
它可以包含连接之间共享的与基础设施相关的配置设置,例如线程池、指标和/或观察:
@Bean
Environment environment() {
return new AmqpEnvironmentBuilder()
.connectionSettings()
.port(5672)
.environmentBuilder()
.build();
}
一样Environment
实例可用于连接不同的 RabbitMQ 代理,则必须在特定连接上提供连接设置。
见下文。
AMQP 连接工厂
这org.springframework.amqp.rabbitmq.client.AmqpConnectionFactory
引入抽象来管理com.rabbitmq.client.amqp.Connection
.
不要将其与org.springframework.amqp.rabbit.connection.ConnectionFactory
这仅适用于 AMQP 0.9.1 协议。
这SingleAmqpConnectionFactory
实现用于管理一个连接及其设置。
一样Connection
可以在许多生产者、消费者和管理层之间共享。
多路复用由 AMQP 客户端库内部 AMQP 1.0 协议实现的链路抽象处理。
这Connection
具有恢复功能,还可以处理拓扑。
在大多数情况下,将此 bean 添加到项目中仅够:
@Bean
AmqpConnectionFactory connectionFactory(Environment environment) {
return new SingleAmqpConnectionFactory(environment);
}
看SingleAmqpConnectionFactory
所有特定于连接的设置的 setter。
RabbitMQ 拓扑管理
对于从应用程序角度来看的拓扑管理(交换、队列和绑定),该RabbitAmqpAdmin
存在,这是现有AmqpAdmin
接口:
@Bean
RabbitAmqpAdmin admin(AmqpConnectionFactory connectionFactory) {
return new RabbitAmqpAdmin(connectionFactory);
}
相同的 bean 定义Exchange
,Queue
,Binding
和Declarables
必须使用配置代理中所述的实例来管理拓扑。
这RabbitAdmin
从spring-rabbit
也可以这样做,但它发生在 AMQP 0.9.1 连接时,并且由于RabbitAmqpAdmin
基于 AMQP 1.0 连接,拓扑恢复从那里顺利处理,以及发布者和消费者恢复。
这RabbitAmqpAdmin
在其start()
生命周期回调。
这initialize()
,以及所有其他 RabbitMQ 实体管理方法都可以在运行时手动调用。
在内部RabbitAmqpAdmin
使用com.rabbitmq.client.amqp.Connection.management()
API 来执行相应的拓扑作。
RabbitAmqpTemplate
这RabbitAmqpTemplate
是AsyncAmqpTemplate
并使用 AMQP 1.0 协议执行各种发送/接收作。
需要AmqpConnectionFactory
并且可以使用一些默认值进行配置。
便com.rabbitmq.client:amqp-client
库附带一个com.rabbitmq.client.amqp.Message
这RabbitAmqpTemplate
仍然公开一个基于众所周知的 APIorg.springframework.amqp.core.Message
与所有支持类,例如MessageProperties
和MessageConverter
抽象化。
与/从转换com.rabbitmq.client.amqp.Message
在内部完成RabbitAmqpTemplate
.
所有方法都返回一个CompletableFuture
最终获得作结果。
使用普通对象的作需要消息正文转换和SimpleMessageConverter
默认使用。
有关转换的更多信息,请参阅消息转换器。
通常,像这样的一个 bean 就足以执行所有可能的模板模式作:
@Bean
RabbitAmqpTemplate rabbitTemplate(AmqpConnectionFactory connectionFactory) {
return new RabbitAmqpTemplate(connectionFactory);
}
它可以配置为某些默认交换和路由密钥或仅排队。
这RabbitAmqpTemplate
有一个用于接收作的默认队列和另一个用于请求-回复作的默认队列,其中客户端为请求创建临时队列(如果不存在)。
以下是一些示例RabbitAmqpTemplate
操作:
@Bean
DirectExchange e1() {
return new DirectExchange("e1");
}
@Bean
Queue q1() {
return QueueBuilder.durable("q1").deadLetterExchange("dlx1").build();
}
@Bean
Binding b1() {
return BindingBuilder.bind(q1()).to(e1()).with("k1");
}
...
@Test
void defaultExchangeAndRoutingKey() {
this.rabbitAmqpTemplate.setExchange("e1");
this.rabbitAmqpTemplate.setRoutingKey("k1");
this.rabbitAmqpTemplate.setReceiveQueue("q1");
assertThat(this.rabbitAmqpTemplate.convertAndSend("test1"))
.succeedsWithin(Duration.ofSeconds(10));
assertThat(this.rabbitAmqpTemplate.receiveAndConvert())
.succeedsWithin(Duration.ofSeconds(10))
.isEqualTo("test1");
}
在这里,我们声明了一个e1
交换q1
队列并将其绑定到该交换中,并使用k1
路由键。
然后我们使用RabbitAmqpTemplate
使用相应的路由密钥将消息发布到上述交换,并使用q1
作为接收作的默认队列。
这些方法存在重载变体,要发送到特定交换或队列(用于发送和接收)。
这receiveAndConvert()
作与ParameterizedTypeReference<T>
需要一个SmartMessageConverter
注入RabbitAmqpTemplate
.
下一个示例演示和 RPC 实现RabbitAmqpTemplate
(假设与上一个示例中的 RabbitMQ 对象相同):
@Test
void verifyRpc() {
String testRequest = "rpc-request";
String testReply = "rpc-reply";
CompletableFuture<Object> rpcClientResult = this.template.convertSendAndReceive("e1", "k1", testRequest);
AtomicReference<String> receivedRequest = new AtomicReference<>();
CompletableFuture<Boolean> rpcServerResult =
this.rabbitAmqpTemplate.<String, String>receiveAndReply("q1",
payload -> {
receivedRequest.set(payload);
return testReply;
});
assertThat(rpcServerResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(true);
assertThat(rpcClientResult).succeedsWithin(Duration.ofSeconds(10)).isEqualTo(testReply);
assertThat(receivedRequest.get()).isEqualTo(testRequest);
}
相关性和replyTo
队列在内部进行管理。
服务器端可以使用@RabbitListener
POJO 方法。
RabbitMQ AMQP 1.0 消费者
与消费者端的许多其他消息传递实现一样,spring-rabbitmq-client
模块附带RabbitAmqpListenerContainer
这本质上是众所周知的实现MessageListenerContainer
.
它的作用与DirectMessageListenerContainer
,但支持 RabbitMQ AMQP 1.0。
需要AmqpConnectionFactory
以及至少一个要使用的队列。
此外,MessageListener
(或特定于 AMQP 1.0RabbitAmqpMessageListener
) 必须提供。
可以配置autoSettle = false
,含义为AcknowledgeMode.MANUAL
.
在这种情况下,Message
提供给MessageListener
在其中MessageProperties
一AmqpAcknowledgment
回调,以考虑目标逻辑。
这RabbitAmqpMessageListener
有合同com.rabbitmq.client:amqp-client
抽象:
/**
* Process an AMQP message.
* @param message the message to process.
* @param context the consumer context to settle message.
* Null if container is configured for {@code autoSettle}.
*/
void onAmqpMessage(Message message, Consumer.Context context);
其中第一个参数是本机接收的com.rabbitmq.client.amqp.Message
和context
是消息结算的原生回调,类似于上面提到的AmqpAcknowledgment
抽象化。
这RabbitAmqpMessageListener
可以在以下情况下批量处理和结算消息batchSize
选项。
为此,该MessageListener.onMessageBatch()
必须执行合同。
这batchReceiveDuration
选项用于为非完整批次安排强制释放,以避免耗尽内存和消费者积分。
通常,RabbitAmqpMessageListener
class 不直接在目标项目中使用,并且通过 POJO 方法注释配置@RabbitListener
选择用于声明式使用者配置。
这RabbitAmqpListenerContainerFactory
必须在RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME
和@RabbitListener
注释过程将注册RabbitAmqpMessageListener
实例转换为RabbitListenerEndpointRegistry
.
目标 POJO 方法调用由特定的RabbitAmqpMessageListenerAdapter
实现,它扩展了MessagingMessageListenerAdapter
并重用其许多功能,包括请求-回复场景(异步或非异步)。
因此,注释驱动的侦听器端点中描述的所有概念都与此一起应用RabbitAmqpMessageListener
也。
除了传统的消息传递payload
和headers
这@RabbitListener
POJO 方法合约可以包含以下参数:
-
com.rabbitmq.client.amqp.Message
- 没有任何转换的原生 AMQP 1.0 消息; -
org.springframework.amqp.core.Message
- Spring AMQP 消息抽象作为原生 AMQP 1.0 消息的转换结果; -
org.springframework.messaging.Message
- Spring Messaging 抽象作为 Spring AMQP 消息的转换结果; -
Consumer.Context
- RabbitMQ AMQP 客户端消费者结算 API; -
org.springframework.amqp.core.AmqpAcknowledgment
- Spring AMQP 确认抽象:委托到Consumer.Context
.
以下示例演示了一个简单的@RabbitListener
对于 RabbitMQ AMQP 1.0 与手动结算的交互:
@Bean(RabbitListenerAnnotationBeanPostProcessor.DEFAULT_RABBIT_LISTENER_CONTAINER_FACTORY_BEAN_NAME)
RabbitAmqpListenerContainerFactory rabbitAmqpListenerContainerFactory(AmqpConnectionFactory connectionFactory) {
return new RabbitAmqpListenerContainerFactory(connectionFactory);
}
final List<String> received = Collections.synchronizedList(new ArrayList<>());
CountDownLatch consumeIsDone = new CountDownLatch(11);
@RabbitListener(queues = {"q1", "q2"},
ackMode = "#{T(org.springframework.amqp.core.AcknowledgeMode).MANUAL}",
concurrency = "2",
id = "testAmqpListener")
void processQ1AndQ2Data(String data, AmqpAcknowledgment acknowledgment, Consumer.Context context) {
try {
if ("discard".equals(data)) {
if (!this.received.contains(data)) {
context.discard();
}
else {
throw new MessageConversionException("Test message is rejected");
}
}
else if ("requeue".equals(data) && !this.received.contains(data)) {
acknowledgment.acknowledge(AmqpAcknowledgment.Status.REQUEUE);
}
else {
acknowledgment.acknowledge();
}
this.received.add(data);
}
finally {
this.consumeIsDone.countDown();
}
}