Kafka 队列(共享消费者)

Starting with version 4.0, Spring for Apache Kafka provides support for Kafka Queues through share consumers, which are part of Apache Kafka 4.0.0 and implement KIP-932 (Queues for Kafka). This feature is currently in early access.spring-doc.cadn.net.cn

Kafka Queues 采用了一种与传统消费者组不同的消费模型。 不同于基于分区的分配模型(每个分区仅分配给一个消费者),共享消费者可以协作地从相同的分区消费,记录会在共享组的消费者之间被分配。spring-doc.cadn.net.cn

共享消费者工厂

The ShareConsumerFactory 是负责创建 share consumer 实例的。 Spring Kafka 提供了 DefaultShareConsumerFactory 的实现。spring-doc.cadn.net.cn

配置

您可以像配置常规的 ConsumerFactory 一样配置 DefaultShareConsumerFactoryspring-doc.cadn.net.cn

@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-share-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultShareConsumerFactory<>(props);
}

构造函数选项

DefaultShareConsumerFactory 提供了多种构造函数选项:spring-doc.cadn.net.cn

// Basic configuration
new DefaultShareConsumerFactory<>(configs);

// With deserializer suppliers
new DefaultShareConsumerFactory<>(configs, keyDeserializerSupplier, valueDeserializerSupplier);

// With deserializer instances
new DefaultShareConsumerFactory<>(configs, keyDeserializer, valueDeserializer, configureDeserializers);

反序列化器配置

您可以以几种方式配置反序列化器:spring-doc.cadn.net.cn

  1. 通过配置属性(推荐用于简单情况):spring-doc.cadn.net.cn

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  2. 通过setter注入:spring-doc.cadn.net.cn

    factory.setKeyDeserializer(new StringDeserializer());
    factory.setValueDeserializer(new StringDeserializer());
  3. 通过提供商创建(在反序列化器需要按消费者创建的情况下):spring-doc.cadn.net.cn

    factory.setKeyDeserializerSupplier(() -> new StringDeserializer());
    factory.setValueDeserializerSupplier(() -> new StringDeserializer());

设置 0 到 1 如果您的反序列化器已经完全配置,且不应由工厂重新配置。spring-doc.cadn.net.cn

生命周期监听器

您可以添加监听器来监视共享消费者的生命周期:spring-doc.cadn.net.cn

factory.addListener(new ShareConsumerFactory.Listener<String, String>() {
    @Override
    public void consumerAdded(String id, ShareConsumer<String, String> consumer) {
        // Called when a new consumer is created
        System.out.println("Consumer added: " + id);
    }

    @Override
    public void consumerRemoved(String id, ShareConsumer<String, String> consumer) {
        // Called when a consumer is closed
        System.out.println("Consumer removed: " + id);
    }
});

共享消息监听容器

共享Kafka消息监听容器

ShareKafkaMessageListenerContainer提供了一个容器,用于共享消费者,并支持并发处理:spring-doc.cadn.net.cn

@Bean
public ShareKafkaMessageListenerContainer<String, String> container(
        ShareConsumerFactory<String, String> shareConsumerFactory) {

    ContainerProperties containerProps = new ContainerProperties("my-topic");
    containerProps.setGroupId("my-share-group");

    ShareKafkaMessageListenerContainer<String, String> container =
        new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);

    container.setupMessageListener(new MessageListener<String, String>() {
        @Override
        public void onMessage(ConsumerRecord<String, String> record) {
            System.out.println("Received: " + record.value());
        }
    });

    return container;
}

容器属性

共享容器支持常规消费者可用的一组容器属性:spring-doc.cadn.net.cn

共享消费者不支持:spring-doc.cadn.net.cn

并发

ShareKafkaMessageListenerContainer 支持通过在单个容器内创建多个消费者线程来实现并发处理。 每个线程运行其自己的 ShareConsumer 实例,该实例参与同一共享组。spring-doc.cadn.net.cn

与传统的消费者组不同,传统消费者组的并发涉及分区分布,共享消费者利用了Kafka在代理端记录级别的分发。 这意味着同一容器中的多个消费者线程作为共享组的一部分协同工作,其中Kafka代理会将记录跨所有消费者实例进行分发。spring-doc.cadn.net.cn

