此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9spring-doc.cadn.net.cn

处理异常

本节介绍如何处理使用 Spring for Apache Kafka 时可能出现的各种异常。spring-doc.cadn.net.cn

侦听器错误处理程序

从 2.0 版开始,@KafkaListenerannotation 有一个新属性:errorHandler.spring-doc.cadn.net.cn

您可以使用errorHandler提供KafkaListenerErrorHandler实现。 此功能接口具有一种方法,如以下列表所示:spring-doc.cadn.net.cn

@FunctionalInterface
public interface KafkaListenerErrorHandler {

    Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;

}

您可以访问 spring-messagingMessage<?>消息转换器生成的对象和侦听器抛出的异常,该异常包装在ListenerExecutionFailedException. 错误处理程序可以抛出原始异常或新异常,这些异常会抛出到容器。错误处理程序返回的任何内容都将被忽略。spring-doc.cadn.net.cn

从 2.7 版开始,您可以将rawRecordHeader属性MessagingMessageConverterBatchMessagingMessageConverter这导致原始的ConsumerRecord添加到转换后的Message<?>KafkaHeaders.RAW_DATA页眉。 例如,如果您希望使用DeadLetterPublishingRecoverer在侦听器错误处理程序中。它可用于请求/回复方案,在该方案中,您希望在重试一定次数后,在捕获死信主题中的失败记录后将失败结果发送给发件人。spring-doc.cadn.net.cn

@Bean
public KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
    return (msg, ex) -> {
        if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
            recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
            return "FAILED";
        }
        throw ex;
    };
}

它有一个子接口(ConsumerAwareListenerErrorHandler) 通过以下方法访问消费者对象:spring-doc.cadn.net.cn

Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);

另一个子接口 (ManualAckListenerErrorHandler) 提供对Acknowledgment使用手动时的对象AckModes.spring-doc.cadn.net.cn

Object handleError(Message<?> message, ListenerExecutionFailedException exception,
			Consumer<?, ?> consumer, @Nullable Acknowledgment ack);

无论哪种情况,您都不应对使用者执行任何搜索,因为容器不会意识到它们。spring-doc.cadn.net.cn

容器错误处理程序

从 2.8 版开始,旧版ErrorHandlerBatchErrorHandler接口已被新的CommonErrorHandler. 这些错误处理程序可以处理记录侦听器和批处理侦听器的错误,从而允许单个侦听器容器工厂为这两种类型的侦听器创建容器。CommonErrorHandler提供了替换大多数旧框架错误处理程序实现的实现。spring-doc.cadn.net.cn

将自定义旧版错误处理程序实现迁移到CommonErrorHandler有关将自定义错误处理程序迁移到CommonErrorHandler.spring-doc.cadn.net.cn

使用事务时,默认情况下不会配置错误处理程序,因此异常将回滚事务。事务容器的错误处理由AfterRollbackProcessor. 如果您在使用事务时提供自定义错误处理程序,则如果要回滚事务,它必须引发异常。spring-doc.cadn.net.cn

此接口具有默认方法isAckAfterHandle()由容器调用,以确定如果错误处理程序返回而不抛出异常,是否应提交偏移量;默认情况下,它返回 true。spring-doc.cadn.net.cn

通常,当错误未被“处理”时(例如,在执行查找作之后),框架提供的错误处理程序将抛出异常。默认情况下,此类异常由容器记录在ERROR水平。 所有框架错误处理程序都扩展了KafkaExceptionLogLevelAware它允许您控制记录这些异常的级别。spring-doc.cadn.net.cn

/**
 * Set the level at which the exception thrown by this handler is logged.
 * @param logLevel the level (default ERROR).
 */
public void setLogLevel(KafkaException.Level logLevel) {
    ...
}

您可以指定一个全局错误处理程序,用于容器工厂中的所有侦听器。以下示例显示了如何执行此作:spring-doc.cadn.net.cn

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
        kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.setCommonErrorHandler(myErrorHandler);
    ...
    return factory;
}

默认情况下,如果带 Comments 的监听器方法抛出异常,则将其抛给容器,并根据容器配置处理消息。spring-doc.cadn.net.cn

容器在调用错误处理程序之前提交任何挂起的偏移量提交。spring-doc.cadn.net.cn

如果您使用的是 Spring Boot,则只需将错误处理程序添加为@Bean并且 Boot 会将其添加到自动配置的工厂中。spring-doc.cadn.net.cn

后退处理程序

