此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9! |
Kafka 队列(共享消费者)
从 4.0 版开始,Spring for Apache Kafka 通过共享消费者提供对 Kafka 队列的支持,共享消费者是 Apache Kafka 4.0.0 的一部分,并实现了 KIP-932(Kafka 队列)。 此功能目前处于抢先体验阶段。
与传统消费者组相比,Kafka 队列支持不同的消费模型。 与基于分区的分配模型不同,其中每个分区都专门分配给一个使用者,共享使用者可以从相同的分区协作使用,并在共享组中的使用者之间分配记录。
分享消费工厂
这ShareConsumerFactory
负责创建共享消费者实例。
Spring Kafka 提供了DefaultShareConsumerFactory
实现。
配置
您可以配置DefaultShareConsumerFactory
类似于配置常规ConsumerFactory
:
@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-share-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultShareConsumerFactory<>(props);
}
构造函数选项
这DefaultShareConsumerFactory
提供了几个构造函数选项:
// Basic configuration
new DefaultShareConsumerFactory<>(configs);
// With deserializer suppliers
new DefaultShareConsumerFactory<>(configs, keyDeserializerSupplier, valueDeserializerSupplier);
// With deserializer instances
new DefaultShareConsumerFactory<>(configs, keyDeserializer, valueDeserializer, configureDeserializers);
解序列化器配置
您可以通过多种方式配置反序列化程序:
-
通过配置属性(推荐用于简单情况):
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
通过二传器:
factory.setKeyDeserializer(new StringDeserializer()); factory.setValueDeserializer(new StringDeserializer());
-
通过提供商(对于需要为每个使用者创建解串化器的情况):
factory.setKeyDeserializerSupplier(() -> new StringDeserializer()); factory.setValueDeserializerSupplier(() -> new StringDeserializer());
设置configureDeserializers
自false
如果您的解序列化程序已完全配置,并且不应由工厂重新配置。
生命周期侦听器
您可以添加监听器来监控共享消费者的生命周期:
factory.addListener(new ShareConsumerFactory.Listener<String, String>() {
@Override
public void consumerAdded(String id, ShareConsumer<String, String> consumer) {
// Called when a new consumer is created
System.out.println("Consumer added: " + id);
}
@Override
public void consumerRemoved(String id, ShareConsumer<String, String> consumer) {
// Called when a consumer is closed
System.out.println("Consumer removed: " + id);
}
});
共享消息侦听器容器
共享KafkaMessageListenerContainer
这ShareKafkaMessageListenerContainer
为共享使用者提供了一个简单的单线程容器:
@Bean
public ShareKafkaMessageListenerContainer<String, String> container(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ContainerProperties containerProps = new ContainerProperties("my-topic");
containerProps.setGroupId("my-share-group");
ShareKafkaMessageListenerContainer<String, String> container =
new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);
container.setupMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println("Received: " + record.value());
}
});
return container;
}
注释驱动的侦听器
与共享消费者@KafkaListener
您可以使用@KafkaListener
与共享消费者一起配置ShareKafkaListenerContainerFactory
:
@Configuration
@EnableKafka
public class ShareConsumerConfig {
@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultShareConsumerFactory<>(props);
}
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
}
}
然后在监听器中使用它:
@Component
public class ShareMessageListener {
@KafkaListener(
topics = "my-queue-topic",
containerFactory = "shareKafkaListenerContainerFactory",
groupId = "my-share-group"
)
public void listen(ConsumerRecord<String, String> record) {
System.out.println("Received from queue: " + record.value());
// Record is automatically acknowledged with ACCEPT
}
}
共享组配置
共享组需要特定的代理配置才能正常运行。 对于使用嵌入式 Kafka 进行测试,请使用:
@EmbeddedKafka(
topics = {"my-queue-topic"},
brokerProperties = {
"unstable.api.versions.enable=true",
"group.coordinator.rebalance.protocols=classic,share",
"share.coordinator.state.topic.replication.factor=1",
"share.coordinator.state.topic.min.isr=1"
}
)
共享组偏移重置
与常规使用者组不同,共享组使用不同的配置进行偏移重置行为。 您可以通过编程方式进行配置:
private void configureShareGroup(String bootstrapServers, String groupId) throws Exception {
Map<String, Object> adminProps = new HashMap<>();
adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (Admin admin = Admin.create(adminProps)) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
ConfigEntry configEntry = new ConfigEntry("share.auto.offset.reset", "earliest");
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
configResource, List.of(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))
);
admin.incrementalAlterConfigs(configs).all().get();
}
}
与普通消费者的区别
份额消费者与普通消费者在几个关键方面有所不同:
-
无分区分配:无法为共享使用者分配特定分区
-
无主题模式:共享消费者不支持订阅主题模式
-
合作消费:同一共享组中的多个消费者可以同时消费同一分区
-
自动确认:记录在处理后自动确认
-
不同的组管理:共享组使用不同的协调器协议