|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0! |
错误处理
Apache Kafka Streams 提供了原生处理反序列化错误异常的能力。
有关此支持的详细信息,请参见此处。
开箱即用,Apache Kafka Streams 提供了两种类型的反序列化异常处理程序——LogAndContinueExceptionHandler和LogAndFailExceptionHandler.
顾名思义,前者会记录错误并继续处理后续记录,后者则会记录错误并失败。LogAndFailExceptionHandler是默认的反序列化异常处理程序。
在活页夹中处理反序列化异常
Kafka Streams 绑定器允许使用以下属性指定上述反序列化异常处理程序。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndContinue
或
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: logAndFail
除了上述两个反序列化异常处理程序外,该绑定器还提供了第三个处理程序,用于将错误记录(毒丸)发送到DLQ(死信队列)主题。 以下是如何启用这个DLQ异常处理程序的方法。
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler: sendToDlq
当上述属性被设置时,所有处于反序列化错误中的记录都会自动发送到DLQ主题。
你可以设置DLQ消息发布的主题名称如下。
你可以提供一个实现DlqDestinationResolver这是一个功能性接口。DlqDestinationResolver需要消费者记录以及例外作为输入,然后允许将主题名称指定为输出。
通过获得卡夫卡的访问权消费者记录,头部记录可以在实现双功能.
这里有一个提供 的实现示例DlqDestinationResolver.
@Bean
public DlqDestinationResolver dlqDestinationResolver() {
return (rec, ex) -> {
if (rec.topic().equals("word1")) {
return "topic1-dlq";
}
else {
return "topic2-dlq";
}
};
}
在为 提供实现时,有一点很重要要记住DlqDestinationResolver就是 Binder 中的 provisioner 不会自动为应用创建主题。
这是因为绑定器无法推断实现可能发送的所有DLQ主题名称。
因此,如果你用这种策略提供DLQ名称,应用程序有责任确保这些主题事先被创建。
如果DlqDestinationResolver在应用中以豆子的形式存在,优先级更高。
如果你不想采用这种方法,而是通过配置提供静态的DLQ名称,可以设置以下属性。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.dlqName: custom-dlq (Change the binding name accordingly)
如果设置了这个,那么错误记录会发送到主题Custom-DLQ.
如果应用程序没有使用上述任何一种策略,那么它会创建一个带有名称的 DLQ 主题error.<input-topic-name>.<application-id>.
例如,如果你的装订目标主题是输入主题应用程序ID为process-applicationID,则默认的DLQ主题为error.inputTopic.process-applicationID.
如果你打算启用DLQ,建议为每个输入绑定明确创建一个DLQ主题。
DLQ每输入消费者绑定
该物业spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler适用于整个应用。
这意味着如果同一应用中有多个函数,该属性会应用到所有函数上。
然而,如果你在单个处理器内有多个处理器或多个输入绑定,那么你可以使用绑定器为每个输入消费者绑定提供的更细粒度的DLQ控制。
如果你有以下处理器,
@Bean
public BiFunction<KStream<String, Long>, KTable<String, String>, KStream<String, Long>> process() {
...
}
你只需要在第一个输入绑定上启用DLQ,在第二个绑定上启用skipAndContinue,然后你可以像下面那样在消费者端这样做。
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer.deserializationExceptionHandler: sendToDlq
spring.cloud.stream.kafka.streams.bindings.process-in-1.consumer.deserializationExceptionHandler: skipAndContinue
以这种方式设置反序列化异常处理程序的优先级高于在绑定器层面设置。
DLQ分区
默认情况下,记录会以与原始记录相同的分区发布到死信主题。 这意味着死信主题的分区数量必须至少与原始记录相当。
要改变这种行为,可以添加一个DlqPartitionFunction作为@Bean切换到应用上下文。
只能有一颗这样的豆子存在。
该函数由消费者组(在大多数情况下与应用ID相同)提供,失败消费者记录也是例外。
例如,如果你总是想路由到分区 0,可以使用:
@Bean
public DlqPartitionFunction partitionFunction() {
return (group, record, ex) -> 0;
}
如果你设置了消费者绑定dlqPartitions属性为1(以及绑定者的最小分区计数等于1),无需提供DlqPartitionFunction;框架始终使用分区0。
如果你设置了消费者绑定dlqPartitions属性为大于1(或者说是活页夹的最小分区计数大于1),你必须提供一个DlqPartitionFunction豆子,即使分区计数和原主题相同。 |
使用 Kafka Streams Binder 中的异常处理功能时,有几点需要注意。
-
该物业
spring.cloud.stream.kafka.streams.binder.deserializationExceptionHandler适用于整个应用。 这意味着如果同一应用中有多个函数,该属性会应用到所有函数上。 -
反序列化的异常处理与原生反序列化和框架提供的消息转换保持一致。
在活页夹中处理生产异常
与上述对反序列化异常处理程序的支持不同,该绑定器不提供处理生产异常的第一类机制。
不过,你仍然可以通过以下方式配置生产异常处理程序StreamsBuilderFactoryBean你可以在下面下一节找到更多关于定制器的详细信息。
运行时错误处理
在处理应用代码错误,即业务逻辑执行时,通常由应用程序来处理。
因为Kafka Streams绑定器没有干扰应用代码的方式。
不过,为了让应用更方便,活页夹提供了方便的可记录可恢复处理器通过该函数,你可以决定如何处理应用层的错误。
请考虑以下代码。
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input
.map(...);
}
如果你内部的商业代码地图上述调用会抛出异常,处理该错误由你负责。
这里是可记录可恢复处理器变得方便。
默认情况下,可记录可恢复处理器,只需记录错误,然后让应用程序继续。
假设你想将失败记录发布到DLT,而不是在应用程序中处理。
在这种情况下,你必须使用自定义实现可记录可恢复处理器叫DltAware处理器.
以下是你可以做到这一点的方法。
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process(DltPublishingContext dltSenderContext) {
return input -> input
.process(() -> new DltAwareProcessor<>(record -> {
throw new RuntimeException("error");
}, "hello-dlt-1", dltPublishingContext));
}
原版中的业务逻辑代码地图现在的电话已被迁移,作为KStream#process方法调用,该调用具有处理器提供商.
于是我们传承了这个习俗DltAwareProcessor,该系统能够发布到DLT。
的构造器DltAware处理器上述参数为三个 - A功能该记录先接收输入记录,然后作为业务逻辑作的一部分功能正体、DLT主题,最后是Dlt出版背景.
当函数的lambda表达式抛出一个异常,称为“DltAwareProcessor”将输入记录发送到DLT。
这Dlt出版背景提供DltAware处理器必要的出版基础设施Beans。
这Dlt出版背景由绑定器自动配置,这样你可以直接将它注入应用程序。
如果你不想让活页夹将失败记录发布到DLT,那么你必须使用可记录可恢复处理器直接而不是DltAware处理器.
你可以提供自己的恢复者作为双消费者该输入记录例外作为论证。
假设这样一种情景,你不想把记录发送到DLT,而是直接记录消息然后继续。
下面是一个实现这一目标的示例。
@Bean
public java.util.function.Function<KStream<String, String>, KStream<String, String>> process() {
return input -> input
.process(() -> new RecordRecoverableProcessor<>(record -> {
throw new RuntimeException("error");
},
(record, exception) -> {
// Handle the record
}));
}
在这种情况下,当记录失败时,可记录可恢复处理器,使用用户提供的恢复器,即双消费者这以失败记录和提出的例外作为论据。
在DltAwareProcessor中处理记录键
当发送失败记录到 DLT 时,使用DltAware处理器如果你想把记录键发送到DLT主题,那么你需要在DLT绑定上设置正确的序列化器。
这是因为,DltAware处理器使用流桥它使用常规的Kafka绑定器(基于消息通道),默认使用一个字节数列序列化器为了钥匙。
对于记录值,Spring Cloud Stream 会将有效载荷转换为正确的字节[];然而,密钥则不是这样,因为它只是将收到的地址作为密钥传递。
如果你提供的是非字节数组键,可能会导致类投射异常,为了避免这种情况,你需要像下面那样在DLT绑定上设置串行器。
假设DLT的目的地是你好-DLT-1记录键为字符串数据类型。
spring.cloud.stream.kafka.bindings.hello-dlt-1.producer.configuration.key.serializer=org.apache.kafka.common.serialization.StringSerializer