错误处理程序(例如 DefaultErrorHandler)使用BackOff以确定在重试投放之前等待多长时间。 从 2.9 版开始,您可以配置自定义BackOffHandler. 默认处理程序只是挂起线程,直到回退时间过去(或容器停止)。 该框架还提供了ContainerPausingBackOffHandler这将暂停侦听器容器,直到回退时间过去,然后恢复容器。 当延迟长于max.poll.interval.ms消费者财产。 请注意,实际退避时间的分辨率将受到pollTimeoutcontainer 属性。spring-doc.cadn.net.cn

默认错误处理程序

这个新的错误处理程序将SeekToCurrentErrorHandlerRecoveringBatchErrorHandler,它们现在一直是多个版本的默认错误处理程序。 一个区别是批处理侦听器的回退行为(当BatchListenerFailedException引发)等效于重试完整批次spring-doc.cadn.net.cn

从 2.9 版开始,DefaultErrorHandler可以配置为提供与搜索未处理的记录偏移量相同的语义,如下所述,但实际上不进行搜索。 相反,记录由侦听器容器保留,并在错误处理程序退出后(以及执行单个暂停后)重新提交给侦听器poll(),让消费者活着;如果非阻塞重试或ContainerPausingBackOffHandler,暂停可能会延长到多个轮询)。 错误处理程序向容器返回一个结果,指示当前失败的记录是否可以重新提交,或者是否已恢复,然后不会再次发送到侦听器。 要启用此模式,请将属性seekAfterErrorfalse.

错误处理程序可以恢复(跳过)不断失败的记录。 默认情况下,在十次失败后,将记录失败的记录(在ERROR水平)。 您可以使用自定义恢复器 (BiConsumer) 和BackOff控制每个之间的交付尝试和延迟。 使用FixedBackOffFixedBackOff.UNLIMITED_ATTEMPTS导致(实际上)无限重试。 以下示例配置三次尝试后的恢复:spring-doc.cadn.net.cn

DefaultErrorHandler errorHandler =
    new DefaultErrorHandler((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

要使用此处理程序的自定义实例配置侦听器容器,请将其添加到容器工厂。spring-doc.cadn.net.cn

例如,使用@KafkaListener集装箱工厂,可以添加DefaultErrorHandler如下:spring-doc.cadn.net.cn

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.getContainerProperties().setAckMode(AckMode.RECORD);
    factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
    return factory;
}

对于记录侦听器,这将重试最多 2 次(3 次投放尝试),回退 1 秒,而不是默认配置(FixedBackOff(0L, 9)). 重试用尽后,只会记录失败。spring-doc.cadn.net.cn

例如,如果poll返回六条记录(每个分区 0、1、2 中的两条),侦听器在第四条记录上抛出异常,则容器通过提交前三条消息的偏移量来确认前三条消息。 这DefaultErrorHandler寻求偏移分区 1 的偏移量 1 和分区 2 的偏移量 0。 下一个poll()返回三条未处理的记录。spring-doc.cadn.net.cn

如果AckModeBATCH,容器在调用错误处理程序之前提交前两个分区的偏移量。spring-doc.cadn.net.cn

对于批处理侦听器,侦听器必须抛出BatchListenerFailedException指示批处理中的哪些记录失败。spring-doc.cadn.net.cn

事件的顺序为:spring-doc.cadn.net.cn

  • 在索引之前提交记录的偏移量。spring-doc.cadn.net.cn

  • 如果重试未用尽,请执行搜索,以便重新传递所有剩余记录(包括失败的记录)。spring-doc.cadn.net.cn

  • 如果重试用尽,请尝试恢复失败的记录(仅限默认日志)并执行搜索,以便重新传递剩余的记录(不包括失败的记录)。 已提交已恢复记录的偏移量。spring-doc.cadn.net.cn

  • 如果重试用尽且恢复失败,则执行搜索,就像重试未用尽一样。spring-doc.cadn.net.cn

从 2.9 版开始,DefaultErrorHandler可以配置为提供与上面讨论的查找未处理的记录偏移量相同的语义,但实际上不查找。 相反,错误处理程序会创建一个新的ConsumerRecords<?, ?>仅包含未处理的记录,然后将提交给侦听器(在执行单个暂停后)poll(),以保持消费者的活力)。 要启用此模式,请将属性seekAfterErrorfalse.

默认恢复程序在重试用尽后记录失败的记录。 您可以使用自定义恢复器,也可以使用框架提供的恢复器,例如DeadLetterPublishingRecoverer.spring-doc.cadn.net.cn

当使用 POJO 批处理监听器时(例如List<Thing>),并且您没有要添加到异常的完整使用者记录,您可以只添加失败记录的索引:spring-doc.cadn.net.cn

