请使用 Spring AMQP 4.0.2(最新稳定版本)!spring-doc.cadn.net.cn

使用 RabbitMQ 流插件

版本 2.4 引入了对 RabbitMQ Stream 插件 Java 客户端 的初始支持,该客户端适用于 RabbitMQ Stream 插件spring-doc.cadn.net.cn

spring-rabbit-stream 依赖项添加到您的项目中:spring-doc.cadn.net.cn

Maven
<dependency>
  <groupId>org.springframework.amqp</groupId>
  <artifactId>spring-rabbit-stream</artifactId>
  <version>3.2.9</version>
</dependency>
Gradle
compile 'org.springframework.amqp:spring-rabbit-stream:3.2.9'

您可以像平常一样配置队列,使用 RabbitAdmin Bean,并通过 QueueBuilder.stream() 方法指定队列类型。例如:spring-doc.cadn.net.cn

@Bean
Queue stream() {
    return QueueBuilder.durable("stream.queue1")
            .stream()
            .build();
}

然而,这仅在您同时使用非流式组件(例如 SimpleMessageListenerContainerDirectMessageListenerContainer)时才有效,因为当建立 AMQP 连接时,管理器会触发以声明已定义的 Bean。
如果您的应用程序仅使用流式组件,或希望使用高级流式配置功能,则应改用配置 StreamAdminspring-doc.cadn.net.cn

@Bean
StreamAdmin streamAdmin(Environment env) {
    return new StreamAdmin(env, sc -> {
        sc.stream("stream.queue1").maxAge(Duration.ofHours(2)).create();
        sc.stream("stream.queue2").create();
    });
}

请参考 RabbitMQ 文档以获取有关 StreamCreator 的更多信息。spring-doc.cadn.net.cn

发送消息

The RabbitStreamTemplate provides a subset of the RabbitTemplate (AMQP) functionality.spring-doc.cadn.net.cn

RabbitStreamOperations
public interface RabbitStreamOperations extends AutoCloseable {

	CompletableFuture<Boolean> send(Message message);

	CompletableFuture<Boolean> convertAndSend(Object message);

	CompletableFuture<Boolean> convertAndSend(Object message, @Nullable MessagePostProcessor mpp);

	CompletableFuture<Boolean> send(com.rabbitmq.stream.Message message);

	MessageBuilder messageBuilder();

	MessageConverter messageConverter();

	StreamMessageConverter streamMessageConverter();

	@Override
	void close() throws AmqpException;

}

RabbitStreamTemplate 实现包含以下构造函数和属性:spring-doc.cadn.net.cn

RabbitStreamTemplate
public RabbitStreamTemplate(Environment environment, String streamName) {
}

public void setMessageConverter(MessageConverter messageConverter) {
}

public void setStreamConverter(StreamMessageConverter streamConverter) {
}

public synchronized void setProducerCustomizer(ProducerCustomizer producerCustomizer) {
}

MessageConverter 中使用于 convertAndSend 方法中,将对象转换为 Spring AMQP Messagespring-doc.cadn.net.cn

The StreamMessageConverter is used to convert from a Spring AMQP Message to a native stream Message.spring-doc.cadn.net.cn

您还可以直接发送原生流 Message;其中 messageBuilder() 方法提供对 Producer 的消息构建器的访问。spring-doc.cadn.net.cn

The ProducerCustomizer provides a mechanism to customize the producer before it is built.spring-doc.cadn.net.cn

参阅 Java 客户端文档,了解如何自定义 EnvironmentProducerspring-doc.cadn.net.cn

接收消息

异步消息接收由 StreamListenerContainer(在使用 @RabbitListener 时还包括 StreamRabbitListenerContainerFactory)提供。spring-doc.cadn.net.cn

监听器容器还需要一个 Environment 以及一个流名称。spring-doc.cadn.net.cn

您可以使用经典方式 MessageListener 接收 Spring AMQP Message 消息,也可以使用新接口接收原生流式 Message 消息:spring-doc.cadn.net.cn

public interface StreamMessageListener extends MessageListener {

	void onStreamMessage(Message message, Context context);

}

请参阅 消息监听容器配置,以获取有关支持属性的信息。spring-doc.cadn.net.cn

与模板类似,容器具有一个 ConsumerCustomizer 属性。spring-doc.cadn.net.cn

参阅 Java 客户端文档,了解如何自定义 EnvironmentConsumerspring-doc.cadn.net.cn

当使用 @RabbitListener 时,需配置一个 StreamRabbitListenerContainerFactory;此时,大多数 @RabbitListener 属性(concurrency 等)将被忽略。仅支持 idqueuesautoStartupcontainerFactory
此外,queues 中只能包含一个流名称。spring-doc.cadn.net.cn

示例

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "test.stream.queue1");
    template.setProducerCustomizer((name, builder) -> builder.name("test"));
    return template;
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> rabbitListenerContainerFactory(Environment env) {
    return new StreamRabbitListenerContainerFactory(env);
}

@RabbitListener(queues = "test.stream.queue1")
void listen(String in) {
    ...
}

