此版本仍在开发中,目前尚不稳定。如需最新稳定版本,请使用 Spring AMQP 4.0.2spring-doc.cadn.net.cn

通用 AMQP 1.0 支持

版本 4.1 引入了 spring-amqp-client 模块,用于支持 AMQP 1.0 协议。spring-doc.cadn.net.cn

该构件基于 Qpid ProtonJ2 客户端库,可与任何支持 AMQP 1.0 协议的对等方协同工作,包括 RabbitMQ 代理。spring-doc.cadn.net.cn

此依赖项必须添加到项目中,才能与 AMQP 1.0 支持进行交互:spring-doc.cadn.net.cn

<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-amqp-client</artifactId>
  <version>4.1.0-SNAPSHOT</version>
</dependency>
implementation 'org.springframework.amqp:spring-amqp-client:4.1.0-SNAPSHOT'

AMQP 1.0 环境

代码org.apache.qpid:protonj2-client具有极强的灵活性,并提供了一个便捷的API,即使在Spring应用中不使用任何专用的Spring实现,也能轻松集成使用。例如,org.apache.qpid.protonj2.client.Message类是构建器模式(builder pattern)的实现。无论采用哪种方式,spring-amqp-client都为连接生命周期管理以及AMQP 1.0协议交互提供了高级别的Spring模式实现,用于发送和接收消息操作。AMQP 1.0环境始于一个org.apache.qpid.protonj2.client.Client实例:spring-doc.cadn.net.cn

@Bean
Client protonClient() {
    return Client.create();
}

相同的 Client 可用于连接到不同的代理(broker),但具体的连接设置必须在特定的连接中提供。然而,该 Bean 并非必需,SingleAmqpConnectionFactory 会内部创建实例。请参见下方的 AMQP 1.0 连接工厂spring-doc.cadn.net.cn

AMQP 1.0 连接工厂

引入了org.springframework.amqp.client.AmqpConnectionFactory抽象,用于管理org.apache.qpid.protonj2.client.ConnectionSingleAmqpConnectionFactory实现用于管理一个连接及其设置。 同一个Connection可在多个生产者和消费者之间共享。 多路复用由链接抽象处理,该抽象在AMQP客户端库内部用于AMQP 1.0协议的实现。 Connection具备恢复能力。spring-doc.cadn.net.cn

在大多数情况下,只需将此 Bean 添加到项目中即可:spring-doc.cadn.net.cn

@Bean
AmqpConnectionFactory connectionFactory() {
    return new SingleAmqpConnectionFactory();
}

请参阅 SingleAmqpConnectionFactory 个用于所有连接特定选项的设置器,包括 hostportuser。更高级的配置可以通过 setConnectionOptions(ConnectionOptions) 完成,它来自 ProtonJ 库的构建器 API。所提及的 SingleAmqpConnectionFactory 实例在所有默认设置下,将连接到 localhost:5672 并支持无限次重连。spring-doc.cadn.net.cn

注入 Client 可以省略,而 SingleAmqpConnectionFactory 则由工厂内部创建相应实例。

AmqpClient

代码AmqpClient是一个用于在AMQP 1.0协议上执行消息的发送和接收操作的流畅式API。它需要一个AmqpConnectionFactory,并可通过AmqpClient.Builder API配置一些默认值。代码DefaultAmqpClientAmqpClient契约的内部实现。即使org.apache.qpid:protonj2-client库自带一个org.apache.qpid.protonj2.client.Message实现,AmqpClient仍然基于广为人知的org.springframework.amqp.core.Message提供了一个API,并包含诸如MessagePropertiesMessageConverter等所有支持类与抽象。在org.apache.qpid.protonj2.client.Message之间的转换由AmqpClient实现内部通过ProtonUtils支持类完成。所有发送和接收方法均返回一个CompletableFuture,以最终获取操作结果。与普通对象交互时需进行消息体转换,而SimpleMessageConverter则作为默认方式被使用。有关转换的更多信息,请参阅AmqpClient.Builder.messageConverter(MessageConverter)配置以及消息转换器spring-doc.cadn.net.cn

通常,仅一个这样的 Bean 就足以执行所有可能的发送和接收操作:spring-doc.cadn.net.cn

@Bean
AmqpClient amqpClient(AmqpConnectionFactory connectionFactory) {
    return AmqpClient.builder(connectionFactory)
            .defaultToAddress("/queues/some_queue_as_default")
            .messageConverter(new JacksonJsonMessageConverter())
            .build();
}
在上面的例子中,/queues/ 前缀用于 defaultToAddressRabbitMQ 中目标地址约定的一个示例。

以下是一些 AmqpClient 操作的示例:spring-doc.cadn.net.cn

