Apache Kafka Streams 支持

从版本 1.1.4 开始,Spring for Apache Kafka 提供了对 Kafka Streams 的全面支持。 要在 Spring 应用程序中使用它,kafka-streams jar 必须在 classpath 上。 它是 Spring for Apache Kafka 项目的可选依赖,并且不会通过传递依赖进行下载。spring-doc.cadn.net.cn

基础

The reference Apache Kafka Streams documentation suggests the following way of using the API:spring-doc.cadn.net.cn

// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.

StreamsBuilder builder = ...;  // when using the Kafka Streams DSL

// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;

KafkaStreams streams = new KafkaStreams(builder, config);

// Start the Kafka Streams instance
streams.start();

// Stop the Kafka Streams instance
streams.close();

我们有两个主要组件:spring-doc.cadn.net.cn

所有由单个 KStream 通过 KafkaStreams 曝光给 StreamsBuilderStreamsBuilder 实例都会在相同的时间启动和停止,即使它们有不同的逻辑。 换句话说,由一个 StreamsBuilder 定义的所有流都与单个生命周期控制绑定。 一旦 KafkaStreams 实例被 streams.close() 关闭,就无法重新启动。 相反,必须创建一个新的 KafkaStreams 实例以重新开始流处理。

Spring 管理

为简化从 Spring 应用上下文视角使用 Kafka Streams,并通过容器使用生命周期管理,Spring for Apache Kafka 引入了 StreamsBuilderFactoryBean。 这是 AbstractFactoryBean 的实现,用于将 StreamsBuilder 的单例实例作为 bean 暴露出来。 以下示例创建了这样的 bean:spring-doc.cadn.net.cn

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
从 2.2 版本开始,stream 配置现在以一个 KafkaStreamsConfiguration 对象提供,而不是一个 StreamsConfig 对象。

The StreamsBuilderFactoryBean 也实现了 SmartLifecycle 以管理内部 KafkaStreams 实例的生命周期。 类似 Kafka Streams API,你必须在启动 KafkaStreams 之前定义 KStream 实例。 同样适用于 Spring API for Kafka Streams。 因此,当你在 StreamsBuilderFactoryBean 上使用默认的 autoStartup = true 时,你必须在应用程序上下文刷新之前在 StreamsBuilder 上声明 KStream 实例。 例如,KStream 可以是一个常规的 bean 定义,而 Kafka Streams API 的使用不受影响。 以下示例展示了如何实现:spring-doc.cadn.net.cn

@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
    KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
    // Fluent KStream API
    return stream;
}

如果你希望手动控制生命周期(例如根据某些条件停止和启动),可以通过使用工厂bean(&)加上StreamsBuilderFactoryBean bean的前缀prefix)来直接引用StreamsBuilderFactoryBean bean。 由于StreamsBuilderFactoryBean使用其内部的KafkaStreams实例,因此可以安全地停止并重新启动它。 每次start()调用都会创建一个新的KafkaStreams。 如果你希望为不同的KStream实例分别控制生命周期,也可以考虑使用不同的StreamsBuilderFactoryBean实例。spring-doc.cadn.net.cn

您也可以在StreamsBuilderFactoryBean上指定KafkaStreams.StateListenerThread.UncaughtExceptionHandlerStateRestoreListener选项,这些选项会委托给内部的KafkaStreams实例。spring-doc.cadn.net.cn

另外,除了通过在 StreamsBuilderFactoryBean 上间接设置这些选项外,你还可以使用一个 KafkaStreamsCustomizer 回调接口来:spring-doc.cadn.net.cn

  1. (从 version 2.1.5) 使用 customize(KafkaStreams) 配置内部 KafkaStreams 实例spring-doc.cadn.net.cn

  2. (从 version 3.3.0) 实例化一个自定义实现的 KafkaStreams 使用 initKafkaStreams(Topology, Properties, KafkaClientSupplier)spring-doc.cadn.net.cn

注意 KafkaStreamsCustomizer 会覆盖 StreamsBuilderFactoryBean 提供的选项。spring-doc.cadn.net.cn

