| 
         此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.4!  | 
    
| 
         此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.2.4!  | 
    
大多数功能都可用于 Comments 和 Bean。@RetryableTopicRetryTopicConfiguration
BackOff 配置
BackOff 配置依赖于项目中的接口。BackOffPolicySpring Retry
它包括:
- 
固定退后
 - 
指数回退
 - 
Random Exponential Back Off(随机指数回退)
 - 
统一随机回退
 - 
无退缩
 - 
自定义回退
 
@RetryableTopic(attempts = 5,
    backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackoff(3_000)
            .maxAttempts(4)
            .create(template);
}
你还可以提供 Spring Retry 接口的自定义实现:SleepingBackOffPolicy
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .customBackOff(new MyCustomBackOffPolicy())
            .maxAttempts(5)
            .create(template);
}
默认的回退策略最多尝试 3 次,间隔为 1000 毫秒。FixedBackOffPolicy | 
默认最大延迟为 30 秒。
如果您的退避策略需要值大于该值的延迟,请相应地调整属性。ExponentialBackOffPolicymaxDelay | 
第一次尝试对 进行计数,因此,如果您提供的值为 4,则原始尝试将加上 3 次重试。maxAttemptsmaxAttempts | 
默认的回退策略最多尝试 3 次,间隔为 1000 毫秒。FixedBackOffPolicy | 
默认最大延迟为 30 秒。
如果您的退避策略需要值大于该值的延迟,请相应地调整属性。ExponentialBackOffPolicymaxDelay | 
第一次尝试对 进行计数,因此,如果您提供的值为 4,则原始尝试将加上 3 次重试。maxAttemptsmaxAttempts | 
全局超时
您可以为重试过程设置全局超时。 如果达到该时间,则下次使用者引发异常时,消息将直接发送到 DLT,或者如果没有可用的 DLT,则直接结束处理。
@RetryableTopic(backoff = @Backoff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackoff(2_000)
            .timeoutAfter(5_000)
            .create(template);
}
| 默认值是没有设置超时,这也可以通过提供 -1 作为 timeout 值来实现。 | 
| 默认值是没有设置超时,这也可以通过提供 -1 作为 timeout 值来实现。 | 
异常分类器
您可以指定要重试的异常和不重试的异常。 您还可以将其设置为遍历原因以查找嵌套异常。
@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = true)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    throw new RuntimeException(new MyRetryException()); // will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .notRetryOn(MyDontRetryException.class)
            .create(template);
}
| 默认行为是对所有异常重试,而不是遍历原因。 | 
从 2.8.3 开始,有一个致命异常的全局列表,这将导致记录被发送到 DLT 而无需任何重试。
有关致命异常的默认列表,请参见DefaultErrorHandler。
您可以通过覆盖扩展 .
有关更多信息,请参阅配置全局设置和功能。configureNonBlockingRetries@ConfigurationRetryTopicConfigurationSupport
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
    nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
| 要禁用致命异常的分类,只需清除提供的列表即可。 | 
| 默认行为是对所有异常重试,而不是遍历原因。 | 
| 要禁用致命异常的分类,只需清除提供的列表即可。 | 
包含和排除主题
您可以通过 .includeTopic(String topic)、.includeTopics(Collection<String> topics) .excludeTopic(String topic) 和 .excludeTopics(Collection<String> topics) 方法来决定 Bean 将处理和不处理哪些主题。RetryTopicConfiguration
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .includeTopics(List.of("my-included-topic", "my-other-included-topic"))
            .create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .excludeTopic("my-excluded-topic")
            .create(template);
}
| 默认行为是包含所有主题。 | 
| 默认行为是包含所有主题。 | 
主题自动创建
除非另有指定,否则框架将使用 Bean 使用的 bean 自动创建所需的主题。
您可以指定分区数和用于创建主题的复制因子,并且可以关闭此功能。
从版本 3.0 开始,默认复制因子为 ,表示使用代理默认值。
如果您的代理版本低于 2.4,则需要设置一个显式值。NewTopicKafkaAdmin-1
| 请注意,如果您不使用 Spring Boot,则必须提供 KafkaAdmin bean 才能使用此功能。 | 
@RetryableTopic(numPartitions = 2, replicationFactor = 3)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@RetryableTopic(autoCreateTopics = false)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .autoCreateTopicsWith(2, 3)
            .create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotAutoCreateRetryTopics()
            .create(template);
}
| 默认情况下,主题是使用一个分区和复制因子 -1 自动创建的(意味着使用代理默认值)。 如果您的代理版本低于 2.4,则需要设置一个显式值。 | 
| 请注意,如果您不使用 Spring Boot,则必须提供 KafkaAdmin bean 才能使用此功能。 | 
| 默认情况下,主题是使用一个分区和复制因子 -1 自动创建的(意味着使用代理默认值)。 如果您的代理版本低于 2.4,则需要设置一个显式值。 | 
失败标头管理
在考虑如何管理失败标头(原始标头和异常标头)时,框架会委托 来决定是附加还是替换标头。DeadLetterPublishingRecover
默认情况下,它显式设置为 .appendOriginalHeadersfalsestripPreviousExceptionHeadersDeadLetterPublishingRecover
这意味着只有第一个 “original” 和最后一个 exception headers 会保留 default configuration。 这是为了避免在涉及许多重试步骤时创建过大的消息(例如,由于堆栈跟踪标头)。
有关更多信息,请参阅 管理死信记录标头 。
要重新配置框架以对这些属性使用不同的设置,请通过覆盖 extends 的类中的方法来配置定制器。
有关更多详细信息,请参阅配置全局设置和功能。DeadLetterPublishingRecovererconfigureCustomizers@ConfigurationRetryTopicConfigurationSupport
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
    customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
        dlpr.setAppendOriginalHeaders(true);
        dlpr.setStripPreviousExceptionHeaders(false);
    });
}
从版本 2.8.4 开始,如果您希望添加自定义 Headers(除了工厂添加的重试信息 Headers 之外,您还可以向工厂添加 - .headersFunctionfactory.setHeadersFunction((rec, ex) -> { ... })
默认情况下,添加的任何标头都是累积的 - Kafka 标头可以包含多个值。
从版本 2.9.5 开始,如果函数返回的 title 包含 type 为 , 则将删除该 Headers 的任何现有值,只保留新的单个值。HeadersDeadLetterPublishingRecoverer.SingleRecordHeader
自定义 DeadLetterPublishingRecoverer
从 Failure Header Management 中可以看出,可以自定义框架创建的默认实例。
但是,对于某些用例,有必要对 进行子类化,例如覆盖以修改发送到重试(或死信)主题的内容。
从版本 3.0.9 开始,您可以覆盖该方法以提供实例,例如:DeadLetterPublishingRecovererDeadLetterPublishingRecoverercreateProducerRecord()RetryConfigurationSupport.configureDeadLetterPublishingContainerFactory()DeadLetterPublisherCreator
@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
        configureDeadLetterPublishingContainerFactory() {
    return (factory) -> factory.setDeadLetterPublisherCreator(
            (templateResolver, destinationResolver) ->
                    new CustomDLPR(templateResolver, destinationResolver));
}
建议您在构建自定义实例时使用提供的解析程序。