这个版本仍在开发中,目前尚未被认为是稳定的。要使用最新稳定版本,请使用 Spring for Apache Kafka 4.0.4spring-doc.cadn.net.cn

处理异常

本部分描述了在使用 Spring for Apache Kafka 时如何处理各种可能出现的异常。spring-doc.cadn.net.cn

监听器错误处理程序

从 2.0 版本开始,@KafkaListener 注解新增了属性:errorHandlerspring-doc.cadn.net.cn

你可以使用 errorHandler 来提供一个 KafkaListenerErrorHandler 实现的 bean 名称。 此函数式接口有一个方法,如下列示的代码所示:spring-doc.cadn.net.cn

@FunctionalInterface
public interface KafkaListenerErrorHandler {

    Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;

}

你有权限访问由消息转换器产生的 spring-messaging Message<?> 对象,以及由监听器抛出的异常,该异常被包装在 ListenerExecutionFailedException 中。 错误处理器可以抛出原始或一个新的异常,该异常将被抛给容器。 错误处理器返回的任何内容都会被忽略。spring-doc.cadn.net.cn

从 2.7 版本开始,您可以将 rawRecordHeader 属性设置在 MessagingMessageConverterBatchMessagingMessageConverter 上,这会使原始的 ConsumerRecord 添加到在 KafkaHeaders.RAW_DATA 头部转换的 Message<?> 中。 这在例如您希望在监听器错误处理程序中使用 DeadLetterPublishingRecoverer 时很有用。 它可能在请求/回复场景中使用,您希望在重试一定次数后捕获失败记录并将其发送到发送者,其中失败记录已放入死信主题中。spring-doc.cadn.net.cn

@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
    return (msg, ex) -> {
        if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
            recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
            return "FAILED";
        }
        throw ex;
    };
}

它有一个子接口 (ConsumerAwareListenerErrorHandler),可以通过以下方法访问consumer对象:spring-doc.cadn.net.cn

Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);

另一个子接口(ManualAckListenerErrorHandler)在使用手动 AckModes 时提供对 Acknowledgment 对象的访问。spring-doc.cadn.net.cn

Object handleError(Message<?> message, ListenerExecutionFailedException exception,
			Consumer<?, ?> consumer, @Nullable Acknowledgment ack);

在任何情况下,都不应对接收者执行任何seek操作,因为容器将无法感知这些操作。spring-doc.cadn.net.cn

容器错误处理器

从 2.8 版本开始,遗留的 ErrorHandlerBatchErrorHandler 接口已被 CommonErrorHandler 新接口取代。 这些错误处理程序可以同时处理记录型和批处理监听器的错误,允许单个监听器容器工厂创建这两种类型监听器的容器。 提供了 CommonErrorHandler 个实现,以替代大多数遗留框架错误处理程序实现。spring-doc.cadn.net.cn

将自定义遗留错误处理程序迁移到 CommonErrorHandler 以获取将自定义错误处理程序迁移到 CommonErrorHandler 的信息。spring-doc.cadn.net.cn

当使用事务时,如果没有配置错误处理程序,则异常将回滚事务。 事务容器的错误处理由 AfterRollbackProcessor 处理。 如果你在使用事务时提供自定义错误处理程序,并且希望回滚事务,则该处理程序必须抛出异常。spring-doc.cadn.net.cn

此接口具有一个默认方法 isAckAfterHandle(),由容器在错误处理程序在不抛出异常的情况下返回时,用来确定是否应提交偏移量;默认返回 true。spring-doc.cadn.net.cn

通常,框架提供的错误处理程序在错误未被“处理”时(例如在执行seek操作后)将抛出异常。 默认情况下,容器会在ERROR级别记录此类异常。 所有框架错误处理程序都扩展KafkaExceptionLogLevelAware,这允许您控制这些异常的日志记录级别。spring-doc.cadn.net.cn

/**
 * Set the level at which the exception thrown by this handler is logged.
 * @param logLevel the level (default ERROR).
 */
public void setLogLevel(KafkaException.Level logLevel) {
    ...
}

您可以指定一个全局错误处理器,用于容器工厂中所有监听器的错误处理。 以下示例展示了如何实现:spring-doc.cadn.net.cn

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
        kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setCommonErrorHandler(myErrorHandler);
    ...
    return factory;
}

默认情况下,如果带有注解的监听器方法抛出异常,该异常将传递到容器,并根据容器的配置来处理消息。spring-doc.cadn.net.cn

容器会在调用错误处理程序之前先提交任何待处理的偏移提交。spring-doc.cadn.net.cn

如果您使用 Spring Boot,只需将错误处理程序作为 a @Bean 添加,Boot 将会将其添加到自动配置的工厂中。spring-doc.cadn.net.cn