应用程序实例之间的并发是可叠加的spring-doc.cadn.net.cn

从共享组的角度来看,每个ShareConsumer实例都是一个独立成员,无论它在何处运行。在一个容器中设置concurrency=3会创建3个共享组成员。如果使用相同的共享组ID运行多个应用程序实例,则它们的所有消费者线程将组合成一个池。spring-doc.cadn.net.cn

例如:
* 应用实例1:concurrency=3 → 3个共享组成员
* 应用实例2:concurrency=3 → 3个共享组成员
* 总计:6个可用的共享组成员,用于代理分发记录spring-doc.cadn.net.cn

这意味着在单个容器中设置concurrency=5在操作上等同于运行5个单独的应用程序实例,每个实例都使用concurrency=1(全部使用相同的group.id)。
Kafka代理将所有消费者实例视为平等,并在整个池中分发记录。spring-doc.cadn.net.cn

编程方式配置并发

@Bean
public ShareKafkaMessageListenerContainer<String, String> concurrentContainer(
        ShareConsumerFactory<String, String> shareConsumerFactory) {

    ContainerProperties containerProps = new ContainerProperties("my-topic");
    containerProps.setGroupId("my-share-group");

    ShareKafkaMessageListenerContainer<String, String> container =
        new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);

    // Set concurrency to create 5 consumer threads
    container.setConcurrency(5);

    container.setupMessageListener(new MessageListener<String, String>() {
        @Override
        public void onMessage(ConsumerRecord<String, String> record) {
            System.out.println("Received on " + Thread.currentThread().getName() + ": " + record.value());
        }
    });

    return container;
}

通过工厂配置并发

您可以在工厂级别设置默认并发性,这将应用于该工厂创建的所有容器:spring-doc.cadn.net.cn

@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
        ShareConsumerFactory<String, String> shareConsumerFactory) {

    ShareKafkaListenerContainerFactory<String, String> factory =
        new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);

    // Set default concurrency for all containers created by this factory
    factory.setConcurrency(3);

    return factory;
}

每个监听器的并发数

并发设置可以使用 concurrency 属性在每个监听器上进行覆盖:spring-doc.cadn.net.cn

@Component
public class ConcurrentShareListener {

    @KafkaListener(
        topics = "high-throughput-topic",
        containerFactory = "shareKafkaListenerContainerFactory",
        groupId = "my-share-group",
        concurrency = "10"  // Override factory default
    )
    public void listen(ConsumerRecord<String, String> record) {
        // This listener will use 10 consumer threads
        System.out.println("Processing: " + record.value());
    }
}

并发性注意事项

  • 线程安全: 每个消费者线程都有自己的ShareConsumer实例,并独立管理自己的确认
    spring-doc.cadn.net.cn

  • 客户端 ID: 每个消费者线程都会收到一个带有数字后缀的唯一客户端 ID(例如,myContainer-0myContainer-1 等)spring-doc.cadn.net.cn

  • 指标: 所有消费者线程的指标被聚合并通过container.metrics()可访问spring-doc.cadn.net.cn

  • 生命周期: 所有消费者线程作为一个单元一起启动和停止spring-doc.cadn.net.cn

  • 工作分配: Kafka代理处理共享组中所有消费者实例之间的记录分发spring-doc.cadn.net.cn

  • 显式确认: 每个线程独立管理其记录的确认;一个线程中未确认的记录不会阻塞其他线程spring-doc.cadn.net.cn

使用显式确认的并发性

并发与显式确认模式无缝集成。每个消费者线程独立跟踪并确认自己的记录:spring-doc.cadn.net.cn

@KafkaListener(
    topics = "order-queue",
    containerFactory = "explicitShareKafkaListenerContainerFactory",
    groupId = "order-processors",
    concurrency = "5"
)
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
    try {
        // Process the order
        processOrderLogic(record.value());
        acknowledgment.acknowledge(); // ACCEPT
    }
    catch (RetryableException e) {
        acknowledgment.release(); // Will be redelivered
    }
    catch (Exception e) {
        acknowledgment.reject(); // Permanent failure
    }
}

记录获取和分发行为:spring-doc.cadn.net.cn

