|
对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.6! |
监测
监视侦听器性能
从版本 2.3 开始,侦听器容器将自动创建和更新 MicrometerTimers 表示侦听器,如果Micrometer,并且单个MeterRegistry存在于应用程序上下文中。
可以通过设置ContainerProperty的micrometerEnabled自false.
维护两个计时器 - 一个用于成功调用侦听器,另一个用于失败。
计时器被命名为spring.kafka.listener并具有以下标签:
-
name:(容器 Bean 名称) -
result:success或failure -
exception:none或ListenerExecutionFailedException
您可以使用ContainerProperties的micrometerTags财产。
从版本 2.9.8、3.0.6 开始,您可以在ContainerProperties的micrometerTagsProvider;该函数接收ConsumerRecord<?, ?>并返回可以基于该记录的标签,并与micrometerTags.
使用并发容器,将为每个线程创建计时器,并且name标签的后缀为-n其中 n 是0自concurrency-1. |
监控 KafkaTemplate 性能
从版本 2.5 开始,模板将自动创建并更新 MicrometerTimers 用于发送作,如果Micrometer,并且单个MeterRegistry存在于应用程序上下文中。
可以通过设置模板的micrometerEnabledproperty 设置为false.
维护两个计时器 - 一个用于成功调用侦听器,另一个用于失败。
计时器被命名为spring.kafka.template并具有以下标签:
-
name:(模板 Bean 名称) -
result:success或failure -
exception:none或失败的异常类名称
您可以使用模板的micrometerTags财产。
从版本 2.9.8、3.0.6 开始,您可以提供KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>)财产;该函数接收ProducerRecord<?, ?>并返回可以基于该记录的标签,并与micrometerTags.
Micrometer 原生指标
从版本 2.5 开始,框架提供了 Factory Listeners 来管理 MicrometerKafkaClientMetrics实例。
要启用此功能,只需将侦听器添加到您的 producer 和 consumer 工厂:
@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
Map<String, Object> configs = consumerConfigs();
...
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
...
cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return cf;
}
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return pf;
}
消费者/生产者id传递给侦听器的 Tag 将添加到带有 Tag Name 的计量器标签中spring.id.
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
.tag("customTag", "customTagValue")
.tag("spring.id", "myProducerFactory.myClientId-1")
.functionCounter()
.count();
为StreamsBuilderFactoryBean- 请参阅 KafkaStreams Micrometer 支持。
千分尺观察
从 3.0 版本开始,现在支持使用 Micrometer 进行观察,因为KafkaTemplate和侦听器容器。
设置observationEnabled自true在KafkaTemplate和ContainerProperties使观察成为可能;这将禁用 Micrometer Timers,因为计时器现在将随每个观测一起管理。
| 千分尺观测不支持批量侦听器;这将启用 Micrometer Timers |
有关更多信息,请参阅 Micrometer Tracing 。
要向计时器/跟踪添加标签,请配置自定义KafkaTemplateObservationConvention或KafkaListenerObservationConvention分别添加到模板或侦听器容器中。
默认实现会添加bean.name标签用于模板观察项,而listener.id标记。
你可以子类化DefaultKafkaTemplateObservationConvention或DefaultKafkaListenerObservationConvention或提供全新的实现。
请参阅千分尺观测文档,了解记录的默认观测值的详细信息。
从版本 3.0.6 开始,您可以根据使用者或创建者记录中的信息向计时器和跟踪添加动态标签。
为此,请添加自定义KafkaListenerObservationConvention和/或KafkaTemplateObservationConvention添加到侦听器容器属性中,或者KafkaTemplate分别。
这record属性都包含ConsumerRecord或ProducerRecord分别。
发送方和接收方上下文remoteServiceName属性设置为 KafkaclusterId财产;这由KafkaAdmin.
如果由于某种原因 - 可能缺少管理员权限,您无法检索集群 ID,则从版本 3.1 开始,您可以设置手动clusterId在KafkaAdmin并将其注入KafkaTemplates 和侦听器容器。
当它是null(默认),管理员将调用describeClusteradmin作从 broker 中检索它。