退避处理程序

错误处理程序(如 DefaultErrorHandler)使用一个 BackOff 来决定在重试传递之前等待多长时间。 从 2.9 版本开始,您可以配置一个自定义的 BackOffHandler。 默认处理程序会在退避时间经过(或容器被停止)后挂起线程。 框架还提供了 ContainerPausingBackOffHandler,它会在退避时间经过后暂停监听容器,然后恢复容器。 这在延迟时间长于 max.poll.interval.ms 消费者属性时很有用。 请注意,实际退避时间的精度将受到 pollTimeout 容器属性的影响。spring-doc.cadn.net.cn

默认错误处理器

这个新的错误处理器取代了SeekToCurrentErrorHandlerRecoveringBatchErrorHandler,它们在过去多个发布版本中都是默认的错误处理器。 一个不同之处是,当抛出除BatchListenerFailedException以外的异常时,批处理监听器的回退行为等同于重试完整批次spring-doc.cadn.net.cn

从 2.9 版本开始,DefaultErrorHandler 可以配置为提供与讨论的未处理记录偏移量查找相同的语义,但不实际执行查找操作。 相反,记录由监听器容器保留,并在错误处理程序退出后(并在执行一次暂停的 poll() 以保持消费者存活后)重新提交给监听器(如果使用了非阻塞重试或 ContainerPausingBackOffHandler,暂停可能会持续多个轮询周期)。 错误处理程序会将结果返回给容器,指示当前失败的记录是否可以重新提交,或者是否已恢复且不再发送给监听器第二次。 要启用此模式,请将属性 seekAfterError 设置为 false

出错处理程序可以恢复(跳过)一直失败的记录。 默认情况下,在发生十次失败后,失败的记录会被记录(记录级别为ERROR)。 你可以通过配置自定义的恢复器(BiConsumer)以及一个BackOff来控制重试次数和每次重试之间的延迟。 使用FixedBackOff配合FixedBackOff.UNLIMITED_ATTEMPTS将导致(实际上)无限重试。 以下示例配置了在尝试三次后进行恢复:spring-doc.cadn.net.cn

DefaultErrorHandler errorHandler =
    new DefaultErrorHandler((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

为了使用自定义的此处理器实例配置监听器容器,请将其添加到容器工厂中。spring-doc.cadn.net.cn

例如,使用 @KafkaListener 容器工厂,你可以添加 DefaultErrorHandler 如下:spring-doc.cadn.net.cn

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
    return factory;
}

对于记录器监听器,这将使用1秒的退避策略重试最多2次(总共3次发送尝试),而不是默认配置(FixedBackOff(0L, 9))。 在重试耗尽后,失败将仅记录日志。spring-doc.cadn.net.cn

作为一个示例,如果 poll 返回六条记录(来自分区 0、1、2 各两条),而监听器在第四个记录上抛出异常,容器会通过确认前三个消息的偏移量来确认这三条记录。 The DefaultErrorHandler 会为分区 1 寻求到偏移量 1,为分区 2 寻求到偏移量 0。 下一个 poll() 会返回三条未处理的记录。spring-doc.cadn.net.cn

如果 AckModeBATCH,则容器会在调用错误处理程序之前,为前两个分区提交偏移量。spring-doc.cadn.net.cn

批处理监听器必须抛出一个BatchListenerFailedException,表示批处理中哪些记录失败。spring-doc.cadn.net.cn

事件序列是:spring-doc.cadn.net.cn

  • 提交索引前的记录的偏移量。spring-doc.cadn.net.cn

  • 如果重试次数未耗尽,则执行seek操作,使所有剩余记录(包括失败的记录)都会被重新投递。spring-doc.cadn.net.cn

  • 如果重试次数耗尽,将尝试恢复失败的记录(默认仅日志记录)并执行定位,使得排除该失败记录的其余记录将重新投递。 恢复的记录的偏移量会被提交。spring-doc.cadn.net.cn

  • 如果重试次数耗尽且恢复失败,那么将像重试次数未耗尽一样执行寻道操作。spring-doc.cadn.net.cn

从 2.9 版本开始,DefaultErrorHandler 可以配置提供与上述讨论中类似的效果,即像查找未处理记录的偏移量一样操作,但并不实际进行查找。 相反,错误处理器会创建一个新的 ConsumerRecords<?, ?>,其中只包含未处理的记录,然后在进行一次暂停的 poll() 以保持消费者存活后,将其提交给监听器。 要启用此模式,请将属性 seekAfterError 设置为 false

The 默认 recoverer 在尝试次数耗尽后会记录失败的记录。 您可以使用自定义 recoverer,或使用框架提供的 recoverer,例如 DeadLetterPublishingRecovererspring-doc.cadn.net.cn

当使用POJO批处理监听器(例如:List<Thing>),且你没有完整的消费记录可添加到异常中时,你只需要添加失败记录的索引:spring-doc.cadn.net.cn

@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
    for (int i = 0; i < things.size(); i++) {
        try {
            process(things.get(i));
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Failed to process", i);
        }
    }
}

