该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0!spring-doc.cadn.net.cn

多重活页夹,结合基于Kafka Streams的活页夹和普通Kafka活页夹

你可以有一个应用程序,既有基于普通Kafka活页夹的功能/消费者/提供商,也有基于Kafka Streams的处理器。 然而,你不能在一个功能或消费者中同时使用这两者。spring-doc.cadn.net.cn

这里有个例子,你在同一个应用里有两个基于活页夹的组件。spring-doc.cadn.net.cn

@Bean
public Function<String, String> process() {
    return s -> s;
}

@Bean
public Function<KStream<Object, String>, KStream<?, WordCount>> kstreamProcess() {

    return input -> input;
}

以下是配置中相关的部分:spring-doc.cadn.net.cn

spring.cloud.function.definition=process;kstreamProcess
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar

如果你的应用和上面一样,但涉及两个不同的Kafka簇,情况会更复杂,比如普通的过程同时作用于 Kafka 集群 1 和 cluster 2(接收来自 cluster-1 的数据并发送到 cluster-2),而 Kafka Streams 处理器则作用于 Kafka 集群 2。 然后你得用Spring Cloud Stream提供的多活页夹功能。spring-doc.cadn.net.cn

以下是你在那种情况下配置可能会发生变化的情况。spring-doc.cadn.net.cn

# multi binder configuration
spring.cloud.stream.binders.kafka1.type: kafka
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-1} #Replace kafkaCluster-1 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka2.type: kafka
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster
spring.cloud.stream.binders.kafka3.type: kstream
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2} #Replace kafkaCluster-2 with the approprate IP of the cluster

spring.cloud.function.definition=process;kstreamProcess

# From cluster 1 to cluster 2 with regular process function
spring.cloud.stream.bindings.process-in-0.destination=foo
spring.cloud.stream.bindings.process-in-0.binder=kafka1 # source from cluster 1
spring.cloud.stream.bindings.process-out-0.destination=bar
spring.cloud.stream.bindings.process-out-0.binder=kafka2 # send to cluster 2

# Kafka Streams processor on cluster 2
spring.cloud.stream.bindings.kstreamProcess-in-0.destination=bar
spring.cloud.stream.bindings.kstreamProcess-in-0.binder=kafka3
spring.cloud.stream.bindings.kstreamProcess-out-0.destination=foobar
spring.cloud.stream.bindings.kstreamProcess-out-0.binder=kafka3

注意上述配置。 我们有两种结合剂,但总共有三种,第一种是基于第1组的普通卡夫卡粘合剂(卡夫卡1),然后基于第2个簇的另一个卡夫卡活页夹(卡夫卡2) 最后Kstream一 (卡夫卡3). 应用程序中的第一个处理器接收来自卡夫卡1并出版至以下卡夫卡2其中两种结合剂都基于普通卡夫卡结合剂,但簇不同。 第二个处理器是Kafka Streams处理器,消耗来自以下的数据。卡夫卡3该簇与卡夫卡2但用的是不同类型的活页夹。spring-doc.cadn.net.cn

由于Kafka Streams系列活页夹有三种不同类型的活页夹——Kstream,ktable全球克洛卡泰表- 如果你的应用程序基于这些绑定器中的任意一个有多个绑定,则需要明确提供该绑定器类型。spring-doc.cadn.net.cn

例如,如果你有一个像下面这样的处理器,spring-doc.cadn.net.cn

@Bean
public Function<KStream<Long, Order>,
        Function<KTable<Long, Customer>,
                Function<GlobalKTable<Long, Product>, KStream<Long, EnrichedOrder>>>> enrichOrder() {

    ...
}

然后,在多活页夹场景中,必须配置如下。 请注意,只有在真正的多绑定器场景下才需要,即单个应用内有多个处理器处理多个集群。 在这种情况下,绑定器需要明确提供绑定,以区别于其他处理器的绑定器类型和集群。spring-doc.cadn.net.cn

spring.cloud.stream.binders.kafka1.type: kstream
spring.cloud.stream.binders.kafka1.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka2.type: ktable
spring.cloud.stream.binders.kafka2.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}
spring.cloud.stream.binders.kafka3.type: globalktable
spring.cloud.stream.binders.kafka3.environment.spring.cloud.stream.kafka.streams.binder.brokers=${kafkaCluster-2}

spring.cloud.stream.bindings.enrichOrder-in-0.binder=kafka1  #kstream
spring.cloud.stream.bindings.enrichOrder-in-1.binder=kafka2  #ktablr
spring.cloud.stream.bindings.enrichOrder-in-2.binder=kafka3  #globalktable
spring.cloud.stream.bindings.enrichOrder-out-0.binder=kafka1 #kstream

# rest of the configuration is omitted.