此版本仍在开发中,目前尚不稳定。如需最新稳定版本,请使用 Spring AMQP 4.0.2spring-doc.cadn.net.cn

AmqpTemplate

与 Spring Framework 及相关项目提供的许多其他高级抽象一样,Spring AMQP 也提供了一个“模板”,其在其中起着核心作用。定义主要操作的接口称为 AmqpTemplate。这些操作涵盖了发送和接收消息的一般行为。换句话说,它们并非任何特定实现所独有的——因此名称中包含“AMQP”。另一方面,该接口存在一些实现,这些实现与 AMQP 协议的实现相关联。与JMS不同,JMS本身是一个接口级别的API,而AMQP是一种网络协议层面的协议。该协议的实现提供了其自身的客户端库,因此每个模板接口的实现都依赖于特定的客户端库。目前,仅有一个实现:RabbitTemplate。在下面的示例中,我们经常使用 AmqpTemplate。然而,当你查看配置示例或任何涉及模板实例化或调用setter方法的代码片段时,你可以看到实现类型(例如,RabbitTemplate)。spring-doc.cadn.net.cn

如前所述,AmqpTemplate 接口定义了所有基本的消息发送和接收操作。我们将在 发送消息接收消息 中分别探讨消息的发送与接收。spring-doc.cadn.net.cn

添加重试功能

从版本 1.3 开始,现在您可以将 RabbitTemplate 配置为使用 RetryTemplate,以帮助处理与代理(broker)连接相关的问题。有关更多信息,请参阅 Spring Framework 中的 核心重试(Core Retry) 支持。以下仅是一个示例,它使用指数退避策略和默认的 SimpleRetryPolicy,即在将异常抛给调用方之前尝试三次。spring-doc.cadn.net.cn

以下示例在 Java 中使用了 @Configuration 注解:spring-doc.cadn.net.cn

public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    RetryPolicy retryPolicy = RetryPolicy.builder()
            .delay(Duration.ofMillis(500))
            .multiplier(2.0)
            .maxDelay(Duration.ofSeconds(10))
            .build();
    template.setRetryTemplate(new RetryTemplate(retryPolicy));
    return template;
}

从版本 1.4 开始,除了 retryTemplate 属性外,recoveryCallback 选项也已在 RabbitTemplate 上得到支持。它被用作 RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback) 的第二个参数。spring-doc.cadn.net.cn

代码 RecoveryCallback 的功能 somewhat 有限,因为重试上下文仅包含 lastThrowable 字段。对于更复杂的使用场景,建议使用外部的 RetryTemplate,以便通过上下文的属性向 RecoveryCallback 传递更多信息。以下示例展示了如何实现这一点:
retryTemplate.execute(
    new RetryCallback<Object, Exception>() {

        @Override
        public Object doWithRetry(RetryContext context) throws Exception {
            context.setAttribute("message", message);
            return rabbitTemplate.convertAndSend(exchange, routingKey, message);
        }

    }, new RecoveryCallback<Object>() {

        @Override
        public Object recover(RetryContext context) throws Exception {
            Object message = context.getAttribute("message");
            Throwable t = context.getLastThrowable();
            // Do something with message
            return null;
        }
    });
}

在这种情况下,您 不会RetryTemplate 注入到 RabbitTemplate 中。spring-doc.cadn.net.cn

发布是异步的——如何检测成功与失败

发布消息是一种异步机制,默认情况下,RabbitMQ 会丢弃无法路由的消息。为了确保成功发布,您可以接收异步确认,如 关联的发布者确认和返回 所述。请考虑以下两种故障场景:spring-doc.cadn.net.cn

第一种情况由出版商退货涵盖,如关联出版商确认和退货中所述。spring-doc.cadn.net.cn

对于第二种情况,消息会被丢弃,且不会生成返回结果。
底层通道会因异常而关闭。
默认情况下,该异常会被记录,但您可以在 ChannelListener 中注册一个 CachingConnectionFactory,以获取此类事件的通知。
以下示例展示了如何添加一个 ConnectionListenerspring-doc.cadn.net.cn

this.connectionFactory.addConnectionListener(new ConnectionListener() {

    @Override
    public void onCreate(Connection connection) {
    }

    @Override
    public void onShutDown(ShutdownSignalException signal) {
        ...
    }

});

您可以检查信号的 reason 属性以确定发生的问题。spring-doc.cadn.net.cn