CompletableFuture<Boolean> sendFuture = this.amqpClient.send(Message.create("test_data"));
CompletableFuture<Boolean> sendFuture =
    this.amqpClient
            .to("/queues/test_queue")
            .message(new org.springframework.amqp.core.Message("test_data2".getBytes()))
            .send();
CompletableFuture<Boolean> sendFuture =
        this.amqpClient
                .to("/queues/test_queue")
                .body("convert")
                .priority(7)
                .header("test_header", "test_value")
                .messageId("some_id")
                .userId("guest")
                .send();

AmqpClient 还提供了一种流畅的 API,用于按需从地址接收消息。为此,应调用一个 AmqpClient.from(String fromAddress) 方法,以返回一个 ReceiveSpec,用于各种接收操作的行为。所有这些方法均会生成一个 CompletableFuture,其中包含原生 ProtonJ 消息、Spring AMQP 消息,或仅是将消息体转换后的有效载荷。对于带有非 Object.class 泛型参数的 receiveAndConvert(),需要在 AmqpClient 中提供一个 messageConverter,使其作为 SmartMessageConverter。例如,当为 AmqpClient 提供 JacksonJsonMessageConverter 时,以下示例将实现适当的数据模型转换:spring-doc.cadn.net.cn

record TestData(String data) { }

 this.amqpClient
        .to("/queues/test_queue")
        .body(new TestData("convert"))
        .send();

CompletableFuture<TestData> receiveFuture =
        this.amqpClient.from("/queues/test_queue")
                    .receiveAndConvert();

参见这些方法的 Javadoc,以获取有关 AmqpClient 所暴露的流畅 API 的更多信息。spring-doc.cadn.net.cn

AMQP 1.0 消费者

AMQP 1.0 协议的事件驱动消费者实现为一个 AmqpMessageListenerContainer,类似于该框架中所有其他 监听器容器实现AmqpMessageListenerContainer 需要一个 AmqpConnectionFactory,并内部使用 org.apache.qpid.protonj2.client.Receiver 实例从所提供的 queueNames(本质上是 AMQP 1.0 地址)中消费消息。100 初始信用值默认用于接收者链接。每次已处理的交付都会为后续新消息补充一定数量的信用值。所消费的消息由提供的 MessageListener 处理;若 autoAccept 设置为 false,则会填充一个 AmqpAcknowledgment 实现,以将相应 Spring AMQP 消息属性用于目标消息监听器逻辑中的手动交付确认。spring-doc.cadn.net.cn

选项 consumersPerQueue(默认为 1)为每个提供的地址实现了一种并发行为以进行消息消费。spring-doc.cadn.net.cn

选项 Duration receiveTimeout(默认为 1 second)控制阻塞 Receiver.receive() 操作,该操作会在循环中反复调用,直到消费者被停止。spring-doc.cadn.net.cn

每个消费者(一个内部 AmqpMessageListenerContainer.AmqpConsumer 实例)通过 Executor 进行调度以运行。默认情况下,AmqpMessageListenerContainer.taskExecutor 属性被设置为 SimpleAsyncTaskExecutorspring-doc.cadn.net.cn

消息处理错误可以通过 ErrorHandler 配置进行处理。spring-doc.cadn.net.cn

如果提供了一些 AOP 拦截器(例如 TransactionInterceptor),则 MessageListener 可以在 AmqpMessageListenerContainer 中被代理。spring-doc.cadn.net.cn

AmqpMessageListenerContainer 还提供了 pause()resume() API,用于设置 AMQP 链路信用额度为 0 并进行补充。spring-doc.cadn.net.cn

以下示例演示了 AmqpMessageListenerContainer 的简单配置:spring-doc.cadn.net.cn

BlockingQueue<Message> receivedMessages = new LinkedBlockingQueue<>();

@Bean
AmqpMessageListenerContainer amqpMessageListenerContainer(AmqpConnectionFactory connectionFactory) {
    var amqpMessageListenerContainer = new AmqpMessageListenerContainer(connectionFactory);
    amqpMessageListenerContainer.setQueueNames("address1", "address2");
    amqpMessageListenerContainer.setConsumersPerQueue(3);
    amqpMessageListenerContainer.setAutoAccept(false);
    amqpMessageListenerContainer.setReceiveTimeout(Duration.ofMillis(100));
    amqpMessageListenerContainer.setupMessageListener(this.receivedMessages::add);
    return amqpMessageListenerContainer;
}

ProtonJDelivery消费

