这个版本仍在开发中,目前尚未被认为是稳定的。要使用最新稳定版本,请使用 Spring for Apache Kafka 4.0.4spring-doc.cadn.net.cn

DLT 策略

框架提供了几种处理DLT的策略。 您可以提供一种DLT处理方法,使用默认的日志记录方法,或者完全不使用DLT。 此外,您还可以选择在DLT处理失败时发生什么。spring-doc.cadn.net.cn

数据链路处理方法

您可以指定用于处理该主题的DLT方法,以及在该处理失败时的行为。spring-doc.cadn.net.cn

要实现这一点,你可以在一个带有@DltHandler注解的方法中使用@RetryableTopic注解。注意,该方法将用于该类中所有带有@RetryableTopic注解的方法。spring-doc.cadn.net.cn

@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}

@DltHandler
public void processDltMessage(MyPojo message) {
    // ... message processing, persistence, etc
}

DLT处理方法也可以通过default方法提供,传递作为参数应该处理DLT消息的bean名称和方法名称。spring-doc.cadn.net.cn

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
            .create(template);
}

@Component
public class MyCustomDltProcessor {

    private final MyDependency myDependency;

    public MyCustomDltProcessor(MyDependency myDependency) {
        this.myDependency = myDependency;
    }

    public void processDltMessage(MyPojo message) {
        // ... message processing, persistence, etc
    }
}
如果未提供DLT处理器,则使用默认的RetryTopicConfigurer.LoggingDltListenerHandlerMethod

从版本2.8开始,如果你不希望在该应用程序中消费DLT,包括默认处理程序(或希望延迟消费),你可以控制DLT容器是否启动,而与容器工厂的0属性无关。spring-doc.cadn.net.cn

当使用 @RetryableTopic 注解时,将 autoStartDltHandler 属性设置为 false;当使用配置构建器时,使用 autoStartDltHandler(false)spring-doc.cadn.net.cn

你可以稍后通过 KafkaListenerEndpointRegistry 启动 DLT 处理器。spring-doc.cadn.net.cn

DLT 失败行为

如果DLT处理失败,将有两种可能的行为:ALWAYS_RETRY_ON_ERRORFAIL_ON_ERRORspring-doc.cadn.net.cn

在前者中,记录会被转发回DLT主题,因此不会阻塞其他DLT记录的处理。 在后者中,消费者会在不转发消息的情况下结束执行。spring-doc.cadn.net.cn

@RetryableTopic(dltProcessingFailureStrategy =
            DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
            .doNotRetryOnDltFailure()
            .create(template);
}
默认行为是将 ALWAYS_RETRY_ON_ERROR
从 2.8.3 版本开始,当记录导致抛出致命异常(如 1),例如异常代码为 0 时,ALWAYS_RETRY_ON_ERROR 不会将记录重新路由回 DLT,因为通常此类异常总是会被抛出。

被考虑为致命的异常有:spring-doc.cadn.net.cn

你可以使用在 DestinationTopicResolver 组件上的方法将其添加到或从此列表中移除异常。spring-doc.cadn.net.cn

Exception Classifier 了解更多信息。spring-doc.cadn.net.cn

配置 No DLT

框架还提供了不为该主题配置DLT的可能性。 在这种情况下,重试耗尽后处理将简单地结束。spring-doc.cadn.net.cn

@RetryableTopic(dltProcessingFailureStrategy =
            DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotConfigureDlt()
            .create(template);
}