为了在发送线程上检测异常,您可以在 setChannelTransacted(true)RabbitTemplate,异常将在 txCommit() 处被检测到。然而,事务会显著影响性能,因此在为这一单一用例启用事务之前,请慎重考虑。spring-doc.cadn.net.cn

关联的发布者确认和返回

《0》实现的《1》支持发布者确认和返回。spring-doc.cadn.net.cn

对于返回的消息,模板的 mandatory 属性必须设置为 truemandatory-expression 必须计算为 true,才能使特定消息生效。此功能需要一个 CachingConnectionFactory,其 publisherReturns 属性必须设置为 true(参见 发布者确认与返回)。客户端通过注册一个 RabbitTemplate.ReturnsCallback(即调用 setReturnsCallback(ReturnsCallback callback))来接收返回消息。回调方法必须实现以下方法:spring-doc.cadn.net.cn

void returnedMessage(ReturnedMessage returned);

代码 ReturnedMessage 具有以下属性:spring-doc.cadn.net.cn

每个 RabbitTemplate 仅支持一个 ReturnsCallback
另请参阅 回复超时spring-doc.cadn.net.cn

对于发布者确认(也称为发布者确认),模板需要一个 CachingConnectionFactory,其 publisherConfirm 属性被设置为 ConfirmType.CORRELATED。确认信息由客户端通过调用 setConfirmCallback(ConfirmCallback callback) 注册一个 RabbitTemplate.ConfirmCallback 来发送给客户端。回调方法必须实现以下方法:spring-doc.cadn.net.cn

void confirm(CorrelationData correlationData, boolean ack, String cause);

客户端在发送原始消息时提供的对象是 CorrelationDataack 表示为 ack(即“失败”)时为真,而为 nack(即“成功”)时为假。 对于 nack 实例,如果在生成 nack 时可获取关闭原因,则 nack 中可能包含导致 nack 的原因。例如,在向不存在的交换机发送消息时,代理会关闭通道。关闭的原因将包含在 cause 中。cause 是在 1.4 版本中新增的。spring-doc.cadn.net.cn

一个 ConfirmCallback 仅由一个 RabbitTemplate 支持。spring-doc.cadn.net.cn

当兔子模板发送操作完成时,通道将被关闭。

spring-doc.cadn.net.cn

这会阻止在连接工厂缓存已满时接收确认或返回(当缓存中有空间时,通道不会被物理关闭,确认和返回可正常进行)。spring-doc.cadn.net.cn

当缓存已满时,框架最多会延迟关闭5秒,以确保有足够时间接收确认和返回。spring-doc.cadn.net.cn

在使用确认机制时,通道会在收到最后一个确认时关闭。spring-doc.cadn.net.cn

仅使用返回机制时,通道将保持打开状态长达完整的5秒。spring-doc.cadn.net.cn

我们通常建议将连接工厂的 channelCacheSize 设置为一个足够大的值,以确保发布消息所用的通道能被返回到缓存中,而不是被关闭。spring-doc.cadn.net.cn

您可以通过使用 RabbitMQ 管理插件来监控通道使用情况。spring-doc.cadn.net.cn

如果您观察到通道被快速地打开和关闭,应考虑增大缓存大小,以减少对服务器的开销。spring-doc.cadn.net.cn

在 2.1 版本之前,启用发布者确认的通道会在收到确认前被放回缓存中。其他某个进程可能在此时获取该通道并执行某些操作,从而导致通道关闭——例如向一个不存在的交换机发送消息。这可能导致确认丢失。

spring-doc.cadn.net.cn

2.1 版本及更高版本不再在确认未完成时将通道放回缓存。spring-doc.cadn.net.cn

RabbitTemplate 在每次操作后对通道执行一次逻辑 close() 操作。spring-doc.cadn.net.cn

通常情况下,这意味着一个通道上同时仅有一个确认处于待处理状态。spring-doc.cadn.net.cn

从版本 2.2 开始,回调将在连接工厂的 executor 线程之一上调用。此举旨在避免潜在的死锁:如果你在回调内部执行 Rabbit 操作,就可能出现死锁。在早期版本中,回调直接在 amqp-client 连接 I/O 线程上调用;如果此时执行某些 RPC 操作(例如打开新通道),则会发生死锁,因为 I/O 线程会阻塞等待结果,而该结果又需要由 I/O 线程自身进行处理。在那些旧版本中,必须在回调内将工作(如发送消息)移交至其他线程来完成。如今,由于框架已将回调调用交由执行器(executor)处理,这种操作已不再必要。
只要返回回调在60秒或更短时间内执行,仍会保持在确认(ack)之前收到返回消息的保证。</p><p>确认(confirm)将在返回回调执行完毕后或60秒后(以较早者为准)被调度发送。