如果需要直接执行一些 KafkaStreams 操作,可以通过使用 StreamsBuilderFactoryBean.getKafkaStreams() 来访问内部的 KafkaStreams 实例。spring-doc.cadn.net.cn

你可以通过类型自动装配StreamsBuilderFactoryBean个bean,但你应该确保在bean定义中使用完整的类型,如下例所示:spring-doc.cadn.net.cn

@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

或者,如果你使用接口类型的 bean 定义,可以添加 @Qualifier 以通过名称进行注入。 以下示例展示了如何操作:spring-doc.cadn.net.cn

@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
    return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;

从版本 2.4.1 开始,工厂 bean 新增了一个类型为 KafkaStreamsInfrastructureCustomizerinfrastructureCustomizer 属性;这允许自定义 StreamsBuilder(例如添加一个状态存储)和/或在创建流之前自定义 Topologyspring-doc.cadn.net.cn

public interface KafkaStreamsInfrastructureCustomizer {

    void configureBuilder(StreamsBuilder builder);

    void configureTopology(Topology topology);

}

默认提供无操作实现,以避免在不需要某方法时同时实现两个方法。spring-doc.cadn.net.cn

一个 CompositeKafkaStreamsInfrastructureCustomizer 提供,用于在需要应用多个自定义器时使用。spring-doc.cadn.net.cn

Kafka Streams 微计量支持

从2.5.3版本开始,你可以配置一个KafkaStreamsMicrometerListener,以自动为工厂bean管理的KafkaStreams对象注册micrometer meters:spring-doc.cadn.net.cn

streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
        Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));

流式处理的 JSON 序列化和反序列化

对于在向topics或state stores读取或写入数据时以JSON格式进行序列化和反序列化,Spring for Apache Kafka提供了一个使用JSON的JacksonJsonSerde实现,将其委托给序列化、反序列化和消息转换中描述的JacksonJsonSerializerJacksonJsonDeserializerJacksonJsonSerde实现通过其构造函数提供相同的配置选项(目标类型或ObjectMapper)。 在以下示例中,我们使用JacksonJsonSerde来序列化和反序列化Kafka流中Cat负载(JacksonJsonSerde可以以类似方式在需要实例的地方使用):spring-doc.cadn.net.cn

stream.through(Serdes.Integer(), new JacksonJsonSerde<>(Cat.class), "cats");

在使用2.3版本之后,当程序化地构建用于生产者/消费者的序列化器/反序列化器时,您可以使用流畅API(fluent API),这简化了配置。spring-doc.cadn.net.cn

stream.through(
    new JacksonJsonSerde<>(MyKeyType.class)
        .forKeys()
        .noTypeInfo(),
    new JacksonJsonSerde<>(MyValueType.class)
        .noTypeInfo(),
    "myTypes");

使用KafkaStreamBrancher

The KafkaStreamBrancher类引入了一种在 KStream 之上构建更方便的条件分支的方式。spring-doc.cadn.net.cn

考虑以下不使用 KafkaStreamBrancher 的示例:spring-doc.cadn.net.cn