当容器配置为使用 AckMode.MANUAL_IMMEDIATE 时,错误处理器可以配置在恢复记录后提交偏移量;将 commitRecovered 属性设置为 truespring-doc.cadn.net.cn

在使用事务时,提供了类似的 DefaultAfterRollbackProcessor 功能。 查看 回滚后处理器spring-doc.cadn.net.cn

The DefaultErrorHandler 将某些异常视为致命,对于此类异常会跳过重试;在第一次失败时会调用 recoverer。 默认视为致命的异常包括:spring-doc.cadn.net.cn

由于这些异常在重试投递时不太可能得到解决。spring-doc.cadn.net.cn

您可以将更多异常类型添加到不可重试类别,或完全替换分类异常的映射。 请参阅DefaultErrorHandler.addNotRetryableException()DefaultErrorHandler.setClassifications()的Javadocs以获取更多信息,以及spring-retryBinaryExceptionClassifier的Javadocs。spring-doc.cadn.net.cn

这里是将 IllegalArgumentException 添加到不可重试异常的示例:spring-doc.cadn.net.cn

@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
    DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
    handler.addNotRetryableExceptions(IllegalArgumentException.class);
    return handler;
}

错误处理程序可以配置一个或多个RetryListener,接收重试和恢复进度的通知。 从2.8.10版本开始,增加了批量监听器的方法。spring-doc.cadn.net.cn

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

    default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
    }

    default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
    }

	default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
	}

}

查看更多Java文档信息。spring-doc.cadn.net.cn

如果恢复器失败(抛出异常),则失败的记录将包含在seeks中。 如果恢复器失败,默认会将BackOff重置,并会在再次尝试恢复前按照回退策略重新进行重试。 要跳过恢复失败后的重试,将错误处理程序的resetStateOnRecoveryFailure设置为false

你可以通过提供一个BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>的错误处理程序来确定BackOff的使用,该BackOff将基于失败的记录和/或异常来决定:spring-doc.cadn.net.cn

handler.setBackOffFunction((record, ex) -> { ... });

如果函数返回 null,将使用处理器的默认 BackOffspring-doc.cadn.net.cn

resetStateOnExceptionChange 设置为 true 时,如果在多次失败间异常类型发生变化,重试序列将重新开始(包括根据配置选择新的 BackOff)。 当 false(在 2.9 版本之前为默认值)时,不考虑异常类型。spring-doc.cadn.net.cn

从 2.9 版本开始,现在默认值为 truespring-doc.cadn.net.cn

批处理错误处理的转换错误

从版本 2.8 开始,当使用一个 MessageConverter 与一个 ByteArrayDeserializer、一个 BytesDeserializer 或一个 StringDeserializer,以及一个 DefaultErrorHandler 时,批处理监听器现在可以正确处理转换错误。 当发生转换错误时,payload 会被设置为 null,并会在记录头中添加一个反序列化异常,类似于 ErrorHandlingDeserializer。 监听器中可用 ConversionException 的列表,以便监听器可以抛出 BatchListenerFailedException,表示转换异常首次发生的位置索引。spring-doc.cadn.net.cn

@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
    for (int i = 0; i < in.size(); i++) {
        Foo foo = in.get(i);
        if (foo == null && exceptions.get(i) != null) {
            throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
        }
        process(foo);
    }
}

重试完整批次

这是现在 批处理监听器 的回退行为,当监听器抛出除 BatchListenerFailedException 以外的异常时,DefaultErrorHandler 的行为将作为回退。spring-doc.cadn.net.cn

无法保证在批次重新投递时,该批次包含的记录数量相同,或重新投递的记录顺序相同。 因此,无法轻易地为一个批次维护重试状态。 FallbackBatchErrorHandler 采取以下方法。 如果批次监听器抛出的异常不是 BatchListenerFailedException,重试将从内存中的记录批次进行。 为避免在长时间的重试序列期间发生重新平衡,错误处理程序会在每次重试时暂停消费者,重试前进行轮询,并在休眠等待退避时间之前再次调用监听器。 如果/当重试耗尽时,ConsumerRecordRecoverer 将为该批次中的每条记录被调用。 如果恢复器抛出异常,或在休眠期间线程被中断,记录批次将在下一次轮询时重新投递。 无论结果如何,退出前都会恢复消费者。spring-doc.cadn.net.cn

