功能
大多数功能对于 @RetryableTopic 注解和 RetryTopicConfiguration 的 beans 都是可用的。
退避策略配置
The BackOff 配置依赖于来自 Spring Retry 项目的 BackOffPolicy 接口。
它包含:
-
固定退避
-
指数退避
-
随机指数退避
-
均匀随机退避
-
没有退避
-
自定义退避
@RetryableTopic(attempts = 5,
backOff = @BackOff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3_000)
.maxAttempts(4)
.create(template);
}
你也可以提供 Spring Retry 的 SleepingBackOffPolicy 接口的自定义实现:
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.customBackoff(new MyCustomBackOffPolicy())
.maxAttempts(5)
.create(template);
}
默认退避策略为 FixedBackOffPolicy,最多尝试3次,间隔1000ms。 |
有一个30秒的默认最大延迟用于ExponentialBackOffPolicy。
如果你的退避策略需要比该值更大的延迟,请相应调整maxDelay属性。 |
第一次尝试计入 maxAttempts,所以如果你提供一个 maxAttempts 值为 4,那么将包括原来的 1 次尝试加 3 次重试。 |
全局超时
你可以为重试过程设置全局超时。 如果达到该时间,当消费者在下一次抛出异常时,消息会直接进入 DLT,或者在没有 DLT 可用时结束处理。
@RetryableTopic(backOff = @BackOff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(2_000)
.timeoutAfter(5_000)
.create(template);
}
| 默认情况下不设置超时时间,也可以通过将超时值提供为 -1 来实现。 |
异常分类器
你可以指定希望重试的异常以及不希望重试的异常。 你还可以设置遍历原因以查找嵌套异常。
@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = "true")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
throw new RuntimeException(new MyRetryException()); // will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.notRetryOn(MyDontRetryException.class)
.create(template);
}
| 默认行为是重试所有异常,并不遍历原因。 |
Since 2.8.3 已经提供了一个全局的致命异常列表,这些异常会导致记录被发送到 DLT,且不会进行重试。
请参阅 DefaultErrorHandler 以查看默认的致命异常列表。
你可以通过在扩展 RetryTopicConfigurationSupport 的 @Configuration 类中重写 configureNonBlockingRetries 方法,来添加或移除此列表中的异常。
有关更多信息,请参阅 Configuring Global Settings and Features。
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
| 禁用致命异常的分类,只需清除提供的列表。 |
包含和排除主题
你可以通过 .includeTopic(String topic),.include topics(Collection<String> topics) ,.excludeTopic(String topic) 和 .exclude topics(Collection<String> topics) 方法决定哪些主题将由一个 RetryTopicConfiguration bean 处理。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.includeTopics(List.of("my-included-topic", "my-other-included-topic"))
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.excludeTopic("my-excluded-topic")
.create(template);
}
| 默认行为是包含所有主题。 |
话题自动创建
除非另有指定,框架将使用被KafkaAdmin bean 消费的NewTopic bean 自动创建所需主题。
您可以指定创建主题的分区数和复制因子,并可关闭此功能。
从 3.0 版本起,默认复制因子为-1,表示使用 broker 默认值。
如果您的 broker 版本早于 2.4,需要显式设置值。
| 注意:如果你没有使用 Spring Boot,你将需要提供一个 KafkaAdmin 组件 才能使用该功能。 |
@RetryableTopic(numPartitions = "2", replicationFactor = "3")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@RetryableTopic(autoCreateTopics = "false")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.autoCreateTopicsWith(2, 3)
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotAutoCreateRetryTopics()
.create(template);
}
| 默认情况下,主题将自动创建,包含一个分区和复制因子为 -1(表示使用经纪人默认值)。 如果你的经纪人版本早于 2.4,你需要设置一个显式值。 |
失败标题管理
在考虑如何管理失败头(原始头和异常头)时,框架会将决定是否追加或替换头的职责委托给DeadLetterPublishingRecoverer。
默认情况下,它显式地将 appendOriginalHeaders 设置为 false,并将 stripPreviousExceptionHeaders 保留为由 DeadLetterPublishingRecover 使用的默认值。
这表示默认配置下,只会保留第一个“原始”和最后一个异常头。 这是为了避免在涉及许多重试步骤时产生过大的消息(例如由于堆栈跟踪头)。
见 Managing Dead Letter Record Headers 的更多信息。
To reconfigure the framework to use different settings for these properties, configure a DeadLetterPublishingRecoverer customizer by overriding the configureCustomizers method in a @Configuration class that extends RetryTopicConfigurationSupport.
See 配置全局设置和特性 以获取更多详情。
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
dlpr.setAppendOriginalHeaders(true);
dlpr.setStripPreviousExceptionHeaders(false);
});
}
从 2.8.4 版本开始,如果你想在工厂添加的重试信息头之外再添加自定义头,可以在工厂 - 0 添加一个 1。
默认情况下,添加的任何头信息都是累积的 - Kafka 头信息可以包含多个值。
从 2.9.5 版本开始,如果函数返回的 Headers 包含类型为 DeadLetterPublishingRecoverer.SingleRecordHeader 的头信息,则会移除该头信息已有的所有值,仅保留新的单个值。
自定义 DeadLetterPublishingRecoverer
如在失败头管理中所示,可以自定义框架创建的默认DeadLetterPublishingRecoverer实例。
然而,对于某些使用场景,需要子类化DeadLetterPublishingRecoverer,例如重写createProducerRecord()以修改发送到重试(或死信)主题的内容。
从3.0.9版本开始,可以重写RetryTopicConfigurationSupport.configureDeadLetterPublishingContainerFactory()方法以提供一个DeadLetterPublisherCreator实例,例如:
@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
configureDeadLetterPublishingContainerFactory() {
return (factory) -> factory.setDeadLetterPublisherCreator(
(templateResolver, destinationResolver) ->
new CustomDLPR(templateResolver, destinationResolver));
}
建议在构建自定义实例时使用提供的解析器。
基于抛出的异常将消息路由到自定义DLTs
从3.2.0版本开始,可以根据处理过程中抛出的异常类型将消息路由到自定义DLT,这需要指定路由设置。
为此,需要指定额外的目的地。路由自定义包括额外目的地的指定。
目的地由两组设置组成:suffix 和 exceptions。
当在exceptions中指定的异常类型被抛出时,将考虑包含suffix的DLT作为消息的目标主题,在考虑通用目的DLT之前。
使用注解或RetryTopicConfiguration配置 beans 的配置示例:
@RetryableTopic(exceptionBasedDltRouting = {
@ExceptionBasedDltDestination(
suffix = "-deserialization", exceptions = {DeserializationException.class}
)}
)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
.create(template);
}
suffix 在自定义 DLT 名称中的通用 dltTopicSuffix 之前发生。
考虑到所呈现的例子,导致 DeserializationException 的消息将被路由到 my-annotated-topic-deserialization-dlt 而不是 my-annotated-topic-dlt。
自定义 DLT 将按照 主题自动创建 中所述的相同规则创建。