记录序列化与反串列化
Kafka Streams 装订器允许你通过两种方式序列化和反序列化记录。 一是Kafka提供的原生序列化和反序列化功能,另一是Spring Cloud Stream框架的消息转换功能。 让我们来看一些细节。
入站反序列化
密钥总是通过原生Serdes进行反序列化。
对于值,默认情况下,Kafka原生执行入站的反序列化。 请注意,这与之前版本的 Kafka Streams 绑装器相比,默认行为发生了重大变化,之前的反序列化由框架完成。
Kafka Streams 的活页夹会尝试推断匹配Serde通过观察 的类型签名来确定类型java.util.function.Function|消费者.
以下是它与Serdes匹配的顺序。
-
如果应用程序提供了 类型的豆子
Serde如果返回类型参数化为输入键或值的实际类型,则会使用Serde用于入站反序列化。 例如,如果你在应用程序中有以下内容,绑定器会检测到输入值的类型KStream与参数化在Serde豆。 它会用这些数据进行入库反序列化。
@Bean
public Serde<Foo> customSerde() {
...
}
@Bean
public Function<KStream<String, Foo>, KStream<String, Foo>> process() {
}
-
接下来,它会观察这些类型,看看它们是否是卡夫卡流(Kafka Streams)暴露的类型之一。如果有,就用它们。 以下是活页夹会尝试匹配的 Kafka Streams 中的 Serde 字体。
Integer, Long, Short, Double, Float, byte[], UUID and String.
-
如果 Kafka Streams 提供的 Serde 都不符合类型,那么它将使用 Spring Kafka 提供的 JsonSerde。在这种情况下,绑定器假设这些类型对JSON友好。 如果你有多个值对象作为输入,这很有用,因为绑定器会在内部推断它们以纠正 Java 类型。 在退回
JacksonJsonSerde不过,绑定器默认会检查Serde在 Kafka Streams 配置中设置了 s,以判断它是否是Serde它能与新来的KStream类型匹配。
如果上述策略均无效,应用程序必须提供Serde通过配置。
这可以通过两种方式配置——绑定或默认。
首先,活页夹会检查是否Serde在绑定层面提供。
例如,如果你有以下处理器,
@Bean
public BiFunction<KStream<CustomKey, AvroIn1>, KTable<CustomKey, AvroIn2>, KStream<CustomKey, AvroOutput>> process() {...}
然后,你可以提供一个绑定级别Serde使用以下方法:
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果你提供Serde作为输入绑定的 abover,那么该绑定优先级更高,绑定器将避免使用任何Serde推理。 |
如果你想让默认的键值Serdes用于入站反序列化,可以在绑定器层面实现。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果你不想要Kafka提供的原生解码,可以依赖Spring Cloud Stream提供的消息转换功能。 由于原生解码是默认,为了让 Spring Cloud Stream 反序列化入站值对象,你需要明确禁用原生解码。
例如,如果你使用的是与上面相同的 BiFunction 处理器,spring.cloud.stream.bindings.process-in-0.consumer.nativeDecoding: false你需要单独关闭所有输入的原生解码。否则,对于未禁用的部分,仍会应用原生解码。
默认情况下,Spring Cloud Stream 将使用application/json作为内容类型,并使用合适的 JSON 消息转换器。
你可以通过以下属性和适当的条件来使用自定义消息转换器消息转换器豆。
spring.cloud.stream.bindings.process-in-0.contentType
出站序列化
出站序列化基本遵循上述的入站反序列化规则。与入站反序列化一样,Spring Cloud Stream 与之前版本的一个主要变化是,出站的序列化由 Kafka 原生处理。在 3.0 版本之前,这由框架本身完成。
出境键总是由卡夫卡用匹配序列化Serde这由绑定器推断出来。如果无法推断密钥的类型,则需要通过配置来指定。
值 serdes 的推断与进站反序列化相同的规则。首先匹配以判断出站类型是否来自应用程序中提供的豆子。如果不是,则检查是否与Serde卡夫卡揭露了这些作品,例如——整数,长,短,双,浮,字节[],UUID和字符串. 如果不行,那就退回去JacksonJsonSerde由Spring Kafka项目提供,但先看默认Serde配置以查看是否匹配。请记住,所有这些都是对应用程序透明发生的。如果这些都不行,用户必须提供Serde通过配置使用。
假设你用的是同样的双功能处理器如上所述。然后你可以配置出站键值Serdes如下配置。
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.keySerde=CustomKeySerde
spring.cloud.stream.kafka.streams.bindings.process-out-0.producer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
如果Serde推断失败,且没有提供绑定级别Serdes,则绑定器退回到JacksonJsonSerde但看看默认的Serdes匹配。
默认 serdes 的配置方式与上述相同,描述在反序列化中。
spring.cloud.stream.kafka.streams.binder.configuration.default.key.serde
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde
如果你的应用使用分支功能并且有多个输出绑定,那么这些绑定必须针对每个绑定配置。同样,如果绑定器能够推断Serde类型,你不需要做这个配置。
如果你不想要Kafka提供的原生编码,但想使用框架提供的消息转换,那么你需要明确禁用原生编码,因为原生编码是默认的。例如,如果你使用的是上面相同的BiFunction处理器,spring.cloud.stream.bindings.process-out-0.producer.useNativeEncoding: false在分支的情况下,你需要单独禁用所有输出的原生编码。否则,对于未禁用的输出,原生编码仍会被应用。
当Spring Cloud Stream进行转换时,默认情况下,它会使用application/json作为内容类型,并使用合适的JSON消息转换器。你可以通过以下属性和相应的条件使用自定义消息转换器消息转换器豆。
spring.cloud.stream.bindings.process-out-0.contentType
当本地编码/解码被禁用时,binder 不会像原生 Serdes 那样进行推理。应用程序需要明确提供所有配置选项。因此,通常建议在编写 Spring Cloud Stream Kafka Streams 应用时,保持默认的反序列化选项,并坚持使用 Kafka Streams 提供的原生去序列化功能。唯一必须使用框架提供的消息转换能力的情况是当上游生产者使用特定的序列化策略时。此时,你需要使用匹配的反序列化策略,因为本地机制可能会失败。当依赖默认Serde机制中,应用程序必须确保活页夹有前进路径,能够正确映射进出的进站和出站Serde否则可能会失败。
值得一提的是,上述数据解序列化方法仅适用于处理器的边缘,即入站和出站。你的业务逻辑可能仍需调用明确需要的Kafka Streams API。Serde对象。 这些仍然是应用的责任,开发者必须相应处理。