KStream<String, String>[] branches = builder.stream("source").branch(
        (key, value) -> value.contains("A"),
        (key, value) -> value.contains("B"),
        (key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");

以下示例使用 KafkaStreamBrancherspring-doc.cadn.net.cn

new KafkaStreamBrancher<String, String>()
        .branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
        .branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
        //default branch should not necessarily be defined in the end of the chain!
        .defaultBranch(ks -> ks.to("C"))
        .onTopOf(builder.stream("source"));
        //onTopOf method returns the provided stream so we can continue with method chaining

配置

要配置Kafka Streams环境,需要一个StreamsBuilderFactoryBean实例。 查看Apache Kafka的文档以了解所有可能的选项。spring-doc.cadn.net.cn

从 2.2 版本开始,stream 配置现在以一个 KafkaStreamsConfiguration 对象提供,而不是以一个 StreamsConfig 提供。

为了避免在大多数情况下编写样板代码,特别是在开发微服务时,Spring for Apache Kafka 提供了 @EnableKafkaStreams 注解,你应该将其放在 @Configuration 类上。 你所需要做的就是声明一个名为 defaultKafkaStreamsConfigKafkaStreamsConfiguration bean。 一个名为 defaultKafkaStreamsBuilderStreamsBuilderFactoryBean bean 会自动在应用上下文中声明。 你也可以声明并使用任何额外的 StreamsBuilderFactoryBean bean。 你可以通过提供一个实现 StreamsBuilderFactoryBeanConfigurer 的 bean 来对该 bean 进行额外的自定义。 如果有多个这样的 bean,它们将根据其 Ordered.order 属性被应用。spring-doc.cadn.net.cn

清理配置 & 停止配置

当工厂停止时,KafkaStreams.close() 会使用2个参数被调用:spring-doc.cadn.net.cn

  • 关闭超时 : 等待线程关闭的等待时间(默认为 DEFAULT_CLOSE_TIMEOUT,设置为10秒)。可以通过 StreamsBuilderFactoryBean.setCloseTimeout() 配置。spring-doc.cadn.net.cn

  • leaveGroupOnClose : 从组中触发消费者离开调用,从 (默认值为 false)。可以使用 StreamsBuilderFactoryBean.setLeaveGroupOnClose() 配置。spring-doc.cadn.net.cn

默认情况下,当工厂bean被停止时,KafkaStreams.cleanUp()方法会被调用。 从2.1.2版本开始,工厂bean新增了带有一个CleanupConfig对象的构造函数,该对象具有属性,可让您在start()stop()时调用cleanUp()方法,或两者皆不调用。 从2.7版本开始,默认情况下从不清理本地状态。spring-doc.cadn.net.cn

头部信息增强器

Version 3.0 added the HeaderEnricherProcessor extension of ContextualProcessor; providing the same functionality as the deprecated HeaderEnricher which implemented the deprecated Transformer interface. This can be used to add headers within the stream processing; the header values are SpEL expressions; the root object of the expression evaluation has 3 properties:spring-doc.cadn.net.cn

这些表达式必须返回一个 byte[] 或一个 String(这将使用 UTF-8 转换为 byte[])。spring-doc.cadn.net.cn

使用 enricher 于流中:spring-doc.cadn.net.cn

.process(() -> new HeaderEnricherProcessor(expressions))

处理器不会更改keyvalue;它只是添加头信息。spring-doc.cadn.net.cn

您需要为每条记录创建一个新的实例。
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))

这里是简单的示例,添加一个字面量标题和一个变量:spring-doc.cadn.net.cn

Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
        .process(() -> supplier)
        .to(OUTPUT);

MessagingProcessor

版本 3.0 增加了 MessagingProcessor 扩展的 ContextualProcessor,提供了与已弃用的 MessagingTransformer 相同的功能,该 MessagingTransformer 实现了已弃用的 Transformer 接口。 这允许 Kafka Streams 构型与 Spring 消息组件(如 Spring 集成流)进行交互。 转换器需要一个 MessagingFunction 的实现。spring-doc.cadn.net.cn

@FunctionalInterface
public interface MessagingFunction {

    Message<?> exchange(Message<?> message);

}

Spring Integration 自动提供一个实现,使用其 GatewayProxyFactoryBean。 它还需要一个 MessagingMessageConverter 来将键、值和元数据(包括头)与 Spring 消息 Message<?> 之间进行转换。 参见 KStream 调用 Spring Integration 流程 以获取更多信息。spring-doc.cadn.net.cn

反序列化异常的恢复

版本 2.3 引入了 RecoveringDeserializationExceptionHandler,当发生反序列化异常时可以采取一些操作。 参阅关于 DeserializationExceptionHandler 的 Kafka 文档,其中 RecoveringDeserializationExceptionHandler 是其实现。 RecoveringDeserializationExceptionHandler 配置使用 4 实现。 框架提供了 DeadLetterPublishingRecoverer,它可以将失败的记录发送到死信主题。 参阅 发布死信记录 了解更多关于此恢复器的信息。spring-doc.cadn.net.cn

