该版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring AMQP 3.2.6! |
使用 RabbitMQ Stream 插件
版本 2.4 引入了对 RabbitMQ Stream 插件的初始支持RabbitMQ Stream 插件 Java 客户端。
-
RabbitStreamTemplate
-
StreamListenerContainer
添加spring-rabbit-stream
对项目的依赖:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-stream</artifactId>
<version>4.0.0-M4</version>
</dependency>
compile 'org.springframework.amqp:spring-rabbit-stream:4.0.0-M4'
您可以像往常一样配置队列,使用RabbitAdmin
bean,使用QueueBuilder.stream()
方法来指定队列类型。
例如:
@Bean
Queue stream() {
return QueueBuilder.durable("stream.queue1")
.stream()
.build();
}
但是,这仅在您还使用非流组件(例如SimpleMessageListenerContainer
或DirectMessageListenerContainer
),因为在打开 AMQP 连接时,会触发管理员声明定义的 bean。
如果您的应用程序仅使用流组件,或者您希望使用高级流配置功能,则应配置StreamAdmin
相反:
@Bean
StreamAdmin streamAdmin(Environment env) {
return new StreamAdmin(env, sc -> {
sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
sc.stream("stream.queue2").create();
});
}
有关StreamCreator
.
发送消息
这RabbitStreamTemplate
提供了RabbitTemplate
(AMQP) 功能。
public interface RabbitStreamOperations extends AutoCloseable {
CompletableFuture<Boolean> send(Message message);
CompletableFuture<Boolean> convertAndSend(Object message);
CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);
CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);
MessageBuilder messageBuilder();
MessageConverter messageConverter();
StreamMessageConverter streamMessageConverter();
@Override
void close() throws AmqpException;
}
这RabbitStreamTemplate
实现具有以下构造函数和属性:
public RabbitStreamTemplate(Environment environment, String streamName) {
}
public void setMessageConverter(MessageConverter messageConverter) {
}
public void setStreamConverter(StreamMessageConverter streamConverter) {
}
public void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}
这MessageConverter
用于convertAndSend
将对象转换为 Spring AMQP 的方法Message
.
这StreamMessageConverter
用于从 Spring AMQP 转换Message
到本机流Message
.
您还可以发送原生流Message
直接;使用messageBuilder()
提供对Producer
的消息生成器。
这ProducerCustomizer
提供了一种在构建生产者之前对其进行自定义的机制。
请参阅 Java 客户端文档,了解如何自定义Environment
和Producer
.
接收消息
异步消息接收由StreamListenerContainer
(和StreamRabbitListenerContainerFactory
使用时@RabbitListener
).
侦听器容器需要一个Environment
以及单个流名称。
您可以接收 Spring AMQPMessage
s 使用经典MessageListener
,或者您可以接收原生流Message
s 使用新接口:
public interface StreamMessageListener extends MessageListener {
void onStreamMessage(Message message, Context context);
}
有关支持的属性的信息,请参阅消息侦听器容器配置。
与模板类似,容器有一个ConsumerCustomizer
财产。
请参阅 Java 客户端文档,了解如何自定义Environment
和Consumer
.
使用时@RabbitListener
,配置一个StreamRabbitListenerContainerFactory
;此时,大多数@RabbitListener
属性 (concurrency
等)被忽略。只id
,queues
,autoStartup
和containerFactory
被支持。
另外queues
只能包含一个流名称。
例子
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
template.setProducerCustomizer((name, builder) -> builder.name("test"));
return template;
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
return new StreamRabbitListenerContainerFactory(env);
}
@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
...
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
factory.setNativeListener(true);
factory.setConsumerCustomizer((id, builder) -> {
builder.name("myConsumer")
.offset(OffsetSpecification.first())
.manualTrackingStrategy();
});
return factory;
}
@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
...
context.storeOffset();
}
@Bean
Queue stream() {
return QueueBuilder.durable("test.stream.queue1")
.stream()
.build();
}
@Bean
Queue stream() {
return QueueBuilder.durable("test.stream.queue2")
.stream()
.build();
}
版本 2.4.5 添加了adviceChain
属性设置为StreamListenerContainer
(及其工厂)。
还提供了一个新的工厂 bean 来创建一个无状态重试拦截器,其中包含可选的StreamMessageRecoverer
用于使用原始流消息时。
@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
StreamRetryOperationsInterceptorFactoryBean rfb =
new StreamRetryOperationsInterceptorFactoryBean();
rfb.setRetryOperations(retryTemplate);
rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
...
});
return rfb;
}
此容器不支持有状态重试。 |
超级流
超级流是分区流的一个抽象概念,通过将多个流队列绑定到具有参数的交换来实现x-super-stream: true
.
供应
为方便起见,可以通过定义类型为SuperStream
.
@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3);
}
这RabbitAdmin
检测到此 bean 并将声明交换 (my.super.stream
) 和 3 个队列(分区) -my.super-stream-n
哪里n
是0
,1
,2
,绑定路由键等于n
.
如果您还希望通过 AMQP 发布到交易所,则可以提供自定义路由密钥:
@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
.mapToObj(j -> "rk-" + j)
.collect(Collectors.toList()));
}
键的数量必须等于分区的数量。
生产到 SuperStream
您必须添加一个superStreamRoutingFunction
到RabbitStreamTemplate
:
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
template.setSuperStreamRouting(message -> {
// some logic to return a String for the client's hashing algorithm
});
return template;
}
您还可以使用RabbitTemplate
.
使用单个活跃消费者的超级流
调用superStream
方法,以在超级流上启用单个活动使用者。
@Bean
StreamListenerContainer container(Environment env, String name) {
StreamListenerContainer container = new StreamListenerContainer(env);
container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
container.setupMessageListener(msg -> {
...
});
container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
return container;
}
这时,当并发大于1时,实际并发由Environment ;要实现完全并发,请将环境的maxConsumersByConnection 到 1.
请参阅配置环境。 |
千分尺观察
从版本 3.0.5 开始,现在支持使用 Micrometer 进行观察,用于RabbitStreamTemplate
和流侦听器容器。
容器现在还支持千分尺计时器(未启用观察时)。
设置observationEnabled
在每个组件上进行观察;这将禁用千分尺计时器,因为计时器现在将通过每次观测进行管理。
使用带注释的监听器时,将observationEnabled
在集装箱工厂。
有关更多信息,请参阅千分尺追踪。
要向计时器/跟踪添加标记,请配置自定义RabbitStreamTemplateObservationConvention
或RabbitStreamListenerObservationConvention
分别添加到模板或侦听器容器。
默认实现将name
标签用于模板观察,以及listener.id
标签。
您可以将子类化DefaultRabbitStreamTemplateObservationConvention
或DefaultStreamRabbitListenerObservationConvention
或提供全新的实现。
有关更多详细信息,请参阅千分尺观察文档。