该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0!spring-doc.cadn.net.cn

编程模型的辅助

单一应用中的多个Kafka Streams处理器

Binder 允许在单个 Spring Cloud Stream 应用中拥有多个 Kafka Streams 处理器。 你可以提交如下的申请表。spring-doc.cadn.net.cn

@Bean
public java.util.function.Function<KStream<Object, String>, KStream<Object, String>> process() {
   ...
}

@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

@Bean
public java.util.function.BiFunction<KStream<Object, String>, KTable<Integer, String>, KStream<Object, String>> yetAnotherProcess() {
   ...
}

在这种情况下,绑定器会创建3个不同的应用ID的Kafka Streams对象(下面会详细说明)。 但是,如果你的应用程序中有多个处理器,你必须告诉 Spring Cloud Stream 需要激活哪些功能。 以下是如何激活这些功能的方法。spring-doc.cadn.net.cn

spring.cloud.function.definition: process;anotherProcess;然而另一个过程spring-doc.cadn.net.cn

如果你希望某些功能不立即激活,可以从列表中移除。spring-doc.cadn.net.cn

当你只有一个Kafka Streams处理器和其他类型的功能同一应用程序中的豆子,但通过不同的装订器处理(例如,基于普通卡夫卡消息通道装订器的功能豆)spring-doc.cadn.net.cn

Kafka Streams Application ID

申请ID是Kafka Streams申请必须提供的属性。 Spring Cloud Stream Kafka Streams 绑定器允许你以多种方式配置该应用 ID。spring-doc.cadn.net.cn

如果应用程序中只有一个处理器,那么你可以在绑定器层面使用以下属性设置:spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.applicationId.spring-doc.cadn.net.cn

为了方便,如果你只有一个处理器,也可以使用spring.application.name作为用于委托应用ID的属性。spring-doc.cadn.net.cn

如果你在应用程序中有多个 Kafka Streams 处理器,那么你需要为每个处理器设置应用 ID。 对于函数模型,你可以将其作为属性附加到每个函数上。spring-doc.cadn.net.cn

例如,假设你有以下函数。spring-doc.cadn.net.cn

@Bean
public java.util.function.Consumer<KStream<Object, String>> process() {
   ...
}
@Bean
public java.util.function.Consumer<KStream<Object, String>> anotherProcess() {
  ...
}

然后你可以用以下绑定器级别的属性为每个应用设置应用ID。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.functions.process.applicationIdspring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.functions.anotherProcess.applicationIdspring-doc.cadn.net.cn

对于基于函数的模型,将应用ID设置为绑定级别的方法同样有效。 然而,如上所述,使用函数模型时,在绑定器层面设置每个函数会容易得多。spring-doc.cadn.net.cn

对于生产部署,强烈建议通过配置明确指定应用ID。 如果你是自动扩展应用,这一点尤其关键,这时你需要确保每个实例都部署相同的应用ID。spring-doc.cadn.net.cn

如果应用程序没有提供应用ID,那么绑定器会自动生成一个静态应用ID。 这在开发场景中非常方便,因为避免了明确提供应用 ID 的需求。 以这种方式生成的应用ID在应用重启时将保持静态。 在函数模型的情况下,生成的应用ID将是函数豆名,后跟文字applicationID,例如process-applicationID如果过程如果函数 BEAN 名称。spring-doc.cadn.net.cn

应用ID设置总结

  • 默认情况下,Binder 会自动生成每个函数方法的应用 ID。spring-doc.cadn.net.cn

  • 如果你只有一个处理器,那你可以使用spring.kafka.streams.applicationId,spring.application.namespring.cloud.stream.kafka.streams.binder.applicationId.spring-doc.cadn.net.cn

  • 如果你有多个处理器,那么可以用以下属性为每个函数设置应用 ID -spring.cloud.stream.kafka.streams.binder.functions.<function-name>.applicationId.spring-doc.cadn.net.cn

覆盖绑定器生成的默认绑定名称,并用函数式样式

默认情况下,绑定器在使用函数样式时采用上述策略生成绑定名称,即 <function-bean-name>-<in>|<out>-[0..n]),例如 process-in-0、process-out-0 等。 如果你想覆盖这些绑定名称,可以通过指定以下属性来实现。spring-doc.cadn.net.cn

spring.cloud.stream.function.bindings.<default binding name>.默认绑定名是绑定器生成的原始绑定名。spring-doc.cadn.net.cn

比如说,你有这个函数。spring-doc.cadn.net.cn

@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}

Binder 会生成带有名称的绑定,process-in-0,process-in-1process-out-0. 如果你想完全改成别的,比如更专属领域的绑定名,可以按下面作。spring-doc.cadn.net.cn

spring.cloud.stream.function.bindings.process-in-0=usersspring-doc.cadn.net.cn

spring.cloud.stream.function.bindings.process-in-0=regionsspring-doc.cadn.net.cn

spring.cloud.stream.function.bindings.process-out-0=clicksspring-doc.cadn.net.cn

之后,你必须为这些新绑定名称设置所有绑定级别的属性。spring-doc.cadn.net.cn

请记住,在上述函数式编程模型中,在大多数情况下遵循默认的绑定名称是合理的。 你可能还想做覆盖的唯一原因是当配置属性更多,想把绑定映射到更适合域的配置时。spring-doc.cadn.net.cn

设置引导服务器配置

运行 Kafka Streams 应用时,必须提供 Kafka 代理服务器信息。 如果你不提供这些信息,绑定者会期望你默认运行经纪人本地主机:9092. 如果不是这样,那你需要覆盖这个。有几种方法可以做到这一点。spring-doc.cadn.net.cn

关于活页夹层的属性,无论你是否使用普通卡夫卡活页夹提供的经纪人属性——spring.cloud.stream.kafka.binder.brokers. Kafka Streams 绑定器首先会检查 Kafka Streams 绑订器的特定经纪人属性是否被设置 (spring.cloud.stream.kafka.streams.binder.brokers如果找不到,则寻找spring.cloud.stream.kafka.binder.brokers.spring-doc.cadn.net.cn