|
这个版本仍在开发中,目前尚未被认为是稳定的。要使用最新稳定版本,请使用 Spring for Apache Kafka 4.0.4! |
Kafka 队列(共享消费者)
Starting with version 4.0, Spring for Apache Kafka provides support for Kafka Queues through share consumers, which are part of Apache Kafka 4.0.0 and implement KIP-932 (Queues for Kafka). This feature is currently in early access.
Kafka Queues 采用了一种与传统消费者组不同的消费模型。 不同于基于分区的分配模型(每个分区仅分配给一个消费者),共享消费者可以协作地从相同的分区消费,记录会在共享组的消费者之间被分配。
共享消费者工厂
The ShareConsumerFactory 是负责创建 share consumer 实例的。
Spring Kafka 提供了 DefaultShareConsumerFactory 的实现。
配置
您可以像配置常规的 ConsumerFactory 一样配置 DefaultShareConsumerFactory:
@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-share-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultShareConsumerFactory<>(props);
}
构造函数选项
DefaultShareConsumerFactory 提供了多种构造函数选项:
// Basic configuration
new DefaultShareConsumerFactory<>(configs);
// With deserializer suppliers
new DefaultShareConsumerFactory<>(configs, keyDeserializerSupplier, valueDeserializerSupplier);
// With deserializer instances
new DefaultShareConsumerFactory<>(configs, keyDeserializer, valueDeserializer, configureDeserializers);
反序列化器配置
您可以以几种方式配置反序列化器:
-
通过配置属性(推荐用于简单情况):
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); -
通过setter注入:
factory.setKeyDeserializer(new StringDeserializer()); factory.setValueDeserializer(new StringDeserializer()); -
通过提供商创建(在反序列化器需要按消费者创建的情况下):
factory.setKeyDeserializerSupplier(() -> new StringDeserializer()); factory.setValueDeserializerSupplier(() -> new StringDeserializer());
设置 0 到 1 如果您的反序列化器已经完全配置,且不应由工厂重新配置。
生命周期监听器
您可以添加监听器来监视共享消费者的生命周期:
factory.addListener(new ShareConsumerFactory.Listener<String, String>() {
@Override
public void consumerAdded(String id, ShareConsumer<String, String> consumer) {
// Called when a new consumer is created
System.out.println("Consumer added: " + id);
}
@Override
public void consumerRemoved(String id, ShareConsumer<String, String> consumer) {
// Called when a consumer is closed
System.out.println("Consumer removed: " + id);
}
});
共享消息监听容器
共享Kafka消息监听容器
该ShareKafkaMessageListenerContainer提供了一个容器,用于共享消费者,并支持并发处理:
@Bean
public ShareKafkaMessageListenerContainer<String, String> container(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ContainerProperties containerProps = new ContainerProperties("my-topic");
containerProps.setGroupId("my-share-group");
ShareKafkaMessageListenerContainer<String, String> container =
new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);
container.setupMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println("Received: " + record.value());
}
});
return container;
}
容器属性
共享容器支持常规消费者可用的一组容器属性:
-
topics: 要订阅的主题名称数组 -
groupId: 分享群组 ID -
clientId: 消费者的客户端ID -
kafkaConsumerProperties: 额外的消费者属性
|
共享消费者不支持:
|
并发
ShareKafkaMessageListenerContainer 支持通过在单个容器内创建多个消费者线程来实现并发处理。 每个线程运行其自己的 ShareConsumer 实例,该实例参与同一共享组。
与传统的消费者组不同,传统消费者组的并发涉及分区分布,共享消费者利用了Kafka在代理端记录级别的分发。 这意味着同一容器中的多个消费者线程作为共享组的一部分协同工作,其中Kafka代理会将记录跨所有消费者实例进行分发。
|
应用程序实例之间的并发是可叠加的 从共享组的角度来看,每个 例如: 这意味着在单个容器中设置 |
编程方式配置并发
@Bean
public ShareKafkaMessageListenerContainer<String, String> concurrentContainer(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ContainerProperties containerProps = new ContainerProperties("my-topic");
containerProps.setGroupId("my-share-group");
ShareKafkaMessageListenerContainer<String, String> container =
new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);
// Set concurrency to create 5 consumer threads
container.setConcurrency(5);
container.setupMessageListener(new MessageListener<String, String>() {
@Override
public void onMessage(ConsumerRecord<String, String> record) {
System.out.println("Received on " + Thread.currentThread().getName() + ": " + record.value());
}
});
return container;
}
通过工厂配置并发
您可以在工厂级别设置默认并发性,这将应用于该工厂创建的所有容器:
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
// Set default concurrency for all containers created by this factory
factory.setConcurrency(3);
return factory;
}
每个监听器的并发数
并发设置可以使用 concurrency 属性在每个监听器上进行覆盖:
@Component
public class ConcurrentShareListener {
@KafkaListener(
topics = "high-throughput-topic",
containerFactory = "shareKafkaListenerContainerFactory",
groupId = "my-share-group",
concurrency = "10" // Override factory default
)
public void listen(ConsumerRecord<String, String> record) {
// This listener will use 10 consumer threads
System.out.println("Processing: " + record.value());
}
}
并发性注意事项
-
线程安全: 每个消费者线程都有自己的
ShareConsumer实例,并独立管理自己的确认 -
客户端 ID: 每个消费者线程都会收到一个带有数字后缀的唯一客户端 ID(例如,
myContainer-0、myContainer-1等) -
指标: 所有消费者线程的指标被聚合并通过
container.metrics()可访问 -
生命周期: 所有消费者线程作为一个单元一起启动和停止
-
工作分配: Kafka代理处理共享组中所有消费者实例之间的记录分发
-
显式确认: 每个线程独立管理其记录的确认;一个线程中未确认的记录不会阻塞其他线程
使用显式确认的并发性
并发与显式确认模式无缝集成。每个消费者线程独立跟踪并确认自己的记录:
@KafkaListener(
topics = "order-queue",
containerFactory = "explicitShareKafkaListenerContainerFactory",
groupId = "order-processors",
concurrency = "5"
)
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
try {
// Process the order
processOrderLogic(record.value());
acknowledgment.acknowledge(); // ACCEPT
}
catch (RetryableException e) {
acknowledgment.release(); // Will be redelivered
}
catch (Exception e) {
acknowledgment.reject(); // Permanent failure
}
}
|
记录获取和分发行为: 共享消费者使用基于拉取的模型,每个消费者线程调用
当记录被一个消费者获取时,其他消费者无法使用这些记录。当获取锁过期后,未确认的记录会自动返回到“可用”状态,并可以交付给另一个消费者。 经纪人通过 并发的影响:
配置:
|
基于注解的监听器
@KafkaListener 与共享消费者
您可以使用@KafkaListener与共享消费者通过配置ShareKafkaListenerContainerFactory:
@Configuration
@EnableKafka
public class ShareConsumerConfig {
@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultShareConsumerFactory<>(props);
}
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
}
}
然后在您的监听器中使用它:
@Component
public class ShareMessageListener {
@KafkaListener(
topics = "my-queue-topic",
containerFactory = "shareKafkaListenerContainerFactory",
groupId = "my-share-group"
)
public void listen(ConsumerRecord<String, String> record) {
System.out.println("Received from queue: " + record.value());
// Record is automatically acknowledged with ACCEPT
}
}
共享组偏移重置
与常规消费者组不同,共享组对偏移重置行为使用了不同的配置。<br/>您可以进行编程方式的配置:
private void configureShareGroup(String bootstrapServers, String groupId) throws Exception {
Map<String, Object> adminProps = new HashMap<>();
adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
try (Admin admin = Admin.create(adminProps)) {
ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
ConfigEntry configEntry = new ConfigEntry("share.auto.offset.reset", "earliest");
Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
configResource, List.of(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))
);
admin.incrementalAlterConfigs(configs).all().get();
}
}
记录确认
共享消费者支持两种确认模式,这些模式控制记录在处理后如何被确认。
隐式确认(默认)
在隐式模式下,记录会根据处理结果自动确认:
处理成功:记录已确认为ACCEPT
处理错误:记录已确认为REJECT
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
// Implicit mode is the default - no additional configuration needed
return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
}
显式确认
在显式模式下,应用程序必须使用提供的 ShareAcknowledgment 手动确认每条记录。
配置显式确认模式有两种方法:
选项1:使用Kafka客户端配置
@Bean
public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"); // Official Kafka client config
return new DefaultShareConsumerFactory<>(props);
}
选项2:使用Spring容器配置
@Bean
public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
// Configure acknowledgment mode at container factory level
// true means explicit acknowledgment is required
factory.getContainerProperties().setExplicitShareAcknowledgment(true);
return factory;
}
确认类型
共享消费者支持四种确认类型:
ACCEPT: Record processed successfully, mark as completed RELEASE: Temporary failure, make record available for redelivery REJECT: Permanent failure, do not retry RENEW: Extend the acquisition lock (non-terminal; a terminal ack is still required when processing completes)
共享确认API
该ShareAcknowledgment接口提供了明确确认的方法:
public interface ShareAcknowledgment {
void acknowledge();
void release();
void reject();
void renew();
}
监听器接口
共享消费者支持针对不同用例的专业化监听器接口:
基本消息监听器
在简单情况下使用标准的MessageListener:
@KafkaListener(topics = "my-topic", containerFactory = "shareKafkaListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record) {
System.out.println("Received: " + record.value());
// Automatically acknowledged in implicit mode
}
确认分享消费者感知消息监听器
此接口提供了对ShareConsumer实例的访问,并可选择性地支持确认。
确认参数可以为null,具体取决于容器的确认模式:
隐式模式示例(确认为空)
@KafkaListener(
topics = "my-topic",
containerFactory = "shareKafkaListenerContainerFactory" // Implicit mode by default
)
public void listen(ConsumerRecord<String, String> record,
@Nullable ShareAcknowledgment acknowledgment,
ShareConsumer<?, ?> consumer) {
// In implicit mode, acknowledgment is null
System.out.println("Received: " + record.value());
// Access consumer metrics if needed
Map<MetricName, ? extends Metric> metrics = consumer.metrics();
// Record is auto-acknowledged as ACCEPT on success, REJECT on error
}
显式模式示例(确认不为 null)
@Component
public class ExplicitAckListener {
@KafkaListener(
topics = "my-topic",
containerFactory = "explicitShareKafkaListenerContainerFactory"
)
public void listen(ConsumerRecord<String, String> record,
@Nullable ShareAcknowledgment acknowledgment,
ShareConsumer<?, ?> consumer) {
// In explicit mode, acknowledgment is non-null
try {
processRecord(record);
acknowledgment.acknowledge(); // ACCEPT
}
catch (RetryableException e) {
acknowledgment.release(); // Will be redelivered
}
catch (Exception e) {
acknowledgment.reject(); // Permanent failure
}
}
private void processRecord(ConsumerRecord<String, String> record) {
// Business logic here
}
}
\u786e\u8bba\u7ea6\u675f
在显式确认模式中,容器强制执行一些重要的约束:
Poll Blocking: Subsequent polls are blocked until all records from the previous poll are acknowledged. Terminal Acknowledgment: Each record must receive exactly one terminal acknowledgment (`acknowledge()`, `release()`, or `reject()`). You may call `renew()` zero or more times before that to extend the acquisition lock when processing exceeds the broker's lock duration. Error Handling: If processing throws an exception, the outcome is determined by the <<share-error-handling,ShareConsumerRecordRecoverer>> (default: `REJECT`).
| 在显式模式下,如果未能确认记录,则会阻止进一步的消息处理。 请确保在所有代码路径中都确认记录。 |
确认超时检测
为了帮助识别遗漏的确认,Spring Kafka 提供了可配置的超时检测功能。 当记录在指定的超时时间内未被确认时,将记录一条警告,并包含有关未确认记录的详细信息。
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
// Set acknowledgment timeout (default is 30 seconds)
factory.getContainerProperties().setShareAcknowledgmentTimeout(Duration.ofSeconds(30));
return factory;
}
当记录超过超时时间时,您会看到如下警告:<br>
WARN: Record not acknowledged within timeout (30 seconds). In explicit acknowledgment mode, you must call ack.acknowledge(), ack.release(), or ack.reject() for every record. Call ack.renew() to extend the lock; a terminal ack is still required.
此功能帮助开发人员快速识别代码中何时缺少确认调用,防止因遗忘确认而导致“Spring Kafka不再消耗新记录”的常见问题。
错误处理
共享容器在两个级别处理错误:poll()(轮询级别)以及当侦听器抛出异常时(监听器级别)。
轮询级别的错误处理
如果 consumer.poll() 抛出异常,容器会处理它,因此消费者线程不会停止:
-
RecordDeserializationException: 当无法反序列化记录时抛出(例如,
StringDeserializer的无效 UTF-8)。 容器会捕获此异常,使用异常中的主题、分区和偏移量拒绝受影响的记录,记录警告并继续轮询。 后续记录将正常处理。 -
损坏记录异常:当CRC校验失败时抛出(例如,启用
check.crcs时)。 Kafka客户端会自动拒绝损坏的数据批次。 容器捕获该异常,记录日志,并继续轮询。
如果不进行这种处理,一条错误记录就会终止消费者线程。
监听器级别的错误处理:ShareConsumerRecordRecoverer
当监听器抛出异常时,容器会委托给一个ShareConsumerRecordRecoverer来决定是接受、释放还是拒绝失败的记录。默认值为ShareConsumerRecordRecoverer.REJECTING(记录日志并拒绝);也可以选择ShareConsumerRecordRecoverer.RELEASING(记录日志并释放以便重新传递)。
对于瞬态故障(例如,下游超时),您可以提供自定义恢复程序来释放记录,以便它们可以重新传递给另一个使用者,同时拒绝永久性故障。
配置自定义恢复程序
在容器或工厂上设置恢复程序,以便它应用于该工厂创建的所有容器:<br>
@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
ShareConsumerFactory<String, String> shareConsumerFactory) {
ShareKafkaListenerContainerFactory<String, String> factory =
new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
factory.setShareConsumerRecordRecoverer((record, ex) -> {
if (ex instanceof TransientException || ex.getCause() instanceof TimeoutException) {
return AcknowledgeType.RELEASE;
}
return AcknowledgeType.REJECT;
});
return factory;
}
您还可以通过container.setShareConsumerRecordRecoverer(recoverer)在单个容器上设置恢复程序。
与毒消息保护的关系
代理端投递次数(参见毒消息保护和投递次数)限制了一条记录可以被获取的次数。
即使您的恢复程序始终返回RELEASE,在经过配置的限制次数后(默认为5),代理最终会归档该记录,因此毒消息不会无限循环。
确认示例
混合确认模式
@KafkaListener(topics = "order-processing", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
String orderId = record.key();
String orderData = record.value();
try {
if (isValidOrder(orderData)) {
if (processOrder(orderData)) {
acknowledgment.acknowledge(); // Success - ACCEPT
}
else {
acknowledgment.release(); // Temporary failure - retry later
}
}
else {
acknowledgment.reject(); // Invalid order - don't retry
}
}
catch (Exception e) {
// Exception automatically triggers REJECT
throw e;
}
}
条件确认
@KafkaListener(topics = "data-validation", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void validateData(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
ValidationResult result = validator.validate(record.value());
switch (result.getStatus()) {
case VALID:
acknowledgment.acknowledge(AcknowledgeType.ACCEPT);
break;
case INVALID_RETRYABLE:
acknowledgment.acknowledge(AcknowledgeType.RELEASE);
break;
case INVALID_PERMANENT:
acknowledgment.acknowledge(AcknowledgeType.REJECT);
break;
}
}
毒消息保护和投递次数
KIP-932 包括代理端的毒消息保护,以防止无法处理的消息记录被无休止地重传。
工作原理
每当消费者在共享组中获取一条记录时,代理会增加一个内部传递计数。
首次获取时,传递计数设置为1,并且每次后续获取都会递增它。
当传递计数达到配置的限制(默认值:5)时,该记录将进入归档状态,并且不再适合额外的传递尝试。
|
投递次数对应用程序不可见 |
对于应用级别的重试逻辑,请使用确认类型:
-
RELEASE- 记录可用于重新传递(计入传递次数) -
REJECT- 标记为永久失败(不会导致重新投递) -
ACCEPT- 处理成功(不会导致重新交付)
代理自动阻止无限重试,一旦达到group.share.delivery.count.limit,就将记录移到归档状态。
重试策略建议
以下是基于各种确认类型异常类型的用法示例。
@KafkaListener(topics = "orders", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment ack) {
try {
// Attempt to process the order
orderService.process(record.value());
ack.acknowledge(); // ACCEPT - successfully processed
}
catch (TransientException e) {
// Temporary failure (network issue, service unavailable, etc.)
// Release the record for redelivery
// Broker will retry up to group.share.delivery.count.limit times
logger.warn("Transient error processing order, will retry: {}", e.getMessage());
ack.release(); // RELEASE - make available for retry
}
catch (ValidationException e) {
// Permanent semantic error (invalid data format, business rule violation, etc.)
// Do not retry - this record will never succeed
logger.error("Invalid order data, rejecting: {}", e.getMessage());
ack.reject(); // REJECT - permanent failure, do not retry
}
catch (Exception e) {
// Unknown error - typically safer to reject to avoid infinite loops
// But could also release if you suspect it might be transient
logger.error("Unexpected error processing order, rejecting: {}", e.getMessage());
ack.reject(); // REJECT - avoid poison message loops
}
}
经纪人中毒消息保护确保了即使您始终对错误使用RELEASE,记录也不会无限重试。达到交付尝试次数限制后,它们将自动存档。
与普通消费者的不同之处
共享消费者与普通消费者在几个关键方面有所不同:
-
无分区分配: 共享消费者的分区无法被指定
-
无主题模式: 共享消费者不支持订阅主题模式
-
合作消费: 同一分组中的多个消费者可以同时从相同分区中消费
-
记录级确认: 支持显式确认,包括
ACCEPT、RELEASE和REJECT类型 -
不同组管理: 分享群组使用不同的协调器协议
-
无批处理:共享消费者逐个处理记录,而非批量处理
-
代理端重试管理: 传递计数跟踪和有害消息保护由代理管理,不暴露给应用程序