对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9! |
监测
监控侦听器性能
从 2.3 版本开始,监听器容器将自动创建和更新 MicrometerTimer
s 表示侦听器,如果Micrometer
在类路径上检测到,并且单个MeterRegistry
存在于应用程序上下文中。
可以通过设置ContainerProperty
的micrometerEnabled
自false
.
维护两个计时器 - 一个用于成功调用侦听器,一个用于失败。
计时器被命名为spring.kafka.listener
并具有以下标签:
-
name
:(容器 bean 名称) -
result
:success
或failure
-
exception
:none
或ListenerExecutionFailedException
您可以使用ContainerProperties
的micrometerTags
财产。
从 2.9.8、3.0.6 版本开始,您可以在ContainerProperties
的micrometerTagsProvider
;函数接收ConsumerRecord<?, ?>
并返回可以基于该记录的标签,并与micrometerTags
.
使用并发容器,为每个线程创建计时器,并且name 标签后缀为-n 其中 n 是0 自concurrency-1 . |
监控 KafkaTemplate 性能
从 2.5 版开始,模板将自动创建和更新 MicrometerTimer
发送作的 +++s,如果Micrometer
在类路径上检测到,并且单个MeterRegistry
存在于应用程序上下文中。
可以通过设置模板的micrometerEnabled
属性设置为false
.
维护两个计时器 - 一个用于成功调用侦听器,一个用于失败。
计时器被命名为spring.kafka.template
并具有以下标签:
-
name
:(模板 bean 名称) -
result
:success
或failure
-
exception
:none
或失败的异常类名称
您可以使用模板的micrometerTags
财产。
从版本 2.9.8、3.0.6 开始,您可以提供KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>)
财产;函数接收ProducerRecord<?, ?>
并返回可以基于该记录的标签,并与micrometerTags
.
微米原生指标
从 2.5 版本开始,该框架提供了工厂监听器来管理千分尺KafkaClientMetrics
每当创建和关闭生产者和消费者时,实例。
要启用此功能,只需将监听器添加到生产者和消费者工厂:
@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
Map<String, Object> configs = consumerConfigs();
...
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
...
cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return cf;
}
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return pf;
}
消费者/生产者id
传递给侦听器的标记将添加到带有标记名称的仪表的标记中spring.id
.
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
.tag("customTag", "customTagValue")
.tag("spring.id", "myProducerFactory.myClientId-1")
.functionCounter()
.count();
为StreamsBuilderFactoryBean
- 请参阅 KafkaStreams 千分尺支持。
千分尺观察
从 3.0 版开始,现在支持使用千分尺进行观察,用于KafkaTemplate
和侦听器容器。
设置observationEnabled
自true
在KafkaTemplate
和ContainerProperties
使观察能够;这将禁用千分尺计时器,因为计时器现在将通过每次观测进行管理。
千分尺观察不支持批量监听器;这将启用千分尺定时器 |
有关更多信息,请参阅千分尺追踪。
要向计时器/跟踪添加标记,请配置自定义KafkaTemplateObservationConvention
或KafkaListenerObservationConvention
分别添加到模板或侦听器容器。
默认实现将bean.name
标签用于模板观察,以及listener.id
标签。
您可以将子类化DefaultKafkaTemplateObservationConvention
或DefaultKafkaListenerObservationConvention
或提供全新的实现。
有关记录的缺省观测的详细信息,请参阅千分尺观测文档。
从 V3.0.6 开始,您可以根据使用者或生产者记录中的信息向计时器和跟踪添加动态标记。
为此,请添加自定义KafkaListenerObservationConvention
和/或KafkaTemplateObservationConvention
到 listener 容器属性或KafkaTemplate
分别。
这record
属性包含ConsumerRecord
或ProducerRecord
分别。
发送方和接收方上下文的remoteServiceName
属性设置为 KafkaclusterId
财产;这是由KafkaAdmin
.
如果由于某种原因 - 可能缺乏管理员权限,您无法检索集群 ID,从 3.1 版开始,您可以设置一个手册clusterId
在KafkaAdmin
并将其注入KafkaTemplate
s 和侦听器容器。
当它是null
(默认),管理员将调用describeCluster
admin作以从代理中检索它。