为了方便起见,提供了一个 ProtonDeliveryListener 协议来处理原生 ProtonJ Delivery 对象,而非 Spring AMQP 消息。这在需要对 Delivery 实例进行完全控制的场景中可能很有用。例如,添加某些拒绝条件、将接收到的数据作为 InputStream 处理、根据目标应用逻辑动态补充链接信用额度等。另一种原生 Delivery 处理场景,即生产者和消费者均使用原生 ProtonJ Message 协议,包括 ProtonJ 内部的正文编码/解码机制(与 Spring AMQP 消息仅接受 byte[] 类型正文不同)。spring-doc.cadn.net.cn

手动结算仍可跳过,如果将 AmqpMessageListenerContainer 配置为 autoAccept = true(默认值)。
AmqpMessageListenerContainer 会自动进行信用额度补充。

应将 ProtonDeliveryListener 实现作为常规 MessageListener 注入到 AmqpMessageListenerContainer 中:spring-doc.cadn.net.cn

BlockingQueue<Delivery> receivedDeliveries = new LinkedBlockingQueue<>();

@Bean
AmqpMessageListenerContainer protonDeliveryListenerContainer(AmqpConnectionFactory connectionFactory) {
    var amqpMessageListenerContainer = new AmqpMessageListenerContainer(connectionFactory);
    amqpMessageListenerContainer.setQueueNames(TEST_QUEUE_FOR_NATIVE_PROTON);
    amqpMessageListenerContainer.setAutoAccept(false);
    amqpMessageListenerContainer.setupMessageListener((ProtonDeliveryListener) this.receivedDeliveries::add);
    return amqpMessageListenerContainer;
}

AMQP 1.0 注解配置

spring-amqp-client 提供了一种便捷的方式,通过注解来配置 AMQP 1.0 基础设施。@EnableAmqp 注解可设置在目标项目中的 @Configuration 类上,以触发该基础设施的注册。由 @EnableAmqp 注解导入的 AmqpDefaultConfiguration 提供了支持 @AmqpListener 的基础设施 Bean(详见下文):spring-doc.cadn.net.cn

  • The AmqpListenerEndpointRegistry - the global component to register AMQP listener containers programmatically.spring-doc.cadn.net.cn

  • The AmqpListenerAnnotationBeanPostProcessor (under the AmqpDefaultConfiguration.AMQP_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME) - a BeanPostProcessor to parse POJO methods with the @AmqpListener and register listener containers via the AmqpListenerEndpointRegistry.spring-doc.cadn.net.cn

@AmqpListener

