处理异常
本部分描述了在使用 Spring for Apache Kafka 时如何处理各种可能出现的异常。
监听器错误处理程序
从 2.0 版本开始,@KafkaListener 注解新增了属性:errorHandler。
你可以使用 errorHandler 来提供一个 KafkaListenerErrorHandler 实现的 bean 名称。
此函数式接口有一个方法,如下列示的代码所示:
@FunctionalInterface
public interface KafkaListenerErrorHandler {
Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;
}
你有权限访问由消息转换器产生的 spring-messaging Message<?> 对象,以及由监听器抛出的异常,该异常被包装在 ListenerExecutionFailedException 中。
错误处理器可以抛出原始或一个新的异常,该异常将被抛给容器。
错误处理器返回的任何内容都会被忽略。
从 2.7 版本开始,您可以将 rawRecordHeader 属性设置在 MessagingMessageConverter 和 BatchMessagingMessageConverter 上,这会使原始的 ConsumerRecord 添加到在 KafkaHeaders.RAW_DATA 头部转换的 Message<?> 中。
这在例如您希望在监听器错误处理程序中使用 DeadLetterPublishingRecoverer 时很有用。
它可能在请求/回复场景中使用,您希望在重试一定次数后捕获失败记录并将其发送到发送者,其中失败记录已放入死信主题中。
@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
return (msg, ex) -> {
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
return "FAILED";
}
throw ex;
};
}
它有一个子接口 (ConsumerAwareListenerErrorHandler),可以通过以下方法访问consumer对象:
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);
另一个子接口(ManualAckListenerErrorHandler)在使用手动 AckModes 时提供对 Acknowledgment 对象的访问。
Object handleError(Message<?> message, ListenerExecutionFailedException exception,
Consumer<?, ?> consumer, @Nullable Acknowledgment ack);
在任何情况下,都不应对接收者执行任何seek操作,因为容器将无法感知这些操作。
容器错误处理器
从 2.8 版本开始,遗留的 ErrorHandler 和 BatchErrorHandler 接口已被 CommonErrorHandler 新接口取代。
这些错误处理程序可以同时处理记录型和批处理监听器的错误,允许单个监听器容器工厂创建这两种类型监听器的容器。
提供了 CommonErrorHandler 个实现,以替代大多数遗留框架错误处理程序实现。
见 将自定义遗留错误处理程序迁移到 CommonErrorHandler 以获取将自定义错误处理程序迁移到 CommonErrorHandler 的信息。
当使用事务时,如果没有配置错误处理程序,则异常将回滚事务。
事务容器的错误处理由 AfterRollbackProcessor 处理。
如果你在使用事务时提供自定义错误处理程序,并且希望回滚事务,则该处理程序必须抛出异常。
此接口具有一个默认方法 isAckAfterHandle(),由容器在错误处理程序在不抛出异常的情况下返回时,用来确定是否应提交偏移量;默认返回 true。
通常,框架提供的错误处理程序在错误未被“处理”时(例如在执行seek操作后)将抛出异常。
默认情况下,容器会在ERROR级别记录此类异常。
所有框架错误处理程序都扩展KafkaExceptionLogLevelAware,这允许您控制这些异常的日志记录级别。
/**
* Set the level at which the exception thrown by this handler is logged.
* @param logLevel the level (default ERROR).
*/
public void setLogLevel(KafkaException.Level logLevel) {
...
}
您可以指定一个全局错误处理器,用于容器工厂中所有监听器的错误处理。 以下示例展示了如何实现:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setCommonErrorHandler(myErrorHandler);
...
return factory;
}
默认情况下,如果带有注解的监听器方法抛出异常,该异常将传递到容器,并根据容器的配置来处理消息。
容器会在调用错误处理程序之前先提交任何待处理的偏移提交。
如果您使用 Spring Boot,只需将错误处理程序作为 a @Bean 添加,Boot 将会将其添加到自动配置的工厂中。
退避处理程序
错误处理程序(如 DefaultErrorHandler)使用一个 BackOff 来决定在重试传递之前等待多长时间。
从 2.9 版本开始,您可以配置一个自定义的 BackOffHandler。
默认处理程序会在退避时间经过(或容器被停止)后挂起线程。
框架还提供了 ContainerPausingBackOffHandler,它会在退避时间经过后暂停监听容器,然后恢复容器。
这在延迟时间长于 max.poll.interval.ms 消费者属性时很有用。
请注意,实际退避时间的精度将受到 pollTimeout 容器属性的影响。
默认错误处理器
这个新的错误处理器取代了SeekToCurrentErrorHandler和RecoveringBatchErrorHandler,它们在过去多个发布版本中都是默认的错误处理器。
一个不同之处是,当抛出除BatchListenerFailedException以外的异常时,批处理监听器的回退行为等同于重试完整批次。
从 2.9 版本开始,DefaultErrorHandler 可以配置为提供与讨论的未处理记录偏移量查找相同的语义,但不实际执行查找操作。
相反,记录由监听器容器保留,并在错误处理程序退出后(并在执行一次暂停的 poll() 以保持消费者存活后)重新提交给监听器(如果使用了非阻塞重试或 ContainerPausingBackOffHandler,暂停可能会持续多个轮询周期)。
错误处理程序会将结果返回给容器,指示当前失败的记录是否可以重新提交,或者是否已恢复且不再发送给监听器第二次。
要启用此模式,请将属性 seekAfterError 设置为 false。 |
出错处理程序可以恢复(跳过)一直失败的记录。
默认情况下,在发生十次失败后,失败的记录会被记录(记录级别为ERROR)。
你可以通过配置自定义的恢复器(BiConsumer)以及一个BackOff来控制重试次数和每次重试之间的延迟。
使用FixedBackOff配合FixedBackOff.UNLIMITED_ATTEMPTS将导致(实际上)无限重试。
以下示例配置了在尝试三次后进行恢复:
DefaultErrorHandler errorHandler =
new DefaultErrorHandler((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
为了使用自定义的此处理器实例配置监听器容器,请将其添加到容器工厂中。
例如,使用 @KafkaListener 容器工厂,你可以添加 DefaultErrorHandler 如下:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
return factory;
}
对于记录器监听器,这将使用1秒的退避策略重试最多2次(总共3次发送尝试),而不是默认配置(FixedBackOff(0L, 9))。
在重试耗尽后,失败将仅记录日志。
作为一个示例,如果 poll 返回六条记录(来自分区 0、1、2 各两条),而监听器在第四个记录上抛出异常,容器会通过确认前三个消息的偏移量来确认这三条记录。
The DefaultErrorHandler 会为分区 1 寻求到偏移量 1,为分区 2 寻求到偏移量 0。
下一个 poll() 会返回三条未处理的记录。
如果 AckMode 是 BATCH,则容器会在调用错误处理程序之前,为前两个分区提交偏移量。
批处理监听器必须抛出一个BatchListenerFailedException,表示批处理中哪些记录失败。
事件序列是:
-
提交索引前的记录的偏移量。
-
如果重试次数未耗尽,则执行seek操作,使所有剩余记录(包括失败的记录)都会被重新投递。
-
如果重试次数耗尽,将尝试恢复失败的记录(默认仅日志记录)并执行定位,使得排除该失败记录的其余记录将重新投递。 恢复的记录的偏移量会被提交。
-
如果重试次数耗尽且恢复失败,那么将像重试次数未耗尽一样执行寻道操作。
从 2.9 版本开始,DefaultErrorHandler 可以配置提供与上述讨论中类似的效果,即像查找未处理记录的偏移量一样操作,但并不实际进行查找。
相反,错误处理器会创建一个新的 ConsumerRecords<?, ?>,其中只包含未处理的记录,然后在进行一次暂停的 poll() 以保持消费者存活后,将其提交给监听器。
要启用此模式,请将属性 seekAfterError 设置为 false。 |
The 默认 recoverer 在尝试次数耗尽后会记录失败的记录。
您可以使用自定义 recoverer,或使用框架提供的 recoverer,例如 DeadLetterPublishingRecoverer。
当使用POJO批处理监听器(例如:List<Thing>),且你没有完整的消费记录可添加到异常中时,你只需要添加失败记录的索引:
@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
for (int i = 0; i < things.size(); i++) {
try {
process(things.get(i));
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", i);
}
}
}
当容器配置为使用 AckMode.MANUAL_IMMEDIATE 时,错误处理器可以配置在恢复记录后提交偏移量;将 commitRecovered 属性设置为 true。
参见 发布死信记录。
在使用事务时,提供了类似的 DefaultAfterRollbackProcessor 功能。
查看 回滚后处理器。
The DefaultErrorHandler 将某些异常视为致命,对于此类异常会跳过重试;在第一次失败时会调用 recoverer。
默认视为致命的异常包括:
-
DeserializationException -
MessageConversionException -
ConversionException -
MethodArgumentResolutionException -
NoSuchMethodException -
ClassCastException
由于这些异常在重试投递时不太可能得到解决。
你可以将更多异常类型添加到“不可重试”类别中,或完全替换分类异常的映射。
查看 DefaultErrorHandler.addNotRetryableException() 和 DefaultErrorHandler.setClassifications() 的 Javadocs 以获取更多信息,以及 ExceptionMatcher。
这里是将 IllegalArgumentException 添加到不可重试异常的示例:
@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
handler.addNotRetryableExceptions(IllegalArgumentException.class);
return handler;
}
The DefaultErrorHandler only processes exceptions that inherit from RuntimeException.
Exceptions inheriting from Error bypass the error handler entirely, causing the consumer to terminate immediately, close the Kafka connection, and skip all retry/recovery mechanisms.
This critical distinction means applications may report healthy status despite having terminated consumers that no longer process messages.
Always ensure that exceptions thrown in message processing code explicitly extend from RuntimeException rather than Error to allow proper error handling.
In other words, if the application throws an exception, ensure that it is extended from RuntimeException and not inadvertently inherited from Error.
Standard errors like OutOfMemoryError, IllegalAccessError, and other errors beyond the control of the application are still treated as Errors and not retried. |
错误处理程序可以配置一个或多个RetryListener,接收重试和恢复进度的通知。
从2.8.10版本开始,增加了批量监听器的方法。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
}
default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
}
default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
}
}
查看更多Java文档信息。
如果恢复器失败(抛出异常),则失败的记录将包含在seeks中。
如果恢复器失败,默认会将BackOff重置,并会在再次尝试恢复前按照回退策略重新进行重试。
要跳过恢复失败后的重试,将错误处理程序的resetStateOnRecoveryFailure设置为false。 |
你可以通过提供一个BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>的错误处理程序来确定BackOff的使用,该BackOff将基于失败的记录和/或异常来决定:
handler.setBackOffFunction((record, ex) -> { ... });
如果函数返回 null,将使用处理器的默认 BackOff。
将 resetStateOnExceptionChange 设置为 true 时,如果在多次失败间异常类型发生变化,重试序列将重新开始(包括根据配置选择新的 BackOff)。
当 false(在 2.9 版本之前为默认值)时,不考虑异常类型。
从 2.9 版本开始,现在默认值为 true。
也请参见 Delivery Attempts 头部。
批量监听器死信主题错误处理
非阻塞重试(使用@RetryableTopic注解)与批处理监听器不兼容。
要与批处理监听器使用死信主题功能,请使用DefaultErrorHandler与DeadLetterPublishingRecoverer。 |
使用 BatchListenerFailedException
为了指示批处理中具体哪条记录失败,抛出一个 BatchListenerFailedException:
@KafkaListener(id = "batch-listener", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<String, Order>> records) {
for (ConsumerRecord<String, Order> record : records) {
try {
process(record.value());
}
catch (Exception e) {
// Identifies the failed record for error handling
throw new BatchListenerFailedException("Failed to process", e, record);
}
}
}
对于不具有ConsumerRecord的POJO批处理监听器,使用索引代替:
@KafkaListener(id = "batch-listener", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<Order> orders) {
for (int i = 0; i < orders.size(); i++) {
try {
process(orders.get(i));
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", e, i);
}
}
}
配置批处理监听器的死信主题
配置您的批处理监听器容器工厂上的一个 DefaultErrorHandler 与一个 DeadLetterPublishingRecoverer:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> batchFactory(
ConsumerFactory<String, Order> consumerFactory,
KafkaTemplate<String, Order> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, Order> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
// Configure Dead Letter Publishing
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate,
(record, ex) -> new TopicPartition(record.topic() + "-dlt", record.partition()));
// Configure retries: 3 attempts with 1 second between each
DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer,
new FixedBackOff(1000L, 2L)); // 2 retries = 3 total attempts
factory.setCommonErrorHandler(errorHandler);
return factory;
}
批次错误处理如何工作
当一个 BatchListenerFailedException 被抛出时,DefaultErrorHandler:
-
在失败记录之前的所有记录的提交偏移量
-
重试 失败的记录(以及后续记录)根据
BackOff配置 -
重试耗尽后发布到 DLT - 仅将失败的记录发送到 DLT
-
提交失败记录的偏移量 并重新投递剩余记录以供处理
示例流程:一批6条记录,其中索引为2的记录失败:
-
首次尝试:处理了记录 0, 1 成功;记录 2 失败
-
容器为记录 0,1 提交偏移量
-
重试尝试 1: 记录 2, 3, 4, 5 将被重试
-
重试尝试 2: 记录 2, 3, 4, 5 将再次重试
-
尝试重试耗尽后:记录 2 被发布到 DLT,其偏移量已提交
-
容器继续显示记录 3, 4, 5
跳过特定异常的重试
默认情况下,除了致命异常(如 DeserializationException, MessageConversionException, 等等)外,DefaultErrorHandler 次重试会重试所有异常。要跳过您自己异常类型的重试,请使用错误处理器配置异常分类。
错误处理程序会检查 原因 为 BatchListenerFailedException 的情况,以确定是否应跳过重试:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Order> batchFactory(
ConsumerFactory<String, Order> consumerFactory,
KafkaTemplate<String, Order> kafkaTemplate) {
ConcurrentKafkaListenerContainerFactory<String, Order> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(kafkaTemplate);
DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer,
new FixedBackOff(1000L, 2L));
// Add custom exception types that should skip retries and go directly to DLT
errorHandler.addNotRetryableExceptions(ValidationException.class, InvalidFormatException.class);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
现在在你的监听器中:
@KafkaListener(id = "batch-listener", topics = "orders", containerFactory = "batchFactory")
public void processOrders(List<ConsumerRecord<String, Order>> records) {
for (ConsumerRecord<String, Order> record : records) {
try {
process(record.value());
}
catch (DatabaseException e) {
// Will be retried 3 times (according to BackOff configuration)
throw new BatchListenerFailedException("Database error", e, record);
}
catch (ValidationException e) {
// Skips retries - goes directly to DLT
// (because ValidationException is configured as not retryable)
throw new BatchListenerFailedException("Validation failed", e, record);
}
}
}
错误处理程序会检查BatchListenerFailedException的原因(第二个参数)。
如果原因被分类为不可重试,记录将立即发送到DLT,不会重试。 |
偏移量提交行为
理解偏移提交对于批量错误处理很重要:
-
AckMode.BATCH(批处理监听器中最常见):
-
失败记录之前的偏移量在错误处理之前被提交
-
失败记录的偏移量在成功恢复后提交(DLT发布)
-
-
AckMode.MANUAL_IMMEDIATE:
-
设置
errorHandler.setCommitRecovered(true)以提交恢复的记录偏移量 -
您在监听器中控制确认时间
-
示例:手动确认
@KafkaListener(id = "manual-batch", topics = "myTopic", containerFactory = "manualBatchFactory")
public void listen(List<ConsumerRecord<String, Order>> records, Acknowledgment ack) {
for (ConsumerRecord<String, Order> record : records) {
try {
process(record.value());
}
catch (Exception e) {
throw new BatchListenerFailedException("Processing failed", e, record);
}
}
ack.acknowledge();
}
批处理错误处理的转换错误
从版本 2.8 开始,当使用一个 MessageConverter 与一个 ByteArrayDeserializer、一个 BytesDeserializer 或一个 StringDeserializer,以及一个 DefaultErrorHandler 时,批处理监听器现在可以正确处理转换错误。
当发生转换错误时,payload 会被设置为 null,并会在记录头中添加一个反序列化异常,类似于 ErrorHandlingDeserializer。
监听器中可用 ConversionException 的列表,以便监听器可以抛出 BatchListenerFailedException,表示转换异常首次发生的位置索引。
示例:
@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
for (int i = 0; i < in.size(); i++) {
Foo foo = in.get(i);
if (foo == null && exceptions.get(i) != null) {
throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
}
process(foo);
}
}
反序列化错误与批量监听器
Batch listeners require manual handling of deserialization errors.
Unlike record listeners, there is no automatic error handler that detects and routes deserialization failures to the DLT.
You must explicitly check for failed records and throw BatchListenerFailedException. |
使用 ErrorHandlingDeserializer 可以防止反序列化错误导致整个批处理停止:
@Bean
public ConsumerFactory<String, Order> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
// Wrap your deserializer with ErrorHandlingDeserializer
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
return new DefaultKafkaConsumerFactory<>(props);
}
在你的监听器中,必须手动检查null值,这些值表示反序列化失败:
@KafkaListener(id = "batch-deser", topics = "orders", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<String, Order>> records) {
for (ConsumerRecord<String, Order> record : records) {
if (record.value() == null) {
// Deserialization failed - throw exception to send to DLT
throw new BatchListenerFailedException("Deserialization failed", record);
}
process(record.value());
}
}
When DeadLetterPublishingRecoverer 在 DLT 中发布反序列化失败:
-
原始无法反序列化的
byte[]数据将作为记录值恢复 -
异常信息(类名,消息,堆栈轨迹)被添加到标准 DLT 异常头中
-
原始的
ErrorHandlingDeserializer异常标题头默认移除(在recoverer上设置setRetainExceptionHeader(true)以保留它)
重试完整批次
这是现在 批处理监听器 的回退行为,当监听器抛出除 BatchListenerFailedException 以外的异常时,DefaultErrorHandler 的行为将作为回退。
无法保证在批次重新投递时,该批次包含的记录数量相同,或重新投递的记录顺序相同。
因此,无法轻易地为一个批次维护重试状态。
FallbackBatchErrorHandler 采取以下方法。
如果批次监听器抛出的异常不是 BatchListenerFailedException,重试将从内存中的记录批次进行。
为避免在长时间的重试序列期间发生重新平衡,错误处理程序会在每次重试时暂停消费者,重试前进行轮询,并在休眠等待退避时间之前再次调用监听器。
如果/当重试耗尽时,ConsumerRecordRecoverer 将为该批次中的每条记录被调用。
如果恢复器抛出异常,或在休眠期间线程被中断,记录批次将在下一次轮询时重新投递。
无论结果如何,退出前都会恢复消费者。
| 此机制无法与事务一起使用。 |
在等待 BackOff 间隔时,错误处理程序将通过短暂停留进行循环,直到达到期望的延迟,同时检查容器是否已停止,从而在 stop() 处尽快退出循环,而不是造成延迟。
容器停止错误处理程序
The CommonContainerStoppingErrorHandler 会在监听器抛出异常时停止容器。
对于记录监听器,当 AckMode 是 RECORD 时,已处理记录的偏移量会被提交。
对于记录监听器,当 AckMode 是任何手动值时,已确认记录的偏移量会被提交。
对于记录监听器,当 AckMode 是 BATCH,或对于批次监听器,容器重启时会重新播放整个批次。
容器停止后,会抛出包装 ListenerExecutionFailedException 的异常。
这是为了在事务启用时导致事务回滚。
委托式错误处理器
The CommonDelegatingErrorHandler 可以根据异常类型委托到不同的错误处理器。
例如,您可以希望为大多数异常调用 DefaultErrorHandler,或者为其他异常调用 CommonContainerStoppingErrorHandler。
所有委托必须共享相同的兼容属性(ackAfterHandle, seekAfterError …)。
使用不同的常见错误处理器为记录器和批处理监听器
如果您希望为record和batch监听器使用不同的错误处理策略,提供了CommonMixedErrorHandler,允许为每种监听器类型配置特定的错误处理器。
常见错误处理程序摘要
-
DefaultErrorHandler -
CommonContainerStoppingErrorHandler -
CommonDelegatingErrorHandler -
CommonLoggingErrorHandler -
CommonMixedErrorHandler
遗留错误处理器及其替换
| 遗留错误处理程序 | 替换 |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
无替换,使用 |
|
|
|
没有替换,使用 |
迁移自定义遗留错误处理实现到CommonErrorHandler
参考 CommonErrorHandler 中的 JavaDocs。
为了替换 ErrorHandler 或 ConsumerAwareErrorHandler 的实现,你应该实现 handleOne(),并将 seeksAfterHandle() 设置为返回 false(默认值)。
你还应该实现 handleOtherException() 以处理记录处理范围之外发生的异常(例如消费者错误)。
To replace a RemainingRecordsErrorHandler implementation, you should implement handleRemaining() and override seeksAfterHandle() to return true (the error handler must perform the necessary seeks).
You should also implement handleOtherException() - to handle exceptions that occur outside the scope of record processing (e.g. consumer errors).
To replace any BatchErrorHandler 实现,你应该实现 handleBatch()
你也应该实现 handleOtherException() - 以处理超出记录处理范围的异常(例如消费者错误)。
回滚处理器
在使用事务时,如果监听器抛出异常(且如果存在错误处理程序也抛出异常),事务将被回滚。
默认情况下,任何未处理的记录(包括失败的记录)将在下次轮询时重新获取。
这通过在DefaultAfterRollbackProcessor中执行seek个操作实现。
对于批处理监听器,整个记录批次将被重新处理(容器不知道哪个记录在批次中失败)。
要修改此行为,可以使用自定义的AfterRollbackProcessor配置。
例如,对于基于记录的监听器,您可能需要跟踪失败的记录,并在尝试一定次数后放弃,或许将其发布到死信主题。
From version 2.2, the DefaultAfterRollbackProcessor can now recover (skip) a record that keeps failing.
By default, after ten failures, the failed record is logged (at the ERROR level).
You can configure the processor with a custom recoverer (BiConsumer) and maximum failures.
Setting the maxFailures property to a negative number causes infinite retries.
The following example configures recovery after three tries:
AfterRollbackProcessor<String, String> processor =
new DefaultAfterRollbackProcessor((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
当你不使用事务时,可以通过配置一个 DefaultErrorHandler。
查看 容器错误处理器。
从3.2版本开始,Recovery现在可以跳过(恢复)持续失败的整个记录批次。
设置ContainerProperties.setBatchRecoverAfterRollback(true)以启用此功能。
| 默认行为是,使用批处理监听器时无法恢复,因为框架并不知道是批处理中的哪一条记录导致失败。 在这种情况下,应用程序监听器必须处理持续失败的记录。 |
参见 发布死信记录。
从 2.2.5 版本开始,DefaultAfterRollbackProcessor 可以在新的事务中被调用(新事务在失败事务回滚后启动)。
然后,如果你使用 DeadLetterPublishingRecoverer 发布失败记录,处理器会将恢复记录的偏移发送到原始主题/分区的事务。
要启用此功能,请在 DefaultAfterRollbackProcessor 上设置 commitRecovered 和 kafkaTemplate 属性。
如果恢复器失败(抛出异常),则失败的记录将包含在seeks中。
从2.5.5版本开始,如果恢复器失败,默认会将BackOff重置,并在恢复尝试再次时按照退避策略重新投递。
在更早的版本中,BackOff不会被重置,恢复会在下一次失败时再次尝试。
要还原到以前的行为,请将处理器的resetStateOnRecoveryFailure属性设置为false。 |
从版本 2.6 开始,您可以向处理器提供一个BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>来根据失败记录和/或异常确定要使用的BackOff:
handler.setBackOffFunction((record, ex) -> { ... });
如果该函数返回null,处理器的默认BackOff将被使用。
从版本 2.6.3 开始,如果在失败之间异常类型发生变化,则将resetStateOnExceptionChange设置为true并将重新启动重试序列(如果配置了选择新的BackOff)。默认情况下,不考虑异常类型。
从版本2.3.1开始,与DefaultErrorHandler类似,DefaultAfterRollbackProcessor将某些异常视为致命错误,在发生这些异常时跳过重试;在首次失败时调用recoverer。
默认情况下被视为致命的异常包括:
-
DeserializationException -
MessageConversionException -
ConversionException -
MethodArgumentResolutionException -
NoSuchMethodException -
ClassCastException
由于这些异常在重试投递时不太可能得到解决。
您可以向不可重试异常类型类别中添加更多异常类型,或完全替换已分类的异常映射。有关更多信息,请参阅DefaultAfterRollbackProcessor.setClassifications()和ExceptionMatcher的Javadoc。
这里是将 IllegalArgumentException 添加到不可重试异常的示例:
@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
processor.addNotRetryableException(IllegalArgumentException.class);
return processor;
}
也请参见 Delivery Attempts 头部。
在当前的kafka-clients下,容器无法检测到ProducerFencedException是由再平衡引起的还是由于生产者的transactional.id因超时或到期而被撤销。
因为大多数情况下是由于再平衡引起的,所以容器不会调用AfterRollbackProcessor(因为不再分配给它们,因此不合适的重新定位分区)。
如果您确保超时时间足够长以处理每个事务,并定期执行“空”事务(例如通过ListenerContainerIdleEvent),则可以避免由于超时和到期导致的围栏问题。
或者,您可以将stopContainerWhenFenced容器属性设置为true,这样容器将会停止,从而避免记录丢失。
您可以消费一个ConsumerStoppedEvent并检查Reason属性是否为FENCED来检测此条件。
由于事件还包含对容器的引用,您可以使用此事件重启容器。 |
从版本 2.7 开始,在等待 BackOff 时间间隔期间,错误处理器将循环并短暂休眠,直到达到所需延迟,并同时检查容器是否已停止,以便在收到 stop() 后尽快退出休眠而不是造成延迟。
从版本 2.7 开始,处理器可以配置一个或多个 RetryListener,接收重试和恢复进度的通知。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
}
查看更多Java文档信息。
配送尝试标题
以下规则仅适用于记录侦听器,不适用于批处理侦听器。
从版本 2.5 开始,当使用实现 DeliveryAttemptAware 的 ErrorHandler 或 AfterRollbackProcessor 时,可以启用向记录添加 KafkaHeaders.DELIVERY_ATTEMPT 头(kafka_deliveryAttempt)的功能。
此头的值是从 1 开始递增的整数。
在接收原始 ConsumerRecord<?, ?> 时,该整数位于 byte[4] 中。
int delivery = ByteBuffer.wrap(record.headers()
.lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
.getInt();
当使用@KafkaListener与JsonKafkaHeaderMapper或SimpleKafkaHeaderMapper时,可以通过将@Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery作为参数添加到监听器方法中来获取。
要启用此标题的填充,请将容器属性 deliveryAttemptHeader 设置为 true。 默认情况下它是禁用的,以避免每次查找记录状态并添加标题时产生的(微小)开销。
数字 DefaultErrorHandler 和 DefaultAfterRollbackProcessor 支持此功能。
批处理监听器的交付尝试标题
在使用BatchListener处理ConsumerRecord时,KafkaHeaders.DELIVERY_ATTEMPT标题可以以与SingleRecordListener不同的方式出现。
从版本 3.3 开始,如果您希望在使用 BatchListener 时将 KafkaHeaders.DELIVERY_ATTEMPT 头注入到 ConsumerRecord 中,请在 ErrorHandler 中将 DeliveryAttemptAwareRetryListener 设置为 RetryListener。
请参考下面的代码。
final FixedBackOff fixedBackOff = new FixedBackOff(1, 10);
final DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener());
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);
然后,每当批处理未能完成时,DeliveryAttemptAwareRetryListener 都会在 KafkaHeaders.DELIVERY_ATTMPT 标头中注入一个 ConsumerRecord。
监听器信息标题
在某些情况下,能够知道监听器正在哪个容器中运行是有用的。
从版本 2.8.4 开始,现在可以在监听器容器上设置 listenerInfo 属性,或者在 info 注解上设置 @KafkaListener 属性。然后,容器将在所有传入消息的 KafkaListener.LISTENER_INFO 头中添加此属性;它可以用作记录拦截器、过滤器等中的条件判断,或在监听器本身中使用。
@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
info = "this is the something listener")
public void listen(@Payload Thing thing,
@Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
...
}
在使用 RecordInterceptor 或 RecordFilterStrategy 实现时,标头作为字节数组存在于消费者记录中,并使用 KafkaListenerAnnotationBeanPostProcessor 的 charSet 属性进行转换。
标头映射器在从消费者记录创建1时也会转换为String,并且永远不会将此标头映射到传出记录上。
对于POJO批处理监听器,从版本2.8.6开始,标头被复制到每个批次成员中,并且在转换后也作为单个String参数可用。
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
info = "info for batch")
public void listen(List<Thing> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header(KafkaHeaders.LISTENER_INFO) String info) {
...
}
如果批处理监听器具有过滤器,并且该过滤器导致批次为空,您需要向参数@Header添加required = false,因为对于空批次而言这些信息不可用。 |
如果您收到List<Message<Thing>>,则信息位于每个Message<?>的KafkaHeaders.LISTENER_INFO标题中。
有关处理批次的更多信息,请参阅批处理侦听器。
发布死信记录
当记录的最大失败次数达到时,您可以使用记录恢复器来配置DefaultErrorHandler和DefaultAfterRollbackProcessor。 框架提供了DeadLetterPublishingRecoverer,它会将失败的消息发布到另一个主题。 恢复程序需要一个KafkaTemplate<Object, Object>,用于发送记录。 您还可以选择性地用BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>进行配置,该代码用于解析目标主题和分区。
默认情况下,死信记录会被发送到一个名为 <originalTopic>-dlt 的主题(原始主题名称后缀为 -dlt)以及与原始记录相同的分区。因此,当您使用默认解析器时,死信主题必须至少具有与原始主题一样多的分区。 |
如果返回的 TopicPartition 具有负分区,则该分区在 ProducerRecord 中未被设置,因此由 Kafka 选择分区。
从版本 2.2.4 开始,任何 ListenerExecutionFailedException(例如,在检测到 @KafkaListener 方法中的异常时抛出)都会增强 groupId 属性。
这使得目标解析器可以使用此属性,结合 ConsumerRecord 中的信息来选择死信主题。
以下示例显示了如何绑定自定义目标解析器:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
}
else {
return new TopicPartition(r.topic() + ".other.failures", r.partition());
}
});
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));
发送到死信主题的记录会附加以下标题:<br/>
-
KafkaHeaders.DLT_EXCEPTION_FQCN: 异常类名(通常为ListenerExecutionFailedException,但也可能是其他值)。 -
KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN: 异常原因类名(如果存在的话)(自版本 2.8 起)。 -
KafkaHeaders.DLT_EXCEPTION_STACKTRACE: 异常堆栈跟踪。 -
KafkaHeaders.DLT_EXCEPTION_MESSAGE: 异常消息。 -
KafkaHeaders.DLT_KEY_EXCEPTION_FQCN: 异常类名称(仅反序列化错误)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE: 异常堆栈跟踪(仅键反序列化错误)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE: 异常消息(仅键反序列化错误)。 -
KafkaHeaders.DLT_ORIGINAL_TOPIC: 原始主题。 -
KafkaHeaders.DLT_ORIGINAL_PARTITION: 原始分区。 -
KafkaHeaders.DLT_ORIGINAL_OFFSET: 原始偏移量。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP: 原始时间戳。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE: 原始时间戳类型。 -
KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP: 消费者组在处理记录时失败(自版本 2.8 起)。
关键异常仅由 DeserializationException 引起,因此不存在 DLT_KEY_EXCEPTION_CAUSE_FQCN。
有两种机制可以添加更多标题。
-
继承恢复器并重写
createProducerRecord()- 调用super.createProducerRecord()并添加更多标题。 -
提供一个
BiFunction来接收消费者记录和异常,并返回一个Headers对象;该对象中的标题将被复制到最终的生产者记录中;另请参阅管理死信记录标题。使用setHeadersFunction()来设置BiFunction。
第二个实现起来更简单,但第一个有更多可用信息,包括已组装的标准标题。
从版本 2.3 开始,与 ErrorHandlingDeserializer 结合使用时,发布者将恢复死信生产记录中的记录 value(),使其回到原始值(即未能反序列化的失败值)。
此前,value() 是 null,用户代码必须从消息头中解码 DeserializationException。
此外,您可以向发布者提供多个 KafkaTemplate;例如,如果您想发布来自 DeserializationException 的 byte[],以及使用不同序列化器对成功反序列化的记录进行处理,则可能需要这样做。
以下是配置使用 String 和 byte[] 序列化器的 KafkaTemplate 的发布者的示例:
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
KafkaTemplate<?, ?> bytesTemplate) {
Map<Class<?>, KafkaOperations<?, ?>> templates = new LinkedHashMap<>();
templates.put(String.class, stringTemplate);
templates.put(byte[].class, bytesTemplate);
return new DeadLetterPublishingRecoverer(templates);
}
发布者使用映射键来定位适合于即将发布的value()的模板。LinkedHashMap是推荐使用的,这样键就可以按顺序进行检查。
发布null值时,如果有多个模板,则恢复器会查找Void类的模板;如果不存在,则使用values().iterator()中的第一个模板。
从2.7版本开始,当消息发布失败时,您可以使用setFailIfSendResultIsError方法抛出异常。您还可以使用setWaitForSendResultTimeout为验证发送者成功设置超时时间。
| 如果恢复器失败(抛出异常),则失败的记录将包含在查找中。 从版本 2.5.5 开始,如果恢复器失败,默认情况下 对于较早的版本, 要恢复到以前的行为,请将错误处理器的 |
从版本 2.6.3 开始,如果在失败之间异常类型发生变化,则将resetStateOnExceptionChange设置为true并将重新启动重试序列(如果配置了选择新的BackOff)。默认情况下,不考虑异常类型。
从版本 2.3 开始,恢复器也可以与 Kafka Streams 一起使用 - 有关详细信息,请参阅 反序列化异常恢复。
在标题ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER和ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER中添加序列化异常(使用Java序列化)。ErrorHandlingDeserializer。默认情况下,这些标题不会保留在发布到死信主题的消息中。从版本2.7开始,如果键和值都失败了反序列化,则原始值都会填充到发送到DLT的记录中。
如果传入的记录相互依赖,但可能以无序方式到达,则重新发布失败的记录到原始主题末尾(重复若干次)可能是有用的,而不是直接将其发送到死信主题。有关示例,请参阅此Stack Overflow问题。
以下错误处理程序配置将完全实现这一点:
@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
(rec, ex) -> {
org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
if (retries == null) {
retries = new RecordHeader("retries", new byte[] { 1 });
rec.headers().add(retries);
}
else {
retries.value()[0]++;
}
return retries.value()[0] > 5
? new TopicPartition("topic-dlt", rec.partition())
: new TopicPartition("topic", rec.partition());
}), new FixedBackOff(0L, 0L));
}
从版本 2.7 开始,恢复程序会检查目标解析器选择的分区是否确实存在。如果该分区不存在,则将 ProducerRecord 的值设置为 null,以便 KafkaProducer 可以选择该分区。您可以通过将 verifyPartition 属性设置为 false 来禁用此检查。
从版本 3.1 开始,将 logRecoveryRecord 属性设置为 true 将记录恢复记录和异常。
处理死信记录标头
-
appendOriginalHeaders(默认值true) -
stripPreviousExceptionHeaders(默认值为true,自版本 2.8 起)
Apache Kafka 支持具有相同名称的多个标头;要获取最新的值,可以使用 headers.lastHeader(headerName);若要迭代多个标头,请使用 headers.headers(headerName).iterator()。
在反复重新发布失败的记录时,这些标头可能会增长(最终导致由于 RecordTooLargeException 而发布失败);对于异常标头以及特别是堆栈跟踪标头来说尤其如此。
设置两个属性的原因在于,虽然您可能只想保留最后一次异常信息,但您也可能想保留每次记录失败时经过的主题历史。
appendOriginalHeaders 应用于所有名为 ORIGINAL 的标题,而 stripPreviousExceptionHeaders 应用于所有名为 EXCEPTION 的标题。
从版本 2.8.4 开始,您可以控制哪些标准标头将被添加到输出记录中。
请参阅enum HeadersToAdd获取默认情况下(目前)由 10 个标准标头的通用名称(这些不是实际的标头名称,而只是抽象;实际的标头名称由子类可以覆盖的getHeaderNames()方法设置。
要排除标头,请使用excludeHeaders()方法;例如,要禁止在标头中添加异常堆栈跟踪,请使用:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);
此外,您可以通过添加一个 ExceptionHeadersCreator 来完全自定义异常头信息的添加;这也会禁用所有标准的异常头信息。
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
kafkaHeaders.add(new RecordHeader(..., ...));
});
从版本 2.8.4 开始,您可以使用 addHeadersFunction 方法提供多个标题函数。这允许应用额外的函数,即使另一个函数已经被注册,例如在使用非阻塞重试时。
ExponentialBackOffWithMaxRetries实施
Spring 框架提供了多个BackOff实现。
默认情况下,ExponentialBackOff会无限重试;若要在一定次数的重试后放弃,则需要计算maxElapsedTime。
自版本 2.7.3 起,用于 Apache Kafka 的 Spring 提供了ExponentialBackOffWithMaxRetries,它是一个子类,接收maxRetries属性并自动计算maxElapsedTime,这稍微更方便一些。
@Bean
DefaultErrorHandler handler() {
ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
bo.setInitialInterval(1_000L);
bo.setMultiplier(2.0);
bo.setMaxInterval(10_000L);
return new DefaultErrorHandler(myRecoverer, bo);
}
将在调用恢复程序之前重试1, 2, 4, 8, 10, 10秒。