对象 CorrelationData 具有一个 CompletableFuture,您可以使用它来获取结果,而无需在模板上使用 ConfirmCallback。以下示例展示了如何配置一个 CorrelationData 实例:spring-doc.cadn.net.cn

CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnedMessage = cd1.getReturn();
...

由于这是一个 CompletableFuture<Confirm>,您可以在准备好时 get() 结果,或使用 whenComplete() 进行异步回调。
Confirm 对象是一个简单的 JavaBean,包含两个属性:ackreason(用于 nack 实例)。
对于由代理生成的 nack 实例,该原因字段不会被填充。
但对于由框架生成的 nack 实例(例如,在存在未完成的 ack 实例时关闭连接),该字段会被填充。spring-doc.cadn.net.cn

此外,当同时启用确认和返回功能时,如果消息无法路由到任何队列,则 CorrelationData return 属性将填充返回的消息。可以保证在 Future 被设置为 ack 之前,已设置返回消息属性。CorrelationData.getReturn() 返回一个 ReturnMessage,其包含以下属性:spring-doc.cadn.net.cn

参见作用域操作,以获取一种更简单的机制来等待发布者确认。spring-doc.cadn.net.cn

作用域操作

通常,使用模板时,会从缓存中检出一个 Channel(或创建它),用于该操作,然后将其返回到缓存中以供重用。在多线程环境中,无法保证下一次操作会使用相同的通道。然而,有时您可能希望对通道的使用拥有更多控制权,并确保若干操作均在同一通道上执行。spring-doc.cadn.net.cn

从版本 2.0 开始,提供了一种名为 invoke 的新方法,该方法带有一个 OperationsCallback。在回调作用域内执行的所有操作以及对提供的 RabbitOperations 参数执行的操作均使用相同的专用 Channel,该通道将在最后被关闭(不会返回到缓存中)。如果通道是 PublisherCallbackChannel,则在所有确认均已收到后将其返回到缓存中(参见 关联的发布者确认与返回)。spring-doc.cadn.net.cn

@FunctionalInterface
public interface OperationsCallback<T> {

    T doInRabbit(RabbitOperations operations);

}

一个可能需要此功能的例子是,如果您希望在底层的 waitForConfirms() 方法上进行操作。该方法此前未通过 Spring API 暴露,因为通道通常被缓存并共享(如前所述)。现在,RabbitTemplate 提供了 waitForConfirms(long timeout)waitForConfirmsOrDie(long timeout),它们会将调用委托给在 OperationsCallback 的作用域内使用的专用通道。这些方法仅限于该作用域内使用,原因显而易见。spring-doc.cadn.net.cn

请注意,更高层次的抽象(允许您将确认与请求关联)在其他地方提供(参见 关联发布者确认和返回)。
如果您只想等待直到代理确认已交付消息,您可以使用以下示例中所示的技术:spring-doc.cadn.net.cn

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
    messages.forEach(m -> t.convertAndSend(ROUTE, m));
    t.waitForConfirmsOrDie(10_000);
    return true;
});

如果您希望在 RabbitAdmin 的作用域内,同一通道上不调用任何操作,则管理员必须使用与 invoke 操作相同的 RabbitTemplate 构建。spring-doc.cadn.net.cn

如果模板操作已经在现有事务的作用域内执行(例如,在已启用事务的监听容器线程上运行,并在已启用事务的模板上执行操作),那么前面的讨论就不再适用。在这种情况下,操作将在该通道上执行,并在该线程返回到容器时提交。在这种场景下,无需使用 invoke

以这种方式使用确认时,用于将确认与请求关联的大部分基础设施实际上并不需要(除非也启用了返回)。
从版本 2.2 开始,连接工厂支持一个名为 publisherConfirmType 的新属性。
当将其设置为 ConfirmType.SIMPLE 时,可避免相关基础设施,从而提升确认处理的效率。spring-doc.cadn.net.cn

