对于最新稳定版本,请使用Spring for Apache Kafka 4.0.4spring-doc.cadn.net.cn

配置

对于默认设置,通过在@KafkaListener方法上添加@RetryableTopic注解即可启用非阻塞重试。 这是推荐且最简单的做法,因为它会自动配置所需的重试基础设施,并使用默认设置创建重试和DLT主题。spring-doc.cadn.net.cn

为了导入非阻塞重试基础设施并将其组件作为 beans 暴露出来,给一个 @Configuration 类添加 @EnableKafkaRetryTopic 注解。 这使得可以对特征的组件进行注入和运行时查找,并为高级和全局配置提供基础。spring-doc.cadn.net.cn

如果不添加@EnableKafka,也不需要添加该注解,因为@EnableKafkaRetryTopic@EnableKafka元注解所注解。

对于高级和全球自定义,扩展 RetryTopicConfigurationSupport 在一个 @Configuration 类中并重写相关方法。 了解更多详情,请参阅 全球设置和功能配置spring-doc.cadn.net.cn

默认情况下,重试主题的容器将与主容器具有相同的并发性。 从3.0版本开始,您可以为重试容器设置不同的concurrency(可以在注解中设置,或在RetryTopicConfigurationBuilder中设置)。spring-doc.cadn.net.cn

仅使用上述两种全局配置方法中的一种(@EnableKafkaRetryTopic 或继承 RetryTopicConfigurationSupport)。 此外,仅应有一个 @Configuration 类继承 RetryTopicConfigurationSupportspring-doc.cadn.net.cn

使用@RetryableTopic注解

为了配置一个标注为@KafkaListener的方法的重试主题和死信主题,你只需要在该方法上添加@RetryableTopic注解,Spring for Apache Kafka 将会使用默认配置自动创建所有必要的主题和消费者。spring-doc.cadn.net.cn

@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
    // ... message processing
}

自 3.2 起,@RetryableTopic 对于类上的 @KafkaListener 支持将:spring-doc.cadn.net.cn

@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public class ClassLevelRetryListener {

    @KafkaHandler
    public void processMessage(MyPojo message) {
        // ... message processing
    }

}

您可以使用同一类中的方法来处理dlt消息,通过使用@DltHandler注解进行标注。 如果未提供DltHandler方法,则会创建一个默认的消费者,仅记录消费。spring-doc.cadn.net.cn

@DltHandler
public void processMessage(MyPojo message) {
    // ... message processing, persistence, etc
}
如果未指定kafkaTemplate的名称,将查找名为defaultRetryTopicKafkaTemplate的bean。 如果未找到该bean将抛出异常。

从 3.0 版本开始,@RetryableTopic 注解可以作为元注解用于自定义注解;例如:spring-doc.cadn.net.cn

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {

    @AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
    String parallelism() default "3";

}

使用RetryTopicConfigurationBean

您也可以通过在一个@Configuration注解类中创建RetryTopicConfiguration个beans来配置非阻塞重试支持。spring-doc.cadn.net.cn

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .create(template);
}

这将为所有在方法上使用默认配置并带有@KafkaListener注解的主题创建重试主题和DLT,以及相应的消费者。KafkaTemplate实例用于消息转发。spring-doc.cadn.net.cn

为更精细地控制每个主题的非阻塞重试处理,可以提供多个 RetryTopicConfiguration bean。spring-doc.cadn.net.cn

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3000)
            .maxAttempts(5)
            .concurrency(1)
            .includeTopics(List.of("my-topic", "my-other-topic"))
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(1000, 2, 5000)
            .maxAttempts(4)
            .excludeTopics(List.of("my-topic", "my-other-topic"))
            .retryOn(MyException.class)
            .create(template);
}
重试主题和DLT的消费者将被分配到一个具有组ID的消费者组,该组ID是由你在groupId参数中的@KafkaListener注解中提供的ID与主题后缀组合而成。如果你没有提供任何ID,它们将全部属于同一个组,重试主题上的重新平衡将导致主主题上的不必要的重新平衡。
如果消费者配置了ErrorHandlingDeserializer,以处理反序列化异常,重要的是要配置KafkaTemplate及其生产者使用能够处理普通对象以及来自反序列化异常的原始byte[]值的序列化器。 模板的通用值类型应为Object。 一种技术是使用DelegatingByTypeSerializer;以下是一个示例:
@Bean
public ProducerFactory<String, Object> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
        new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
               MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}
