该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0!spring-doc.cadn.net.cn

记录序列化与反串列化

Kafka Streams 装订器允许你通过两种方式序列化和反序列化记录。 一是Kafka提供的原生序列化和反序列化功能,另一是Spring Cloud Stream框架的消息转换功能。 让我们来看一些细节。spring-doc.cadn.net.cn

入站反序列化

密钥总是通过原生Serdes进行反序列化。spring-doc.cadn.net.cn

对于值,默认情况下,Kafka原生执行入站的反序列化。 请注意,这与之前版本的 Kafka Streams 绑装器相比,默认行为发生了重大变化,之前的反序列化由框架完成。spring-doc.cadn.net.cn

Kafka Streams 的活页夹会尝试推断匹配Serde通过观察 的类型签名来确定类型java.util.function.Function|消费者. 以下是它与Serdes匹配的顺序。spring-doc.cadn.net.cn

  • 如果应用程序提供了 类型的豆子Serde如果返回类型参数化为输入键或值的实际类型,则会使用Serde用于入站反序列化。 例如,如果你在应用程序中有以下内容,绑定器会检测到输入值的类型KStream与参数化在Serde豆。 它会用这些数据进行入库反序列化。spring-doc.cadn.net.cn

@Bean
public Serde<Foo> customSerde() {
 ...
}

@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
  • 接下来,它会观察这些类型,看看它们是否是卡夫卡流(Kafka Streams)暴露的类型之一。如果有,就用它们。 以下是活页夹会尝试匹配的 Kafka Streams 中的 Serde 字体。spring-doc.cadn.net.cn

    Integer, Long, Short, Double, Float, byte[], UUID and String.
  • 如果 Kafka Streams 提供的 Serde 都不符合类型,那么它将使用 Spring Kafka 提供的 JsonSerde。在这种情况下,绑定器假设这些类型对JSON友好。 如果你有多个值对象作为输入,这很有用,因为绑定器会在内部推断它们以纠正 Java 类型。 在退回JacksonJsonSerde不过,绑定器默认会检查Serde在 Kafka Streams 配置中设置了 s,以判断它是否是Serde它能与新来的KStream类型匹配。spring-doc.cadn.net.cn

如果上述策略均无效,应用程序必须提供Serde通过配置。 这可以通过两种方式配置——绑定或默认。spring-doc.cadn.net.cn

首先,活页夹会检查是否Serde在绑定层面提供。 例如,如果你有以下处理器,spring-doc.cadn.net.cn

@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}

然后,你可以提供一个绑定级别Serde使用以下方法:spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果你提供Serde作为输入绑定的 abover,那么该绑定优先级更高,绑定器将避免使用任何Serde推理。

如果你想让默认的键值Serdes用于入站反序列化,可以在绑定器层面实现。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde

如果你不想要Kafka提供的原生解码,可以依赖Spring Cloud Stream提供的消息转换功能。 由于原生解码是默认,为了让 Spring Cloud Stream 反序列化入站值对象,你需要明确禁用原生解码。spring-doc.cadn.net.cn

例如,如果你使用的是与上面相同的 BiFunction 处理器,spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false你需要单独关闭所有输入的原生解码。否则,对于未禁用的部分,仍会应用原生解码。spring-doc.cadn.net.cn

默认情况下,Spring Cloud Stream 将使用application/json作为内容类型,并使用合适的 JSON 消息转换器。 你可以通过以下属性和适当的条件来使用自定义消息转换器消息转换器豆。spring-doc.cadn.net.cn

spring.cloud.stream.bindings.process-in-0.contentType

出站序列化

出站序列化基本上遵循上述的入站反序列化规则。 与入站反序列化类似,与之前版本的 Spring Cloud Stream 有一个重大变化是出站的序列化由 Kafka 原生处理。 在3.0版本之前,这都是由框架本身完成的。spring-doc.cadn.net.cn

出境键总是由卡夫卡用匹配序列化Serde这由活页夹推断。 如果无法推断密钥的类型,那就需要用配置来指定。spring-doc.cadn.net.cn

值服务通过与进站反序列化相同的规则推断。 首先匹配的是外出类型是否来自应用中提供的豆子。 如果不匹配,它会检查是否与Serde卡夫卡揭露了这些作品,例如——整数,,,,,字节[],UUID字符串. 如果不行,那就退回去JacksonJsonSerde由Spring Kafka项目提供,但先看默认Serde配置以判断是否匹配。 请记住,所有这些都是对应用程序透明的。 如果这些都不行,用户必须提供Serde通过配置使用。spring-doc.cadn.net.cn

假设你用的是同样的双功能处理器如上所述。然后你可以配置出站键/值Serdes,具体如下。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

如果Serde推断失败,且没有提供绑定级别Serdes,则绑定器退回到JacksonJsonSerde但看看默认的Serdes匹配。spring-doc.cadn.net.cn

默认 serdes 的配置方式与上述相同,描述在反序列化中。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde spring.cloud.stream.kafka.streams.binder.configuration.default.value.serdespring-doc.cadn.net.cn

如果你的应用使用分支功能并且有多个输出绑定,那么这些绑定必须在每个绑定中配置。 同样,如果活页夹能够推断Serde类型,你不需要做这个配置。spring-doc.cadn.net.cn

如果你不想要Kafka提供的原生编码,但想使用框架提供的消息转换,那么你需要明确禁用原生编码,因为原生编码是默认的。 例如,如果你使用的是与上面相同的 BiFunction 处理器,spring.cloud.stream.bindings.process-out-0.producer.useNativeEncoding: false在分支情况下,你需要单独禁用所有输出的原生编码。否则,对于未禁用的部分,仍会应用原生编码。spring-doc.cadn.net.cn

当Spring Cloud Stream进行转换时,默认情况下,它会使用application/json作为内容类型,并使用合适的 JSON 消息转换器。 你可以通过以下属性和相应的条件使用自定义消息转换器消息转换器豆。spring-doc.cadn.net.cn

spring.cloud.stream.bindings.process-out-0.contentType

当本地编码/解码被禁用时,binder不会像原生Serdes那样进行任何推理。 应用程序需要明确提供所有配置选项。 因此,通常建议在编写 Spring Cloud Stream Kafka Streams 应用时,保持默认的反序列化选项,并坚持使用 Kafka Streams 提供的原生反序列化功能。 唯一必须使用框架提供的消息转换能力的情况是,当上游生产者使用特定的序列化策略时。 在这种情况下,你需要采用匹配的反序列化策略,因为原生机制可能会失败。 当依赖默认时Serde机制中,应用程序必须确保活页夹有前进路径,能够正确映射进出的进站和出站Serde否则可能会失败。spring-doc.cadn.net.cn

值得一提的是,上述数据反序列化方法仅适用于处理器的边缘,即进站和出站。 你的业务逻辑可能仍然需要调用明确需要的 Kafka Streams APISerde对象。 这些仍然是应用的责任,开发者必须相应处理。spring-doc.cadn.net.cn