|
这个版本仍在开发中,目前尚未被认为是稳定的。要使用最新稳定版本,请使用 Spring for Apache Kafka 4.0.4! |
监控
监控监听器性能
从版本 2.3 开始,如果类路径中检测到 Micrometer 并且应用程序上下文中存在一个 MeterRegistry,则监听器容器将自动为监听器创建和更新 Micrometer Timer。可以通过将 ContainerProperty 的 micrometerEnabled 设置为 false 来禁用计时器。
维护两个计时器——一个用于监听器调用成功,另一个用于失败。
计时器命名为 spring.kafka.listener,并具有以下标签:
-
name: (容器bean名称) -
result:successorfailure -
exception:noneorListenerExecutionFailedException
您可以使用ContainerProperties的micrometerTags属性添加额外的标签。
从版本 2.9.8、3.0.6 开始,您可以在 ContainerProperties 的 micrometerTagsProvider 中提供一个函数;该函数接收 ConsumerRecord<?, ?> 并返回基于该记录的标签,并与 micrometerTags 中的任何静态标签合并。
使用并发容器时,为每个线程创建计时器,并且在name标签后附加-n,其中n是0到concurrency-1。 |
监控 KafkaTemplate 性能
从版本 2.5 开始,如果类路径中检测到 Micrometer,并且应用程序上下文中存在一个 MeterRegistry,则模板将自动创建和更新发送操作的 Micrometer Timer。
可以通过将模板的 micrometerEnabled 属性设置为 false 来禁用计时器。
维护两个计时器——一个用于监听器调用成功,另一个用于失败。
计时器命名为 spring.kafka.template,并具有以下标签:
-
name: (模板bean名称) -
result:successorfailure -
exception:none或失败时的异常类名
您可以使用模板的micrometerTags属性添加其他标签。
从版本 2.9.8、3.0.6 开始,您可以提供一个 KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>) 属性;函数接收 ProducerRecord<?, ?> 并返回可以基于该记录的标签,并与 micrometerTags 中的任何静态标签合并。
Micrometer 原生指标
从2.5版本开始,框架提供了工厂监听器,以便在生产者和消费者被创建和关闭时管理Micrometer KafkaClientMetrics实例。
要启用此功能,请简单地向您的生产者和消费者工厂添加监听器:
@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
Map<String, Object> configs = consumerConfigs();
...
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
...
cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return cf;
}
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return pf;
}
传递给监听器的消费者/生产者 id 将使用标签名称 spring.id 添加到计数器的标签中。
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
.tag("customTag", "customTagValue")
.tag("spring.id", "myProducerFactory.myClientId-1")
.functionCounter()
.count();
类似监听器提供给StreamsBuilderFactoryBean-请参阅KafkaStreams Micrometer支持。
从版本 3.3 开始,引入了一个 KafkaMetricsSupport 抽象类来管理 io.micrometer.core.instrument.binder.kafka.KafkaMetrics 绑定到提供的 Kafka 客户端的 MeterRegistry。
该类是上述 MicrometerConsumerListener、MicrometerProducerListener 和 KafkaStreamsMicrometerListener 的超类。
但是,它可用于任何 Kafka 客户端使用场景。
需要扩展此类并调用其 bindClient() 和 unbindClient() API 将 Kafka 客户端指标与 Micrometer 收集器连接起来。
Micrometer 观测
自版本 3.0 起,现已支持使用 Micrometer 进行观测,适用于 KafkaTemplate 和监听器容器。
将 observationEnabled 到 true 设置到 KafkaTemplate 和 ContainerProperties 以启用观察;这会禁用Micrometer 计时器,因为计时器现在将与每次观察一起进行管理。
| Micrometer 观察不支持批处理侦听器;这将启用 Micrometer 计时器 |
有关更多信息,请参阅 Micrometer 追踪。
要为计时器/跟踪添加标签,请分别在模板或监听器容器中配置自定义 KafkaTemplateObservationConvention 或 KafkaListenerObservationConvention。
默认实现为模板观察添加 bean.name 标签,为容器添加 listener.id 标签。
您可以选择继承 DefaultKafkaTemplateObservationConvention 或 DefaultKafkaListenerObservationConvention,或者提供完全新的实现。
有关记录的默认观察信息的详细信息,请参阅Micrometer 观察文档。
从版本 3.0.6 开始,您可以根据消费者或生产者记录中的信息向计时器和跟踪中添加动态标签。
为此,请分别将自定义 KafkaListenerObservationConvention 和/或 KafkaTemplateObservationConvention 添加到监听器容器属性或 KafkaTemplate 中。
在两种观察上下文中,record 属性都包含 ConsumerRecord 或 ProducerRecord。
发送方和接收方上下文remoteServiceName属性被设置为Kafka clusterId属性;这是通过KafkaAdmin检索的。
如果由于某种原因(可能是缺乏管理员权限),你无法检索集群ID,从版本3.1开始,你可以手动在clusterId上设置一个KafkaAdmin并将其注入到KafkaTemplate和监听器容器中。
当它是null(默认)时,管理员将调用describeCluster管理操作从中检索。
批量监听器观察
使用批处理监听器时,默认情况下不会创建任何观察结果,即使存在ObservationRegistry。
这是因为观察结果的作用域与线程相关联,而在批处理监听器中,并没有一个一对一的映射关系将观察结果和记录关联起来。
要在批处理侦听器中启用每个记录的观察,请将容器工厂属性recordObservationsInBatch设置为true。
@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
factory.getContainerProperties().setRecordObservationsInBatch(true);
return factory;
}
当此属性为 true 时,批处理中的每条记录都会创建一个观察值,但该观察值不会传递给监听器方法。
应用程序可以使用观察上下文来跟踪批处理中每条记录的处理情况。
这样即使在批处理环境中,您也可以了解每条记录的处理情况。