此外,RabbitTemplate 会设置发送消息中的 publisherSequenceNumber 属性 MessageProperties。如果您希望检查(或记录或以其他方式使用)特定的确认信息,可以使用重载的 invoke 方法,如下例所示:spring-doc.cadn.net.cn

public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
        com.rabbitmq.client.ConfirmCallback nacks);
这些 ConfirmCallback 对象(针对 acknack 实例)是 Rabbit 客户端回调,而非模板回调。

以下示例记录了 acknack 个实例:spring-doc.cadn.net.cn

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
    messages.forEach(m -> t.convertAndSend(ROUTE, m));
    t.waitForConfirmsOrDie(10_000);
    return true;
}, (tag, multiple) -> {
        log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
        log.info("Nack: " + tag + ":" + multiple);
}));
作用域操作与线程绑定。多线程环境中的严格消息排序 一文讨论了多线程环境中的严格排序问题。

多线程环境中的严格消息排序

作用域操作中的讨论仅适用于在同一线程上执行的操作。spring-doc.cadn.net.cn

考虑以下情况:spring-doc.cadn.net.cn

由于 RabbitMQ 的异步特性和缓存通道的使用,无法保证始终使用同一通道,因此消息到达队列的顺序无法得到保障。(在大多数情况下,消息会按顺序到达,但出现乱序传递的概率并非为零)。为解决此用例,可使用大小为 1 的有界通道缓存(配合 channelCheckoutTimeout)来确保所有消息始终通过同一通道发布,从而保证消息顺序。若您的连接工厂还有其他用途(如消费者),则应为模板单独配置一个连接工厂,或配置模板以使用主连接工厂中嵌入的发布者连接工厂(详见 使用独立连接)。spring-doc.cadn.net.cn

这最好通过一个简单的 Spring Boot 应用程序来说明:spring-doc.cadn.net.cn

@SpringBootApplication
public class Application {

	private static final Logger log = LoggerFactory.getLogger(Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	TaskExecutor exec() {
		ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
		exec.setCorePoolSize(10);
		return exec;
	}

	@Bean
	CachingConnectionFactory ccf() {
		CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
		CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();
		publisherCF.setChannelCacheSize(1);
		publisherCF.setChannelCheckoutTimeout(1000L);
		return ccf;
	}

	@RabbitListener(queues = "queue")
	void listen(String in) {
		log.info(in);
	}

	@Bean
	Queue queue() {
		return new Queue("queue");
	}


	@Bean
	public ApplicationRunner runner(Service service, TaskExecutor exec) {
		return args -> {
			exec.execute(() -> service.mainService("test"));
		};
	}

}

@Component
class Service {

	private static final Logger LOG = LoggerFactory.getLogger(Service.class);

	private final RabbitTemplate template;

	private final TaskExecutor exec;

	Service(RabbitTemplate template, TaskExecutor exec) {
		template.setUsePublisherConnection(true);
		this.template = template;
		this.exec = exec;
	}

	void mainService(String toSend) {
		LOG.info("Publishing from main service");
		this.template.convertAndSend("queue", toSend);
		this.exec.execute(() -> secondaryService(toSend.toUpperCase()));
	}

	void secondaryService(String toSend) {
		LOG.info("Publishing from secondary service");
		this.template.convertAndSend("queue", toSend);
	}

}

尽管发布是在两个不同的线程上执行的,但它们都将使用同一个通道,因为缓存最多只允许一个通道。spring-doc.cadn.net.cn

从版本 2.3.7 开始,ThreadChannelConnectionFactory 支持将线程的通道(channel)转移给另一线程,方法是使用 prepareContextSwitchswitchContext 方法。
第一种方法返回一个上下文,该上下文传递给第二个线程,由其调用第二种方法。
一个线程可以绑定非事务性通道或事务性通道(或两者兼有);除非您使用两个连接工厂,否则无法单独转移它们。
示例如下:spring-doc.cadn.net.cn

@SpringBootApplication
public class Application {

	private static final Logger log = LoggerFactory.getLogger(Application.class);

	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}

	@Bean
	TaskExecutor exec() {
		ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
		exec.setCorePoolSize(10);
		return exec;
	}

	@Bean
	ThreadChannelConnectionFactory tccf() {
		ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
		rabbitConnectionFactory.setHost("localhost");
		return new ThreadChannelConnectionFactory(rabbitConnectionFactory);
	}

	@RabbitListener(queues = "queue")
	void listen(String in) {
		log.info(in);
	}