共享消费者使用基于拉取的模型,每个消费者线程调用poll()从代理获取记录。
当一个消费者轮询时,代理的共享分区领导者:spring-doc.cadn.net.cn

当记录被一个消费者获取时,其他消费者无法使用这些记录。当获取锁过期后,未确认的记录会自动返回到“可用”状态,并可以交付给另一个消费者。spring-doc.cadn.net.cn

经纪人通过group.share.partition.max.record.locks限制每个分区可获取的记录数量。一旦达到此限制,后续轮询将暂时不返回任何记录,直到锁过期。spring-doc.cadn.net.cn

并发的影响:spring-doc.cadn.net.cn

基于注解的监听器

@KafkaListener 与共享消费者

您可以使用@KafkaListener与共享消费者通过配置ShareKafkaListenerContainerFactory
spring-doc.cadn.net.cn

@Configuration
@EnableKafka
public class ShareConsumerConfig {

    @Bean
    public ShareConsumerFactory<String, String> shareConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultShareConsumerFactory<>(props);
    }

    @Bean
    public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
            ShareConsumerFactory<String, String> shareConsumerFactory) {
        return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
    }
}

然后在您的监听器中使用它:spring-doc.cadn.net.cn

@Component
public class ShareMessageListener {

    @KafkaListener(
        topics = "my-queue-topic",
        containerFactory = "shareKafkaListenerContainerFactory",
        groupId = "my-share-group"
    )
    public void listen(ConsumerRecord<String, String> record) {
        System.out.println("Received from queue: " + record.value());
        // Record is automatically acknowledged with ACCEPT
    }
}

共享组偏移重置

与常规消费者组不同,共享组对偏移重置行为使用了不同的配置。<br/>您可以进行编程方式的配置:spring-doc.cadn.net.cn

private void configureShareGroup(String bootstrapServers, String groupId) throws Exception {
    Map<String, Object> adminProps = new HashMap<>();
    adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    try (Admin admin = Admin.create(adminProps)) {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
        ConfigEntry configEntry = new ConfigEntry("share.auto.offset.reset", "earliest");

        Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
            configResource, List.of(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))
        );

        admin.incrementalAlterConfigs(configs).all().get();
    }
}

记录确认

共享消费者支持两种确认模式,这些模式控制记录在处理后如何被确认。spring-doc.cadn.net.cn

隐式确认(默认)

在隐式模式下,记录会根据处理结果自动确认:spring-doc.cadn.net.cn

处理成功:记录已确认为ACCEPT
处理错误:记录已确认为REJECTspring-doc.cadn.net.cn

@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
    ShareConsumerFactory<String, String> shareConsumerFactory) {
    // Implicit mode is the default - no additional configuration needed
    return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
}

显式确认

在显式模式下,应用程序必须使用提供的 ShareAcknowledgment 手动确认每条记录。spring-doc.cadn.net.cn

配置显式确认模式有两种方法:spring-doc.cadn.net.cn

选项1:使用Kafka客户端配置

@Bean
public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"); // Official Kafka client config
    return new DefaultShareConsumerFactory<>(props);
}

选项2:使用Spring容器配置

@Bean
public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaListenerContainerFactory(
    ShareConsumerFactory<String, String> shareConsumerFactory) {

    ShareKafkaListenerContainerFactory<String, String> factory =
        new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);

    // Configure acknowledgment mode at container factory level
    // true means explicit acknowledgment is required
    factory.getContainerProperties().setExplicitShareAcknowledgment(true);

    return factory;
}

配置优先级

当同时使用两种配置方法时,Spring Kafka 按照以下优先级顺序(从高到低)进行处理:spring-doc.cadn.net.cn

  1. 容器属性: containerProperties.setExplicitShareAcknowledgment(true/false)spring-doc.cadn.net.cn

  2. 消费者配置: ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG("implicit" 或 "explicit")spring-doc.cadn.net.cn

  3. 默认值: false(隐式确认)spring-doc.cadn.net.cn

确认类型

分享消费者支持三种确认类型:spring-doc.cadn.net.cn

ACCEPT: Record processed successfully, mark as completed
RELEASE: Temporary failure, make record available for redelivery
REJECT: Permanent failure, do not retry