配置恢复器,请在流配置中添加以下属性:spring-doc.cadn.net.cn

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    ...
    props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
            RecoveringDeserializationExceptionHandler.class);
    props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
    ...
    return new KafkaStreamsConfiguration(props);
}

@Bean
public DeadLetterPublishingRecoverer recoverer() {
    return new DeadLetterPublishingRecoverer(kafkaTemplate(),
            (record, ex) -> new TopicPartition("recovererDLQ", -1));
}

当然,recoverer() 这个 bean 可以是 ConsumerRecordRecoverer 的自己实现。spring-doc.cadn.net.cn

交互式查询支持

从3.2版本开始,Spring for Apache Kafka 提供了实现 Kafka Streams 交互查询所需的基本功能。 交互查询在具有状态的 Kafka Streams 应用程序中很有用,因为它们提供了一种持续查询应用程序中状态存储的方式。 因此,如果一个应用程序想要materialize系统当前视图,交互查询提供了一种实现方法。 要了解更多关于交互查询的信息,请参阅这篇 文章。 Spring for Apache Kafka 对该功能的支持围绕着一个名为 KafkaStreamsInteractiveQueryService 的API,它是 Kafka Streams 库中交互查询API的封装。 一个应用程序可以创建该服务的实例作为bean,然后稍后使用它通过其名称检索状态存储。spring-doc.cadn.net.cn

以下代码片段展示了示例。spring-doc.cadn.net.cn

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    return kafkaStreamsInteractiveQueryService;
}

假设一个Kafka Streams应用程序有一个名为app-store的状态存储,那么可以通过KafkaStreamsInteractiveQuery API 获取该存储,如下所示。spring-doc.cadn.net.cn

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

ReadOnlyKeyValueStore<Object, Object>  appStore = interactiveQueryService.retrieveQueryableStore("app-store", QueryableStoreTypes.keyValueStore());

一旦应用程序获得对状态存储的访问权限,就可以查询其中的键值信息。spring-doc.cadn.net.cn

在这种情况下,应用程序使用的状态存储是一个只读的键值存储。 还存在其他类型的状态存储,Kafka Streams 应用程序也可以使用。 例如,如果一个应用程序更倾向于查询一个窗口基于的存储,它可以在业务逻辑中构建该存储,然后稍后在应用程序中检索它。 由于这个原因,KafkaStreamsInteractiveQueryService 中可查询存储的API具有一个泛型存储类型签名,以便最终用户可以指定正确的类型。spring-doc.cadn.net.cn

这里是来自API的类型签名。spring-doc.cadn.net.cn

public <T> T retrieveQueryableStore(String storeName, QueryableStoreType<T> storeType)

调用此方法时,用户可以具体指定适当的状态存储类型,正如上面示例中所做的那样。spring-doc.cadn.net.cn

重试获取状态存储

当尝试使用 KafkaStreamsInteractiveQueryService 从获取状态存储时,由于各种原因,可能会找不到状态存储。 如果这些原因是暂时性的,KafkaStreamsInteractiveQueryService 提供了通过允许注入自定义 RetryTemplate 来重试获取状态存储的选项。 在 KafkaStreamsInteractiveQueryService 中使用的 RetryTemplate 默认情况下使用最多三次尝试,固定回退时间为一秒。spring-doc.cadn.net.cn

这里是如何将自定义RetryTemplate注入到KafkaStreamsInteractiveQueryService中,最多尝试十次。spring-doc.cadn.net.cn

@Bean
public KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
    final KafkaStreamsInteractiveQueryService kafkaStreamsInteractiveQueryService =
            new KafkaStreamsInteractiveQueryService(streamsBuilderFactoryBean);
    RetryTemplate retryTemplate = new RetryTemplate();
    retryTemplate.setBackOffPolicy(new FixedBackOffPolicy());
    RetryPolicy retryPolicy = new SimpleRetryPolicy(10);
    retryTemplate.setRetryPolicy(retryPolicy);
    kafkaStreamsInteractiveQueryService.setRetryTemplate(retryTemplate);
    return kafkaStreamsInteractiveQueryService;
}