@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
    for (int i = 0; i < things.size(); i++) {
        try {
            process(things.get(i));
        }
        catch (Exception e) {
            throw new BatchListenerFailedException("Failed to process", i);
        }
    }
}

当容器配置为AckMode.MANUAL_IMMEDIATE,可以将错误处理程序配置为提交恢复记录的偏移量;将commitRecovered属性设置为true.spring-doc.cadn.net.cn

使用事务时,类似的功能由DefaultAfterRollbackProcessor. 请参阅回滚后处理器spring-doc.cadn.net.cn

DefaultErrorHandler认为某些异常是致命的,并且跳过此类异常的重试;在第一次失败时调用恢复程序。 默认情况下,被视为致命的异常包括:spring-doc.cadn.net.cn

因为这些异常不太可能在重试投放时得到解决。spring-doc.cadn.net.cn

您可以向不可重试类别添加更多异常类型,或完全替换分类异常的映射。请参阅 JavadocsDefaultErrorHandler.addNotRetryableException()DefaultErrorHandler.setClassifications()更多信息,以及spring-retry BinaryExceptionClassifier.spring-doc.cadn.net.cn

这是一个示例,它添加了IllegalArgumentException到不可重试的异常:spring-doc.cadn.net.cn

@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
    DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
    handler.addNotRetryableExceptions(IllegalArgumentException.class);
    return handler;
}
DefaultErrorHandler仅处理继承自RuntimeException. 继承自Error完全绕过错误处理程序,导致使用者立即终止,关闭 Kafka 连接,并跳过所有重试/恢复机制。这种关键区别意味着应用程序可能会报告正常状态,尽管使用者已终止不再处理消息。始终确保消息处理代码中引发的异常显式扩展自RuntimeException而不是Error以允许正确的错误处理。换句话说,如果应用程序抛出异常,请确保它是从RuntimeException而不是无意中继承自Error. 标准错误,例如OutOfMemoryError,IllegalAccessError,其他超出应用程序控制范围的错误仍被视为Errors 并未重试。

错误处理程序可以配置一个或多个RetryListeners,接收重试和恢复进度的通知。从 2.8.10 版开始,添加了批处理侦听器的方法。spring-doc.cadn.net.cn

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

    default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
    }

    default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
    }

	default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
	}

}

有关更多信息,请参阅 JavaDocs。spring-doc.cadn.net.cn

如果恢复器失败(抛出异常),则失败的记录将包含在搜索中。如果恢复器失败,则BackOff将默认重置,并且在再次尝试恢复之前,重新传递将再次经过回退。要在恢复失败后跳过重试,请将错误处理程序的resetStateOnRecoveryFailurefalse.

您可以为错误处理程序提供BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>以确定BackOff使用,基于失败的记录和/或异常:spring-doc.cadn.net.cn

handler.setBackOffFunction((record, ex) -> { ... });

如果函数返回null,则处理程序的默认值BackOff将被使用。spring-doc.cadn.net.cn

设置resetStateOnExceptionChangetrue重试序列将重新启动(包括选择新的BackOff,如果已配置),如果异常类型在失败之间发生变化。 什么时候false(2.9 版之前的默认值),则不考虑异常类型。spring-doc.cadn.net.cn

从 2.9 版本开始,现在是true默认情况下。spring-doc.cadn.net.cn

使用批处理错误处理程序的转换错误

从 2.8 版开始,批处理监听器现在可以正确处理转换错误,当使用MessageConverter使用ByteArrayDeserializer一个BytesDeserializerStringDeserializer,以及DefaultErrorHandler. 当发生转换错误时,有效负载将设置为 null,并将反序列化异常添加到记录标头中,类似于ErrorHandlingDeserializer. 列表ConversionExceptions 在侦听器中可用,因此侦听器可以抛出BatchListenerFailedException指示发生转换异常的第一个索引。spring-doc.cadn.net.cn

@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
    for (int i = 0; i < in.size(); i++) {
        Foo foo = in.get(i);
        if (foo == null && exceptions.get(i) != null) {
            throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
        }
        process(foo);
    }
}

重试完整批次

现在,这是DefaultErrorHandler对于批处理侦听器,其中侦听器抛出的异常不是BatchListenerFailedException.spring-doc.cadn.net.cn