此机制无法与事务一起使用。

在等待 BackOff 间隔时,错误处理程序将通过短暂停留进行循环,直到达到期望的延迟,同时检查容器是否已停止,从而在 stop() 处尽快退出循环,而不是造成延迟。spring-doc.cadn.net.cn

容器停止错误处理程序

The CommonContainerStoppingErrorHandler 会在监听器抛出异常时停止容器。 对于记录监听器,当 AckModeRECORD 时,已处理记录的偏移量会被提交。 对于记录监听器,当 AckMode 是任何手动值时,已确认记录的偏移量会被提交。 对于记录监听器,当 AckModeBATCH,或对于批次监听器,容器重启时会重新播放整个批次。spring-doc.cadn.net.cn

容器停止后,会抛出包装 ListenerExecutionFailedException 的异常。 这是为了在事务启用时导致事务回滚。spring-doc.cadn.net.cn

委托式错误处理器

The CommonDelegatingErrorHandler 可以根据异常类型委托到不同的错误处理器。 例如,您可以希望为大多数异常调用 DefaultErrorHandler,或者为其他异常调用 CommonContainerStoppingErrorHandlerspring-doc.cadn.net.cn

所有委托必须共享相同的兼容属性(ackAfterHandle, seekAfterError …​)。spring-doc.cadn.net.cn

日志记录错误处理程序

The CommonLoggingErrorHandler 仅仅记录异常;使用记录监听器时,上一次轮询的剩余记录将传递给监听器。 对于批量监听器,批量中的所有记录都会被记录。spring-doc.cadn.net.cn

使用不同的常见错误处理器为记录器和批处理监听器

如果您希望为record和batch监听器使用不同的错误处理策略,提供了CommonMixedErrorHandler,允许为每种监听器类型配置特定的错误处理器。spring-doc.cadn.net.cn

常见错误处理程序摘要

遗留错误处理器及其替换

遗留错误处理程序 替换

LoggingErrorHandlerspring-doc.cadn.net.cn

CommonLoggingErrorHandlerspring-doc.cadn.net.cn

BatchLoggingErrorHandlerspring-doc.cadn.net.cn

CommonLoggingErrorHandlerspring-doc.cadn.net.cn

ConditionalDelegatingErrorHandlerspring-doc.cadn.net.cn

DelegatingErrorHandlerspring-doc.cadn.net.cn

ConditionalDelegatingBatchErrorHandlerspring-doc.cadn.net.cn

DelegatingErrorHandlerspring-doc.cadn.net.cn

ContainerStoppingErrorHandlerspring-doc.cadn.net.cn

CommonContainerStoppingErrorHandlerspring-doc.cadn.net.cn

ContainerStoppingBatchErrorHandlerspring-doc.cadn.net.cn

CommonContainerStoppingErrorHandlerspring-doc.cadn.net.cn

SeekToCurrentErrorHandlerspring-doc.cadn.net.cn

DefaultErrorHandlerspring-doc.cadn.net.cn

SeekToCurrentBatchErrorHandlerspring-doc.cadn.net.cn

无替换,使用DefaultErrorHandler配合无限的BackOffspring-doc.cadn.net.cn

RecoveringBatchErrorHandlerspring-doc.cadn.net.cn

DefaultErrorHandlerspring-doc.cadn.net.cn

RetryingBatchErrorHandlerspring-doc.cadn.net.cn

没有替换,使用 DefaultErrorHandler 并抛出除 BatchListenerFailedException 以外的异常。spring-doc.cadn.net.cn

迁移自定义遗留错误处理实现到CommonErrorHandler

参考 CommonErrorHandler 中的 JavaDocs。spring-doc.cadn.net.cn

为了替换 ErrorHandlerConsumerAwareErrorHandler 的实现,你应该实现 handleOne(),并将 seeksAfterHandle() 设置为返回 false(默认值)。 你还应该实现 handleOtherException() 以处理记录处理范围之外发生的异常(例如消费者错误)。spring-doc.cadn.net.cn

To replace a RemainingRecordsErrorHandler implementation, you should implement handleRemaining() and override seeksAfterHandle() to return true (the error handler must perform the necessary seeks). You should also implement handleOtherException() - to handle exceptions that occur outside the scope of record processing (e.g. consumer errors).spring-doc.cadn.net.cn

To replace any BatchErrorHandler 实现,你应该实现 handleBatch() 你也应该实现 handleOtherException() - 以处理超出记录处理范围的异常(例如消费者错误)。spring-doc.cadn.net.cn

回滚处理器

