| 
         此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring AMQP 3.2.6!  | 
    
AmqpTemplate
与 Spring Framework 和相关项目提供的许多其他高级抽象一样,Spring AMQP 提供了一个起着核心作用的“模板”。
定义主要作的接口称为AmqpTemplate.
这些作涵盖了发送和接收消息的一般行为。
换句话说,它们对于任何实现都不是唯一的,因此名称中的“AMQP”。
另一方面,该接口的实现与 AMQP 协议的实现相关联。
与 JMS 本身是接口级 API 不同,AMQP 是线级协议。
该协议的实现提供自己的客户端库,因此模板接口的每个实现都依赖于特定的客户端库。
目前,只有一个实现:RabbitTemplate.
在下面的示例中,我们经常使用AmqpTemplate.
但是,当您查看实例化模板或调用 setter 的配置示例或任何代码摘录时,您可以看到实现类型(例如,RabbitTemplate).
另请参阅异步兔子模板。
添加重试功能
从 1.3 版开始,您现在可以配置RabbitTemplate使用RetryTemplate以帮助处理代理连接问题。
有关完整信息,请参阅 spring-retry 项目。
下面只是使用指数退避策略和默认SimpleRetryPolicy,这会在向调用方抛出异常之前进行三次尝试。
以下示例使用 XML 命名空间:
<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
    <property name="backOffPolicy">
        <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
            <property name="initialInterval" value="500" />
            <property name="multiplier" value="10.0" />
            <property name="maxInterval" value="10000" />
        </bean>
    </property>
</bean>
以下示例使用@ConfigurationJava 中的注释:
@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    RetryTemplate retryTemplate = new RetryTemplate();
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(500);
    backOffPolicy.setMultiplier(10.0);
    backOffPolicy.setMaxInterval(10000);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    template.setRetryTemplate(retryTemplate);
    return template;
}
从 1.4 版本开始,除了retryTemplate属性,则recoveryCallback选项支持RabbitTemplate.
它用作RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback).
这RecoveryCallback在某种程度上受到限制,因为重试上下文仅包含lastThrowable田。
对于更复杂的用例,您应该使用RetryTemplate以便您可以向RecoveryCallback通过上下文的属性。
以下示例显示了如何执行此作: | 
retryTemplate.execute(
    new RetryCallback<Object, Exception>() {
        @Override
        public Object doWithRetry(RetryContext context) throws Exception {
            context.setAttribute("message", message);
            return rabbitTemplate.convertAndSend(exchange, routingKey, message);
        }
    }, new RecoveryCallback<Object>() {
        @Override
        public Object recover(RetryContext context) throws Exception {
            Object message = context.getAttribute("message");
            Throwable t = context.getLastThrowable();
            // Do something with message
            return null;
        }
    });
}
在这种情况下,您不会注入RetryTemplate进入RabbitTemplate.
发布是异步的 — 如何检测成功和失败
发布消息是一种异步机制,默认情况下,无法路由的消息会被 RabbitMQ 丢弃。 为了成功发布,您可以收到异步确认,如相关发布者确认和返回中所述。 考虑两种故障方案:
- 
发布到交换,但没有匹配的目标队列。
 - 
发布到不存在的交易所。
 
