|
请使用 Spring AMQP 4.0.2(最新稳定版本)! |
使用 RabbitMQ 流插件
版本 2.4 引入了对 RabbitMQ Stream 插件 Java 客户端 的初始支持,该客户端适用于 RabbitMQ Stream 插件。
-
RabbitStreamTemplate -
StreamListenerContainer
将 spring-rabbit-stream 依赖项添加到您的项目中:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-stream</artifactId>
<version>3.2.9</version>
</dependency>
compile 'org.springframework.amqp:spring-rabbit-stream:3.2.9'
您可以像平常一样配置队列,使用 RabbitAdmin Bean,并通过 QueueBuilder.stream() 方法指定队列类型。例如:
@Bean
Queue stream() {
return QueueBuilder.durable("stream.queue1")
.stream()
.build();
}
然而,这仅在您同时使用非流式组件(例如 SimpleMessageListenerContainer 或 DirectMessageListenerContainer)时才有效,因为当建立 AMQP 连接时,管理器会触发以声明已定义的 Bean。
如果您的应用程序仅使用流式组件,或希望使用高级流式配置功能,则应改用配置 StreamAdmin:
@Bean
StreamAdmin streamAdmin(Environment env) {
return new StreamAdmin(env, sc -> {
sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
sc.stream("stream.queue2").create();
});
}
请参考 RabbitMQ 文档以获取有关 StreamCreator 的更多信息。
发送消息
The RabbitStreamTemplate provides a subset of the RabbitTemplate (AMQP) functionality.
public interface RabbitStreamOperations extends AutoCloseable {
CompletableFuture<Boolean> send(Message message);
CompletableFuture<Boolean> convertAndSend(Object message);
CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);
CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);
MessageBuilder messageBuilder();
MessageConverter messageConverter();
StreamMessageConverter streamMessageConverter();
@Override
void close() throws AmqpException;
}
该 RabbitStreamTemplate 实现包含以下构造函数和属性:
public RabbitStreamTemplate(Environment environment, String streamName) {
}
public void setMessageConverter(MessageConverter messageConverter) {
}
public void setStreamConverter(StreamMessageConverter streamConverter) {
}
public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}
在 MessageConverter 中使用于 convertAndSend 方法中,将对象转换为 Spring AMQP Message。
The StreamMessageConverter is used to convert from a Spring AMQP Message to a native stream Message.
您还可以直接发送原生流 Message;其中 messageBuilder() 方法提供对 Producer 的消息构建器的访问。
The ProducerCustomizer provides a mechanism to customize the producer before it is built.
参阅 Java 客户端文档,了解如何自定义 Environment 和 Producer。
接收消息
异步消息接收由 StreamListenerContainer(在使用 @RabbitListener 时还包括 StreamRabbitListenerContainerFactory)提供。
监听器容器还需要一个 Environment 以及一个流名称。
您可以使用经典方式 MessageListener 接收 Spring AMQP Message 消息,也可以使用新接口接收原生流式 Message 消息:
public interface StreamMessageListener extends MessageListener {
void onStreamMessage(Message message, Context context);
}
请参阅 消息监听容器配置,以获取有关支持属性的信息。
与模板类似,容器具有一个 ConsumerCustomizer 属性。
参阅 Java 客户端文档,了解如何自定义 Environment 和 Consumer。
当使用 @RabbitListener 时,需配置一个 StreamRabbitListenerContainerFactory;此时,大多数 @RabbitListener 属性(concurrency 等)将被忽略。仅支持 id、queues、autoStartup 和 containerFactory。
此外,queues 中只能包含一个流名称。
示例
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
template.setProducerCustomizer((name, builder) -> builder.name("test"));
return template;
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
return new StreamRabbitListenerContainerFactory(env);
}
@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
...
}
@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
factory.setNativeListener(true);
factory.setConsumerCustomizer((id, builder) -> {
builder.name("myConsumer")
.offset(OffsetSpecification.first())
.manualTrackingStrategy();
});
return factory;
}
@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
...
context.storeOffset();
}
@Bean
Queue stream() {
return QueueBuilder.durable("test.stream.queue1")
.stream()
.build();
}
@Bean
Queue stream() {
return QueueBuilder.durable("test.stream.queue2")
.stream()
.build();
}
版本 2.4.5 为 StreamListenerContainer(及其工厂)新增了 adviceChain 属性。此外,还提供了一个新的工厂 Bean,用于创建无状态重试拦截器,并可选地使用 StreamMessageRecoverer 来处理原始流消息的消费。
@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
StreamRetryOperationsInterceptorFactoryBean rfb =
new StreamRetryOperationsInterceptorFactoryBean();
rfb.setRetryOperations(retryTemplate);
rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
...
});
return rfb;
}
| 状态化重试在此容器中不受支持。 |
超级流
超级流(Super Stream)是一个用于分区流的抽象概念,通过将多个流队列绑定到一个交换机(exchange),并设置参数 x-super-stream: true 来实现。
配置
为了方便起见,可以通过定义一个类型为 SuperStream 的单一 Bean 来配置一个超级流。
@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3);
}
The RabbitAdmin 检测到此 Bean,并将声明交换机(my.super.stream)和 3 个队列(分区)— my.super-stream-n,其中 n 是 0、1、2,并使用路由键 n 进行绑定。
如果您也希望通过 AMQP 将消息发布到交换机,可以提供自定义的路由键:
@Bean
SuperStream superStream() {
return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
.mapToObj(j -> "rk-" + j)
.collect(Collectors.toList()));
}
键的数量必须等于分区的数量。
生产到超级流
您必须在 superStreamRoutingFunction 中添加 RabbitStreamTemplate:
@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
template.setSuperStreamRouting(message -> {
// some logic to return a String for the client's hashing algorithm
});
return template;
}
您还可以通过 AMQP 发布,使用 RabbitTemplate。
使用单个活跃消费者消费超级流
调用监听器容器上的 superStream 方法,以在超流上启用单个活动消费者。
@Bean
StreamListenerContainer container(Environment env, String name) {
StreamListenerContainer container = new StreamListenerContainer(env);
container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
container.setupMessageListener(msg -> {
...
});
container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
return container;
}
此时,当并发数大于1时,实际并发数进一步由Environment进行控制;若要实现完全并发,请将环境的maxConsumersByConnection设置为1。参见配置环境。 |
Micrometer 观测
自版本 3.0.5 起,现已支持使用 Micrometer 进行观测,适用于 RabbitStreamTemplate 和流监听器容器。该容器现在也支持 Micrometer 计时器(当未启用观测时)。
将每个组件的 observationEnabled 设置为启用观察;这将禁用 Micrometer 计时器,因为计时器现在将由每次观察进行管理。当使用注解监听器时,请在容器工厂上设置 observationEnabled。
有关更多信息,请参阅 Micrometer 追踪。
要为计时器/跟踪添加标签,请分别在模板或监听器容器中配置自定义 RabbitStreamTemplateObservationConvention 或 RabbitStreamListenerObservationConvention。
默认实现为模板观察添加 name 标签,为容器添加 listener.id 标签。
您可以选择继承 DefaultRabbitStreamTemplateObservationConvention 或 DefaultStreamRabbitListenerObservationConvention,或者提供完全新的实现。
请参阅 Micrometer 观测文档 以获取更多详情。