@Bean
RabbitListenerContainerFactory<StreamListenerContainer> nativeFactory(Environment env) {
    StreamRabbitListenerContainerFactory factory = new StreamRabbitListenerContainerFactory(env);
    factory.setNativeListener(true);
    factory.setConsumerCustomizer((id, builder) -> {
        builder.name("myConsumer")
                .offset(OffsetSpecification.first())
                .manualTrackingStrategy();
    });
    return factory;
}

@RabbitListener(id = "test", queues = "test.stream.queue2", containerFactory = "nativeFactory")
void nativeMsg(Message in, Context context) {
    ...
    context.storeOffset();
}

@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue1")
            .stream()
            .build();
}

@Bean
Queue stream() {
    return QueueBuilder.durable("test.stream.queue2")
            .stream()
            .build();
}

版本 2.4.5 为 StreamListenerContainer(及其工厂)新增了 adviceChain 属性。此外,还提供了一个新的工厂 Bean,用于创建无状态重试拦截器,并可选地使用 StreamMessageRecoverer 来处理原始流消息的消费。spring-doc.cadn.net.cn

@Bean
public StreamRetryOperationsInterceptorFactoryBean sfb(RetryTemplate retryTemplate) {
    StreamRetryOperationsInterceptorFactoryBean rfb =
            new StreamRetryOperationsInterceptorFactoryBean();
    rfb.setRetryOperations(retryTemplate);
    rfb.setStreamMessageRecoverer((msg, context, throwable) -> {
        ...
    });
    return rfb;
}
状态化重试在此容器中不受支持。

超级流

超级流(Super Stream)是一个用于分区流的抽象概念,通过将多个流队列绑定到一个交换机(exchange),并设置参数 x-super-stream: true 来实现。spring-doc.cadn.net.cn

配置

为了方便起见,可以通过定义一个类型为 SuperStream 的单一 Bean 来配置一个超级流。spring-doc.cadn.net.cn

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3);
}

The RabbitAdmin 检测到此 Bean,并将声明交换机(my.super.stream)和 3 个队列(分区)— my.super-stream-n,其中 n012,并使用路由键 n 进行绑定。spring-doc.cadn.net.cn

如果您也希望通过 AMQP 将消息发布到交换机,可以提供自定义的路由键:spring-doc.cadn.net.cn

@Bean
SuperStream superStream() {
    return new SuperStream("my.super.stream", 3, (q, i) -> IntStream.range(0, i)
					.mapToObj(j -> "rk-" + j)
					.collect(Collectors.toList()));
}

键的数量必须等于分区的数量。spring-doc.cadn.net.cn

生产到超级流

您必须在 superStreamRoutingFunction 中添加 RabbitStreamTemplatespring-doc.cadn.net.cn

@Bean
RabbitStreamTemplate streamTemplate(Environment env) {
    RabbitStreamTemplate template = new RabbitStreamTemplate(env, "stream.queue1");
    template.setSuperStreamRouting(message -> {
        // some logic to return a String for the client's hashing algorithm
    });
    return template;
}

您还可以通过 AMQP 发布,使用 RabbitTemplatespring-doc.cadn.net.cn

使用单个活跃消费者消费超级流

调用监听器容器上的 superStream 方法,以在超流上启用单个活动消费者。spring-doc.cadn.net.cn

@Bean
StreamListenerContainer container(Environment env, String name) {
    StreamListenerContainer container = new StreamListenerContainer(env);
    container.superStream("ss.sac", "myConsumer", 3); // concurrency = 3
    container.setupMessageListener(msg -> {
        ...
    });
    container.setConsumerCustomizer((id, builder) -> builder.offset(OffsetSpecification.last()));
    return container;
}
此时,当并发数大于1时,实际并发数进一步由Environment进行控制;若要实现完全并发,请将环境的maxConsumersByConnection设置为1。参见配置环境

Micrometer 观测

自版本 3.0.5 起,现已支持使用 Micrometer 进行观测,适用于 RabbitStreamTemplate 和流监听器容器。该容器现在也支持 Micrometer 计时器(当未启用观测时)。spring-doc.cadn.net.cn

将每个组件的 observationEnabled 设置为启用观察;这将禁用 Micrometer 计时器,因为计时器现在将由每次观察进行管理。当使用注解监听器时,请在容器工厂上设置 observationEnabledspring-doc.cadn.net.cn

有关更多信息,请参阅 Micrometer 追踪spring-doc.cadn.net.cn

要为计时器/跟踪添加标签,请分别在模板或监听器容器中配置自定义 RabbitStreamTemplateObservationConventionRabbitStreamListenerObservationConventionspring-doc.cadn.net.cn

默认实现为模板观察添加 name 标签,为容器添加 listener.id 标签。spring-doc.cadn.net.cn

您可以选择继承 DefaultRabbitStreamTemplateObservationConventionDefaultStreamRabbitListenerObservationConvention,或者提供完全新的实现。spring-doc.cadn.net.cn

请参阅 Micrometer 观测文档 以获取更多详情。spring-doc.cadn.net.cn