查询远程状态存储

上面显示的用于检索状态存储的API - retrieveQueryableStore 是专为本地可用的键值状态存储设计的。在生产环境设置中,Kafka Streams 应用程序最可能根据分区数量进行分布。如果一个主题有四个分区,且有四个相同Kafka Streams处理器的实例在运行,那么每个实例可能负责处理该主题中的单个分区。在该场景中,调用 retrieveQueryableStore 可能不会给出实例所期望的正确结果,尽管它可能会返回一个有效的存储。让我们假设一个包含四个分区的主题,其数据涉及各种键,而一个特定的分区总是负责一个特定的键。如果调用retrieveQueryableStore的实例在查找本实例不托管的键的信息时,将不会收到任何数据。这是因为在当前的Kafka Streams实例中,它并不了解此键。To fix this, 调用实例首先需要确保拥有托管特定键的Kafka Streams处理实例的主机信息。可以从同一application.id下的任何Kafka Streams实例中检索到如下内容。spring-doc.cadn.net.cn

@Autowired
private KafkaStreamsInteractiveQueryService interactiveQueryService;

HostInfo kafkaStreamsApplicationHostInfo = this.interactiveQueryService.getKafkaStreamsApplicationHostInfo("app-store", 12345, new IntegerSerializer());

在上面的示例代码中,调用实例正在从名为app-store的状态存储中查询特定键12345。 API还需要一个相应的键序列化器,该序列化器在此处为IntegerSerializer。 Kafka Streams会遍历同一application.id下的所有实例,尝试找到托管此特定键的实例,一旦找到,就会返回该主机信息作为HostInfo对象。spring-doc.cadn.net.cn

这是该API的外观:spring-doc.cadn.net.cn

public <K> HostInfo getKafkaStreamsApplicationHostInfo(String store, K key, Serializer<K> serializer)

When using multiple instances of the Kafka Streams processors of the same application.id in a distributed way like this, the application is supposed to provide an RPC layer where the state stores can be queried over an RPC endpoint such as a REST one. See this article for more details on this. When using Spring for Apache Kafka, it is very easy to add a Spring based REST endpoint by using the spring-web technologies. Once there is a REST endpoint, then that can be used to query the state stores from any Kafka Streams instance, given the HostInfo where the key is hosted is known to the instance.spring-doc.cadn.net.cn

如果键所持有的实例就是当前实例,那么应用程序无需调用RPC机制,而是直接进行JVM内部调用。 然而,问题在于应用程序可能不知道正在调用的实例是否就是持有键的实例,因为某个服务器可能因消费者重新平衡而失去分区。 为了解决这个问题,KafkaStreamsInteractiveQueryService 提供了一个方便的API,通过API方法getCurrentKafkaStreamsApplicationHostInfo() 查询当前主机信息,该方法返回当前的HostInfo。 想法是,应用程序可以首先获取键所在的位置信息,然后将HostInfo与当前实例的信息进行比较。 如果HostInfo数据匹配,则可以通过retrieveQueryableStore进行简单的JVM调用,否则选择RPC选项。spring-doc.cadn.net.cn

Kafka Streams 示例

以下示例结合了本章中我们已涵盖的各个主题:spring-doc.cadn.net.cn

@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {

    @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
    public KafkaStreamsConfiguration kStreamsConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
        return new KafkaStreamsConfiguration(props);
    }

    @Bean
    public StreamsBuilderFactoryBeanConfigurer configurer() {
        return fb -> fb.setStateListener((newState, oldState) -> {
            System.out.println("State transition from " + oldState + " to " + newState);
        });
    }

    @Bean
    public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
        KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
        stream
                .mapValues((ValueMapper<String, String>) String::toUpperCase)
                .groupByKey()
                .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(1_000)))
                .reduce((String value1, String value2) -> value1 + value2,
                		Named.as("windowStore"))
                .toStream()
                .map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
                .filter((i, s) -> s.length() > 40)
                .to("streamingTopic2");

        stream.print(Printed.toSysOut());

        return stream;
    }

}