对于最新稳定版本,请使用Spring for Apache Kafka 4.0.4spring-doc.cadn.net.cn

主题命名

重试主题和死信主题通过在主主题名称后添加提供的或默认值,再附加该主题的延迟或索引来命名。spring-doc.cadn.net.cn

"my-topic" → "my-topic-retry-0", "my-topic-retry-1",…,"my-topic-dlt"spring-doc.cadn.net.cn

"my-other-topic" → "my-topic-我的重试后缀-1000", "my-topic-我的重试后缀-2000", …​, "my-topic-我的DLT后缀"spring-doc.cadn.net.cn

默认行为是为每个重试尝试创建单独的重试主题,并附加索引值:retry-0, retry-1, …​, retry-n。 因此,默认情况下,重试主题的数量是配置的maxAttempts减去1。

You can 配置后缀, 选择是否追加 尝试索引或延迟, 使用 固定退避时的单个重试主题, 并在使用指数退避时为具有 maxInterval 的尝试使用 单个重试主题.spring-doc.cadn.net.cn

重试主题和 DLT 后缀

您可以指定将由重试和 DLT 主题使用的后缀。spring-doc.cadn.net.cn

@RetryableTopic(retryTopicSuffix = "-my-retry-suffix", dltTopicSuffix = "-my-dlt-suffix")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .retryTopicSuffix("-my-retry-suffix")
            .dltTopicSuffix("-my-dlt-suffix")
            .create(template);
}
默认后缀为"-retry" 和 "-dlt",分别用于重试主题和死信队列。

追加主题索引或延迟

你可以将主题的索引或延迟值附加在后缀之后。spring-doc.cadn.net.cn

@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .suffixTopicsWithIndexValues()
            .create(template);
    }
默认行为是对延迟值进行后缀处理,除了具有多个主题的固定延迟配置,在这种情况下,会使用主题的索引对主题进行后缀处理。

单个固定延迟重试主题

如果你使用固定的延迟策略,如FixedBackOffPolicyNoBackOffPolicy,你可以使用一个单一的主题来实现非阻塞重试。 该主题将加上提供的或默认后缀,且不会附加索引或延迟值。spring-doc.cadn.net.cn

之前的 FixedDelayStrategy 已不再推荐使用,可以替换为 SameIntervalTopicReuseStrategy
@RetryableTopic(backoff = @Backoff(2_000), sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3_000)
            .maxAttempts(5)
            .useSingleTopicForSameIntervals()
            .create(template);
}
默认行为是为每次尝试创建单独的重试主题,并在其后面附加其索引值:retry-0,retry-1,...

单一主题用于maxInterval指数延迟

如果你使用指数退避策略(0),可以使用一个重试主题来实现对那些延迟为配置的1的尝试的非阻塞重试。spring-doc.cadn.net.cn

这个以"final"结尾的重试主题将附加提供的或默认后缀,并会附加索引或maxInterval值。spring-doc.cadn.net.cn

通过选择使用延迟为maxInterval的单个主题进行重试,可能会更可行地配置指数重试策略,该策略会在较长时间内持续重试,因为在这种方法中不需要大量的主题。

从 3.2 开始,使用指数退避时,默认行为是为相同的时间间隔复用重试主题。重试主题会加上延迟值的后缀,具有相同间隔的重试会复用最后一个重试主题(对应于 maxInterval 延迟)。spring-doc.cadn.net.cn

例如,当使用 initialInterval=1_000multiplier=2maxInterval=16_000 配置指数退避时,为了在1小时内持续尝试,需要将 maxAttempts 配置为229,且默认需要的重试主题为:spring-doc.cadn.net.cn

当 使用 与 重试主题数量相等于 配置的 maxAttempts 减 1 的 策略 时, 对应于 maxInterval 延迟 的 最后一个 重试 主题(附加 一个 索引)会 是:
spring-doc.cadn.net.cn

如果需要多个主题,则可以使用以下配置。spring-doc.cadn.net.cn

@RetryableTopic(attempts = 230,
    backoff = @Backoff(delay = 1_000, multiplier = 2, maxDelay = 16_000),
    sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(1_000, 2, 16_000)
            .maxAttempts(230)
            .useSingleTopicForSameIntervals()
            .create(template);
}

自定义命名策略

更复杂的命名策略可以通过注册一个实现 RetryTopicNamesProviderFactory 的 bean 来完成。 默认实现是 SuffixingRetryTopicNamesProviderFactory,可以通过以下方式注册不同的实现:spring-doc.cadn.net.cn

@Override
protected RetryTopicComponentFactory createComponentFactory() {
    return new RetryTopicComponentFactory() {
        @Override
        public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
            return new CustomRetryTopicNamesProviderFactory();
        }
    };
}

作为一个示例,以下实现除标准后缀外,还为 retry/dlt 主题名称添加前缀:spring-doc.cadn.net.cn

public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {

    @Override
    public RetryTopicNamesProvider createRetryTopicNamesProvider(
                DestinationTopic.Properties properties) {

        if (properties.isMainEndpoint()) {
            return new SuffixingRetryTopicNamesProvider(properties);
        }
        else {
            return new SuffixingRetryTopicNamesProvider(properties) {

                @Override
                public String getTopicName(String topic) {
                    return "my-prefix-" + super.getTopicName(topic);
                }

            };
        }
    }

}