RabbitMQ Stream插件的初始消费者支持

现已提供对RabbitMQ流插件的基本支持。 要启用此功能,您必须添加春兔溪jar 映射到类路径——它必须是与春季-AMQP春兔.spring-doc.cadn.net.cn

上述消费者属性在设置containerType属性到; 并发仅支持超级直播。 每次绑定只能消耗一个流队列。

配置绑定器以使用containerType=stream,Spring Boot 会自动配置环境 @Bean从应用属性中获得。 你可以选择添加自定义工具来自定义监听器容器。spring-doc.cadn.net.cn

@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
    return (cont, dest, group) -> {
        StreamListenerContainer container = (StreamListenerContainer) cont;
        container.setConsumerCustomizer((name, builder) -> {
            builder.offset(OffsetSpecification.first());
        });
        // ...
    };
}

名称传递给定制器的参数为destination + '.' + group + '.container'.spring-doc.cadn.net.cn

溪流名称()(用于偏移跟踪)被设置为绑定目的地 + '.' + 组. 可以通过消费者定制器如上图所示。 如果你决定使用手动偏移跟踪,那上下文以消息头部形式提供:spring-doc.cadn.net.cn

int count;

@Bean
public Consumer<Message<?>> input() {
    return msg -> {
        System.out.println(msg);
        if (++count % 1000 == 0) {
            Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
            context.consumer().store(context.offset());
        }
    };
}

有关环境配置和消费者构建器的信息,请参阅RabbitMQ Stream Java客户端文档spring-doc.cadn.net.cn

RabbitMQ 超级流的消费者支持

有关超级直播的信息,请参见超级直播spring-doc.cadn.net.cn

使用超级流允许在每个超级流分区上自动进行扩展和缩放,每个分区上都有一个活跃的消费者。spring-doc.cadn.net.cn

配置示例:spring-doc.cadn.net.cn

@Bean
public Consumer<Thing> input() {
    ...
}
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true

框架会创建一个名为,有9个分区。 该应用最多可部署3个实例。spring-doc.cadn.net.cn