响应式卡夫卡活字器中的可观测性
本节介绍了响应式Kafka结合器中如何实现基于微米的可观测性。
制作人装订
生产者结合内置了可观察性支持。 要启用它,请设置以下属性:
spring.cloud.stream.kafka.binder.enable-observation
当该属性被设置为true你可以观察记录的发布过程。
两者均通过流桥以及正规提供商<?>可以观察到Beans。
消费者绑定
在消费者端实现可观测性比生产者端更复杂。 消费者装订有两个起点:
-
一个通过生产者装订发布数据的主题
-
这是一个数据产出于Spring Cloud Stream之外的主题
在第一种情况下,应用程序理想情况下希望将可观测性头部传递给消费者的入站。 第二种情况下,如果没有开始上游观测,则会重新开始观测。
示例:具有可观测性的函数
@Bean
Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<Message<String>>> receive(ObservationRegistry observationRegistry) {
return s -> s.flatMap(record -> {
Observation receiverObservation = KafkaReceiverObservation.RECEIVER_OBSERVATION.start(
null,
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(record, "user.receiver", "localhost:9092"),
observationRegistry
);
return Mono.deferContextual(contextView -> Mono.just(record)
.map(rec -> new String(rec.value()).toLowerCase())
.map(rec -> MessageBuilder.withPayload(rec)
.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, contextView)
.build()))
.doOnTerminate(receiverObservation::stop)
.doOnError(receiverObservation::error)
.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
});
}
在这个例子中:
-
当收到记录时,会创建一个观测值。
-
如果有上游观测,它将成为
卡夫卡记录接收器上下文. -
一个
单是在上下文延迟的情况下创建的。 -
当
地图作被调用时,上下文可以访问正确的观察。 -
结果
平面地图运算返回绑定,表示为Flux<Message<?>>. -
出站记录的可观测头部与输入绑定相同。
示例:具有可观察性的消费者
@Bean
Consumer<Flux<ReceiverRecord<?, String>>> receive(ObservationRegistry observationRegistry, @Value("${spring.kafka.bootstrap-servers}") String bootstrap) {
return f -> f.doOnNext(record -> KafkaReceiverObservation.RECEIVER_OBSERVATION.observation(
null,
KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(record, "user.receiver", bootstrap),
observationRegistry).observe(() -> System.out.println(record)))
.subscribe();
}
在这种情况下:
-
由于没有输出绑定,
doOnNext(点点下一个)用于通量而不是平面地图. -
直接呼唤
观察开始观察,完成后正确关闭。