|
对于最新稳定版本,请使用Spring for Apache Kafka 4.0.4! |
应用程序事件
以下由监听器容器发布并由其消费者监听的 Spring 应用程序事件:
-
ConsumerStartingEvent: 在消费者线程首次启动时发布,它开始轮询之前。 -
ConsumerStartedEvent: 在消费者即将开始轮询时发布。 -
ConsumerFailedToStartEvent: 在consumerStartTimeout容器属性内未发布ConsumerStartingEvent时触发发布。 该事件可能表示所配置的任务执行器线程数不足以支持其使用的容器及其并发性。 当发生此条件时还会记录错误消息。 -
ListenerContainerIdleEvent: 在未收到idleEventInterval条消息时触发发布(如果已配置)。 -
ListenerContainerNoLongerIdleEvent: 在之前发布过一个ListenerContainerIdleEvent之后消费记录时发布。 -
ListenerContainerPartitionIdleEvent: 在从该分区未收到消息的持续时间为idlePartitionEventInterval(如果已配置)时发布。 -
ListenerContainerPartitionNoLongerIdleEvent: 在从之前发布过ListenerContainerPartitionIdleEvent的分区消费记录时发布。 -
NonResponsiveConsumerEvent: 在消费者似乎在poll方法中被阻塞时发布。 "} -
ConsumerPartitionPausedEvent: 在分区暂停时由每个消费者发布。 -
ConsumerPartitionResumedEvent: 每个消费者在分区恢复时发布。 -
ConsumerPausedEvent: 每个消费者在容器暂停时发布。 -
ConsumerResumedEvent: 每个消费者在容器恢复时发布。 -
ConsumerStoppingEvent: 在消费者停止之前由每个消费者发布。 -
ConsumerStoppedEvent: 在消费者关闭之后发布。 查看 线程安全性。 -
ConsumerRetryAuthEvent: 在消费者的身份验证或授权失败且正在重试时发布。 -
ConsumerRetryAuthSuccessfulEvent: 在认证或授权重新尝试成功时发布。只能在之前出现过一个ConsumerRetryAuthEvent的情况下发生。 -
ContainerStoppedEvent: 在所有消费者都停止时发布。 -
ConcurrentContainerStoppedEvent: 在ConcurrentMessageListenerContainer停止时发布。
默认情况下,应用程序上下文的事件多播器会在调用线程上触发事件监听器。
如果你将多播器更改为使用异步执行器,那么在事件包含对消费者的引用时,不要调用任何Consumer方法。 |
代码 ListenerContainerIdleEvent 具有以下属性:
-
source: 事件的发布者监听器容器实例。 -
container: 侦听器容器或其父侦听器容器,如果源容器是子容器。 -
id: 监听器ID(或容器bean名称)。 -
idleTime: 容器在发布该事件时已空闲的时间。 -
topicPartitions: 生成事件时容器被分配的主题和分区。 -
consumer: 对 KafkaConsumer对象的引用。 例如,如果消费者之前调用了pause()方法,它可以在事件收到时进行resume()处理。 -
paused: 容器是否当前处于暂停状态。 请参阅 暂停和恢复监听器容器 以获取更多信息。
The ListenerContainerNoLongerIdleEvent 具有相同的属性,除了 idleTime 和 paused。
代码 ListenerContainerPartitionIdleEvent 具有以下属性:
-
source: 事件的发布者监听器容器实例。 -
container: 侦听器容器或其父侦听器容器,如果源容器是子容器。 -
id: 监听器ID(或容器bean名称)。 -
idleTime: 事件发布时,时间分区的消费处于空闲状态。 -
topicPartition: 触发事件的主题和分区。 -
consumer: 对 KafkaConsumer对象的引用。 例如,如果消费者之前调用了pause()方法,它可以在事件收到时进行resume()处理。 -
paused: 该分区的消费者当前是否暂停消费。 请参阅暂停和恢复监听容器以获取更多信息。
The ListenerContainerPartitionNoLongerIdleEvent 具有相同的属性,除了 idleTime 和 paused。
代码 NonResponsiveConsumerEvent 具有以下属性:
-
source: 事件的发布者监听器容器实例。 -
container: 侦听器容器或其父侦听器容器,如果源容器是子容器。 -
id: 监听器ID(或容器bean名称)。 -
timeSinceLastPoll: 容器上一次调用poll()之前的时刻。 -
topicPartitions: 生成事件时容器被分配的主题和分区。 -
consumer: 对 KafkaConsumer对象的引用。 例如,如果消费者之前调用了pause()方法,它可以在事件收到时进行resume()处理。 -
paused: 容器是否当前处于暂停状态。 请参阅 暂停和恢复监听器容器 以获取更多信息。
事件 ConsumerPausedEvent、ConsumerResumedEvent 和 ConsumerStopping 具有以下属性:
-
source: 事件的发布者监听器容器实例。 -
container: 侦听器容器或其父侦听器容器,如果源容器是子容器。 -
partitions: 涉及的TopicPartition实例。
事件ConsumerPartitionPausedEvent和ConsumerPartitionResumedEvent具有以下属性:
-
source: 事件的发布者监听器容器实例。 -
container: 侦听器容器或其父侦听器容器,如果源容器是子容器。 -
partition: Spring框架涉及的TopicPartition实例。
事件ConsumerRetryAuthEvent具有以下属性:
-
source: 事件的发布者监听器容器实例。 -
container: 侦听器容器或其父侦听器容器,如果源容器是子容器。 -
reason:-
AUTHENTICATION- 此事件因身份验证异常而发布。 -
AUTHORIZATION- 该事件因授权异常而发布。
-
0、1、2、3、4 和 5 事件具有以下属性:
-
source: 事件的发布者监听器容器实例。 -
container: 侦听器容器或其父侦听器容器,如果源容器是子容器。
所有容器(无论是子容器还是父容器)都发布 ContainerStoppedEvent。 对于一个父容器,源属性和容器属性是相同的。
此外,ConsumerStoppedEvent还具有以下附加属性:
-
reason:-
NORMAL- 消费者正常停止(容器已停止)。 -
ERROR- 当抛出java.lang.Error时。 -
FENCED- 事务性生产者被封存,stopContainerWhenFenced容器属性为true。 -
AUTH- 抛出了AuthenticationException或AuthorizationException,且未配置authExceptionRetryInterval。 -
NO_OFFSET- 分区没有偏移量,且auto.offset.reset策略是none。
-
在发生这种情况后,您可以使用此事件重启容器:
if (event.getReason().equals(Reason.FENCED)) {
event.getSource(MessageListenerContainer.class).start();
}
检测空闲和无响应的消费者
虽然异步消费者效率很高,但一个问题在于检测它们何时处于空闲状态。如果您希望在一段时间内没有收到消息时采取某些操作,那么这个问题就显得尤为重要。
您可以配置侦听器容器,当一段时间内没有消息传递时发布一个ListenerContainerIdleEvent。在容器空闲期间,每隔idleEventInterval毫秒就会发布一个事件。
要配置此功能,请在容器上设置idleEventInterval。以下示例显示了如何操作:
@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
...
containerProps.setIdleEventInterval(60000L);
...
KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
return container;
}
以下示例显示了如何为@KafkaListener设置idleEventInterval:
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.getContainerProperties().setIdleEventInterval(60000L);
...
return factory;
}
在这些情况下,每当容器空闲时,每分钟都会发布一次事件。
如果由于某种原因,消费者poll()方法没有退出,则不会收到任何消息,并且无法生成空闲事件(这是早期版本的kafka-clients中的一个问题,当时代理不可达)。在这种情况下,如果轮询在3x内未返回NonResponsiveConsumerEvent,则容器会发布pollTimeout属性。默认情况下,每个容器每30秒执行一次此检查。可以通过设置monitorInterval(默认为30秒)和noPollThreshold(默认为3.0)属性来修改此行为。ContainerProperties配置监听器容器时使用。为了防止因竞争条件而产生虚假事件,应确保noPollThreshold大于1.0。接收到此类事件后可以停止容器,从而唤醒消费者以便其停止。
从版本 2.6.2 开始,如果容器已发布ListenerContainerIdleEvent,则在接收到后续记录时会发布ListenerContainerNoLongerIdleEvent。
事件消费
通过实现ApplicationListener,您可以捕获这些事件——无论是通用监听器还是仅接收此特定事件的监听器。
您还可以使用@EventListener,它是在Spring Framework 4.2中引入的。
下一个示例将@KafkaListener和@EventListener合并到一个类中。
你应该明白,应用程序监听器会获取所有容器的事件,因此如果你想要根据哪个容器处于空闲状态来采取特定操作,则可能需要检查监听器ID。
你也可以使用@EventListener的condition来实现此目的。
有关事件属性的信息,请参阅应用程序事件。
事件通常在消费者线程上发布,因此可以安全地与Consumer对象进行交互。
以下示例同时使用@KafkaListener和@EventListener:
public class Listener {
@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
...
}
}
| 事件监听器会看到所有容器的事件。 因此,在前面的示例中,我们根据监听器 ID 缩小了接收到的事件范围。 由于为 这就是为什么我们在条件中使用 |
如果您希望使用空闲事件来停止监听器容器,您不应该在调用监听器的线程上调用 container.stop()。这样做会导致延迟和不必要的日志消息。相反,您应该将事件交给另一个线程,以便它可以停止容器。 此外,如果容器实例是子容器,则不应 stop()它。您应该停止并发容器。 |
空闲时的位置
请注意,当检测到空闲时,您可以通过在监听器中实现ConsumerSeekAware来获取当前位置。
参见查找中的onIdleContainer()。