Kafka Streams 应用中的基于事件类型的路由

基于常规消息通道的绑定器中可用的路由功能在 Kafka Streams 绑定器中不被支持。 然而,Kafka Streams 绑定器仍通过入站记录的事件类型记录头提供路由功能。spring-doc.cadn.net.cn

为了基于事件类型实现路由,应用程序必须提供以下属性。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer.eventTypes.spring-doc.cadn.net.cn

这可以是一个逗号分隔值。spring-doc.cadn.net.cn

例如,假设我们有这个函数:spring-doc.cadn.net.cn

@Bean
public Function<KStream<Integer, Foo>, KStream<Integer, Foo>> process() {
    return input -> input;
}

我们也假设只有当输入记录的事件类型为酒吧. 这可以通过以下表达方式表示:事件类型绑定上的属性。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypes=foo,barspring-doc.cadn.net.cn

现在,当应用程序运行时,绑定器会检查每个进入的记录是否有头部event_type并观察其值是否设为酒吧. 如果找不到这两个函数,则会跳过函数执行。spring-doc.cadn.net.cn

默认情况下,绑定器期望记录头键为event_type但这可以根据绑定进行调整。 例如,如果我们想将该绑定的头键更改为my_event默认设置可以更改如下。spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.eventTypeHeaderKey=my_event.spring-doc.cadn.net.cn

在使用 Kafka Streams 绑定器的事件路由功能时,它使用字节数组Serde去序列化所有进来记录。 如果记录头与事件类型匹配,则只有它使用实际事件Serde使用配置或推断的 进行适当的反序列化Serde. 如果你在绑定上设置了反序列化异常处理程序,这会带来问题,因为预期的反序列化只在栈层下发生,导致意想不到的错误。 为了解决这个问题,你可以在绑定上设置以下属性,强制绑定器使用配置或推断的Serde而不是字节数组Serde.spring-doc.cadn.net.cn

spring.cloud.stream.kafka.streams.bindings.<process-in-0>.consumer.useConfiguredSerdeWhenRoutingEventsspring-doc.cadn.net.cn

这样,应用程序在使用事件路由功能时可以立即检测反序列化问题,并做出适当的处理决策。spring-doc.cadn.net.cn