	@Bean
	Queue queue() {
		return new Queue("queue");
	}


	@Bean
	public ApplicationRunner runner(Service service, TaskExecutor exec) {
		return args -> {
			exec.execute(() -> service.mainService("test"));
		};
	}

}

@Component
class Service {

	private static final Logger LOG = LoggerFactory.getLogger(Service.class);

	private final RabbitTemplate template;

	private final TaskExecutor exec;

	private final ThreadChannelConnectionFactory connFactory;

	Service(RabbitTemplate template, TaskExecutor exec,
			ThreadChannelConnectionFactory tccf) {

		this.template = template;
		this.exec = exec;
		this.connFactory = tccf;
	}

	void mainService(String toSend) {
		LOG.info("Publishing from main service");
		this.template.convertAndSend("queue", toSend);
		Object context = this.connFactory.prepareSwitchContext();
		this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));
	}

	void secondaryService(String toSend, Object threadContext) {
		LOG.info("Publishing from secondary service");
		this.connFactory.switchContext(threadContext);
		this.template.convertAndSend("queue", toSend);
		this.connFactory.closeThreadChannel();
	}

}
一旦调用 prepareSwitchContext,如果当前线程执行任何其他操作,它们都将在一个新的通道上执行。

spring-doc.cadn.net.cn

当不再需要时,务必关闭与线程绑定的通道。spring-doc.cadn.net.cn

消息集成

从版本 1.4 开始,RabbitMessagingTemplate(基于 RabbitTemplate 构建)提供了与 Spring Framework 消息抽象的集成——即,org.springframework.messaging.Message。这使您能够通过 spring-messaging Message<?> 抽象发送和接收消息。该抽象被其他 Spring 项目所使用,例如 Spring Integration 和 Spring 对 STOMP 的支持。涉及两种消息转换器:一种用于在 spring-messaging 的 Message<?> 与 Spring AMQP 的 Message 抽象之间进行转换;另一种用于在 Spring AMQP 的 Message 抽象与底层 RabbitMQ 客户端库所需的格式之间进行转换。默认情况下,消息负载由提供的 RabbitTemplate 实例的消息转换器进行转换。或者,您可以注入一个自定义的 MessagingMessageConverter 及其对应的其他负载转换器,如下例所示:spring-doc.cadn.net.cn

MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);

已验证用户ID

从版本 1.6 开始,模板现在支持 user-id-expression(在使用 Java 配置时为 userIdExpression)。
如果发送了一条消息,则在评估此表达式后,会设置用户 ID 属性(若尚未设置)。
评估的根对象是待发送的消息。spring-doc.cadn.net.cn

以下示例展示了如何使用 user-id-expression 属性:spring-doc.cadn.net.cn

<rabbit:template ... user-id-expression="'guest'" />

<rabbit:template ... user-id-expression="@myConnectionFactory.username" />

第一个示例是字面量表达式。
第二个示例从应用上下文中的连接工厂bean中获取username属性。spring-doc.cadn.net.cn

使用独立的连接

从版本 2.0.2 开始,您可以将 usePublisherConnection 属性设置为 true,以在可能的情况下使用与监听器容器所用连接不同的连接。此举旨在避免当生产者因任何原因被阻塞时,消费者也被阻塞。连接工厂为此目的维护一个内部的第二个连接工厂;默认情况下,它与主工厂类型相同,但如果您希望为发布使用不同的工厂类型,也可以显式进行设置。如果 Rabbit 模板在由监听器容器启动的事务中运行,则无论此设置如何,均使用容器的通道。spring-doc.cadn.net.cn

通常情况下,您不应将 RabbitAdmin 与一个将此设置为 true 的模板一起使用。

spring-doc.cadn.net.cn

请使用接受连接工厂参数的 RabbitAdmin 构造函数。spring-doc.cadn.net.cn

如果您使用了另一个接受模板参数的构造函数,请确保该模板的属性设置为 falsespring-doc.cadn.net.cn

这是因为,通常管理员会用于为监听器容器声明队列。spring-doc.cadn.net.cn

如果使用了一个将该属性设置为 true 的模板,则意味着独占队列(例如 AnonymousQueue)将在与监听器容器所使用的连接不同的连接上被声明。spring-doc.cadn.net.cn

在这种情况下,这些队列将无法被容器使用。spring-doc.cadn.net.cn