无法保证在重新交付批次时,该批次具有相同的记录数和/或重新交付的记录按相同的顺序。 因此,不可能轻松维护批处理的重试状态。 这FallbackBatchErrorHandler采取以下方法。 如果批处理侦听器抛出的异常不是BatchListenerFailedException,则从内存中的记录批次执行重试。 为了避免在扩展重试序列期间重新平衡,错误处理程序会暂停使用者,在休眠之前轮询它以进行回退,每次重试,然后再次调用侦听器。 如果/当重试用尽时,则ConsumerRecordRecoverer为批处理中的每条记录调用。 如果恢复器抛出异常,或者线程在其睡眠期间中断,则将在下一次轮询时重新传递这批记录。 在退出之前,无论结果如何,消费者都会恢复。spring-doc.cadn.net.cn

此机制不能用于事务。

在等待BackOffinterval,错误处理程序将以短暂的睡眠循环,直到达到所需的延迟,同时检查容器是否已停止,允许睡眠在stop()而不是造成延误。spring-doc.cadn.net.cn

容器停止错误处理程序

CommonContainerStoppingErrorHandler如果侦听器抛出异常,则停止容器。 对于记录侦听器,当AckModeRECORD,则提交已处理记录的偏移量。 对于记录侦听器,当AckMode是任何手动值,则提交已确认记录的偏移量。 对于记录侦听器,当AckModeBATCH,或者对于批处理侦听器,当容器重新启动时,将重播整个批处理。spring-doc.cadn.net.cn

容器停止后,将ListenerExecutionFailedException被抛出。这是为了导致事务回滚(如果启用了事务)。spring-doc.cadn.net.cn

委托错误处理程序

CommonDelegatingErrorHandler可以委托给不同的错误处理程序,具体取决于异常类型。例如,您可能希望调用DefaultErrorHandler对于大多数例外情况,或CommonContainerStoppingErrorHandler对于其他人来说。spring-doc.cadn.net.cn

所有委托必须共享相同的兼容属性 (ackAfterHandle,seekAfterError…​).spring-doc.cadn.net.cn

日志记录错误处理程序

CommonLoggingErrorHandler只是记录异常;使用记录侦听器时,将上一次轮询的剩余记录传递给侦听器。对于批处理侦听器,将记录批处理中的所有记录。spring-doc.cadn.net.cn

对记录侦听器和批处理侦听器使用不同的常见错误处理程序

如果您希望对记录和批处理侦听器使用不同的错误处理策略,请CommonMixedErrorHandler允许为每种侦听器类型配置特定的错误处理程序。spring-doc.cadn.net.cn

常见错误处理程序摘要

旧版错误处理程序及其替换程序

旧版错误处理程序 更换

LoggingErrorHandlerspring-doc.cadn.net.cn

CommonLoggingErrorHandlerspring-doc.cadn.net.cn

BatchLoggingErrorHandlerspring-doc.cadn.net.cn

CommonLoggingErrorHandlerspring-doc.cadn.net.cn

ConditionalDelegatingErrorHandlerspring-doc.cadn.net.cn

DelegatingErrorHandlerspring-doc.cadn.net.cn

ConditionalDelegatingBatchErrorHandlerspring-doc.cadn.net.cn

DelegatingErrorHandlerspring-doc.cadn.net.cn

ContainerStoppingErrorHandlerspring-doc.cadn.net.cn

CommonContainerStoppingErrorHandlerspring-doc.cadn.net.cn

ContainerStoppingBatchErrorHandlerspring-doc.cadn.net.cn

CommonContainerStoppingErrorHandlerspring-doc.cadn.net.cn

SeekToCurrentErrorHandlerspring-doc.cadn.net.cn

DefaultErrorHandlerspring-doc.cadn.net.cn

SeekToCurrentBatchErrorHandlerspring-doc.cadn.net.cn

无需更换,使用DefaultErrorHandler与无限BackOff.spring-doc.cadn.net.cn

RecoveringBatchErrorHandlerspring-doc.cadn.net.cn

DefaultErrorHandlerspring-doc.cadn.net.cn

RetryingBatchErrorHandlerspring-doc.cadn.net.cn

无需更换,使用DefaultErrorHandler并抛出BatchListenerFailedException.spring-doc.cadn.net.cn

将自定义旧版错误处理程序实现迁移到CommonErrorHandler

请参阅CommonErrorHandler.spring-doc.cadn.net.cn

要将ErrorHandlerConsumerAwareErrorHandler实现,你应该实现handleOne()并离开seeksAfterHandle()返回false(默认值)。 您还应该实现handleOtherException()处理记录处理范围之外发生的异常(例如使用者错误)。spring-doc.cadn.net.cn

要将RemainingRecordsErrorHandler实现,你应该实现handleRemaining()并覆盖seeksAfterHandle()返回true(错误处理程序必须执行必要的搜索)。 您还应该实现handleOtherException()- 处理记录处理范围之外发生的异常(例如消费者错误)。spring-doc.cadn.net.cn

