此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9spring-doc.cadn.net.cn

Kafka 队列(共享消费者)

从 4.0 版开始,Spring for Apache Kafka 通过共享消费者提供对 Kafka 队列的支持,共享消费者是 Apache Kafka 4.0.0 的一部分,并实现了 KIP-932(Kafka 队列)。 此功能目前处于抢先体验阶段。spring-doc.cadn.net.cn

与传统消费者组相比,Kafka 队列支持不同的消费模型。 与基于分区的分配模型不同,其中每个分区都专门分配给一个使用者,共享使用者可以从相同的分区协作使用,并在共享组中的使用者之间分配记录。spring-doc.cadn.net.cn

分享消费工厂

ShareConsumerFactory负责创建共享消费者实例。 Spring Kafka 提供了DefaultShareConsumerFactory实现。spring-doc.cadn.net.cn

配置

您可以配置DefaultShareConsumerFactory类似于配置常规ConsumerFactory:spring-doc.cadn.net.cn

@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-share-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultShareConsumerFactory<>(props);
}

构造函数选项

DefaultShareConsumerFactory提供了几个构造函数选项:spring-doc.cadn.net.cn

// Basic configuration
new DefaultShareConsumerFactory<>(configs);

// With deserializer suppliers
new DefaultShareConsumerFactory<>(configs, keyDeserializerSupplier, valueDeserializerSupplier);

// With deserializer instances
new DefaultShareConsumerFactory<>(configs, keyDeserializer, valueDeserializer, configureDeserializers);

解序列化器配置

您可以通过多种方式配置反序列化程序:spring-doc.cadn.net.cn

  1. 通过配置属性(推荐用于简单情况):spring-doc.cadn.net.cn

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  2. 通过二传器spring-doc.cadn.net.cn

    factory.setKeyDeserializer(new StringDeserializer());
    factory.setValueDeserializer(new StringDeserializer());
  3. 通过提供商(对于需要为每个使用者创建解串化器的情况):spring-doc.cadn.net.cn

    factory.setKeyDeserializerSupplier(() -> new StringDeserializer());
    factory.setValueDeserializerSupplier(() -> new StringDeserializer());

设置configureDeserializersfalse如果您的解序列化程序已完全配置,并且不应由工厂重新配置。spring-doc.cadn.net.cn

生命周期侦听器

您可以添加监听器来监控共享消费者的生命周期:spring-doc.cadn.net.cn

factory.addListener(new ShareConsumerFactory.Listener<String, String>() {
    @Override
    public void consumerAdded(String id, ShareConsumer<String, String> consumer) {
        // Called when a new consumer is created
        System.out.println("Consumer added: " + id);
    }

    @Override
    public void consumerRemoved(String id, ShareConsumer<String, String> consumer) {
        // Called when a consumer is closed
        System.out.println("Consumer removed: " + id);
    }
});

共享消息侦听器容器

共享KafkaMessageListenerContainer

ShareKafkaMessageListenerContainer为共享使用者提供了一个简单的单线程容器:spring-doc.cadn.net.cn

@Bean
public ShareKafkaMessageListenerContainer<String, String> container(
        ShareConsumerFactory<String, String> shareConsumerFactory) {

    ContainerProperties containerProps = new ContainerProperties("my-topic");
    containerProps.setGroupId("my-share-group");

    ShareKafkaMessageListenerContainer<String, String> container =
        new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);

    container.setupMessageListener(new MessageListener<String, String>() {
        @Override
        public void onMessage(ConsumerRecord<String, String> record) {
            System.out.println("Received: " + record.value());
        }
    });

    return container;
}

容器属性

共享容器支持可供常规使用者使用的容器属性子集:spring-doc.cadn.net.cn

共享使用者不支持:spring-doc.cadn.net.cn

注释驱动的侦听器

与共享消费者@KafkaListener

您可以使用@KafkaListener与共享消费者一起配置ShareKafkaListenerContainerFactory:spring-doc.cadn.net.cn

@Configuration
@EnableKafka
public class ShareConsumerConfig {

    @Bean
    public ShareConsumerFactory<String, String> shareConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultShareConsumerFactory<>(props);
    }

    @Bean
    public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
            ShareConsumerFactory<String, String> shareConsumerFactory) {
        return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
    }
}

然后在监听器中使用它:spring-doc.cadn.net.cn

@Component
public class ShareMessageListener {

    @KafkaListener(
        topics = "my-queue-topic",
        containerFactory = "shareKafkaListenerContainerFactory",
        groupId = "my-share-group"
    )
    public void listen(ConsumerRecord<String, String> record) {
        System.out.println("Received from queue: " + record.value());
        // Record is automatically acknowledged with ACCEPT
    }
}

共享组配置

共享组需要特定的代理配置才能正常运行。 对于使用嵌入式 Kafka 进行测试,请使用:spring-doc.cadn.net.cn

@EmbeddedKafka(
    topics = {"my-queue-topic"},
    brokerProperties = {
        "unstable.api.versions.enable=true",
        "group.coordinator.rebalance.protocols=classic,share",
        "share.coordinator.state.topic.replication.factor=1",
        "share.coordinator.state.topic.min.isr=1"
    }
)

共享组偏移重置

与常规使用者组不同,共享组使用不同的配置进行偏移重置行为。 您可以通过编程方式进行配置:spring-doc.cadn.net.cn

private void configureShareGroup(String bootstrapServers, String groupId) throws Exception {
    Map<String, Object> adminProps = new HashMap<>();
    adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    try (Admin admin = Admin.create(adminProps)) {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
        ConfigEntry configEntry = new ConfigEntry("share.auto.offset.reset", "earliest");

        Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
            configResource, List.of(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))
        );

        admin.incrementalAlterConfigs(configs).all().get();
    }
}

记录确认

目前,共享消费者会自动确认记录AcknowledgeType.ACCEPT处理成功后。 将来的版本中将添加更复杂的确认模式。spring-doc.cadn.net.cn

与普通消费者的区别

份额消费者与普通消费者在几个关键方面有所不同:spring-doc.cadn.net.cn

  1. 无分区分配:无法为共享使用者分配特定分区spring-doc.cadn.net.cn

  2. 无主题模式:共享消费者不支持订阅主题模式spring-doc.cadn.net.cn

  3. 合作消费:同一共享组中的多个消费者可以同时消费同一分区spring-doc.cadn.net.cn

  4. 自动确认:记录在处理后自动确认spring-doc.cadn.net.cn

  5. 不同的组管理:共享组使用不同的协调器协议spring-doc.cadn.net.cn

限制和注意事项

当前限制