第一种情况由发布者退货涵盖,如相关发布者确认和退货中所述。
对于第二种情况,消息被丢弃并且不会生成返回。
基础通道关闭,但出现异常。
默认情况下,会记录此异常,但您可以注册一个ChannelListener使用CachingConnectionFactory以获取此类事件的通知。
以下示例演示如何添加ConnectionListener:
this.connectionFactory.addConnectionListener(new ConnectionListener() {
    @Override
    public void onCreate(Connection connection) {
    }
    @Override
    public void onShutDown(ShutdownSignalException signal) {
        ...
    }
});
您可以检查信号的reason属性来确定发生的问题。
要检测发送线程上的异常,您可以setChannelTransacted(true)在RabbitTemplate并且在txCommit().
但是,事务会严重影响性能,因此在仅为这一用例启用事务之前请仔细考虑这一点。
相关发布者确认并返回
这RabbitTemplate实现AmqpTemplate支持发布者确认和退货。
对于返回的消息,模板的mandatory属性必须设置为true或mandatory-expression必须评估为true对于特定消息。
此功能需要CachingConnectionFactory它有它的publisherReturns属性设置为true(请参阅出版商确认和退货)。
返回通过注册RabbitTemplate.ReturnsCallback通过调用setReturnsCallback(ReturnsCallback callback).
回调必须实现以下方法:
void returnedMessage(ReturnedMessage returned);
这ReturnedMessage具有以下属性:
- 
message- 返回的消息本身 - 
replyCode- 指示退货原因的代码 - 
replyText- 返回的文本原因 - 例如NO_ROUTE - 
exchange- 发送消息的交换 - 
routingKey- 使用的路由密钥 
只有一个ReturnsCallback由每个RabbitTemplate.
另请参阅回复超时。
对于发布者确认(也称为发布者确认),模板需要CachingConnectionFactory它有它的publisherConfirm属性设置为ConfirmType.CORRELATED.
通过注册RabbitTemplate.ConfirmCallback通过调用setConfirmCallback(ConfirmCallback callback).
回调必须实现以下方法:
void confirm(CorrelationData correlationData, boolean ack, String cause);
这CorrelationData是客户端在发送原始消息时提供的对象。
这ack对于ack和 false 表示nack.
为nack实例中,原因可能包含nack,如果当nack生成。
例如,向不存在的交换发送消息时。
在这种情况下,代理关闭通道。
关闭的原因包含在cause. 这cause在 1.4 版本中添加。
只有一个ConfirmCallback由RabbitTemplate.
兔子模板发送作完成后,通道将关闭。
当连接工厂缓存已满时,这会阻止接收确认或返回(当缓存中有空间时,通道不会物理关闭,并且返回和确认正常进行)。
当缓存已满时,框架会将关闭时间推迟最多五秒钟,以便有时间接收确认和返回。
使用确认时,在收到最后一次确认时关闭通道。
仅使用返回时,通道将保持打开状态整整五秒钟。
我们通常建议将连接工厂的channelCacheSize设置为足够大的值,以便发布消息的通道返回到缓存而不是关闭。
您可以使用 RabbitMQ 管理插件监控通道使用情况。
如果您看到通道快速打开和关闭,则应考虑增加缓存大小以减少服务器上的开销。 | 
在 2.1 版之前,为发布者确认启用的通道在收到确认之前会返回到缓存中。
其他一些进程可以签出通道并执行一些导致通道关闭的作,例如将消息发布到不存在的交换。
这可能会导致确认丢失。
版本 2.1 及更高版本在确认未完成时不再将通道返回到缓存。
这RabbitTemplate执行逻辑close()每次作后在通道上。
通常,这意味着一次在一个通道上只有一个确认未完成。 | 
从 2.2 版开始,回调是在连接工厂的executor线程。
这是为了避免在回调中执行 Rabbit作时出现潜在的死锁。
在以前的版本中,回调是直接在amqp-client连接I/O线程;如果执行某些 RPC作(例如打开新通道),则会死锁,因为 I/O 线程会阻止等待结果,但结果需要由 I/O 线程本身处理。
对于这些版本,有必要将工作(例如发送消息)移交给回调中的另一个线程。
这不再需要,因为框架现在将回调调用交给执行器。 | 
| 只要返回回调在 60 秒或更短的时间内执行,就仍会保留在 ack 之前接收返回消息的保证。 确认计划在返回回调退出后或 60 秒后(以先到者为准)传递。 | 
这CorrelationData对象有一个CompletableFuture您可以使用它来获取结果,而不是使用ConfirmCallback在模板上。
以下示例演示如何配置CorrelationData实例:
CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnedMessage = cd1.getReturn();
...
由于它是CompletableFuture<Confirm>,您可以get()准备就绪或使用时的结果whenComplete()用于异步回调。
这Confirmobject 是一个具有 2 个属性的简单 bean:ack和reason(对于nack实例)。
未为代理生成的原因填充nack实例。
它填充了nack框架生成的实例(例如,关闭连接,同时ack实例是杰出的)。
此外,当同时启用确认和返回时,CorrelationData return属性将填充返回的消息,如果无法路由到任何队列。
保证在使用ack.CorrelationData.getReturn()返回一个ReturnMessage具有属性:
- 
message(返回的消息)
 - 
回复代码
 - 
回复文本
 - 
交换
 - 
路由键
 