要将任何BatchErrorHandler实现,你应该实现handleBatch()您还应该实现handleOtherException()- 处理记录处理范围之外发生的异常(例如消费者错误)。spring-doc.cadn.net.cn

回滚处理器后

使用事务时,如果侦听器抛出异常(并且错误处理程序(如果存在)抛出异常),则事务将回滚。 默认情况下,任何未处理的记录(包括失败的记录)都会在下一次轮询时重新提取。 这是通过执行seek作中的DefaultAfterRollbackProcessor. 使用批处理侦听器,将重新处理整批记录(容器不知道批处理中的哪条记录失败)。 要修改此行为,您可以使用自定义AfterRollbackProcessor. 例如,对于基于记录的侦听器,您可能希望跟踪失败的记录,并在尝试了几次后放弃,也许可以通过将其发布到死信主题。spring-doc.cadn.net.cn

从 2.2 版开始,DefaultAfterRollbackProcessor现在可以恢复(跳过)不断失败的记录。 默认情况下,在十次失败后,将记录失败的记录(在ERROR水平)。 您可以使用自定义恢复器 (BiConsumer)和最大故障。 设置maxFailures属性设置为负数会导致无限重试。 以下示例配置三次尝试后的恢复:spring-doc.cadn.net.cn

AfterRollbackProcessor<String, String> processor =
    new DefaultAfterRollbackProcessor((record, exception) -> {
        // recover after 3 failures, with no back off - e.g. send to a dead-letter topic
    }, new FixedBackOff(0L, 2L));

当您不使用事务时,您可以通过配置DefaultErrorHandler. 请参阅容器错误处理程序spring-doc.cadn.net.cn

从版本 3.2 开始,恢复现在可以恢复(跳过)持续失败的整批记录。 设置ContainerProperties.setBatchRecoverAfterRollback(true)以启用此功能。spring-doc.cadn.net.cn

默认行为是,使用批处理侦听器无法进行恢复,因为框架不知道批处理中的哪条记录不断失败。 在这种情况下,应用程序侦听器必须处理不断失败的记录。

从 2.2.5 版本开始,DefaultAfterRollbackProcessor可以在新事务中调用(在失败的事务回滚后启动)。 然后,如果您使用DeadLetterPublishingRecoverer要发布失败的记录,处理器会将恢复记录在原始主题/分区中的偏移量发送到事务。 要启用此功能,请将commitRecoveredkafkaTemplate属性DefaultAfterRollbackProcessor.spring-doc.cadn.net.cn

如果恢复器失败(抛出异常),则失败的记录将包含在查找中。 从 2.5.5 版开始,如果恢复器出现故障,BackOff将默认重置,并且重新传递将再次经历回退,然后再尝试恢复。 对于早期版本,BackOff未重置,并在下一次故障时重新尝试恢复。 要恢复到以前的行为,请将处理器的resetStateOnRecoveryFailure属性设置为false.

从 2.6 版开始,您现在可以为处理器提供BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>以确定BackOff使用,基于失败的记录和/或异常:spring-doc.cadn.net.cn

handler.setBackOffFunction((record, ex) -> { ... });

如果函数返回null,处理器的默认值BackOff将被使用。spring-doc.cadn.net.cn

从 2.6.3 版本开始,将resetStateOnExceptionChangetrue重试序列将重新启动(包括选择新的BackOff,如果已配置),如果异常类型在失败之间发生变化。 默认情况下,不考虑异常类型。spring-doc.cadn.net.cn

从 2.3.1 版本开始,类似于DefaultErrorHandlerDefaultAfterRollbackProcessor认为某些异常是致命的,并且跳过此类异常的重试;在第一次失败时调用恢复程序。 默认情况下,被视为致命的异常包括:spring-doc.cadn.net.cn

因为这些异常不太可能在重试投放时得到解决。spring-doc.cadn.net.cn

您可以向不可重试类别添加更多异常类型,或完全替换分类异常的映射。请参阅 JavadocsDefaultAfterRollbackProcessor.setClassifications()更多信息,以及spring-retry BinaryExceptionClassifier.spring-doc.cadn.net.cn

这是一个示例,它添加了IllegalArgumentException到不可重试的异常:spring-doc.cadn.net.cn

