对于最新稳定版本,请使用Spring for Apache Kafka 4.0.4spring-doc.cadn.net.cn

寻找特定偏移

为了搜索,您的监听器必须实现 ConsumerSeekAware,其包含以下方法:spring-doc.cadn.net.cn

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onPartitionsRevoked(Collection<TopicPartition> partitions);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

The registerSeekCallback 会在容器启动时以及分区分配时被调用。 你应该在初始化后任意时间点使用此回调时调用它。 你应该保存对该回调的引用。 如果你在同一 ConcurrentMessageListenerContainer 中使用相同的监听器,你应该将回调保存在一个 ThreadLocal 或其他以监听器 Thread 为键的结构中。spring-doc.cadn.net.cn

When 使用 group 管理时, onPartitionsAssigned 是在分区分配时调用的。 你可以使用此方法,例如,通过调用回调来设置分区的初始偏移量。 你也可以使用此方法将当前线程的回调与分配到的分区进行关联(参见下面的示例)。 必须使用传入 registerSeekCallback 的 callback 参数,而不是该方法的参数。 从 2.5.5 版本开始,即使使用 手动分区分配,也会调用此方法。spring-doc.cadn.net.cn

onPartitionsRevoked 在容器停止或Kafka撤销分配时被调用。 你应该丢弃此线程的回调并移除与撤销分区的任何关联。spring-doc.cadn.net.cn

回调具有以下方法:spring-doc.cadn.net.cn

void seek(String topic, int partition, long offset);

void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction);

void seekToBeginning(String topic, int partition);

void seekToBeginning(Collection<TopicPartitions> partitions);

void seekToEnd(String topic, int partition);

void seekToEnd(Collection<TopicPartitions> partitions);

void seekRelative(String topic, int partition, long offset, boolean toCurrent);

void seekToTimestamp(String topic, int partition, long timestamp);

void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);

String getGroupId();

The two different variants of the seek methods provide a way to seek to an arbitrary offset. The method that takes a Function as an argument to compute the offset was added in version 3.2 of the framework. This function provides access to the current offset (the current position returned by the consumer, which is the next offset to be fetched). The user can decide what offset to seek to based on the current offset in the consumer as part of the function definition。spring-doc.cadn.net.cn

seekRelative 在 2.3 版本中添加,用于执行相对定位。spring-doc.cadn.net.cn

The seekToTimestamp 方法在 2.3 版本中也已添加。spring-doc.cadn.net.cn

当在 onIdleContaineronPartitionsAssigned 方法中为多个分区查找相同时间戳时,优先选择第二种方法,因为在一个调用到消费者 offsetsForTimes 方法中查找时间戳对应偏移量更为高效。 当从其他位置调用时,容器会收集所有时间戳查找请求,并在一个调用到 offsetsForTimes 上进行处理。

你也可以在检测到空闲容器时从onIdleContainer()执行seek操作。 见检测空闲和无响应消费者了解如何启用空闲容器检测。spring-doc.cadn.net.cn

The seekToBeginning 方法接受一个集合作为参数,在处理压缩主题时特别有用,例如每次应用程序启动时都希望定位到主题的开始位置:
public class MyListener implements ConsumerSeekAware {

    ...

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        callback.seekToBeginning(assignments.keySet());
    }

}

在运行时任意查找,请使用来自registerSeekCallback的适当线程的回调引用。spring-doc.cadn.net.cn

这里是使用回调的简单Spring Boot应用程序示例;它向主题发送10条记录;在控制台输入<Enter>会使所有分区跳转到开头。spring-doc.cadn.net.cn

@SpringBootApplication
public class SeekExampleApplication {

    public static void main(String[] args) {
        SpringApplication.run(SeekExampleApplication.class, args);
    }

    @Bean
    public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
        return args -> {
            IntStream.range(0, 10).forEach(i -> template.send(
                new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
            while (true) {
                System.in.read();
                listener.seekToStart();
            }
        };
    }

    @Bean
    public NewTopic topic() {
        return new NewTopic("seekExample", 3, (short) 1);
    }

}

@Component
class Listener implements ConsumerSeekAware {

    private static final Logger logger = LoggerFactory.getLogger(Listener.class);

    private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();

    private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        this.callbackForThread.set(callback);
    }

    @Override
    public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
        assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
    }

    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
        partitions.forEach(tp -> this.callbacks.remove(tp));
        this.callbackForThread.remove();
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
    }

    @KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
    public void listen(ConsumerRecord<String, String> in) {
        logger.info(in.toString());
    }

    public void seekToStart() {
        this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
    }

}

为使事情更简单,版本 2.3 增加了 AbstractConsumerSeekAware 类,用于跟踪某个主题/分区应使用的回调。 以下示例展示了如何在容器空闲时,将每个分区的游标seek到上次处理的最后一条记录。 它还提供了方法,允许任意外部调用来将分区回退一位记录。spring-doc.cadn.net.cn

public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {

    @KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
    public void listen(String in) {
        ...
    }

    @Override
    public void onIdleContainer(Map<TopicPartition, Long> assignments,
            ConsumerSeekCallback callback) {

            assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
    }

    /**
    * Rewind all partitions one record.
    */
    public void rewindAllOneRecord() {
        getTopicsAndCallbacks()
            .forEach((tp, callbacks) ->
                    callbacks.forEach(callback -> callback.seekRelative(tp.topic(), tp.partition(), -1, true))
            );
    }

    /**
    * Rewind one partition one record.
    */
    public void rewindOnePartitionOneRecord(String topic, int partition) {
        getSeekCallbacksFor(new TopicPartition(topic, partition))
            .forEach(callback -> callback.seekRelative(topic, partition, -1, true));
    }

}

版本 2.6 为抽象类新增了便利方法:spring-doc.cadn.net.cn

public class MyListener extends AbstractConsumerSeekAware {

    @KafkaListener(...)
    void listen(...) {
        ...
    }
}

public class SomeOtherBean {

    MyListener listener;

    ...

    void someMethod() {
        this.listener.seekToTimestamp(System.currentTimeMillis() - 60_000);
    }

}

As of version 3.3, a new method getGroupId() was introduced in the ConsumerSeekAware.ConsumerSeekCallback interface. This method is particularly useful when you need to identify the consumer group associated with a specific seek callback.spring-doc.cadn.net.cn

When 使用 一个 继承 自 AbstractConsumerSeekAware 的 类 时, 一个 在 监听器 中 执行 的 seek 操作 可能 会 影响 同一 类 中 的 所有 监听器。 这 可能 并 不 始终 是 所需 的 行为。 为 了解 决 这个 问题, 可以 使用 getGroupId() 提供 的 方法。 这 允许 你 选 择 性 地 进行 seek 操作, 仅 针对 感兴趣 的 消费者 组。