共享确认API

ShareAcknowledgment接口提供了明确确认的方法:spring-doc.cadn.net.cn

public interface ShareAcknowledgment {
    void acknowledge();
    void release();
    void reject();
}

监听器接口

共享消费者支持针对不同用例的专业化监听器接口:spring-doc.cadn.net.cn

基本消息监听器

在简单情况下使用标准的MessageListener:spring-doc.cadn.net.cn

@KafkaListener(topics = "my-topic", containerFactory = "shareKafkaListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record) {
    System.out.println("Received: " + record.value());
    // Automatically acknowledged in implicit mode
}

确认分享消费者感知消息监听器

此接口提供了对ShareConsumer实例的访问,并可选择性地支持确认。
确认参数可以为null,具体取决于容器的确认模式:spring-doc.cadn.net.cn

隐式模式示例(确认为空)
@KafkaListener(
    topics = "my-topic",
    containerFactory = "shareKafkaListenerContainerFactory"  // Implicit mode by default
)
public void listen(ConsumerRecord<String, String> record,
                  @Nullable ShareAcknowledgment acknowledgment,
                  ShareConsumer<?, ?> consumer) {

    // In implicit mode, acknowledgment is null
    System.out.println("Received: " + record.value());

    // Access consumer metrics if needed
    Map<MetricName, ? extends Metric> metrics = consumer.metrics();

    // Record is auto-acknowledged as ACCEPT on success, REJECT on error
}
显式模式示例(确认不为 null)
@Component
public class ExplicitAckListener {
    @KafkaListener(
        topics = "my-topic",
        containerFactory = "explicitShareKafkaListenerContainerFactory"
    )
    public void listen(ConsumerRecord<String, String> record,
                      @Nullable ShareAcknowledgment acknowledgment,
                      ShareConsumer<?, ?> consumer) {

        // In explicit mode, acknowledgment is non-null
        try {
            processRecord(record);
            acknowledgment.acknowledge(); // ACCEPT
        }
		catch (RetryableException e) {
            acknowledgment.release(); // Will be redelivered
        }
		catch (Exception e) {
            acknowledgment.reject(); // Permanent failure
        }
    }

    private void processRecord(ConsumerRecord<String, String> record) {
        // Business logic here
    }
}

\u786e\u8bba\u7ea6\u675f

在显式确认模式中,容器强制执行一些重要的约束:spring-doc.cadn.net.cn

Poll Blocking: Subsequent polls are blocked until all records from the previous poll are acknowledged.
One-time Acknowledgment: Each record can only be acknowledged once.
Error Handling: If processing throws an exception, the record is automatically acknowledged as `REJECT`.
在显式模式下,如果未能确认记录,则会阻止进一步的消息处理。 请确保在所有代码路径中都确认记录。

确认超时检测

为了帮助识别遗漏的确认,Spring Kafka 提供了可配置的超时检测功能。 当记录在指定的超时时间内未被确认时,将记录一条警告,并包含有关未确认记录的详细信息。spring-doc.cadn.net.cn

@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
    ShareConsumerFactory<String, String> shareConsumerFactory) {
    ShareKafkaListenerContainerFactory<String, String> factory =
        new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);

    // Set acknowledgment timeout (default is 30 seconds)
    factory.getContainerProperties().setShareAcknowledgmentTimeout(Duration.ofSeconds(30));

    return factory;
}

当记录超过超时时间时,您会看到如下警告:<br>spring-doc.cadn.net.cn

WARN: Record not acknowledged within timeout (30 seconds).
In explicit acknowledgment mode, you must call ack.acknowledge(), ack.release(),
or ack.reject() for every record.

此功能帮助开发人员快速识别代码中何时缺少确认调用,防止因遗忘确认而导致“Spring Kafka不再消耗新记录”的常见问题。spring-doc.cadn.net.cn

确认示例

混合确认模式

@KafkaListener(topics = "order-processing", containerFactory = "explicitShareKafkaListenerContainerFactory")
    public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
        String orderId = record.key();
        String orderData = record.value();
        try {
            if (isValidOrder(orderData)) {
                if (processOrder(orderData)) {
                    acknowledgment.acknowledge(); // Success - ACCEPT
                }
                else {
                    acknowledgment.release(); // Temporary failure - retry later
                }
            }
            else {
                acknowledgment.reject(); // Invalid order - don't retry
            }
        }
        catch (Exception e) {
            // Exception automatically triggers REJECT
            throw e;
        }
}

