对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9spring-doc.cadn.net.cn

监测

监控侦听器性能

从 2.3 版本开始,监听器容器将自动创建和更新 MicrometerTimers 表示侦听器,如果Micrometer在类路径上检测到,并且单个MeterRegistry存在于应用程序上下文中。 可以通过设置ContainerPropertymicrometerEnabledfalse.spring-doc.cadn.net.cn

维护两个计时器 - 一个用于成功调用侦听器,一个用于失败。spring-doc.cadn.net.cn

计时器被命名为spring.kafka.listener并具有以下标签:spring-doc.cadn.net.cn

您可以使用ContainerPropertiesmicrometerTags财产。spring-doc.cadn.net.cn

从 2.9.8、3.0.6 版本开始,您可以在ContainerPropertiesmicrometerTagsProvider;函数接收ConsumerRecord<?, ?>并返回可以基于该记录的标签,并与micrometerTags.spring-doc.cadn.net.cn

使用并发容器,为每个线程创建计时器,并且name标签后缀为-n其中 n 是0concurrency-1.

监控 KafkaTemplate 性能

从 2.5 版开始,模板将自动创建和更新 MicrometerTimer发送作的 +++s,如果Micrometer在类路径上检测到,并且单个MeterRegistry存在于应用程序上下文中。 可以通过设置模板的micrometerEnabled属性设置为false.spring-doc.cadn.net.cn

维护两个计时器 - 一个用于成功调用侦听器,一个用于失败。spring-doc.cadn.net.cn

计时器被命名为spring.kafka.template并具有以下标签:spring-doc.cadn.net.cn

您可以使用模板的micrometerTags财产。spring-doc.cadn.net.cn

从版本 2.9.8、3.0.6 开始,您可以提供KafkaTemplate.setMicrometerTagsProvider(Function<ProducerRecord<?, ?>, Map<String, String>>)财产;函数接收ProducerRecord<?, ?>并返回可以基于该记录的标签,并与micrometerTags.spring-doc.cadn.net.cn

微米原生指标

从 2.5 版本开始,该框架提供了工厂监听器来管理千分尺KafkaClientMetrics每当创建和关闭生产者和消费者时,实例。spring-doc.cadn.net.cn

要启用此功能,只需将监听器添加到生产者和消费者工厂:spring-doc.cadn.net.cn

@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
    Map<String, Object> configs = consumerConfigs();
    ...
    DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
    ...
    cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return cf;
}

@Bean
public ProducerFactory<String, String> myProducerFactory() {
    Map<String, Object> configs = producerConfigs();
    configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
    ...
    DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
    ...
    pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
            Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
    ...
    return pf;
}

消费者/生产者id传递给侦听器的标记将添加到带有标记名称的仪表的标记中spring.id.spring-doc.cadn.net.cn

获取 Kafka 指标之一的示例
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
                .tag("customTag", "customTagValue")
                .tag("spring.id", "myProducerFactory.myClientId-1")
                .functionCounter()
                .count();

StreamsBuilderFactoryBean- 请参阅 KafkaStreams 千分尺支持spring-doc.cadn.net.cn

千分尺观察

从 3.0 版开始,现在支持使用千分尺进行观察,用于KafkaTemplate和侦听器容器。spring-doc.cadn.net.cn

设置observationEnabledtrueKafkaTemplateContainerProperties使观察能够;这将禁用千分尺计时器,因为计时器现在将通过每次观测进行管理。spring-doc.cadn.net.cn

千分尺观察不支持批量监听器;这将启用千分尺定时器

有关更多信息,请参阅千分尺追踪spring-doc.cadn.net.cn

要向计时器/跟踪添加标记,请配置自定义KafkaTemplateObservationConventionKafkaListenerObservationConvention分别添加到模板或侦听器容器。spring-doc.cadn.net.cn

默认实现将bean.name标签用于模板观察,以及listener.id标签。spring-doc.cadn.net.cn

您可以将子类化DefaultKafkaTemplateObservationConventionDefaultKafkaListenerObservationConvention或提供全新的实现。spring-doc.cadn.net.cn

有关记录的缺省观测的详细信息,请参阅千分尺观测文档spring-doc.cadn.net.cn

从 V3.0.6 开始,您可以根据使用者或生产者记录中的信息向计时器和跟踪添加动态标记。 为此,请添加自定义KafkaListenerObservationConvention和/或KafkaTemplateObservationConvention到 listener 容器属性或KafkaTemplate分别。 这record属性包含ConsumerRecordProducerRecord分别。spring-doc.cadn.net.cn

发送方和接收方上下文的remoteServiceName属性设置为 KafkaclusterId财产;这是由KafkaAdmin. 如果由于某种原因 - 可能缺乏管理员权限,您无法检索集群 ID,从 3.1 版开始,您可以设置一个手册clusterIdKafkaAdmin并将其注入KafkaTemplates 和侦听器容器。 当它是null(默认),管理员将调用describeClusteradmin作以从代理中检索它。spring-doc.cadn.net.cn