另请参阅作用域作,了解等待发布者确认的更简单机制。
作用域作
通常,在使用模板时,一个Channel从缓存中检出(或创建),用于作,并返回到缓存中以供重用。
在多线程环境中,不能保证下一个作使用相同的通道。
但是,有时您可能希望更好地控制通道的使用,并确保多个作都在同一通道上执行。
从 2.0 版开始,一个名为invoke提供,带有OperationsCallback.
在回调范围内执行的任何作,并在提供的RabbitOperations参数使用相同的专用Channel,这将在最后关闭(不返回到缓存中)。
如果通道是PublisherCallbackChannel,则在收到所有确认后将其返回到缓存中(请参阅相关发布者确认和返回)。
@FunctionalInterface
public interface OperationsCallback<T> {
    T doInRabbit(RabbitOperations operations);
}
为什么您可能需要它的一个示例是,如果您希望使用waitForConfirms()基础上的方法Channel.
Spring API 以前没有公开此方法,因为如前所述,通道通常是缓存和共享的。
这RabbitTemplate现在提供waitForConfirms(long timeout)和waitForConfirmsOrDie(long timeout),它委托给在OperationsCallback.
出于显而易见的原因,这些方法不能在该范围之外使用。
请注意,其他地方提供了允许您将确认与请求相关联的更高级别的抽象(请参阅相关的发布者确认和返回)。 如果您只想等到代理确认交付,则可以使用以下示例中所示的技术:
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
    messages.forEach(m -> t.convertAndSend(ROUTE, m));
    t.waitForConfirmsOrDie(10_000);
    return true;
});
如果您愿意RabbitAdmin作,在OperationsCallback,则必须使用相同的RabbitTemplate用于invoke操作。
如果模板作已在现有事务的范围内执行,则上述讨论是无意义的,例如,在事务侦听器容器线程上运行并在事务处理的模板上执行作时。
在这种情况下,作将在该通道上执行,并在线程返回到容器时提交。
没有必要使用invoke在这种情况下。 | 
以这种方式使用确认时,实际上并不需要为将确认与请求相关联而设置的许多基础设施(除非还启用了返回)。
从 2.2 版开始,连接工厂支持一个名为publisherConfirmType. 当将其设置为ConfirmType.SIMPLE,避免了基础设施,并且可以更有效地进行确认处理。
此外,RabbitTemplate将publisherSequenceNumber已发送消息中的属性MessageProperties.
如果您希望检查(或记录或以其他方式使用)特定的确认,您可以使用重载的invoke方法,如以下示例所示:
public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
        com.rabbitmq.client.ConfirmCallback nacks);
