|
对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.6! |
应用程序事件
以下 Spring 应用程序事件由侦听器容器及其使用者发布:
-
ConsumerStartingEvent:在使用线程首次启动时发布,在它开始轮询之前。 -
ConsumerStartedEvent:在使用者即将开始轮询时发布。 -
ConsumerFailedToStartEvent:如果没有,则发布ConsumerStartingEvent发布在consumerStartTimeoutcontainer 属性。 此事件可能表示配置的任务执行程序没有足够的线程来支持使用它的容器及其并发性。 出现此情况时,还会记录错误消息。 -
ListenerContainerIdleEvent:在 中未收到任何消息时发布idleEventInterval(如果已配置)。 -
ListenerContainerNoLongerIdleEvent:在之前发布ListenerContainerIdleEvent. -
ListenerContainerPartitionIdleEvent:在未收到来自该分区的消息时发布idlePartitionEventInterval(如果已配置)。 -
ListenerContainerPartitionNoLongerIdleEvent:当从之前发布了ListenerContainerPartitionIdleEvent. -
NonResponsiveConsumerEvent:当使用者似乎在poll方法。 -
ConsumerPartitionPausedEvent:当分区暂停时,由每个使用者发布。 -
ConsumerPartitionResumedEvent:每个 Consumer 在恢复分区时发布的 -
ConsumerPausedEvent:在容器暂停时由每个使用者发布。 -
ConsumerResumedEvent:容器恢复时由每个使用者发布。 -
ConsumerStoppingEvent:由每个使用者在停止前发布。 -
ConsumerStoppedEvent:在 Consumer 关闭后发布。 请参见线程安全。 -
ConsumerRetryAuthEvent:当消费者的身份验证或授权失败并正在重试时发布。 -
ConsumerRetryAuthSuccessfulEvent:成功重试身份验证或授权时发布。只能发生在存在ConsumerRetryAuthEvent以前。 -
ContainerStoppedEvent:当所有使用者都已停止时发布。
默认情况下,应用程序上下文的事件 multicaster 在调用线程上调用事件侦听器。
如果将 multicaster 更改为使用异步执行程序,则不得调用任何Consumer方法。 |
这ListenerContainerIdleEvent具有以下属性:
-
source:发布事件的侦听器容器实例。 -
container:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
id:侦听器 ID(或容器 Bean 名称)。 -
idleTime:发布事件时容器处于空闲状态的时间。 -
topicPartitions:在生成事件时为容器分配的主题和分区。 -
consumer:对 Kafka 的引用Consumer对象。 例如,如果使用者的pause()方法,它可以resume()当收到事件时。 -
paused:容器当前是否暂停。 有关更多信息,请参阅暂停和恢复侦听器容器。
这ListenerContainerNoLongerIdleEvent具有相同的属性,但idleTime和paused.
这ListenerContainerPartitionIdleEvent具有以下属性:
-
source:发布事件的侦听器容器实例。 -
container:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
id:侦听器 ID(或容器 Bean 名称)。 -
idleTime:发布事件时分区消耗处于空闲状态的时间。 -
topicPartition:触发事件的主题和分区。 -
consumer:对 Kafka 的引用Consumer对象。 例如,如果使用者的pause()方法,它可以resume()当收到事件时。 -
paused:该使用者的分区使用当前是否暂停。 有关更多信息,请参阅暂停和恢复侦听器容器。
这ListenerContainerPartitionNoLongerIdleEvent具有相同的属性,但idleTime和paused.
这NonResponsiveConsumerEvent具有以下属性:
-
source:发布事件的侦听器容器实例。 -
container:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
id:侦听器 ID(或容器 Bean 名称)。 -
timeSinceLastPoll:容器上次调用之前的时间poll(). -
topicPartitions:在生成事件时为容器分配的主题和分区。 -
consumer:对 Kafka 的引用Consumer对象。 例如,如果使用者的pause()方法,它可以resume()当收到事件时。 -
paused:容器当前是否暂停。 有关更多信息,请参阅暂停和恢复侦听器容器。
这ConsumerPausedEvent,ConsumerResumedEvent和ConsumerStopping事件具有以下属性:
-
source:发布事件的侦听器容器实例。 -
container:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
partitions:这TopicPartition涉及的实例。
这ConsumerPartitionPausedEvent,ConsumerPartitionResumedEvent事件具有以下属性:
-
source:发布事件的侦听器容器实例。 -
container:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
partition:这TopicPartition实例。
这ConsumerRetryAuthEventevent 具有以下属性:
-
source:发布事件的侦听器容器实例。 -
container:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
reason:-
AUTHENTICATION- 由于身份验证异常,事件已发布。 -
AUTHORIZATION- 事件因授权异常而发布。
-
这ConsumerStartingEvent,ConsumerStartedEvent,ConsumerFailedToStartEvent,ConsumerStoppedEvent,ConsumerRetryAuthSuccessfulEvent和ContainerStoppedEvent事件具有以下属性:
-
source:发布事件的侦听器容器实例。 -
container:侦听器容器或父侦听器容器(如果源容器是子容器)。
所有容器(无论是子容器还是父容器)发布ContainerStoppedEvent.
对于父容器,source 和 container 属性相同。
此外,ConsumerStoppedEvent具有以下附加属性:
-
reason:-
NORMAL- 消费者正常停止 (容器已停止)。 -
ERROR-一个java.lang.Error被抛出。 -
FENCED- 事务性生产者被隔离,并且stopContainerWhenFencedcontainer 属性为true. -
AUTH-一AuthenticationException或AuthorizationException被抛出,并且authExceptionRetryInterval未配置。 -
NO_OFFSET- 分区没有偏移量,并且auto.offset.resetpolicy 为none.
-
在出现此类情况后,您可以使用此事件重新启动容器:
if (event.getReason.equals(Reason.FENCED)) {
event.getSource(MessageListenerContainer.class).start();
}
检测空闲和无响应的使用者
虽然效率很高,但异步使用者的一个问题是检测它们何时处于空闲状态。 如果在一段时间内没有消息到达,您可能需要采取一些措施。
您可以配置侦听器容器以发布ListenerContainerIdleEvent当一段时间过去了,没有消息传递时。
当容器处于空闲状态时,每idleEventInterval毫秒。
要配置此功能,请将idleEventInterval在容器上。
以下示例显示了如何执行此作:
@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
...
containerProps.setIdleEventInterval(60000L);
...
KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
return container;
}
以下示例演示如何设置idleEventInterval对于@KafkaListener:
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.getContainerProperties().setIdleEventInterval(60000L);
...
return factory;
}
在上述每种情况下,当容器处于空闲状态时,每分钟发布一次事件。
如果出于某种原因,消费者poll()方法未退出,则不会收到任何消息,并且无法生成空闲事件(这是早期版本的kafka-clients当无法访问代理时)。
在这种情况下,容器会发布一个NonResponsiveConsumerEvent如果轮询未返回3x这pollTimeout财产。
默认情况下,此检查在每个容器中每 30 秒执行一次。
您可以通过设置monitorInterval(默认为 30 秒)和noPollThreshold(默认为 3.0)属性中的ContainerProperties配置 Listener 容器时。
这noPollThreshold应大于1.0以避免由于争用条件而获得虚假事件。
接收此类事件可让您停止容器,从而唤醒使用者,使其可以停止。
从版本 2.6.2 开始,如果容器发布了ListenerContainerIdleEvent,它将发布一个ListenerContainerNoLongerIdleEvent当随后收到记录时。
事件消耗
您可以通过实现ApplicationListener— 一般侦听器或缩小范围以仅接收此特定事件的侦听器。
您还可以使用@EventListener,引入于 Spring Framework 4.2 中。
下一个示例结合了@KafkaListener和@EventListener转换为单个类。
您应该了解应用程序侦听器会获取所有容器的事件,因此,如果您想根据哪个容器处于空闲状态来执行特定作,则可能需要检查侦听器 ID。
您还可以使用@EventListener的condition为此目的。
有关事件属性的信息,请参阅应用程序事件。
该事件通常在使用者线程上发布,因此可以安全地与Consumer对象。
以下示例同时使用@KafkaListener和@EventListener:
public class Listener {
@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
...
}
}
事件侦听器查看所有容器的事件。
因此,在前面的示例中,我们根据侦听器 ID 缩小接收的事件范围。
由于为@KafkaListener支持并发,实际的容器命名为id-n其中,n是每个实例的唯一值,以支持并发。
这就是我们使用startsWith在条件。 |
如果您希望使用 idle 事件来停止 lister 容器,则不应调用container.stop()在调用侦听器的线程上。
这样做会导致延迟和不必要的日志消息。
相反,您应该将事件移交给其他线程,然后该线程可以停止容器。
此外,您不应stop()容器实例(如果它是子容器)。
您应该改为停止并发容器。 |
空闲时的当前位置
注意,当检测到空闲时,可以通过实现ConsumerSeekAware在你的侦听器中。
看onIdleContainer()在 seek.