|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0! |
时间戳提取器
Kafka Streams 允许你根据各种时间戳概念控制消费者记录的处理。
默认情况下,Kafka Streams 会提取嵌入在消费者记录中的时间戳元数据。
你可以通过提供不同的方式来改变这种默认行为时间戳提取器实现按输入绑定。
以下是一些具体的实现方式。
@Bean
public Function<KStream<Long, Order>,
Function<KTable<Long, Customer>,
Function<GlobalKTable<Long, Product>, KStream<Long, Order>>>> process() {
return orderStream ->
customers ->
products -> orderStream;
}
@Bean
public TimestampExtractor timestampExtractor() {
return new WallclockTimestampExtractor();
}
然后你设置上述时间戳提取器根据消费者装订的豆子名称。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.timestampExtractorBeanName=timestampExtractor
spring.cloud.stream.kafka.streams.bindings.process-in-2.consumer.timestampExtractorBeanName=timestampExtractor"
如果你跳过设置自定义时间戳提取器的输入消费者绑定,该消费者将使用默认设置。