该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0!spring-doc.cadn.net.cn

响应式卡夫卡活字器中的可观测性

本节介绍了响应式Kafka结合器中如何实现基于微米的可观测性。spring-doc.cadn.net.cn

制作人装订

生产者结合内置了可观察性支持。 要启用它,请设置以下属性:spring-doc.cadn.net.cn

spring.cloud.stream.kafka.binder.enable-observation

当该属性被设置为true你可以观察记录的发布过程。 两者均通过流桥以及正规提供商<?>可以观察到Beans。spring-doc.cadn.net.cn

消费者绑定

在消费者端实现可观测性比生产者端更复杂。 消费者装订有两个起点:spring-doc.cadn.net.cn

  1. 一个通过生产者装订发布数据的主题spring-doc.cadn.net.cn

  2. 这是一个数据产出于Spring Cloud Stream之外的主题spring-doc.cadn.net.cn

在第一种情况下,应用程序理想情况下希望将可观测性头部传递给消费者的入站。 第二种情况下,如果没有开始上游观测,则会重新开始观测。spring-doc.cadn.net.cn

示例:具有可观测性的函数

@Bean
Function<Flux<ReceiverRecord<byte[], byte[]>>, Flux<Message<String>>> receive(ObservationRegistry observationRegistry) {

	return s -> s.flatMap(record -> {
		Observation receiverObservation = KafkaReceiverObservation.RECEIVER_OBSERVATION.start(
		null,
		KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
		() -> new KafkaRecordReceiverContext(record, "user.receiver", "localhost:9092"),
		observationRegistry
		);

		return Mono.deferContextual(contextView -> Mono.just(record)
			.map(rec -> new String(rec.value()).toLowerCase())
			.map(rec -> MessageBuilder.withPayload(rec)
				.setHeader(IntegrationMessageHeaderAccessor.REACTOR_CONTEXT, contextView)
				.build()))
			.doOnTerminate(receiverObservation::stop)
			.doOnError(receiverObservation::error)
			.contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, receiverObservation));
    });
}

在这个例子中:spring-doc.cadn.net.cn

  1. 当收到记录时,会创建一个观测值。spring-doc.cadn.net.cn

  2. 如果有上游观测,它将成为卡夫卡记录接收器上下文.spring-doc.cadn.net.cn

  3. 一个是在上下文延迟的情况下创建的。spring-doc.cadn.net.cn

  4. 地图作被调用时,上下文可以访问正确的观察。spring-doc.cadn.net.cn

  5. 结果平面地图运算返回绑定,表示为Flux<Message<?>>.spring-doc.cadn.net.cn

  6. 出站记录的可观测头部与输入绑定相同。spring-doc.cadn.net.cn

示例:具有可观察性的消费者

@Bean
Consumer<Flux<ReceiverRecord<?, String>>> receive(ObservationRegistry observationRegistry, @Value("${spring.kafka.bootstrap-servers}") String bootstrap) {
	return f -> f.doOnNext(record -> KafkaReceiverObservation.RECEIVER_OBSERVATION.observation(
			null,
			KafkaReceiverObservation.DefaultKafkaReceiverObservationConvention.INSTANCE,
			() -> new KafkaRecordReceiverContext(record, "user.receiver", bootstrap),
			observationRegistry).observe(() -> System.out.println(record)))
		.subscribe();
}

在这种情况下:spring-doc.cadn.net.cn

  1. 由于没有输出绑定,doOnNext(点点下一个)用于通量而不是平面地图.spring-doc.cadn.net.cn

  2. 直接呼唤观察开始观察,完成后正确关闭。spring-doc.cadn.net.cn