|
对于最新稳定版本,请使用 spring-cloud-stream 5.0.0! |
使用春云侦探进行追踪
当 Spring Cloud Sleuth 处于基于 Spring Cloud Stream Kafka Streams 绑定器的类路径上时,其消费者和生产者都会自动被安装跟踪信息。然而,为了追踪任何应用特定的作,用户代码需要显式进行这些作的检测。这可以通过注入KafkaStreams追踪在应用中调用 Spring Cloud Sleuth 的 bean,然后通过这个注入的 bean 调用各种 Kafka Streams作。以下是一些使用该作的示例。
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> clicks(KafkaStreamsTracing kafkaStreamsTracing) {
return (userClicksStream, userRegionsTable) -> (userClicksStream
.transformValues(kafkaStreamsTracing.peek("span-1", (key, value) -> LOG.info("key/value: " + key + "/" + value)))
.leftJoin(userRegionsTable, (clicks, region) -> new RegionWithClicks(region == null ?
"UNKNOWN" : region, clicks),
Joined.with(Serdes.String(), Serdes.Long(), null))
.transform(kafkaStreamsTracing.map("span-2", (key, value) -> {
LOG.info("Click Info: " + value.getRegion() + "/" + value.getClicks());
return new KeyValue<>(value.getRegion(),
value.getClicks());
}))
.groupByKey(Grouped.with(Serdes.String(), Serdes.Long()))
.reduce(Long::sum, Materialized.as(CLICK_UPDATES))
.toStream());
}
在上面的例子中,有两个地方它添加了显式的追踪工具。首先,我们记录来自输入的密钥/值信息KStream. 当这些信息被记录时,相关的跨度和轨距ID也会被记录,以便监控系统跟踪它们,并与相同的跨度ID进行关联。其次,当我们调用地图作,而不是直接调用KStream类,我们把它包裹在一个变换作,然后调用地图从KafkaStreams追踪. 在这种情况下,日志消息也会包含span ID和trace ID。
这里还有另一个例子,我们使用低级 transformer API 访问各种 Kafka Streams 头部。当 spring-cloud-sleuth 在类路径上时,所有追踪头部也都可以这样访问。
@Bean
public Function<KStream<String, String>, KStream<String, String>> process(KafkaStreamsTracing kafkaStreamsTracing) {
return input -> input.transform(kafkaStreamsTracing.transformer(
"transformer-1",
() -> new Transformer<String, String, KeyValue<String, String>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public KeyValue<String, String> transform(String key, String value) {
LOG.info("Headers: " + this.context.headers());
LOG.info("K/V:" + key + "/" + value);
// More transformations, business logic execution, etc. go here.
return KeyValue.pair(key, value);
}
@Override
public void close() {
}
}));
}