|
此版本仍在开发中,目前尚不稳定。如需最新稳定版本,请使用 Spring AMQP 4.0.2! |
通用 AMQP 1.0 支持
版本 4.1 引入了 spring-amqp-client 模块,用于支持 AMQP 1.0 协议。
该构件基于 Qpid ProtonJ2 客户端库,可与任何支持 AMQP 1.0 协议的对等方协同工作,包括 RabbitMQ 代理。
此依赖项必须添加到项目中,才能与 AMQP 1.0 支持进行交互:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp-client</artifactId>
<version>4.1.0-M3</version>
</dependency>
implementation 'org.springframework.amqp:spring-amqp-client:4.1.0-M3'
AMQP 1.0 环境
代码org.apache.qpid:protonj2-client具有极强的灵活性,并提供了一个便捷的API,即使在Spring应用中不使用任何专用的Spring实现,也能轻松集成使用。例如,org.apache.qpid.protonj2.client.Message类是构建器模式(builder pattern)的实现。无论采用哪种方式,spring-amqp-client都为连接生命周期管理以及AMQP 1.0协议交互提供了高级别的Spring模式实现,用于发送和接收消息操作。AMQP 1.0环境始于一个org.apache.qpid.protonj2.client.Client实例:
@Bean
Client protonClient() {
return Client.create();
}
相同的 Client 可用于连接到不同的代理(broker),但具体的连接设置必须在特定的连接中提供。
另请参阅下方的 AMQP 1.0 连接工厂 和 AMQP 1.0 注解配置。
AMQP 1.0 连接工厂
引入了org.springframework.amqp.client.AmqpConnectionFactory抽象,用于管理org.apache.qpid.protonj2.client.Connection。 SingleAmqpConnectionFactory实现用于管理一个连接及其设置。 同一个Connection可在多个生产者和消费者之间共享。 多路复用由链接抽象处理,该抽象在AMQP客户端库内部用于AMQP 1.0协议的实现。 Connection具备恢复能力。
在大多数情况下,只需将此 Bean 添加到项目中即可:
@Bean
AmqpConnectionFactory connectionFactory(Client protonClient) {
return new SingleAmqpConnectionFactory(protonClient);
}
请参阅 SingleAmqpConnectionFactory 个用于所有连接特定选项的设置器,包括 host、port、user。更高级的配置可以通过 setConnectionOptions(ConnectionOptions) 完成,它来自 ProtonJ 库的构建器 API。所提及的 SingleAmqpConnectionFactory 实例在所有默认设置下,将连接到 localhost:5672 并支持无限次重连。
对于 Client 注入,可以省略掉 SingleAmqpConnectionFactory,而相应的单个 Bean 将在内部按需从 BeanFactory 中解析,当 getConnection() 被调用时。 |
AmqpClient
代码AmqpClient是一个用于在AMQP 1.0协议上执行消息的发送和接收操作的流畅式API。它需要一个AmqpConnectionFactory,并可通过AmqpClient.Builder API配置一些默认值。代码DefaultAmqpClient是AmqpClient契约的内部实现。即使org.apache.qpid:protonj2-client库自带一个org.apache.qpid.protonj2.client.Message实现,AmqpClient仍然基于广为人知的org.springframework.amqp.core.Message提供了一个API,并包含诸如MessageProperties和MessageConverter等所有支持类与抽象。在org.apache.qpid.protonj2.client.Message之间的转换由AmqpClient实现内部通过ProtonUtils支持类完成。所有发送和接收方法均返回一个CompletableFuture,以最终获取操作结果。与普通对象交互时需进行消息体转换,而SimpleMessageConverter则作为默认方式被使用。有关转换的更多信息,请参阅AmqpClient.Builder.messageConverter(MessageConverter)配置以及消息转换器。
通常,仅一个这样的 Bean 就足以执行所有可能的发送和接收操作:
@Bean
AmqpClient amqpClient(AmqpConnectionFactory connectionFactory) {
return AmqpClient.builder(connectionFactory)
.defaultToAddress("/queues/some_queue_as_default")
.messageConverter(new JacksonJsonMessageConverter())
.build();
}
在上面的例子中,/queues/ 前缀用于 defaultToAddress 是 RabbitMQ 中目标地址约定的一个示例。 |
以下是一些 AmqpClient 操作的示例:
CompletableFuture<Boolean> sendFuture = this.amqpClient.send(Message.create("test_data"));
CompletableFuture<Boolean> sendFuture =
this.amqpClient
.to("/queues/test_queue")
.message(new org.springframework.amqp.core.Message("test_data2".getBytes()))
.send();
CompletableFuture<Boolean> sendFuture =
this.amqpClient
.to("/queues/test_queue")
.body("convert")
.priority(7)
.header("test_header", "test_value")
.messageId("some_id")
.userId("guest")
.send();
该 AmqpClient 还提供了一种流畅的 API,用于按需从地址接收消息。为此,应调用一个 AmqpClient.from(String fromAddress) 方法,以返回一个 ReceiveSpec,用于各种接收操作的行为。所有这些方法均会生成一个 CompletableFuture,其中包含原生 ProtonJ 消息、Spring AMQP 消息,或仅是将消息体转换后的有效载荷。对于带有非 Object.class 泛型参数的 receiveAndConvert(),需要在 AmqpClient 中提供一个 messageConverter,使其作为 SmartMessageConverter。例如,当为 AmqpClient 提供 JacksonJsonMessageConverter 时,以下示例将实现适当的数据模型转换:
record TestData(String data) { }
this.amqpClient
.to("/queues/test_queue")
.body(new TestData("convert"))
.send();
CompletableFuture<TestData> receiveFuture =
this.amqpClient.from("/queues/test_queue")
.receiveAndConvert();
参见这些方法的 Javadoc,以获取有关 AmqpClient 所暴露的流畅 API 的更多信息。
AMQP 1.0 消费者
AMQP 1.0 协议的事件驱动消费者实现为一个 AmqpMessageListenerContainer,类似于该框架中所有其他 监听器容器实现。AmqpMessageListenerContainer 需要一个 AmqpConnectionFactory,并内部使用 org.apache.qpid.protonj2.client.Receiver 实例从所提供的 queueNames(本质上是 AMQP 1.0 地址)中消费消息。100 初始信用值默认用于接收者链接。每次已处理的交付都会为后续新消息补充一定数量的信用值。所消费的消息由提供的 MessageListener 处理;若 autoAccept 设置为 false,则会填充一个 AmqpAcknowledgment 实现,以将相应 Spring AMQP 消息属性用于目标消息监听器逻辑中的手动交付确认。
选项 consumersPerQueue(默认为 1)为每个提供的地址实现了一种并发行为以进行消息消费。
选项 Duration receiveTimeout(默认为 1 second)控制阻塞 Receiver.receive() 操作,该操作会在循环中反复调用,直到消费者被停止。
每个消费者(一个内部 AmqpMessageListenerContainer.AmqpConsumer 实例)通过 Executor 进行调度以运行。默认情况下,AmqpMessageListenerContainer.taskExecutor 属性被设置为 SimpleAsyncTaskExecutor。
消息处理错误可以通过 ErrorHandler 配置进行处理。
如果提供了一些 AOP 拦截器(例如 TransactionInterceptor),则 MessageListener 可以在 AmqpMessageListenerContainer 中被代理。
该 AmqpMessageListenerContainer 还提供了 pause() 和 resume() API,用于设置 AMQP 链路信用额度为 0 并进行补充。
以下示例演示了 AmqpMessageListenerContainer 的简单配置:
BlockingQueue<Message> receivedMessages = new LinkedBlockingQueue<>();
@Bean
AmqpMessageListenerContainer amqpMessageListenerContainer(AmqpConnectionFactory connectionFactory) {
var amqpMessageListenerContainer = new AmqpMessageListenerContainer(connectionFactory);
amqpMessageListenerContainer.setQueueNames("address1", "address2");
amqpMessageListenerContainer.setConsumersPerQueue(3);
amqpMessageListenerContainer.setAutoAccept(false);
amqpMessageListenerContainer.setReceiveTimeout(Duration.ofMillis(100));
amqpMessageListenerContainer.setupMessageListener(this.receivedMessages::add);
return amqpMessageListenerContainer;
}
ProtonJDelivery消费
为了方便起见,提供了一个 ProtonDeliveryListener 协议来处理原生 ProtonJ Delivery 对象,而非 Spring AMQP 消息。这在需要对 Delivery 实例进行完全控制的场景中可能很有用。例如,添加某些拒绝条件、将接收到的数据作为 InputStream 处理、根据目标应用逻辑动态补充链接信用额度等。另一种原生 Delivery 处理场景,即生产者和消费者均使用原生 ProtonJ Message 协议,包括 ProtonJ 内部的正文编码/解码机制(与 Spring AMQP 消息仅接受 byte[] 类型正文不同)。
手动结算仍可跳过,如果将 AmqpMessageListenerContainer 配置为 autoAccept = true(默认值)。而 AmqpMessageListenerContainer 会自动进行信用额度补充。 |
应将 ProtonDeliveryListener 实现作为常规 MessageListener 注入到 AmqpMessageListenerContainer 中:
BlockingQueue<Delivery> receivedDeliveries = new LinkedBlockingQueue<>();
@Bean
AmqpMessageListenerContainer protonDeliveryListenerContainer(AmqpConnectionFactory connectionFactory) {
var amqpMessageListenerContainer = new AmqpMessageListenerContainer(connectionFactory);
amqpMessageListenerContainer.setQueueNames(TEST_QUEUE_FOR_NATIVE_PROTON);
amqpMessageListenerContainer.setAutoAccept(false);
amqpMessageListenerContainer.setupMessageListener((ProtonDeliveryListener) this.receivedDeliveries::add);
return amqpMessageListenerContainer;
}
AMQP 1.0 注解配置
该 spring-amqp-client 提供了一种便捷的方式,通过注解来配置 AMQP 1.0 基础设施。@EnableAmqp 注解可设置在目标项目中的 @Configuration 类上,以触发该基础设施的注册。由 @EnableAmqp 注解导入的 AmqpDefaultConfiguration 提供了条件性 Bean(可在目标项目中手动配置):
-
该
org.apache.qpid.protonj2.client.Client的org.apache.qpid.protonj2.client.ClientOptions属性基于从@EnableAmqp导入的元数据的相应属性。 -
The
AmqpListenerEndpointRegistry- the global component to register AMQP listener containers programmatically. -
该
AmqpListenerAnnotationBeanPostProcessor是一个用于解析 POJO 方法并使用@AmqpListener注册监听器容器的BeanPostProcessor,并通过AmqpListenerEndpointRegistry完成此操作。
@AmqpListener
为实现高层级且便捷的配置,该框架提供了一个 @AmqpListener 注解,用于将 POJO 方法标记为监听器调用者,以便由框架创建的 AmqpMessageListenerContainer 实例使用。该注解与 @RabbitListener、@KafkaListener、@JmsListener 具有许多相似之处。它需要指定要从中消费消息的 AMQP 地址,并且还可配置许多其他监听器容器属性。所有 @AmqpListener 属性均可作为属性占位符(${})和/或 SpEL 表达式(#{})进行配置。请求-回复方法签名也受到支持,其中若请求消息中未提供 replyTo,则可将 @SendTo 用作回退回复地址。这些方法还可具有异步返回类型:Project Reactor 的 Mono、CompletableFuture 或 Kotlin 的 suspend 函数。
-
Mono-based request-reply example with possible attributes:
@Bean
AmqpMessageListenerContainerFactory customContainerFactory(AmqpConnectionFactory connectionFactory) {
var containerFactory = new MethodAmqpMessageListenerContainerFactory(connectionFactory);
containerFactory.setAutoStartup(false);
containerFactory.setAutoAccept(true);
containerFactory.setConcurrency(5);
containerFactory.setReceiveTimeout(Duration.ofSeconds(5));
containerFactory.setGracefulShutdownPeriod(Duration.ofSeconds(36));
containerFactory.setInitialCredits(55);
containerFactory.setTaskExecutor(this.taskExecutor);
containerFactory.setAdviceChain(new DebugInterceptor());
containerFactory.setListenerErrorHandler(mock());
containerFactory.setDefaultRequeueRejected(false);
containerFactory.setHeaderMapper(this.headerMapper);
containerFactory.setMessageConverter(this.messageConverter);
return containerFactory;
}
@Bean
TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(10);
return threadPoolTaskExecutor;
}
@Bean
AmqpHeaderMapper headerMapper() {
return new SimpleAmqpHeaderMapper();
}
@Bean
DebugInterceptor debugInterceptor() {
return new DebugInterceptor();
}
@AmqpListener(
id = "monoListener",
addresses = TEST_QUEUE2 + "," + TEST_QUEUE3,
concurrency = "2",
autoStartup = "#{true}",
autoAccept = "true",
containerFactory = "customContainerFactory",
receiveTimeout = "${receive.timeout:22000}",
gracefulShutdownPeriod = "pt2m",
initialCredits = "101",
defaultRequeueRejected = "true",
executor = "#{taskExecutor}",
messageConverter = "jsonMessageConverter",
headerMapper = "headerMapper",
adviceChain = "debugInterceptor")
Mono<DataOut> requestReplyWithJsonAndOtherAnnotationAttributes(DataIn dataIn) {
DataOut dataOut = new DataOut(dataIn.data + "_out");
return Mono.just(dataOut);
}
属性 id 不是必需的,目标监听器容器的 Bean 名称将基于类名、方法名加上 .listenerContainer 后缀。
属性 addresses 可以是一个名称数组,或每个项目可以是用逗号分隔的名称集合。
当通过外部配置属性提供 AMQP 地址列表时,这会很有用。
属性 containerFactory(见下文)、executor、messageConverter 和 headerMapper 是可选的,可配置为 Bean 名称或 Bean 引用,例如示例中 executor 所使用的 SpEL 表达式。
属性 adviceChain 是 Advice 实现的 Bean 名称列表。
-
Kotlin
suspend函数示例,其中显式指定@SendTo:
@Bean
fun jsonMessageConverter() = JacksonJsonMessageConverter()
@Bean
fun amqpConnectionFactory(protonClient: Client) =
SingleAmqpConnectionFactory(protonClient)
.setPort(amqpPort())
@Bean
fun amqpClient(connectionFactory: AmqpConnectionFactory, jsonMessageConverter: MessageConverter) =
AmqpClient.builder(connectionFactory)
.messageConverter(jsonMessageConverter)
.build()
@Bean(AmqpDefaultConfiguration.DEFAULT_AMQP_LISTENER_CONTAINER_FACTORY_BEAN_NAME)
fun containerFactory(connectionFactory: AmqpConnectionFactory, jsonMessageConverter: MessageConverter) =
MethodAmqpMessageListenerContainerFactory(connectionFactory)
.also { it.setMessageConverter(jsonMessageConverter) }
@AmqpListener(addresses = ["TEST_QUEUE"])
@SendTo("TEST_REPLY_TO")
suspend fun requestReplyAsyncListener(payload: DataIn) = DataOut(payload.data + "_out")
data class DataIn(val data: String)
data class DataOut(val data: String)
The @AmqpListener is @Repeatable, so several listener containers will be registered for each annotation invoking the same service method.
AmqpMessageListenerContainerFactory
该 AmqpMessageListenerContainerFactory 抽象可用于基于提供的 AmqpListenerEndpoint 创建一个 AmqpMessageListenerContainer。如果未被 AmqpListenerEndpoint 实例覆盖,工厂将提供默认的监听容器属性。该框架基于 MessageListener 注入提供一个 SimpleAmqpListenerEndpoint 实现,并基于特定的 POJO 方法调用提供一个 MethodAmqpListenerEndpoint 实现。两种端点实现均需要 AMQP 地址以进行消息消费。AmqpMessageListenerContainerFactory 的 MethodAmqpMessageListenerContainerFactory 扩展专门用于基于 MethodAmqpListenerEndpoint(即 AmqpMessagingListenerAdapter 在内部用作消息监听器)的 AmqpMessageListenerContainer 实例。当通过 AmqpListenerAnnotationBeanPostProcessor 解析 @AmqpListener 注解时,该框架依赖于一个 Bean 来获取此工厂。若 POJO 方法上未提供 @AmqpListener(containerFactory) 属性,则框架期望存在一个全局 Bean,用于 AmqpMessageListenerContainerFactory(其具有 AmqpDefaultConfiguration.DEFAULT_AMQP_LISTENER_CONTAINER_FACTORY_BEAN_NAME)。
与 @RabbitListener 和 RabbitListenerEndpointRegistry 的基础设施不同,AmqpListenerEndpointRegistry 使用一个 AmqpMessageListenerContainerFactory 和 AmqpListenerEndpoint 来创建一个 AmqpMessageListenerContainer 实例,并将其直接注册为单例 Bean 到应用上下文中。此 AmqpMessageListenerContainer 实例的 Bean 名称是来自 AmqpListenerEndpoint.getId() 的值,或根据 MessageListener 类名和 .listenerContainer 后缀生成的名称;若为 MethodAmqpListenerEndpoint(即服务类名加方法名及指定后缀)的情况,则 Bean 名称为服务类名加方法名及指定后缀。
参见 AmqpListenerEndpoint、AmqpMessageListenerContainerFactory 和 AmqpListenerEndpointRegistry 的 Java 文档以获取更多信息。