在使用事务时,如果监听器抛出异常(且如果存在错误处理程序也抛出异常),事务将被回滚。 默认情况下,任何未处理的记录(包括失败的记录)将在下次轮询时重新获取。 这通过在DefaultAfterRollbackProcessor中执行seek个操作实现。 对于批处理监听器,整个记录批次将被重新处理(容器不知道哪个记录在批次中失败)。 要修改此行为,可以使用自定义的AfterRollbackProcessor配置。 例如,对于基于记录的监听器,您可能需要跟踪失败的记录,并在尝试一定次数后放弃,或许将其发布到死信主题。spring-doc.cadn.net.cn

From version 2.2, the DefaultAfterRollbackProcessor can now recover (skip) a record that keeps failing. By default, after ten failures, the failed record is logged (at the ERROR level). You can configure the processor with a custom recoverer (BiConsumer) and maximum failures. Setting the maxFailures property to a negative number causes infinite retries. The following example configures recovery after three tries:spring-doc.cadn.net.cn

AfterRollbackProcessor<String, String> processor =
    new DefaultAfterRollbackProcessor((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

当你不使用事务时,可以通过配置一个 DefaultErrorHandler。 查看 容器错误处理器spring-doc.cadn.net.cn

从3.2版本开始,Recovery现在可以跳过(恢复)持续失败的整个记录批次。 设置ContainerProperties.setBatchRecoverAfterRollback(true)以启用此功能。spring-doc.cadn.net.cn

默认行为是,使用批处理监听器时无法恢复,因为框架并不知道是批处理中的哪一条记录导致失败。 在这种情况下,应用程序监听器必须处理持续失败的记录。

从 2.2.5 版本开始,DefaultAfterRollbackProcessor 可以在新的事务中被调用(新事务在失败事务回滚后启动)。 然后,如果你使用 DeadLetterPublishingRecoverer 发布失败记录,处理器会将恢复记录的偏移发送到原始主题/分区的事务。 要启用此功能,请在 DefaultAfterRollbackProcessor 上设置 commitRecoveredkafkaTemplate 属性。spring-doc.cadn.net.cn

如果恢复器失败(抛出异常),则失败的记录将包含在seeks中。 从2.5.5版本开始,如果恢复器失败,默认会将BackOff重置,并在恢复尝试再次时按照退避策略重新投递。 在更早的版本中,BackOff不会被重置,恢复会在下一次失败时再次尝试。 要还原到以前的行为,请将处理器的resetStateOnRecoveryFailure属性设置为false

从版本 2.6 开始,您可以向处理器提供一个BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>来根据失败记录和/或异常确定要使用的BackOffspring-doc.cadn.net.cn

handler.setBackOffFunction((record, ex) -> { ... });

如果该函数返回null,处理器的默认BackOff将被使用。spring-doc.cadn.net.cn

从版本 2.6.3 开始,如果在失败之间异常类型发生变化,则将resetStateOnExceptionChange设置为true并将重新启动重试序列(如果配置了选择新的BackOff)。默认情况下,不考虑异常类型。spring-doc.cadn.net.cn

从版本2.3.1开始,与DefaultErrorHandler类似,DefaultAfterRollbackProcessor将某些异常视为致命错误,在发生这些异常时跳过重试;在首次失败时调用recoverer。
默认情况下被视为致命的异常包括:spring-doc.cadn.net.cn

由于这些异常在重试投递时不太可能得到解决。spring-doc.cadn.net.cn

您可以在非重试类别中添加更多的异常类型,或者完全替换分类异常的映射。 请参阅DefaultAfterRollbackProcessor.setClassifications()的Javadoc,以及spring-retryBinaryExceptionClassifier的相关文档以获取更多信息。spring-doc.cadn.net.cn

这里是将 IllegalArgumentException 添加到不可重试异常的示例:spring-doc.cadn.net.cn

@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
    DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
    processor.addNotRetryableException(IllegalArgumentException.class);
    return processor;
}
在当前的kafka-clients下,容器无法检测到ProducerFencedException是由再平衡引起的还是由于生产者的transactional.id因超时或到期而被撤销。 因为大多数情况下是由于再平衡引起的,所以容器不会调用AfterRollbackProcessor(因为不再分配给它们,因此不合适的重新定位分区)。 如果您确保超时时间足够长以处理每个事务,并定期执行“空”事务(例如通过ListenerContainerIdleEvent),则可以避免由于超时和到期导致的围栏问题。 或者,您可以将stopContainerWhenFenced容器属性设置为true,这样容器将会停止,从而避免记录丢失。 您可以消费一个ConsumerStoppedEvent并检查Reason属性是否为FENCED来检测此条件。 由于事件还包含对容器的引用,您可以使用此事件重启容器。

从版本 2.7 开始,在等待 BackOff 时间间隔期间,错误处理器将循环并短暂休眠,直到达到所需延迟,并同时检查容器是否已停止,以便在收到 stop() 后尽快退出休眠而不是造成延迟。spring-doc.cadn.net.cn

从版本 2.7 开始,处理器可以配置一个或多个 RetryListener,接收重试和恢复进度的通知。spring-doc.cadn.net.cn

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

}