为实现高层级且便捷的配置,该框架提供了一个 @AmqpListener 注解,用于将 POJO 方法标记为监听器调用者,以便由框架创建的 AmqpMessageListenerContainer 实例使用。该注解与 @RabbitListener@KafkaListener@JmsListener 具有许多相似之处。它需要指定要从中消费消息的 AMQP 地址,并且还可配置许多其他监听器容器属性。所有 @AmqpListener 属性均可作为属性占位符(${})和/或 SpEL 表达式(#{})进行配置。请求-回复方法签名也受到支持,其中若请求消息中未提供 replyTo,则可将 @SendTo 用作回退回复地址。这些方法还可具有异步返回类型:Project Reactor 的 MonoCompletableFuture 或 Kotlin 的 suspend 函数。spring-doc.cadn.net.cn

@Bean
AmqpMessageListenerContainerFactory customContainerFactory(AmqpConnectionFactory connectionFactory) {
	var containerFactory = new MethodAmqpMessageListenerContainerFactory(connectionFactory);
	containerFactory.setAutoStartup(false);
	containerFactory.setAutoAccept(true);
	containerFactory.setConcurrency(5);
	containerFactory.setReceiveTimeout(Duration.ofSeconds(5));
	containerFactory.setGracefulShutdownPeriod(Duration.ofSeconds(36));
	containerFactory.setInitialCredits(55);
	containerFactory.setTaskExecutor(this.taskExecutor);
	containerFactory.setAdviceChain(new DebugInterceptor());
	containerFactory.setListenerErrorHandler(mock());
	containerFactory.setDefaultRequeueRejected(false);
	containerFactory.setHeaderMapper(this.headerMapper);
	containerFactory.setMessageConverter(this.messageConverter);
	return containerFactory;
}

@Bean
TaskExecutor taskExecutor() {
	ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
	threadPoolTaskExecutor.setCorePoolSize(10);
	return threadPoolTaskExecutor;
}

@Bean
AmqpHeaderMapper headerMapper() {
	return new SimpleAmqpHeaderMapper();
}

@Bean
DebugInterceptor debugInterceptor() {
	return new DebugInterceptor();
}

@AmqpListener(
        id = "monoListener",
        addresses = TEST_QUEUE2 + "," + TEST_QUEUE3,
        concurrency = "2",
        autoStartup = "#{true}",
        autoAccept = "true",
        containerFactory = "customContainerFactory",
        receiveTimeout = "${receive.timeout:22000}",
        gracefulShutdownPeriod = "pt2m",
        initialCredits = "101",
        defaultRequeueRejected = "true",
        executor = "#{taskExecutor}",
        messageConverter = "jsonMessageConverter",
        headerMapper = "headerMapper",
        adviceChain = "debugInterceptor")
Mono<DataOut> requestReplyWithJsonAndOtherAnnotationAttributes(DataIn dataIn) {
       DataOut dataOut = new DataOut(dataIn.data + "_out");
       return Mono.just(dataOut);
}

属性 id 不是必需的,目标监听器容器的 Bean 名称将基于类名、方法名加上 .listenerContainer 后缀。
属性 addresses 可以是一个名称数组,或每个项目可以是用逗号分隔的名称集合。
当通过外部配置属性提供 AMQP 地址列表时,这会很有用。
属性 containerFactory(见下文)、executormessageConverterheaderMapper 是可选的,可配置为 Bean 名称或 Bean 引用,例如示例中 executor 所使用的 SpEL 表达式。
属性 adviceChainAdvice 实现的 Bean 名称列表。spring-doc.cadn.net.cn

@Bean
fun jsonMessageConverter() = JacksonJsonMessageConverter()

@Bean
fun amqpConnectionFactory(protonClient: Client) =
    SingleAmqpConnectionFactory(protonClient)
        .setPort(amqpPort())

@Bean
fun amqpClient(connectionFactory: AmqpConnectionFactory, jsonMessageConverter: MessageConverter) =
    AmqpClient.builder(connectionFactory)
        .messageConverter(jsonMessageConverter)
        .build()

@Bean(AmqpDefaultConfiguration.DEFAULT_AMQP_LISTENER_CONTAINER_FACTORY_BEAN_NAME)
fun containerFactory(connectionFactory: AmqpConnectionFactory, jsonMessageConverter: MessageConverter) =
    MethodAmqpMessageListenerContainerFactory(connectionFactory)
        .also { it.setMessageConverter(jsonMessageConverter) }

@AmqpListener(addresses = ["TEST_QUEUE"])
@SendTo("TEST_REPLY_TO")
suspend fun requestReplyAsyncListener(payload: DataIn) = DataOut(payload.data + "_out")

data class DataIn(val data: String)

data class DataOut(val data: String)

The @AmqpListener is @Repeatable, so several listener containers will be registered for each annotation invoking the same service method.spring-doc.cadn.net.cn

AmqpMessageListenerContainerFactory

AmqpMessageListenerContainerFactory 抽象可用于基于提供的 AmqpListenerEndpoint 创建一个 AmqpMessageListenerContainer。如果未被 AmqpListenerEndpoint 实例覆盖,工厂将提供默认的监听容器属性。该框架基于 MessageListener 注入提供一个 SimpleAmqpListenerEndpoint 实现,并基于特定的 POJO 方法调用提供一个 MethodAmqpListenerEndpoint 实现。两种端点实现均需要 AMQP 地址以进行消息消费。AmqpMessageListenerContainerFactoryMethodAmqpMessageListenerContainerFactory 扩展专门用于基于 MethodAmqpListenerEndpoint(即 AmqpMessagingListenerAdapter 在内部用作消息监听器)的 AmqpMessageListenerContainer 实例。当通过 AmqpListenerAnnotationBeanPostProcessor 解析 @AmqpListener 注解时,该框架依赖于一个 Bean 来获取此工厂。若 POJO 方法上未提供 @AmqpListener(containerFactory) 属性,则框架期望存在一个全局 Bean,用于 AmqpMessageListenerContainerFactory(其具有 AmqpDefaultConfiguration.DEFAULT_AMQP_LISTENER_CONTAINER_FACTORY_BEAN_NAME)。spring-doc.cadn.net.cn

@RabbitListenerRabbitListenerEndpointRegistry 的基础设施不同,AmqpListenerEndpointRegistry 使用一个 AmqpMessageListenerContainerFactoryAmqpListenerEndpoint 来创建一个 AmqpMessageListenerContainer 实例,并将其直接注册为单例 Bean 到应用上下文中。此 AmqpMessageListenerContainer 实例的 Bean 名称是来自 AmqpListenerEndpoint.getId() 的值,或根据 MessageListener 类名和 .listenerContainer 后缀生成的名称;若为 MethodAmqpListenerEndpoint(即服务类名加方法名及指定后缀)的情况,则 Bean 名称为服务类名加方法名及指定后缀。spring-doc.cadn.net.cn

参见 AmqpListenerEndpointAmqpMessageListenerContainerFactoryAmqpListenerEndpointRegistry 的 Java 文档以获取更多信息。spring-doc.cadn.net.cn