对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9spring-doc.cadn.net.cn

配置

从 2.9 版开始,对于默认配置,@EnableKafkaRetryTopic注释应该在@Configuration带注释的类。 这使该功能能够正确引导,并允许注入该功能的一些组件以在运行时查找。spring-doc.cadn.net.cn

没有必要还添加@EnableKafka,如果添加此注释,因为@EnableKafkaRetryTopic元注释为@EnableKafka.

此外,从该版本开始,为了对功能的组件和全局特征进行更高级的配置,请使用RetryTopicConfigurationSupport类应该在@Configuration类,并覆盖适当的方法。 有关更多详细信息,请参阅配置全局设置和功能spring-doc.cadn.net.cn

默认情况下,重试主题的容器将具有与主容器相同的并发性。 从 3.0 版开始,您可以设置不同的concurrency对于重试容器(在注释上,或在RetryConfigurationBuilder).spring-doc.cadn.net.cn

以上技术中只能使用一种,而且只能使用一种@Configuration类可以扩展RetryTopicConfigurationSupport.

使用@RetryableTopic注解

配置重试主题和 dlt@KafkaListenerannotated 方法,您只需添加@RetryableTopic注释,Spring for Apache Kafka 将使用默认配置引导所有必要的主题和消费者。spring-doc.cadn.net.cn

@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
    // ... message processing
}

您可以在同一类中指定一个方法来处理 dlt 消息,方法是使用@DltHandler注解。 如果未提供 DltHandler 方法,则会创建一个默认使用者,该使用者仅记录使用情况。spring-doc.cadn.net.cn

@DltHandler
public void processMessage(MyPojo message) {
    // ... message processing, persistence, etc
}
如果您没有指定 kafkaTemplate 名称defaultRetryTopicKafkaTemplate将被查找。 如果未找到 bean,则抛出异常。

从 3.0 版开始,@RetryableTopicannotation 可以用作自定义 annotation 的元注解;例如:spring-doc.cadn.net.cn

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {

    @AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
    String parallelism() default "3";

}

RetryTopicConfiguration

您还可以通过创建RetryTopicConfigurationbean 中的@Configuration带注释的类。spring-doc.cadn.net.cn

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .create(template);
}

这将为方法中的所有主题创建重试主题和 dlt 以及相应的消费者@KafkaListener使用默认配置。这KafkaTemplate实例是消息转发所必需的。spring-doc.cadn.net.cn

为了实现对如何处理每个主题的非阻塞重审的更细粒度的控制,不止一个RetryTopicConfiguration可以提供 bean。spring-doc.cadn.net.cn

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3000)
            .maxAttempts(5)
            .concurrency(1)
            .includeTopics("my-topic", "my-other-topic")
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(1000, 2, 5000)
            .maxAttempts(4)
            .excludeTopics("my-topic", "my-other-topic")
            .retryOn(MyException.class)
            .create(template);
}
重试主题和 dlt 的使用者将被分配给一个使用者组,该使用者组的组 ID 是您在groupId参数@KafkaListener带有主题后缀的注释。 如果您不提供任何内容,它们都将属于同一组,并且重试主题的重新平衡将导致主要主题不必要的重新平衡。
如果消费者配置了ErrorHandlingDeserializer,要处理反序列化异常,配置KafkaTemplate及其生产者带有一个序列化器,可以处理普通对象和原始对象byte[]值,这是由反序列化异常产生的。 模板的泛型值类型应为Object. 一种技术是使用DelegatingByTypeSerializer;下面是一个例子:
@Bean
public ProducerFactory<String, Object> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
        new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
               MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}
倍数@KafkaListener注释可以用于同一主题,无论是否手动分配分区以及非阻塞重试,但给定主题将仅使用一种配置。 最好使用单个RetryTopicConfiguration用于配置此类主题的 bean;如果多个@RetryableTopic注解用于同一主题,它们都应该具有相同的值,否则其中一个注解将应用于该主题的所有侦听器,而其他注解的值将被忽略。

配置全局设置和功能

从 2.9 开始,以前用于配置组件的 bean 覆盖方法已被删除(由于前面提到的 API 的实验性质,没有弃用)。 这不会改变RetryTopicConfigurationbeans 方法 - 仅基础设施组件的配置。 现在RetryTopicConfigurationSupport类应该扩展为(单个)@Configuration类,并覆盖正确的方法。 示例如下:spring-doc.cadn.net.cn

@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {

    @Override
    protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
        blockingRetries
                .retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
                .backOff(new FixedBackOff(3000, 3));
    }

    @Override
    protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
        nonBlockingFatalExceptions.add(MyNonBlockingException.class);
    }

    @Override
    protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
        // Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
        customizersConfigurer.customizeErrorHandler(eh -> {
            eh.setSeekAfterError(false);
        });
    }

}
使用此配置方法时,@EnableKafkaRetryTopic不应使用注释来防止上下文由于重复的 bean 而无法启动。 使用简单的@EnableKafka注释。

什么时候autoCreateTopics为 true,则将使用指定的分区数和复制因子创建 main 和 retry 主题。 从 3.0 版开始,默认复制因子为-1,这意味着使用代理默认值。 如果您的代理版本早于 2.4,则需要设置显式值。 要覆盖特定主题(例如主主题或 DLT)的这些值,只需添加一个NewTopic @Bean具有所需的属性;这将覆盖自动创建属性。spring-doc.cadn.net.cn

默认情况下,记录使用接收记录的原始分区发布到重试主题。 如果重试主题的分区少于主主题,则应适当配置框架;下面是一个例子。
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {

    @Override
    protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
        return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
    }

    ...

}

函数的参数是使用者记录和下一个主题的名称。 您可以返回特定的分区号,或者null以指示KafkaProducer应确定分区。spring-doc.cadn.net.cn

默认情况下,当记录通过重试主题转换时,将保留重试标头的所有值(尝试次数、时间戳)。 从 2.9.6 版开始,如果您只想保留这些标头的最后一个值,请使用configureDeadLetterPublishingContainerFactory()上面显示的方法来设置工厂的retainAllRetryHeaderValues属性设置为false.spring-doc.cadn.net.cn