此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9! |
寻求特定偏移量
为了寻求,您的监听器必须实现ConsumerSeekAware
,它有以下方法:
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onPartitionsRevoked(Collection<TopicPartition> partitions);
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
这registerSeekCallback
在启动容器时以及每当分配分区时调用。
在初始化后的某个任意时间进行搜索时,应使用此回调。
您应该保存对回调的引用。
如果您在多个容器(或在ConcurrentMessageListenerContainer
),您应该将回调存储在ThreadLocal
或由侦听器键控的其他结构Thread
.
使用组管理时,onPartitionsAssigned
在分配分区时调用。
例如,您可以使用此方法通过调用回调来设置分区的初始偏移量。
您还可以使用此方法将此线程的回调与分配的分区相关联(请参阅下面的示例)。
您必须使用回调参数,而不是传递给registerSeekCallback
.
从版本 2.5.5 开始,即使使用手动分区分配,也会调用此方法。
onPartitionsRevoked
在容器停止或 Kafka 撤销分配时调用。
您应该放弃此线程的回调,并删除与已吊销分区的任何关联。
回调具有以下方法:
void seek(String topic, int partition, long offset);
void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction);
void seekToBeginning(String topic, int partition);
void seekToBeginning(Collection<TopicPartitions> partitions);
void seekToEnd(String topic, int partition);
void seekToEnd(Collection<TopicPartitions> partitions);
void seekRelative(String topic, int partition, long offset, boolean toCurrent);
void seekToTimestamp(String topic, int partition, long timestamp);
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);
String getGroupId();
的两种不同变体seek
方法提供了一种查找任意偏移量的方法。
采用Function
作为计算偏移量的参数,已添加到框架的 3.2 版中。
此函数提供对当前偏移量(消费者返回的当前位置,这是下一个要获取的偏移量)的访问。
作为函数定义的一部分,用户可以根据使用者中的当前偏移量来决定要寻求的偏移量。
seekRelative
在 2.3 版中添加了,以执行相对搜索。
-
offset
negative 和toCurrent
false
- 相对于分区末尾进行搜索。 -
offset
positive 和toCurrent
false
- 相对于分区的开头进行搜索。 -
offset
negative 和toCurrent
true
- 相对于当前位置进行搜索(倒带)。 -
offset
positive 和toCurrent
true
- 相对于当前位置进行搜索(快进)。
这seekToTimestamp
2.3 版中还添加了方法。
在onIdleContainer 或onPartitionsAssigned 方法,第二种方法是首选,因为在对消费者的offsetsForTimes 方法。
从其他位置调用时,容器将收集所有时间戳查找请求,并对offsetsForTimes . |
您还可以从onIdleContainer()
检测到空闲容器时。
有关如何启用空闲容器检测,请参阅检测空闲和无响应的使用者。
这seekToBeginning 方法很有用,例如,在处理压缩主题并且您希望每次启动应用程序时都搜索到开头: |
public class MyListener implements ConsumerSeekAware {
...
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToBeginning(assignments.keySet());
}
}
要在运行时任意查找,请使用registerSeekCallback
对于适当的线程。
这是一个简单的 Spring Boot 应用程序,演示了如何使用回调;它向主题发送 10 条记录;打<Enter>
导致所有分区从头开始搜索。
@SpringBootApplication
public class SeekExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SeekExampleApplication.class, args);
}
@Bean
public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send(
new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
while (true) {
System.in.read();
listener.seekToStart();
}
};
}
@Bean
public NewTopic topic() {
return new NewTopic("seekExample", 3, (short) 1);
}
}
@Component
class Listener implements ConsumerSeekAware {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();
private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.callbackForThread.set(callback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> this.callbacks.remove(tp));
this.callbackForThread.remove();
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
@KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
public void listen(ConsumerRecord<String, String> in) {
logger.info(in.toString());
}
public void seekToStart() {
this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
为了让事情变得简单,2.3 版本添加了AbstractConsumerSeekAware
类,它跟踪要用于主题/分区的回调。
以下示例演示如何在每次容器空闲时查找每个分区中处理的最后一条记录。
它还具有允许任意外部调用按一条记录倒带分区的方法。
public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
@KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
public void listen(String in) {
...
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind all partitions one record.
*/
public void rewindAllOneRecord() {
getTopicsAndCallbacks()
.forEach((tp, callbacks) ->
callbacks.forEach(callback -> callback.seekRelative(tp.topic(), tp.partition(), -1, true))
);
}
/**
* Rewind one partition one record.
*/
public void rewindOnePartitionOneRecord(String topic, int partition) {
getSeekCallbacksFor(new TopicPartition(topic, partition))
.forEach(callback -> callback.seekRelative(topic, partition, -1, true));
}
}
2.6 版为抽象类添加了便利方法:
-
seekToBeginning()
- 查找所有分配到开头的分区。 -
seekToEnd()
- 查找所有分配的分区到最后。 -
seekToTimestamp(long timestamp)
- 查找所有分配给该时间戳表示的偏移量的分区。
例:
public class MyListener extends AbstractConsumerSeekAware {
@KafkaListener(...)
void listen(...) {
...
}
}
public class SomeOtherBean {
MyListener listener;
...
void someMethod() {
this.listener.seekToTimestamp(System.currentTimeMillis() - 60_000);
}
}
从 3.3 版本开始,一种新方法getGroupId()
在ConsumerSeekAware.ConsumerSeekCallback
接口。
当您需要识别与特定搜索回调关联的使用者组时,此方法特别有用。
当使用扩展的类时AbstractConsumerSeekAware ,则在一个侦听器中执行的查找作可能会影响同一类中的所有侦听器。
这可能并不总是期望的行为。
要解决此问题,您可以使用getGroupId() 回调提供的方法。
这允许您有选择地执行搜索作,仅针对感兴趣的使用者组。 |