@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
    DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
    processor.addNotRetryableException(IllegalArgumentException.class);
    return processor;
}
与电流kafka-clients,则容器无法检测到ProducerFencedException是由重新平衡引起的,或者如果生产者的transactional.id由于超时或过期而被撤销。 因为在大多数情况下,它是由重新平衡引起的,所以容器不会调用AfterRollbackProcessor(因为不适合查找分区,因为我们不再被分配分区)。 如果您确保超时足够大以处理每个事务并定期执行“空”事务(例如,通过ListenerContainerIdleEvent)可以避免因超时和过期而进行围栏。 或者,您可以将stopContainerWhenFencedcontainer 属性设置为true并且容器将停止,避免记录丢失。 您可以使用ConsumerStoppedEvent并检查Reason属性FENCED以检测此情况。 由于该事件还具有对容器的引用,因此可以使用此事件重新启动容器。

从 2.7 版本开始,在等待BackOffinterval,错误处理程序将以短暂的睡眠循环,直到达到所需的延迟,同时检查容器是否已停止,允许睡眠在stop()而不是造成延误。spring-doc.cadn.net.cn

从 2.7 版开始,处理器可以配置一个或多个RetryListeners,接收重试和恢复进度的通知。spring-doc.cadn.net.cn

@FunctionalInterface
public interface RetryListener {

    void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);

    default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
    }

    default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
    }

}

有关更多信息,请参阅 JavaDocs。spring-doc.cadn.net.cn

Delivery Attempts 标头

以下内容仅适用于记录侦听器,不适用于批处理侦听器。spring-doc.cadn.net.cn

从 2.5 版开始,当使用ErrorHandlerAfterRollbackProcessor实现DeliveryAttemptAware,则可以启用添加KafkaHeaders.DELIVERY_ATTEMPT标头 (kafka_deliveryAttempt) 到记录。 此标头的值是从 1 开始的递增整数。 接收原始数据时ConsumerRecord<?, ?>整数位于byte[4].spring-doc.cadn.net.cn

int delivery = ByteBuffer.wrap(record.headers()
    .lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
    .getInt();

使用时@KafkaListener使用JsonKafkaHeaderMapperSimpleKafkaHeaderMapper,可以通过添加@Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery作为监听器方法的参数。spring-doc.cadn.net.cn

要启用此标头的填充,请设置容器属性deliveryAttemptHeadertrue. 默认情况下,它是禁用的,以避免查找每条记录的状态和添加标头的(小)开销。spring-doc.cadn.net.cn

DefaultErrorHandlerDefaultAfterRollbackProcessor支持此功能。spring-doc.cadn.net.cn

批处理侦听器的 Delivery Attempts 标头

处理时ConsumerRecord使用BatchListenerKafkaHeaders.DELIVERY_ATTEMPTheader 可以以不同的方式呈现SingleRecordListener.spring-doc.cadn.net.cn

从 3.3 版本开始,如果要注入KafkaHeaders.DELIVERY_ATTEMPT标头到ConsumerRecord使用BatchListener,将DeliveryAttemptAwareRetryListener作为RetryListenerErrorHandler.spring-doc.cadn.net.cn

请参考下面的代码。spring-doc.cadn.net.cn

final FixedBackOff fixedBackOff = new FixedBackOff(1, 10);
final DefaultErrorHandler errorHandler = new DefaultErrorHandler(fixedBackOff);
errorHandler.setRetryListeners(new DeliveryAttemptAwareRetryListener());

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);

然后,每当批处理无法完成时,DeliveryAttemptAwareRetryListener将注入一个KafkaHeaders.DELIVERY_ATTMPT标头到ConsumerRecord.spring-doc.cadn.net.cn

侦听器信息标头

在某些情况下,能够知道侦听器在哪个容器中运行是很有用的。spring-doc.cadn.net.cn

从 2.8.4 版开始,您现在可以将listenerInfo属性,或将info属性@KafkaListener注解。 然后,容器会在KafkaListener.LISTENER_INFO所有传入消息的标头;然后它可以用于记录拦截器、过滤器等,或用于侦听器本身。spring-doc.cadn.net.cn

@KafkaListener(id = "something", topics = "topic", filter = "someFilter",
        info = "this is the something listener")
public void listen(@Payload Thing thing,
        @Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
    ...
}

当用于RecordInterceptorRecordFilterStrategy实现时,标头作为字节数组在消费者记录中,使用KafkaListenerAnnotationBeanPostProcessorcharSet财产。spring-doc.cadn.net.cn

标头映射器还转换为String创建时MessageHeaders从使用者记录中,并且永远不要将此标头映射到出站记录上。spring-doc.cadn.net.cn

对于 POJO 批处理侦听器,从 2.8.6 版开始,标头被复制到批处理的每个成员中,并且也可以作为单个String参数。spring-doc.cadn.net.cn

@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
        info = "info for batch")