这些ConfirmCallback对象(对于ack和nack实例)是 Rabbit 客户端回调,而不是模板回调。 | 
以下示例日志ack和nack实例:
Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
    messages.forEach(m -> t.convertAndSend(ROUTE, m));
    t.waitForConfirmsOrDie(10_000);
    return true;
}, (tag, multiple) -> {
        log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
        log.info("Nack: " + tag + ":" + multiple);
}));
| 作用域作绑定到线程。 有关多线程环境中严格排序的讨论,请参阅多线程环境中的严格消息排序。 | 
多线程环境中的严格消息排序
作用域作中的讨论仅适用于在同一线程上执行作时。
考虑以下情况:
- 
thread-1将消息发送到队列并将工作移交给thread-2 - 
thread-2将消息发送到同一队列 
由于 RabbitMQ 的异步性质和缓存通道的使用;不确定是否会使用相同的通道,因此无法保证消息到达队列的顺序。
(在大多数情况下,它们会按顺序到达,但无序交付的概率不是零)。
要解决此用例,您可以使用大小1(连同channelCheckoutTimeout),以确保消息始终在同一渠道发布,并且有秩序有保证。
为此,如果连接工厂有其他用途(例如使用者),则应为模板使用专用连接工厂,或将模板配置为使用嵌入在主连接工厂中的发布者连接工厂(请参阅使用单独的连接)。
用一个简单的 Spring Boot 应用程序来说明这一点:
@SpringBootApplication
public class Application {
	private static final Logger log = LoggerFactory.getLogger(Application.class);
	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}
	@Bean
	TaskExecutor exec() {
		ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
		exec.setCorePoolSize(10);
		return exec;
	}
	@Bean
	CachingConnectionFactory ccf() {
		CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
		CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();
		publisherCF.setChannelCacheSize(1);
		publisherCF.setChannelCheckoutTimeout(1000L);
		return ccf;
	}
	@RabbitListener(queues = "queue")
	void listen(String in) {
		log.info(in);
	}
	@Bean
	Queue queue() {
		return new Queue("queue");
	}
	@Bean
	public ApplicationRunner runner(Service service, TaskExecutor exec) {
		return args -> {
			exec.execute(() -> service.mainService("test"));
		};
	}
}
@Component
class Service {
	private static final Logger LOG = LoggerFactory.getLogger(Service.class);
	private final RabbitTemplate template;
	private final TaskExecutor exec;
	Service(RabbitTemplate template, TaskExecutor exec) {
		template.setUsePublisherConnection(true);
		this.template = template;
		this.exec = exec;
	}
	void mainService(String toSend) {
		LOG.info("Publishing from main service");
		this.template.convertAndSend("queue", toSend);
		this.exec.execute(() -> secondaryService(toSend.toUpperCase()));
	}
	void secondaryService(String toSend) {
		LOG.info("Publishing from secondary service");
		this.template.convertAndSend("queue", toSend);
	}
}
即使发布是在两个不同的线程上执行的,它们也会使用相同的通道,因为缓存的上限为单个通道。
从 2.3.7 版本开始,ThreadChannelConnectionFactory支持将一个线程的通道转移到另一个线程,使用prepareContextSwitch和switchContext方法。
第一个方法返回一个上下文,该上下文传递给调用第二个方法的第二个线程。
线程可以绑定到它的非事务通道或事务通道(或每个通道之一);除非使用两个连接工厂,否则不能单独传输它们。
示例如下:
@SpringBootApplication
public class Application {
	private static final Logger log = LoggerFactory.getLogger(Application.class);
	public static void main(String[] args) {
		SpringApplication.run(Application.class, args);
	}
	@Bean
	TaskExecutor exec() {
		ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
		exec.setCorePoolSize(10);
		return exec;
	}
	@Bean
	ThreadChannelConnectionFactory tccf() {
		ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
		rabbitConnectionFactory.setHost("localhost");
		return new ThreadChannelConnectionFactory(rabbitConnectionFactory);
	}
	@RabbitListener(queues = "queue")
	void listen(String in) {
		log.info(in);
	}
	@Bean
	Queue queue() {
		return new Queue("queue");
	}
	@Bean
	public ApplicationRunner runner(Service service, TaskExecutor exec) {
		return args -> {
			exec.execute(() -> service.mainService("test"));
		};
	}
}
@Component
class Service {
	private static final Logger LOG = LoggerFactory.getLogger(Service.class);
	private final RabbitTemplate template;
	private final TaskExecutor exec;
	private final ThreadChannelConnectionFactory connFactory;
	Service(RabbitTemplate template, TaskExecutor exec,
			ThreadChannelConnectionFactory tccf) {
		this.template = template;
		this.exec = exec;
		this.connFactory = tccf;
	}
	void mainService(String toSend) {
		LOG.info("Publishing from main service");
		this.template.convertAndSend("queue", toSend);
		Object context = this.connFactory.prepareSwitchContext();
		this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));
	}
	void secondaryService(String toSend, Object threadContext) {
		LOG.info("Publishing from secondary service");
		this.connFactory.switchContext(threadContext);
		this.template.convertAndSend("queue", toSend);
		this.connFactory.closeThreadChannel();
	}
}
一旦prepareSwitchContext被调用时,如果当前线程执行更多作,它们将在新通道上执行。
当不再需要线程绑定通道时,关闭它很重要。 | 
消息传递集成
从 1.4 版本开始,RabbitMessagingTemplate(建立在RabbitTemplate)提供了与 Spring Framework 消息传递抽象的集成——即,org.springframework.messaging.Message.
这样,您就可以使用spring-messaging Message<?>抽象化。
其他 Spring 项目也使用此抽象,例如 Spring Integration 和 Spring 的 STOMP 支持。
涉及两个消息转换器:一个用于在 spring-messaging 之间进行转换Message<?>和 Spring AMQP 的Message抽象和一个在 Spring AMQP 之间转换的Message抽象和底层 RabbitMQ 客户端库所需的格式。
默认情况下,消息有效负载由提供的RabbitTemplate实例的消息转换器。
或者,您可以注入自定义MessagingMessageConverter使用其他一些有效负载转换器,如以下示例所示:
MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);
已验证的用户 ID
从 1.6 版开始,模板现在支持user-id-expression (userIdExpression使用 Java 配置时)。
如果发送了消息,那么在计算此表达式后设置用户标识属性(如果尚未设置)。
评估的根对象是要发送的消息。
以下示例演示如何使用user-id-expression属性:
<rabbit:template ... user-id-expression="'guest'" />
<rabbit:template ... user-id-expression="@myConnectionFactory.username" />
第一个示例是文字表达式。
第二个获取username应用程序上下文中连接工厂 bean 的属性。
使用单独的连接
从 2.0.2 版开始,您可以将usePublisherConnection属性设置为true尽可能使用与侦听器容器使用的连接不同的连接。
这是为了避免当生产者因任何原因被阻止时,消费者被阻止。
为此,连接工厂维护第二个内部连接工厂;默认情况下,它与主工厂类型相同,但如果您希望使用不同的工厂类型进行发布,则可以显式设置。
如果 rabbit 模板在侦听器容器启动的事务中运行,则无论此设置如何,都会使用容器的通道。
通常,您不应该使用RabbitAdmin将此设置为true.
使用RabbitAdmin采用连接工厂的构造函数。
如果使用采用模板的另一个构造函数,请确保模板的属性为false.
这是因为,管理员通常用于声明侦听器容器的队列。
使用属性设置为true意味着独占队列(例如AnonymousQueue) 将在与侦听器容器使用的连接不同的连接上声明。
在这种情况下,容器无法使用队列。 |