条件确认

@KafkaListener(topics = "data-validation", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void validateData(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
    ValidationResult result = validator.validate(record.value());
    switch (result.getStatus()) {
        case VALID:
            acknowledgment.acknowledge(AcknowledgeType.ACCEPT);
            break;
        case INVALID_RETRYABLE:
            acknowledgment.acknowledge(AcknowledgeType.RELEASE);
            break;
        case INVALID_PERMANENT:
            acknowledgment.acknowledge(AcknowledgeType.REJECT);
            break;
    }
}

毒消息保护和投递次数

KIP-932 包括代理端的毒消息保护,以防止无法处理的消息记录被无休止地重传。spring-doc.cadn.net.cn

工作原理

每当消费者在共享组中获取一条记录时,代理会增加一个内部传递计数。
首次获取时,传递计数设置为1,并且每次后续获取都会递增它。
当传递计数达到配置的限制(默认值:5)时,该记录将进入归档状态,并且不再适合额外的传递尝试。spring-doc.cadn.net.cn

投递次数对应用程序不可见
投递次数由代理内部维护,不对消费者应用程序暴露
这是 KIP-932 中有意设计的决定。
投递次数是近似的,用作有毒消息保护机制,而不是精确的重新投递计数器。
应用程序无法通过任何 API 查询或访问此值。spring-doc.cadn.net.cn

对于应用级别的重试逻辑,请使用确认类型:spring-doc.cadn.net.cn

代理自动阻止无限重试,一旦达到group.share.delivery.count.limit,就将记录移到归档状态。spring-doc.cadn.net.cn

重试策略建议

以下是基于各种确认类型异常类型的用法示例。spring-doc.cadn.net.cn

@KafkaListener(topics = "orders", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment ack) {
    try {
        // Attempt to process the order
        orderService.process(record.value());
        ack.acknowledge(); // ACCEPT - successfully processed
    }
    catch (TransientException e) {
        // Temporary failure (network issue, service unavailable, etc.)
        // Release the record for redelivery
        // Broker will retry up to group.share.delivery.count.limit times
        logger.warn("Transient error processing order, will retry: {}", e.getMessage());
        ack.release(); // RELEASE - make available for retry
    }
    catch (ValidationException e) {
        // Permanent semantic error (invalid data format, business rule violation, etc.)
        // Do not retry - this record will never succeed
        logger.error("Invalid order data, rejecting: {}", e.getMessage());
        ack.reject(); // REJECT - permanent failure, do not retry
    }
    catch (Exception e) {
        // Unknown error - typically safer to reject to avoid infinite loops
        // But could also release if you suspect it might be transient
        logger.error("Unexpected error processing order, rejecting: {}", e.getMessage());
        ack.reject(); // REJECT - avoid poison message loops
    }
}

经纪人中毒消息保护确保了即使您始终对错误使用RELEASE,记录也不会无限重试。达到交付尝试次数限制后,它们将自动存档。spring-doc.cadn.net.cn

与普通消费者的不同之处

共享消费者与普通消费者在几个关键方面有所不同:spring-doc.cadn.net.cn

  1. 无分区分配: 共享消费者的分区无法被指定spring-doc.cadn.net.cn

  2. 无主题模式: 共享消费者不支持订阅主题模式spring-doc.cadn.net.cn

  3. 合作消费: 同一分组中的多个消费者可以同时从相同分区中消费spring-doc.cadn.net.cn

  4. 记录级确认: 支持显式确认,包括ACCEPTRELEASEREJECT类型spring-doc.cadn.net.cn

  5. 不同组管理: 分享群组使用不同的协调器协议spring-doc.cadn.net.cn

  6. 无批处理:共享消费者逐个处理记录,而非批量处理spring-doc.cadn.net.cn

  7. 代理端重试管理: 传递计数跟踪和有害消息保护由代理管理,不暴露给应用程序spring-doc.cadn.net.cn

限制与注意事项

当前限制