public void listen(List<Thing> list,
        @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets,
        @Header(KafkaHeaders.LISTENER_INFO) String info) {
            ...
}
如果批处理侦听器具有筛选器,并且筛选器导致一个空批处理,则需要将required = false@Header参数,因为该信息不适用于空批次。

如果您收到List<Message<Thing>>信息在KafkaHeaders.LISTENER_INFO每个的标题Message<?>.spring-doc.cadn.net.cn

有关使用批处理的更多信息,请参阅批处理侦听器spring-doc.cadn.net.cn

发布死信记录

您可以配置DefaultErrorHandlerDefaultAfterRollbackProcessor当达到记录的最大故障数时,与记录恢复器一起使用。 该框架提供了DeadLetterPublishingRecoverer,它将失败的消息发布到另一个主题。 恢复器需要一个KafkaTemplate<Object, Object>,用于发送记录。 您还可以选择使用BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>,调用该函数来解析目标主题和分区。spring-doc.cadn.net.cn

默认情况下,死信记录被发送到名为<originalTopic>-dlt(原始主题名称后缀为-dlt) 并添加到与原始记录相同的分区。 因此,当您使用默认解析器时,死信主题必须至少具有与原始主题一样多的分区。

如果返回的TopicPartition有一个负分区,则该分区未在ProducerRecord,因此分区由 Kafka 选择。 从 2.2.4 版开始,任何ListenerExecutionFailedException(抛出,例如,当在@KafkaListener方法)通过groupId财产。 这允许目标解析器使用它,除了ConsumerRecord以选择死信主题。spring-doc.cadn.net.cn

以下示例显示了如何连接自定义目标解析器:spring-doc.cadn.net.cn

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
        (r, e) -> {
            if (e instanceof FooException) {
                return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
            }
            else {
                return new TopicPartition(r.topic() + ".other.failures", r.partition());
            }
        });
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));

发送到死信主题的记录使用以下标头进行增强:spring-doc.cadn.net.cn

关键异常仅由以下原因引起DeserializationExceptions 所以没有DLT_KEY_EXCEPTION_CAUSE_FQCN.spring-doc.cadn.net.cn

有两种机制可以添加更多标头。spring-doc.cadn.net.cn

  1. 对恢复器进行子类化并覆盖createProducerRecord()-叫super.createProducerRecord()并添加更多标题。spring-doc.cadn.net.cn

  2. 提供一个BiFunction接收消费者记录和异常,返回一个Headers对象;来自那里的标头将被复制到最终的制作人记录中;另请参阅管理死信记录头。 用setHeadersFunction()BiFunction.spring-doc.cadn.net.cn

第二个更易于实现,但第一个有更多信息可用,包括已经组装的标准标头。spring-doc.cadn.net.cn

从 2.3 版开始,当与ErrorHandlingDeserializer,发布者将恢复记录value(),到无法反序列化的原始值。 以前,value()为 null,用户代码必须解码DeserializationException从邮件头。 此外,还可以提供多个KafkaTemplates 给出版商;例如,如果要发布byte[]DeserializationException,以及使用与成功反序列化的记录不同的序列化程序的值。 下面是配置发布者的示例KafkaTemplateStringbyte[]序列化器:spring-doc.cadn.net.cn

@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
        KafkaTemplate<?, ?> bytesTemplate) {
    Map<Class<?>, KafkaOperations<?, ?>> templates = new LinkedHashMap<>();
    templates.put(String.class, stringTemplate);
    templates.put(byte[].class, bytesTemplate);
    return new DeadLetterPublishingRecoverer(templates);
}

发布者使用映射键来查找适合value()即将出版。 一个LinkedHashMap建议按顺序检查密钥。spring-doc.cadn.net.cn

发布时null值,并且有多个模板,则恢复器将为Void类; 如果不存在,则values().iterator()将被使用。spring-doc.cadn.net.cn

从 2.7 开始,您可以使用setFailIfSendResultIsError方法,以便在消息发布失败时引发异常。您还可以使用setWaitForSendResultTimeout.spring-doc.cadn.net.cn

如果恢复器失败(抛出异常),则失败的记录将包含在查找中。 从 2.5.5 版开始,如果恢复器出现故障,BackOff将默认重置,并且重新传递将再次经历回退,然后再尝试恢复。 对于早期版本,BackOff未重置,并在下一次故障时重新尝试恢复。 要恢复到以前的行为,请将错误处理程序的resetStateOnRecoveryFailure属性设置为false.

从 2.6.3 版本开始,将resetStateOnExceptionChangetrue重试序列将重新启动(包括选择新的BackOff,如果已配置),如果异常类型在失败之间发生变化。 默认情况下,不考虑异常类型。spring-doc.cadn.net.cn

