|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0! |
RabbitMQ Stream插件的初始消费者支持
现已提供对RabbitMQ流插件的基本支持。要启用此功能,您必须添加春兔溪jar 映射到类路径——它必须是与春季-AMQP和春兔.
上述消费者属性在设置containerType属性到流; 并发仅支持超级流。每次绑定只能消耗一个流队列。 |
配置绑定器以使用containerType=stream,Spring Boot 会自动配置环境 @Bean来自应用属性。你可以选择添加自定义工具来自定义监听器容器。
@Bean
ListenerContainerCustomizer<MessageListenerContainer> customizer() {
return (cont, dest, group) -> {
StreamListenerContainer container = (StreamListenerContainer) cont;
container.setConsumerCustomizer((name, builder) -> {
builder.offset(OffsetSpecification.first());
});
// ...
};
}
这名称传递给定制器的参数为destination + '.' + group + '.container'.
溪流名称()(用于偏移跟踪)被设置为绑定目的地 + '.' + 组. 可以通过消费者定制器如上图所示。如果你决定使用手动偏移跟踪,则上下文以消息头部形式提供:
int count;
@Bean
public Consumer<Message<?>> input() {
return msg -> {
System.out.println(msg);
if (++count % 1000 == 0) {
Context context = msg.getHeaders().get("rabbitmq_streamContext", Context.class);
context.consumer().store(context.offset());
}
};
}
有关环境配置和消费者构建器的信息,请参阅RabbitMQ Stream Java客户端文档。
RabbitMQ 超级流的消费者支持
有关超级直播的信息,请参见超级直播。
使用超级流允许在每个超级流分区上自动进行扩展和缩放,每个分区上都有一个活跃的消费者。
配置示例:
@Bean
public Consumer<Thing> input() {
...
}
spring.cloud.stream.bindings.input-in-0.destination=super
spring.cloud.stream.bindings.input-in-0.group=test
spring.cloud.stream.bindings.input-in-0.consumer.instance-count=3
spring.cloud.stream.bindings.input-in-0.consumer.concurrency=3
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.container-type=STREAM
spring.cloud.stream.rabbit.bindings.input-in-0.consumer.super-stream=true
框架会创建一个名为超,共有9个分区。最多可部署3个该应用实例。