多个 @KafkaListener 注解可以用于同一主题,可与手动分区分配和非阻塞重试同时使用,但针对同一主题只会应用一个配置。 最好为此类主题使用一个单独的 RetryTopicConfiguration bean 进行配置;如果在同一主题上使用多个 @RetryableTopic 注解,所有注解的值必须相同,否则其中一个配置将应用于该主题所有监听器,其他注解的值会被忽略。

配置全局设置和功能

自 2.9 起,针对组件配置的先前组件覆盖方法已被移除(没有弃用,由于该 API 具有上述实验性质)。 这并不改变 RetryTopicConfiguration 个 beans 的方法——仅基础设施组件的配置受到影响。 现在应在一个(单一的)RetryTopicConfigurationSupport 类中扩展 @Configuration 类,并重写适当的方法。 以下是一个示例:spring-doc.cadn.net.cn

@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {

    @Override
    protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
        blockingRetries
                .retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
                .backOff(new FixedBackOff(3000, 3));
    }

    @Override
    protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
        nonBlockingFatalExceptions.add(MyNonBlockingException.class);
    }

    @Override
    protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
        // Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
        customizersConfigurer.customizeErrorHandler(eh -> {
            eh.setSeekAfterError(false);
        });
    }

}
当使用这种配置方法时,不应使用@EnableKafkaRetryTopic注解,以防止由于重复的bean导致上下文启动失败。 请改用简单的@EnableKafka注解。

autoCreateTopics 为 true 时,主主题和重试主题将使用指定的分区数和复制因子创建。 从 3.0 版本开始,默认复制因子为 -1,即使用 broker 的默认值。 如果您的 broker 版本早于 2.4,则需要显式设置值。 要为特定主题(例如主主题或 DLT)覆盖这些值,只需添加一个 NewTopic @Bean 以及所需的属性;这将覆盖自动创建属性。spring-doc.cadn.net.cn

默认情况下,记录会使用接收到记录的原始分区发布到重试主题(s)。 如果重试主题的分区数少于主主题,你应该相应地配置框架;以下是一个示例。
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {

    @Override
    protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
        return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
    }

    ...

}

The parameters to the function are the consumer record and the name of the next topic. 你可以返回一个特定的分区号,或 null 表示应由 KafkaProducer 来决定分区。spring-doc.cadn.net.cn

默认情况下,重试主题中记录在重试过程中传递时,重试头(尝试次数、时间戳)的所有值都会保留。 从 2.9.6 版本开始,如果您只想保留这些头的最后一个值,请使用上面所示的 configureDeadLetterPublishingContainerFactory() 方法,将工厂的 retainAllRetryHeaderValues 属性设置为 falsespring-doc.cadn.net.cn

查找重试主题配置

尝试通过从 RetryTopicConfiguration 注解创建实例,或在没有注解时从 bean 容器创建 @RetryableTopic 实例。spring-doc.cadn.net.cn

如果在容器中发现了 beans,会检查是否应由这些实例处理提供的主题。spring-doc.cadn.net.cn

如果提供了@RetryableTopic注解,会查找一个DltHandler注解的方法。spring-doc.cadn.net.cn

自 3.2 起,当类上使用 @RetryableTopic 注解时,提供了新的 API 来创建 RetryTopicConfigurationspring-doc.cadn.net.cn

@Bean
public RetryTopicConfiguration myRetryTopic() {
    RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
    return provider.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
}

@RetryableTopic
public static class AnnotatedClass {
    // NoOps
}