从版本 2.3 开始,恢复器还可以与 Kafka Streams 一起使用 - 有关更多信息,请参阅从反序列化异常中恢复spring-doc.cadn.net.cn

ErrorHandlingDeserializer在标头中添加反序列化异常ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADERErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER(使用 Java 序列化)。 默认情况下,这些标头不会保留在发布到死信主题的邮件中。 从 2.7 版开始,如果键和值都反序列化失败,则两者的原始值将填充到发送到 DLT 的记录中。spring-doc.cadn.net.cn

如果传入记录相互依赖,但可能无序到达,则将失败的记录重新发布到原始主题的尾部(一定次数)可能很有用,而不是将其直接发送到死信主题。 有关示例,请参阅此 Stack Overflow 问题spring-doc.cadn.net.cn

以下错误处理程序配置将完全执行此作:spring-doc.cadn.net.cn

@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
    return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
            (rec, ex) -> {
                org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
                if (retries == null) {
                    retries = new RecordHeader("retries", new byte[] { 1 });
                    rec.headers().add(retries);
                }
                else {
                    retries.value()[0]++;
                }
                return retries.value()[0] > 5
                        ? new TopicPartition("topic-dlt", rec.partition())
                        : new TopicPartition("topic", rec.partition());
            }), new FixedBackOff(0L, 0L));
}

从 V2.7 开始,恢复程序会检查目标解析器选择的分区是否实际存在。如果该分区不存在,则ProducerRecord设置为null,允许KafkaProducer以选择分区。您可以通过将verifyPartition属性设置为false.spring-doc.cadn.net.cn

从 3.1 版开始,将logRecoveryRecord属性设置为true将记录恢复记录和异常。spring-doc.cadn.net.cn

管理死信记录标题

参考上文发布死信记录,该DeadLetterPublishingRecoverer有两个属性,用于在标头已存在时管理这些标头(例如,在重新处理失败的死信记录时,包括使用非阻塞重试时)。spring-doc.cadn.net.cn

Apache Kafka 支持多个同名的标头;要获取“最新”值,您可以使用headers.lastHeader(headerName); 要获取多个标头的迭代器,请使用headers.headers(headerName).iterator().spring-doc.cadn.net.cn

当重复重新发布失败的记录时,这些标头可能会增长(并最终导致发布失败,因为RecordTooLargeException); 对于异常标头,尤其是堆栈跟踪标头,尤其如此。spring-doc.cadn.net.cn

使用这两个属性的原因是,虽然您可能只想保留最后一个异常信息,但您可能希望保留记录在每次失败中传递的主题的历史记录。spring-doc.cadn.net.cn

appendOriginalHeaders应用于所有名为ORIGINALstripPreviousExceptionHeaders应用于所有名为EXCEPTION.spring-doc.cadn.net.cn

从 2.8.4 版开始,您现在可以控制将哪些标准标头添加到输出记录中。 请参阅enum HeadersToAdd对于默认添加的(当前)10 个标准标头的通用名称(这些不是实际的标头名称,只是一个抽象;实际的标头名称由getHeaderNames()子类可以覆盖的方法。spring-doc.cadn.net.cn

要排除标头,请使用excludeHeaders()方法;例如,若要禁止在标头中添加异常堆栈跟踪,请使用:spring-doc.cadn.net.cn

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);

此外,您可以通过添加ExceptionHeadersCreator;这也会禁用所有标准异常标头。spring-doc.cadn.net.cn

DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
    kafkaHeaders.add(new RecordHeader(..., ...));
});

同样从版本 2.8.4 开始,您现在可以通过addHeadersFunction方法。 这允许应用其他函数,即使已经注册了另一个函数,例如,在使用非阻塞重试时。spring-doc.cadn.net.cn

ExponentialBackOffWithMaxRetries实现

Spring Framework 提供了许多BackOff实现。 默认情况下,ExponentialBackOff将无限期重试;要在多次重试尝试后放弃,需要计算maxElapsedTime. 从版本 2.7.3 开始,Spring for Apache Kafka 提供了ExponentialBackOffWithMaxRetries这是一个接收maxRetries属性并自动计算maxElapsedTime,这更方便一些。spring-doc.cadn.net.cn

@Bean
DefaultErrorHandler handler() {
    ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
    bo.setInitialInterval(1_000L);
    bo.setMultiplier(2.0);
    bo.setMaxInterval(10_000L);
    return new DefaultErrorHandler(myRecoverer, bo);
}

这将在1, 2, 4, 8, 10, 10秒,然后再调用恢复器。spring-doc.cadn.net.cn