配置
对于默认设置,通过在@KafkaListener方法上添加@RetryableTopic注解即可启用非阻塞重试。
这是推荐且最简单的做法,因为它会自动配置所需的重试基础设施,并使用默认设置创建重试和DLT主题。
为了导入非阻塞重试基础设施并将其组件作为 beans 暴露出来,给一个 @Configuration 类添加 @EnableKafkaRetryTopic 注解。
这使得可以对特征的组件进行注入和运行时查找,并为高级和全局配置提供基础。
如果不添加@EnableKafka,也不需要添加该注解,因为@EnableKafkaRetryTopic被@EnableKafka元注解所注解。 |
对于高级和全球自定义,扩展 RetryTopicConfigurationSupport 在一个 @Configuration 类中并重写相关方法。
了解更多详情,请参阅 全球设置和功能配置。
默认情况下,重试主题的容器将与主容器具有相同的并发性。
从3.0版本开始,您可以为重试容器设置不同的concurrency(可以在注解中设置,或在RetryTopicConfigurationBuilder中设置)。
|
仅使用上述两种全局配置方法中的一种( |
使用@RetryableTopic注解
为了配置一个标注为@KafkaListener的方法的重试主题和死信主题,你只需要在该方法上添加@RetryableTopic注解,Spring for Apache Kafka 将会使用默认配置自动创建所有必要的主题和消费者。
@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
// ... message processing
}
自 3.2 起,@RetryableTopic 对于类上的 @KafkaListener 支持将:
@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public class ClassLevelRetryListener {
@KafkaHandler
public void processMessage(MyPojo message) {
// ... message processing
}
}
您可以使用同一类中的方法来处理dlt消息,通过使用@DltHandler注解进行标注。
如果未提供DltHandler方法,则会创建一个默认的消费者,仅记录消费。
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
如果未指定kafkaTemplate的名称,将查找名为defaultRetryTopicKafkaTemplate的bean。
如果未找到该bean将抛出异常。 |
从 3.0 版本开始,@RetryableTopic 注解可以作为元注解用于自定义注解;例如:
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {
@AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
String parallelism() default "3";
}
使用RetryTopicConfigurationBean
您也可以通过在一个@Configuration注解类中创建RetryTopicConfiguration个beans来配置非阻塞重试支持。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.create(template);
}
这将为所有在方法上使用默认配置并带有@KafkaListener注解的主题创建重试主题和DLT,以及相应的消费者。KafkaTemplate实例用于消息转发。
为更精细地控制每个主题的非阻塞重试处理,可以提供多个 RetryTopicConfiguration bean。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3000)
.maxAttempts(5)
.concurrency(1)
.includeTopics(List.of("my-topic", "my-other-topic"))
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1000, 2, 5000)
.maxAttempts(4)
.excludeTopics(List.of("my-topic", "my-other-topic"))
.retryOn(MyException.class)
.create(template);
}
重试主题和DLT的消费者将被分配到一个具有组ID的消费者组,该组ID是由你在groupId参数中的@KafkaListener注解中提供的ID与主题后缀组合而成。如果你没有提供任何ID,它们将全部属于同一个组,重试主题上的重新平衡将导致主主题上的不必要的重新平衡。 |
如果消费者配置了ErrorHandlingDeserializer,以处理反序列化异常,重要的是要配置KafkaTemplate及其生产者使用能够处理普通对象以及来自反序列化异常的原始byte[]值的序列化器。
模板的通用值类型应为Object。
一种技术是使用DelegatingByTypeSerializer;以下是一个示例: |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JacksonJsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
多个 @KafkaListener 注解可以用于同一主题,可与手动分区分配和非阻塞重试同时使用,但针对同一主题只会应用一个配置。
最好为此类主题使用一个单独的 RetryTopicConfiguration bean 进行配置;如果在同一主题上使用多个 @RetryableTopic 注解,所有注解的值必须相同,否则其中一个配置将应用于该主题所有监听器,其他注解的值会被忽略。 |
配置全局设置和功能
自 2.9 起,针对组件配置的先前组件覆盖方法已被移除(没有弃用,由于该 API 具有上述实验性质)。
这并不改变 RetryTopicConfiguration 个 beans 的方法——仅基础设施组件的配置受到影响。
现在应在一个(单一的)RetryTopicConfigurationSupport 类中扩展 @Configuration 类,并重写适当的方法。
以下是一个示例:
@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
.backOff(new FixedBackOff(3000, 3));
}
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
// Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
customizersConfigurer.customizeErrorHandler(eh -> {
eh.setSeekAfterError(false);
});
}
}
当使用这种配置方法时,不应使用@EnableKafkaRetryTopic注解,以防止由于重复的bean导致上下文启动失败。
请改用简单的@EnableKafka注解。 |
当 autoCreateTopics 为 true 时,主主题和重试主题将使用指定的分区数和复制因子创建。
从 3.0 版本开始,默认复制因子为 -1,即使用 broker 的默认值。
如果您的 broker 版本早于 2.4,则需要显式设置值。
要为特定主题(例如主主题或 DLT)覆盖这些值,只需添加一个 NewTopic @Bean 以及所需的属性;这将覆盖自动创建属性。
| 默认情况下,记录会使用接收到记录的原始分区发布到重试主题(s)。 如果重试主题的分区数少于主主题,你应该相应地配置框架;以下是一个示例。 |
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {
@Override
protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
}
...
}
The parameters to the function are the consumer record and the name of the next topic.
你可以返回一个特定的分区号,或 null 表示应由 KafkaProducer 来决定分区。
默认情况下,重试主题中记录在重试过程中传递时,重试头(尝试次数、时间戳)的所有值都会保留。
从 2.9.6 版本开始,如果您只想保留这些头的最后一个值,请使用上面所示的 configureDeadLetterPublishingContainerFactory() 方法,将工厂的 retainAllRetryHeaderValues 属性设置为 false。
查找重试主题配置
尝试通过从 RetryTopicConfiguration 注解创建实例,或在没有注解时从 bean 容器创建 @RetryableTopic 实例。
如果在容器中发现了 beans,会检查是否应由这些实例处理提供的主题。
如果提供了@RetryableTopic注解,会查找一个DltHandler注解的方法。
自 3.2 起,当类上使用 @RetryableTopic 注解时,提供了新的 API 来创建 RetryTopicConfiguration:
@Bean
public RetryTopicConfiguration myRetryTopic() {
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
return provider.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
}
@RetryableTopic
public static class AnnotatedClass {
// NoOps
}