查看更多Java文档信息。spring-doc.cadn.net.cn

配送尝试标题

以下规则仅适用于记录侦听器,不适用于批处理侦听器。spring-doc.cadn.net.cn

从版本 2.5 开始,当使用实现 DeliveryAttemptAwareErrorHandlerAfterRollbackProcessor 时,可以启用向记录添加 KafkaHeaders.DELIVERY_ATTEMPT 头(kafka_deliveryAttempt)的功能。
此头的值是从 1 开始递增的整数。
在接收原始 ConsumerRecord<?, ?> 时,该整数位于 byte[4] 中。spring-doc.cadn.net.cn

int delivery = ByteBuffer.wrap(record.headers()
    .lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
    .getInt();

当使用@KafkaListenerDefaultKafkaHeaderMapperSimpleKafkaHeaderMapper时,可以通过将@Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery作为参数添加到监听器方法中来获取。spring-doc.cadn.net.cn

要启用此标题的填充,请将容器属性 deliveryAttemptHeader 设置为 true。 默认情况下它是禁用的,以避免每次查找记录状态并添加标题时产生的(微小)开销。spring-doc.cadn.net.cn

数字 DefaultErrorHandlerDefaultAfterRollbackProcessor 支持此功能。spring-doc.cadn.net.cn

批处理监听器的交付尝试标题

在使用BatchListener处理ConsumerRecord时,KafkaHeaders.DELIVERY_ATTEMPT标题可以以与SingleRecordListener不同的方式出现。spring-doc.cadn.net.cn

从版本 3.3 开始,如果您希望在使用 BatchListener 时将 KafkaHeaders.DELIVERY_ATTEMPT 头注入到 ConsumerRecord 中,请在 ErrorHandler 中将 DeliveryAttemptAwareRetryListener 设置为 RetryListenerspring-doc.cadn.net.cn

请参考下面的代码。spring-doc.cadn.net.cn

final FixedBackOff fixedBackOff = new FixedBackOff(1, 10);
final DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener());

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);

然后,每当批处理未能完成时,DeliveryAttemptAwareRetryListener 都会在 KafkaHeaders.DELIVERY_ATTMPT 标头中注入一个 ConsumerRecordspring-doc.cadn.net.cn

监听器信息标题

在某些情况下,能够知道监听器正在哪个容器中运行是有用的。spring-doc.cadn.net.cn

从版本 2.8.4 开始,现在可以在监听器容器上设置 listenerInfo 属性,或者在 info 注解上设置 @KafkaListener 属性。然后,容器将在所有传入消息的 KafkaListener.LISTENER_INFO 头中添加此属性;它可以用作记录拦截器、过滤器等中的条件判断,或在监听器本身中使用。spring-doc.cadn.net.cn

@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
        info = "this is the something listener")
public void listen(@Payload Thing thing,
        @Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
    ...
}

在使用 RecordInterceptorRecordFilterStrategy 实现时,标头作为字节数组存在于消费者记录中,并使用 KafkaListenerAnnotationBeanPostProcessorcharSet 属性进行转换。spring-doc.cadn.net.cn

标头映射器在从消费者记录创建1时也会转换为String,并且永远不会将此标头映射到传出记录上。spring-doc.cadn.net.cn

对于POJO批处理监听器,从版本2.8.6开始,标头被复制到每个批次成员中,并且在转换后也作为单个String参数可用。spring-doc.cadn.net.cn

@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
        info = "info for batch")
public void listen(List<Thing> list,
        @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets,
        @Header(KafkaHeaders.LISTENER_INFO) String info) {
            ...
}
如果批处理监听器具有过滤器,并且该过滤器导致批次为空,您需要向参数@Header添加required = false,因为对于空批次而言这些信息不可用。

如果您收到List<Message<Thing>>,则信息位于每个Message<?>KafkaHeaders.LISTENER_INFO标题中。spring-doc.cadn.net.cn

有关处理批次的更多信息,请参阅批处理侦听器spring-doc.cadn.net.cn

发布死信记录

