此版本仍在开发中,尚不被认为是稳定的。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.9! |
特征
大多数功能都可用于@RetryableTopic
注释和RetryTopicConfiguration
豆。
回退配置
BackOff 配置依赖于BackOffPolicy
接口从Spring Retry
项目。
它包括:
-
固定后退
-
指数退缩
-
随机指数退避
-
均匀随机退缩
-
没有退缩
-
自定义退后
@RetryableTopic(attempts = 5,
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3_000)
.maxAttempts(4)
.create(template);
}
您还可以提供 Spring Retry 的SleepingBackOffPolicy
接口:
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.customBackoff(new MyCustomBackOffPolicy())
.maxAttempts(5)
.create(template);
}
默认退避策略为FixedBackOffPolicy 最多尝试 3 次,间隔为 1000 毫秒。 |
默认最大延迟为 30 秒ExponentialBackOffPolicy .
如果您的退避策略要求值大于该值的延迟,请调整maxDelay 属性相应地。 |
第一次尝试计入maxAttempts ,因此如果您提供maxAttempts 值为 4,则将有原始尝试加上 3 次重试。 |
全局超时
您可以设置重试过程的全局超时。 如果达到该时间,则下次使用者抛出异常时,消息将直接转到 DLT,或者如果没有可用的 DLT,则仅结束处理。
@RetryableTopic(backoff = @Backoff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(2_000)
.timeoutAfter(5_000)
.create(template);
}
默认值是没有设置超时,这也可以通过提供 -1 作为超时值来实现。 |
异常分类器
您可以指定要重试的异常和不重试的异常。 您还可以将其设置为遍历原因以查找嵌套异常。
@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = "true")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
throw new RuntimeException(new MyRetryException()); // will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.notRetryOn(MyDontRetryException.class)
.create(template);
}
默认行为是重试所有异常,而不是遍历原因。 |
从 2.8.3 开始,有一个致命异常的全局列表,这将导致记录无需任何重试即可发送到 DLT。
有关致命异常的默认列表,请参阅 DefaultErrorHandler。
您可以通过覆盖configureNonBlockingRetries
方法@Configuration
扩展的类RetryTopicConfigurationSupport
.
有关详细信息,请参阅配置全局设置和功能。
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
要禁用致命异常的分类,只需清除提供的列表。 |
包含和排除主题
您可以决定哪些主题将由RetryTopicConfiguration
bean 通过 .includeTopic(String topic)、.includeTopics(Collection<String> topics) .excludeTopic(String topic) 和 .excludeTopics(Collection<String> topics) 方法。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.includeTopics(List.of("my-included-topic", "my-other-included-topic"))
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.excludeTopic("my-excluded-topic")
.create(template);
}
默认行为是包含所有主题。 |
主题自动创建
除非另有说明,否则框架将使用NewTopic
由KafkaAdmin
豆。
您可以指定将用于创建主题的分区数和复制因子,并且可以关闭此功能。
从 3.0 版开始,默认复制因子为-1
,这意味着使用代理默认值。
如果您的代理版本早于 2.4,则需要设置显式值。
请注意,如果您不使用 Spring Boot,则必须提供 KafkaAdmin bean 才能使用此功能。 |
@RetryableTopic(numPartitions = "2", replicationFactor = "3")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@RetryableTopic(autoCreateTopics = "false")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.autoCreateTopicsWith(2, 3)
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotAutoCreateRetryTopics()
.create(template);
}
默认情况下,主题是自动创建的,只有一个分区和复制因子为 -1(意味着使用代理默认值)。 如果您的代理版本早于 2.4,则需要设置显式值。 |
故障标头管理
在考虑如何管理故障标头(原始标头和异常标头)时,框架将委托给DeadLetterPublishingRecoverer
以决定是追加还是替换标头。
默认情况下,它显式地将appendOriginalHeaders
自false
和叶子stripPreviousExceptionHeaders
设置为DeadLetterPublishingRecover
.
这意味着默认配置中仅保留第一个“原始”和最后一个异常标头。 这是为了避免在涉及许多重试步骤时创建过大的消息(例如,由于堆栈跟踪标头)。
有关详细信息,请参阅管理死信记录标题。
若要重新配置框架以对这些属性使用不同的设置,请配置DeadLetterPublishingRecoverer
customizer 通过覆盖configureCustomizers
方法@Configuration
扩展的类RetryTopicConfigurationSupport
.
有关更多详细信息,请参阅配置全局设置和功能。
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
dlpr.setAppendOriginalHeaders(true);
dlpr.setStripPreviousExceptionHeaders(false);
});
}
从 2.8.4 版本开始,如果您希望添加自定义标头(除了工厂添加的重试信息标头外,还可以添加headersFunction
到工厂 -factory.setHeadersFunction((rec, ex) -> { ... })
.
默认情况下,添加的任何标头都将是累积的 - Kafka 标头可以包含多个值。
从 2.9.5 版开始,如果Headers
函数返回的标头包含类型为DeadLetterPublishingRecoverer.SingleRecordHeader
,则该标头的任何现有值都将被删除,并且仅保留新的单个值。
自定义 DeadLetterPublishingRecoverer
如故障标头管理中所示,可以自定义默认值DeadLetterPublishingRecoverer
框架创建的实例。
但是,对于某些用例,有必要将DeadLetterPublishingRecoverer
,例如覆盖createProducerRecord()
修改发送到重试(或死信)主题的内容。
从版本 3.0.9 开始,您可以覆盖RetryTopicConfigurationSupport.configureDeadLetterPublishingContainerFactory()
方法提供DeadLetterPublisherCreator
实例,例如:
@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
configureDeadLetterPublishingContainerFactory() {
return (factory) -> factory.setDeadLetterPublisherCreator(
(templateResolver, destinationResolver) ->
new CustomDLPR(templateResolver, destinationResolver));
}
建议您在构造自定义实例时使用提供的解析器。
根据引发的异常将消息路由到自定义 DLT
从版本 3.2.0 开始,可以根据在处理过程中引发的异常类型将消息路由到自定义 DLT。
为此,需要指定路由。
路由自定义包括附加目标的规范。
目标又由两个设置组成:suffix
和exceptions
.
当exceptions
已抛出,则包含suffix
在考虑通用 DLT 之前,将被视为消息的目标主题。
使用注释或RetryTopicConfiguration
豆:
@RetryableTopic(exceptionBasedDltRouting = {
@ExceptionBasedDltDestination(
suffix = "-deserialization", exceptions = {DeserializationException.class}
)}
)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
.create(template);
}
suffix
发生在将军之前dltTopicSuffix
在自定义 DLT 名称中。
考虑到所提供的示例,导致DeserializationException
将被路由到my-annotated-topic-deserialization-dlt
而不是my-annotated-topic-dlt
.
将按照主题自动创建中所述的相同规则创建自定义 DLT。