|
此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9! |
应用程序事件
以下 Spring 应用程序事件由侦听器容器及其使用者发布:
-
ConsumerStartingEvent:在使用者线程首次启动时发布,在开始轮询之前。 -
ConsumerStartedEvent:在消费者即将开始轮询时发布。 -
ConsumerFailedToStartEvent:如果没有,则已发布ConsumerStartingEvent在consumerStartTimeoutcontainer 属性。 此事件可能表示配置的任务执行器没有足够的线程来支持使用它的容器及其并发性。 发生这种情况时,还会记录一条错误消息。 -
ListenerContainerIdleEvent:在未收到任何消息时发布idleEventInterval(如果已配置)。 -
ListenerContainerNoLongerIdleEvent:在之前发布记录后使用记录时发布ListenerContainerIdleEvent. -
ListenerContainerPartitionIdleEvent:当未从该分区收到消息时发布idlePartitionEventInterval(如果已配置)。 -
ListenerContainerPartitionNoLongerIdleEvent:当从先前发布过ListenerContainerPartitionIdleEvent. -
NonResponsiveConsumerEvent:当消费者似乎在poll方法。 -
ConsumerPartitionPausedEvent:在分区暂停时由每个使用者发布。 -
ConsumerPartitionResumedEvent:由每个使用者在恢复分区时发布。 -
ConsumerPausedEvent:在容器暂停时由每个使用者发布。 -
ConsumerResumedEvent:在容器恢复时由每个使用者发布。 -
ConsumerStoppingEvent:由每个消费者在停止前发布。 -
ConsumerStoppedEvent:在使用者关闭后发布。 请参阅线程安全。 -
ConsumerRetryAuthEvent:在使用者的身份验证或授权失败并正在重试时发布。 -
ConsumerRetryAuthSuccessfulEvent:成功重试身份验证或授权时发布。只有当ConsumerRetryAuthEvent以前。 -
ContainerStoppedEvent:当所有消费者都停止时发布。 -
ConcurrentContainerStoppedEvent:在ConcurrentMessageListenerContainer已经停止了。
默认情况下,应用程序上下文的事件多播器在调用线程上调用事件侦听器。
如果将多播程序更改为使用异步执行器,则不得调用任何Consumer当事件包含对消费者的引用时的方法。 |
这ListenerContainerIdleEvent具有以下属性:
-
source:发布事件的侦听器容器实例。 -
container:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
id:侦听器 ID(或容器 Bean 名称)。 -
idleTime:发布事件时容器处于空闲状态的时间。 -
topicPartitions:在生成事件时为容器分配的主题和分区。 -
consumer:对卡夫卡的引用Consumer对象。 例如,如果消费者的pause()方法之前调用过,它可以resume()收到事件时。 -
paused:容器当前是否已暂停。 有关更多信息,请参阅暂停和恢复侦听器容器。
这ListenerContainerNoLongerIdleEvent具有相同的属性,但idleTime和paused.
这ListenerContainerPartitionIdleEvent具有以下属性:
-
source:发布事件的侦听器容器实例。 -
container:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
id:侦听器 ID(或容器 Bean 名称)。 -
idleTime:发布事件时,时间分区消耗处于空闲状态。 -
topicPartition:触发事件的主题和分区。 -
consumer:对卡夫卡的引用Consumer对象。 例如,如果消费者的pause()方法之前调用过,它可以resume()收到事件时。 -
paused:当前是否为该使用者暂停了该分区消耗。 有关更多信息,请参阅暂停和恢复侦听器容器。
这ListenerContainerPartitionNoLongerIdleEvent具有相同的属性,但idleTime和paused.
这NonResponsiveConsumerEvent具有以下属性:
-
source:发布事件的侦听器容器实例。 -
container:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
id:侦听器 ID(或容器 Bean 名称)。 -
timeSinceLastPoll:容器上次调用之前的时间poll(). -
topicPartitions:在生成事件时为容器分配的主题和分区。 -
consumer:对卡夫卡的引用Consumer对象。 例如,如果消费者的pause()方法之前调用过,它可以resume()收到事件时。 -
paused:容器当前是否已暂停。 有关更多信息,请参阅暂停和恢复侦听器容器。
这ConsumerPausedEvent,ConsumerResumedEvent和ConsumerStopping事件具有以下属性:
-
source:发布事件的侦听器容器实例。 -
container:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
partitions:这TopicPartition涉及的实例。
这ConsumerPartitionPausedEvent,ConsumerPartitionResumedEvent事件具有以下属性:
-
source:发布事件的侦听器容器实例。 -
container:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
partition:这TopicPartition涉及的实例。
这ConsumerRetryAuthEventevent 具有以下属性:
-
source:发布事件的侦听器容器实例。 -
container:侦听器容器或父侦听器容器(如果源容器是子容器)。 -
reason:-
AUTHENTICATION- 由于身份验证异常,事件已发布。 -
AUTHORIZATION- 由于授权异常,事件已发布。
-
这ConsumerStartingEvent,ConsumerStartedEvent,ConsumerFailedToStartEvent,ConsumerStoppedEvent,ConsumerRetryAuthSuccessfulEvent和ContainerStoppedEvent事件具有以下属性:
-
source:发布事件的侦听器容器实例。 -
container:侦听器容器或父侦听器容器(如果源容器是子容器)。
所有容器(无论是子容器还是父容器)发布ContainerStoppedEvent.
对于父容器,源和容器属性相同。
此外,ConsumerStoppedEvent具有以下附加属性:
-
reason:-
NORMAL- 消费者正常停止(容器已停止)。 -
ERROR-一个java.lang.Error被扔了。 -
FENCED- 事务性生产者被隔离,并且stopContainerWhenFencedcontainer 属性为true. -
AUTH-一AuthenticationException或AuthorizationException被抛出,并且authExceptionRetryInterval未配置。 -
NO_OFFSET- 分区没有偏移量,并且auto.offset.reset政策是none.
-
您可以使用此事件在出现此类情况后重新启动容器:
if (event.getReason().equals(Reason.FENCED)) {
event.getSource(MessageListenerContainer.class).start();
}
检测空闲和无响应的使用者
异步使用者虽然高效,但一个问题是检测它们何时处于空闲状态。 如果一段时间内没有邮件到达,您可能需要采取一些措施。
您可以将侦听器容器配置为发布ListenerContainerIdleEvent当一段时间过去没有消息传递时。
当容器处于空余状态时,每隔一次就会发布一个事件idleEventInterval毫秒。
要配置此功能,请将idleEventInterval在容器上。
以下示例显示了如何执行此作:
@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
...
containerProps.setIdleEventInterval(60000L);
...
KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
return container;
}
以下示例演示如何设置idleEventInterval对于一个@KafkaListener:
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.getContainerProperties().setIdleEventInterval(60000L);
...
return factory;
}
在每种情况下,当容器处于空闲状态时,每分钟发布一次事件。
如果由于某种原因,消费者poll()方法不退出,没有收到任何消息,也无法生成空闲事件(这是早期版本的kafka-clients当无法联系到经纪人时)。
在这种情况下,容器会发布一个NonResponsiveConsumerEvent如果轮询未在3x这pollTimeout财产。
默认情况下,每个容器中每 30 秒执行一次此检查。
您可以通过将monitorInterval(默认 30 秒)和noPollThreshold(默认 3.0)属性ContainerProperties配置侦听器容器时。
这noPollThreshold应大于1.0以避免由于竞争条件而获得虚假事件。
接收此类事件可以停止容器,从而唤醒使用者以便它可以停止。
从 2.6.2 版开始,如果容器发布了ListenerContainerIdleEvent,它将发布一个ListenerContainerNoLongerIdleEvent随后收到记录时。
事件消耗
您可以通过实现ApplicationListener— 要么是普通侦听器,要么是缩小为仅接收此特定事件的侦听器。
您还可以使用@EventListener,在 Spring Framework 4.2 中引入。
下一个示例将@KafkaListener和@EventListener到单个类中。
您应该了解,应用程序侦听器会获取所有容器的事件,因此,如果要根据空闲的容器执行特定作,则可能需要检查侦听器 ID。
您还可以使用@EventListener的condition为此目的。
有关事件属性的信息,请参阅应用程序事件。
该事件通常发布在消费者线程上,因此可以安全地与Consumer对象。
以下示例同时使用@KafkaListener和@EventListener:
public class Listener {
@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
...
}
}
事件侦听器可以看到所有容器的事件。
因此,在前面的示例中,我们根据侦听器 ID 缩小了接收到的事件范围。
由于为@KafkaListener支持并发,实际容器命名为id-n其中n是每个实例的唯一值,以支持并发。
这就是我们使用startsWith在条件下。 |
如果您希望使用 idle 事件来停止侦听器容器,则不应调用container.stop()在调用侦听器的线程上。
这样做会导致延迟和不必要的日志消息。
相反,您应该将事件移交给可以停止容器的其他线程。
另外,你不应该stop()容器实例(如果它是子容器)。
您应该停止并发容器。 |
空闲时的当前位置
请注意,您可以通过实现ConsumerSeekAware在您的听众中。
看onIdleContainer()在寻求。