当记录的最大失败次数达到时,您可以使用记录恢复器来配置DefaultErrorHandlerDefaultAfterRollbackProcessor。 框架提供了DeadLetterPublishingRecoverer,它会将失败的消息发布到另一个主题。 恢复程序需要一个KafkaTemplate<Object, Object>,用于发送记录。 您还可以选择性地用BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>进行配置,该代码用于解析目标主题和分区。spring-doc.cadn.net.cn

默认情况下,死信记录会被发送到一个名为 <originalTopic>-dlt 的主题(原始主题名称后缀为 -dlt)以及与原始记录相同的分区。

spring-doc.cadn.net.cn

因此,当您使用默认解析器时,死信主题必须至少具有与原始主题一样多的分区。spring-doc.cadn.net.cn

如果返回的 TopicPartition 具有负分区,则该分区在 ProducerRecord 中未被设置,因此由 Kafka 选择分区。
从版本 2.2.4 开始,任何 ListenerExecutionFailedException(例如,在检测到 @KafkaListener 方法中的异常时抛出)都会增强 groupId 属性。
这使得目标解析器可以使用此属性,结合 ConsumerRecord 中的信息来选择死信主题。spring-doc.cadn.net.cn

以下示例显示了如何绑定自定义目标解析器:spring-doc.cadn.net.cn

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
        (r, e) -> {
            if (e instanceof FooException) {
                return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
            }
            else {
                return new TopicPartition(r.topic() + ".other.failures", r.partition());
            }
        });
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));

发送到死信主题的记录会附加以下标题:<br/>spring-doc.cadn.net.cn

关键异常仅由 DeserializationException 引起,因此不存在 DLT_KEY_EXCEPTION_CAUSE_FQCNspring-doc.cadn.net.cn

有两种机制可以添加更多标题。spring-doc.cadn.net.cn

  1. 继承恢复器并重写createProducerRecord() - 调用super.createProducerRecord()并添加更多标题。spring-doc.cadn.net.cn

  2. 提供一个BiFunction来接收消费者记录和异常,并返回一个Headers对象;该对象中的标题将被复制到最终的生产者记录中;另请参阅管理死信记录标题。使用setHeadersFunction()来设置BiFunctionspring-doc.cadn.net.cn

第二个实现起来更简单,但第一个有更多可用信息,包括已组装的标准标题。spring-doc.cadn.net.cn

从版本 2.3 开始,与 ErrorHandlingDeserializer 结合使用时,发布者将恢复死信生产记录中的记录 value(),使其回到原始值(即未能反序列化的失败值)。
此前,value() 是 null,用户代码必须从消息头中解码 DeserializationException
此外,您可以向发布者提供多个 KafkaTemplate;例如,如果您想发布来自 DeserializationExceptionbyte[],以及使用不同序列化器对成功反序列化的记录进行处理,则可能需要这样做。
以下是配置使用 Stringbyte[] 序列化器的 KafkaTemplate 的发布者的示例:spring-doc.cadn.net.cn

@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
        KafkaTemplate<?, ?> bytesTemplate) {
    Map<Class<?>, KafkaOperations<?, ?>> templates = new LinkedHashMap<>();
    templates.put(String.class, stringTemplate);
    templates.put(byte[].class, bytesTemplate);
    return new DeadLetterPublishingRecoverer(templates);
}

发布者使用映射键来定位适合于即将发布的value()的模板。LinkedHashMap是推荐使用的,这样键就可以按顺序进行检查。spring-doc.cadn.net.cn

发布null值时,如果有多个模板,则恢复器会查找Void类的模板;如果不存在,则使用values().iterator()中的第一个模板。spring-doc.cadn.net.cn

从2.7版本开始,当消息发布失败时,您可以使用setFailIfSendResultIsError方法抛出异常。您还可以使用setWaitForSendResultTimeout为验证发送者成功设置超时时间。spring-doc.cadn.net.cn

如果恢复器失败(抛出异常),则失败的记录将包含在查找中。

spring-doc.cadn.net.cn

从版本 2.5.5 开始,如果恢复器失败,默认情况下BackOff将被重置,并且再次尝试恢复之前,重新交付将继续通过退避机制。spring-doc.cadn.net.cn

对于较早的版本,BackOff不会被重置,并且在下一次失败时会再次尝试恢复。spring-doc.cadn.net.cn

要恢复到以前的行为,请将错误处理器的resetStateOnRecoveryFailure属性设置为falsespring-doc.cadn.net.cn

从版本 2.6.3 开始,如果在失败之间异常类型发生变化,则将resetStateOnExceptionChange设置为true并将重新启动重试序列(如果配置了选择新的BackOff)。默认情况下,不考虑异常类型。spring-doc.cadn.net.cn

