Kafka Streams 应用中的基于事件类型的路由
基于常规消息通道的绑定器中可用的路由功能在 Kafka Streams 绑定器中不被支持。 然而,Kafka Streams 绑定器仍通过入站记录的事件类型记录头提供路由功能。
为了基于事件类型实现路由,应用程序必须提供以下属性。
spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes.
这可以是一个逗号分隔值。
例如,假设我们有这个函数:
@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
return input -> input;
}
我们也假设只有当输入记录的事件类型为福或酒吧.
这可以通过以下表达方式表示:事件类型绑定上的属性。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,bar
现在,当应用程序运行时,绑定器会检查每个进入的记录是否有头部event_type并观察其值是否设为福或酒吧.
如果找不到这两个函数,则会跳过函数执行。
默认情况下,绑定器期望记录头键为event_type但这可以根据绑定进行调整。
例如,如果我们想将该绑定的头键更改为my_event默认设置可以更改如下。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event.
在使用 Kafka Streams 绑定器的事件路由功能时,它使用字节数组Serde去序列化所有进来记录。
如果记录头与事件类型匹配,则只有它使用实际事件Serde使用配置或推断的 进行适当的反序列化Serde.
如果你在绑定上设置了反序列化异常处理程序,这会带来问题,因为预期的反序列化只在栈层下发生,导致意想不到的错误。
为了解决这个问题,你可以在绑定上设置以下属性,强制绑定器使用配置或推断的Serde而不是字节数组Serde.
spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEvents
这样,应用程序在使用事件路由功能时可以立即检测反序列化问题,并做出适当的处理决策。