AMQP

高级消息排队协议(AMQP)是一种面向消息中间件的平台中立、有线级协议。 Spring AMQP 项目将 Spring 的核心概念应用于基于 AMQP 的消息解决方案开发。 Spring Boot 为通过 RabbitMQ 与 AMQP 工作提供了多种便利功能,包括Spring BootStarters AMQP起动机。spring-doc.cadn.net.cn

RabbitMQ 支持

RabbitMQ 是一款基于 AMQP 协议的轻量级、可靠、可扩展且可移植的消息代理。 Spring 使用 RabbitMQ 通过 AMQP 协议进行通信。spring-doc.cadn.net.cn

RabbitMQ 配置由外部配置属性控制,在Spring.兔子问题。*. 例如,你可以声明以下部分application.properties:spring-doc.cadn.net.cn

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
  rabbitmq:
    host: "localhost"
    port: 5672
    username: "admin"
    password: "secret"

或者,你也可以用地址属性:spring-doc.cadn.net.cn

spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
  rabbitmq:
    addresses: "amqp://admin:secret@localhost"
当以这种方式指定地址时,主机端口属性被忽略。 如果地址使用AMQPS协议,SSL支持会自动启用。

兔子属性更多支持的基于属性的配置选项。 配置RabbitMQ的底层细节连接工厂Spring AMQP 使用的,定义一个ConnectionFactory定制器豆。spring-doc.cadn.net.cn

如果连接名称策略BEAN 存在于上下文中,它会自动用于命名由自动配置创建的连接缓存连接工厂.spring-doc.cadn.net.cn

发送消息

斯普林斯Amqp模板AmqpAdmin是自动配置的,你可以直接将它们自动接线到你自己的豆子中,如下示例所示:spring-doc.cadn.net.cn

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	private final AmqpAdmin amqpAdmin;

	private final AmqpTemplate amqpTemplate;

	public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
		this.amqpAdmin = amqpAdmin;
		this.amqpTemplate = amqpTemplate;
	}

	// ...

	public void someMethod() {
		this.amqpAdmin.getQueueInfo("someQueue");
	}

	public void someOtherMethod() {
		this.amqpTemplate.convertAndSend("hello");
	}

}
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component

@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {

	// ...

	fun someMethod() {
		amqpAdmin.getQueueInfo("someQueue")
	}

	fun someOtherMethod() {
		amqpTemplate.convertAndSend("hello")
	}

}
兔子消息模板也可以以类似方式注射。 如果消息转换器BEAN 是定义的,它会自动关联到自动配置Amqp模板.

如有必要,任何队列定义为 bean 自动用于在 RabbitMQ 实例上声明对应的队列。spring-doc.cadn.net.cn

要重试作,你可以在Amqp模板(例如,如果经纪人连接丢失):spring-doc.cadn.net.cn

spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
  rabbitmq:
    template:
      retry:
        enabled: true
        initial-interval: "2s"

重试默认是被禁用的。 你也可以自定义重试模板程序化方式是声明 a兔子模板重新尝试设置自定义器豆。spring-doc.cadn.net.cn

如果你需要更多兔子模板或者如果你想覆盖默认值,Spring Boot 提供了RabbitTemplateConfigurer你可以用来初始化一个兔子模板设置与自动配置中使用的工厂相同。spring-doc.cadn.net.cn

向流发送消息

要向特定流发送消息,请指定流的名称,如下例所示:spring-doc.cadn.net.cn

spring.rabbitmq.stream.name=my-stream
spring:
  rabbitmq:
    stream:
      name: "my-stream"

如果消息转换器,流消息转换器制作定制器BEAN 是定义的,它会自动关联到自动配置兔子流模板.spring-doc.cadn.net.cn

如果你需要更多兔子流模板或者如果你想覆盖默认值,Spring Boot 提供了RabbitStreamTemplateConfigurer你可以用来初始化一个兔子流模板设置与自动配置中使用的工厂相同。spring-doc.cadn.net.cn

接收消息

当兔子基础设施存在时,任何豆子都可以注释为@RabbitListener创建监听端点。 如果没有兔子听众容器工厂已被定义,成为默认值SimpleRabbitListenerContainerFactory会自动配置,你可以通过spring.rabbitmq.listener.type财产。 如果消息转换器或者消息恢复器豆子是定义好的,它会自动关联到默认工厂。spring-doc.cadn.net.cn

以下示例组件在某些队列队列:spring-doc.cadn.net.cn

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"])
	fun processMessage(content: String?) {
		// ...
	}

}
@EnableRabbit更多细节请阅读。

如果你需要更多兔子听众容器工厂或者如果你想覆盖默认值,Spring Boot 提供了SimpleRabbitListenerContainerFactoryConfigurer以及一个DirectRabbitListenerContainerFactoryConfigurer你可以用它来初始化一个SimpleRabbitListenerContainerFactory以及一个DirectRabbitListenerContainerFactory设置与自动配置中使用的工厂相同。spring-doc.cadn.net.cn

无论你选择哪种容器类型。 这两颗豆子会被自动配置暴露出来。

例如,以下配置类暴露了另一个使用特定消息转换器:spring-doc.cadn.net.cn

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.amqp.autoconfigure.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {

	@Bean
	public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
		SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
		ConnectionFactory connectionFactory = getCustomConnectionFactory();
		configurer.configure(factory, connectionFactory);
		factory.setMessageConverter(new MyMessageConverter());
		return factory;
	}

	private ConnectionFactory getCustomConnectionFactory() {
		return ...
	}

}
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.amqp.autoconfigure.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration

@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {

	@Bean
	fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
		val factory = SimpleRabbitListenerContainerFactory()
		val connectionFactory = getCustomConnectionFactory()
		configurer.configure(factory, connectionFactory)
		factory.setMessageConverter(MyMessageConverter())
		return factory
	}

	fun getCustomConnectionFactory() : ConnectionFactory {
		return ...
	}

}

然后你可以在任何地方使用工厂@RabbitListener-注释方法,具体如下:spring-doc.cadn.net.cn

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

	@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
	public void processMessage(String content) {
		// ...
	}

}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component

@Component
class MyBean {

	@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
	fun processMessage(content: String?) {
		// ...
	}

}

你可以启用重试来处理监听者抛出异常的情况。 默认情况下,RejectAndDontRequeueRecoveryer使用了,但你可以定义一个消息恢复器属于你自己的。 当重试次数用尽时,消息被拒绝,并被丢弃或如果经纪人配置为死信交换机,则被路由到死信交换机。 默认情况下,重试是被禁用的。 你也可以自定义RetryPolicy程序化方式是声明 aRabbitListenerRetrySettingsCustomizer豆。spring-doc.cadn.net.cn

默认情况下,如果重试被禁用且监听者抛出异常,则该投递会无限期重试。 你可以通过两种方式修改这个行为:设置defaultRequeueRejected属性到false因此,尝试零重投或抛出AmqpRejectAndDontRequeueException信号应拒绝该消息。 后者是在启用重试且达到最大投递次数时使用的机制。