从版本 2.3 开始,恢复器也可以与 Kafka Streams 一起使用 - 有关详细信息,请参阅 反序列化异常恢复spring-doc.cadn.net.cn

在标题ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADERErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER中添加序列化异常(使用Java序列化)。ErrorHandlingDeserializer。默认情况下,这些标题不会保留在发布到死信主题的消息中。从版本2.7开始,如果键和值都失败了反序列化,则原始值都会填充到发送到DLT的记录中。spring-doc.cadn.net.cn

如果传入的记录相互依赖,但可能以无序方式到达,则重新发布失败的记录到原始主题末尾(重复若干次)可能是有用的,而不是直接将其发送到死信主题。有关示例,请参阅此Stack Overflow问题spring-doc.cadn.net.cn

以下错误处理程序配置将完全实现这一点:spring-doc.cadn.net.cn

@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
    return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
            (rec, ex) -> {
                org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
                if (retries == null) {
                    retries = new RecordHeader("retries", new byte[] { 1 });
                    rec.headers().add(retries);
                }
                else {
                    retries.value()[0]++;
                }
                return retries.value()[0] > 5
                        ? new TopicPartition("topic-dlt", rec.partition())
                        : new TopicPartition("topic", rec.partition());
            }), new FixedBackOff(0L, 0L));
}

从版本 2.7 开始,恢复程序会检查目标解析器选择的分区是否确实存在。如果该分区不存在,则将 ProducerRecord 的值设置为 null,以便 KafkaProducer 可以选择该分区。您可以通过将 verifyPartition 属性设置为 false 来禁用此检查。spring-doc.cadn.net.cn

从版本 3.1 开始,将 logRecoveryRecord 属性设置为 true 将记录恢复记录和异常。spring-doc.cadn.net.cn

处理死信记录标头

参考上文发布死信记录DeadLetterPublishingRecoverer有两个属性用于在这些标头已经存在时进行管理(例如,在重新处理失败的死信记录时包括使用非阻塞重试)。spring-doc.cadn.net.cn

Apache Kafka 支持具有相同名称的多个标头;要获取最新的值,可以使用 headers.lastHeader(headerName);若要迭代多个标头,请使用 headers.headers(headerName).iterator()spring-doc.cadn.net.cn

在反复重新发布失败的记录时,这些标头可能会增长(最终导致由于 RecordTooLargeException 而发布失败);对于异常标头以及特别是堆栈跟踪标头来说尤其如此。spring-doc.cadn.net.cn

设置两个属性的原因在于,虽然您可能只想保留最后一次异常信息,但您也可能想保留每次记录失败时经过的主题历史。spring-doc.cadn.net.cn

appendOriginalHeaders 应用于所有名为 ORIGINAL 的标题,而 stripPreviousExceptionHeaders 应用于所有名为 EXCEPTION 的标题。spring-doc.cadn.net.cn

从版本 2.8.4 开始,您可以控制哪些标准标头将被添加到输出记录中。
请参阅enum HeadersToAdd获取默认情况下(目前)由 10 个标准标头的通用名称(这些不是实际的标头名称,而只是抽象;实际的标头名称由子类可以覆盖的getHeaderNames()方法设置。spring-doc.cadn.net.cn

要排除标头,请使用excludeHeaders()方法;例如,要禁止在标头中添加异常堆栈跟踪,请使用:spring-doc.cadn.net.cn

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);

此外,您可以通过添加一个 ExceptionHeadersCreator 来完全自定义异常头信息的添加;这也会禁用所有标准的异常头信息。spring-doc.cadn.net.cn

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
    kafkaHeaders.add(new RecordHeader(..., ...));
});

从版本 2.8.4 开始,您可以使用 addHeadersFunction 方法提供多个标题函数。这允许应用额外的函数,即使另一个函数已经被注册,例如在使用非阻塞重试时。spring-doc.cadn.net.cn

ExponentialBackOffWithMaxRetries实施

Spring 框架提供了多个BackOff实现。
默认情况下,ExponentialBackOff会无限重试;若要在一定次数的重试后放弃,则需要计算maxElapsedTime
自版本 2.7.3 起,用于 Apache Kafka 的 Spring 提供了ExponentialBackOffWithMaxRetries,它是一个子类,接收maxRetries属性并自动计算maxElapsedTime,这稍微更方便一些。spring-doc.cadn.net.cn

@Bean
DefaultErrorHandler handler() {
    ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
    bo.setInitialInterval(1_000L);
    bo.setMultiplier(2.0);
    bo.setMaxInterval(10_000L);
    return new DefaultErrorHandler(myRecoverer, bo);
}

将在调用恢复程序之前重试1, 2, 4, 8, 10, 10秒。spring-doc.cadn.net.cn