该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0!spring-doc.cadn.net.cn

死字母主题处理

启用DLQ

为了启用DLQ,基于Kafka绑定器的应用程序必须通过该属性提供消费者组spring.cloud.stream.bindings.<binding-name>.group. 匿名消费者组(即应用程序未明确提供组)无法启用DLQ功能。spring-doc.cadn.net.cn

当应用程序想要错误地将记录发送到DLQ主题时,该应用程序必须启用DLQ功能,因为默认情况下并未启用DLQ。 为了启用DLQ,性质spring.cloud.stream.kafka.bindings.<binding-name>.consumer.enable-dlq必须设置为 。spring-doc.cadn.net.cn

当DLQ启用时,处理过程中发生错误且所有重试次数都耗尽,基于spring.cloud.stream.bindings.<binding-name>.consumer.max-attempts属性,然后该记录会被发送到DLQ主题。spring-doc.cadn.net.cn

默认情况下,最大尝试次数属性设置为三。 什么时候最大尝试次数属性大于1,并且DLQ已启用,那么你会看到重试是对最大尝试次数财产。 当未启用DLQ(默认状态)时,则最大尝试次数属性不会影响重试的处理方式。 在这种情况下,重试会回退到 Apache Kafka 在 Spring 中的容器默认状态,也就是10重试。 如果应用在DLQ被禁用时完全禁用重试,那么最大尝试次数属性到1不行。 在这种情况下,要完全禁用重试,你需要提供ListenerContainerCustomizer然后使用 appropriate退避设置。 这里有一个例子。spring-doc.cadn.net.cn

@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
	return (container, destinationName, group) -> {
		var commonErrorHandler = new DefaultErrorHandler(new FixedBackOff(0L, 0l));
		container.setCommonErrorHandler(commonErrorHandler);
	};
}

这样,默认容器行为将被禁用,且不会尝试重试。 如上所述,启用DLQ时,绑定器设置会优先。spring-doc.cadn.net.cn

在死信话题中处理记录

由于该框架无法预见用户如何处理死字母消息,因此没有提供任何标准处理机制。 如果死字母的原因是暂时的,你可能需要将消息路由回原主题。 但如果问题是永久性的,可能会导致无限循环。 本主题中的示例 Spring Boot 应用展示了如何将这些消息路由回原始主题,但尝试三次后会将其移至“停车场”主题。 该应用是另一个春云流应用,读取死字母主题。 当5秒内无消息时,它会退出。spring-doc.cadn.net.cn

示例假设原始目的地为SO8400 出而消费者组为SO8400.spring-doc.cadn.net.cn

有几种策略需要考虑:spring-doc.cadn.net.cn

  • 考虑只在主应用程序未运行时运行重定向。 否则,瞬态错误的重试次数会很快被用尽。spring-doc.cadn.net.cn

  • 或者,采用两阶段方法:用该应用路由到第三个主题,再用另一个应用从那里路由回主主题。spring-doc.cadn.net.cn

以下代码列表展示了示例应用:spring-doc.cadn.net.cn

application.properties
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400

spring.cloud.stream.bindings.output.destination=so8400out

spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot

spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest

spring.cloud.stream.kafka.binder.headers=x-retries
应用
@SpringBootApplication
public class ReRouteDlqKApplication implements CommandLineRunner {

    private static final String X_RETRIES_HEADER = "x-retries";

    public static void main(String[] args) {
        SpringApplication.run(ReRouteDlqKApplication.class, args).close();
    }

    private final AtomicInteger processed = new AtomicInteger();

    @Autowired
    private StreamBridge streamBridge;

    @Bean
    public Function<Message<?>, Message<?>> reRoute() {
        return failed -> {
            processed.incrementAndGet();
            Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
            if (retries == null) {
                System.out.println("First retry for " + failed);
                return MessageBuilder.fromMessage(failed)
                        .setHeader(X_RETRIES_HEADER, 1)
                        .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                                failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                        .build();
            }
            else if (retries < 3) {
                System.out.println("Another retry for " + failed);
                return MessageBuilder.fromMessage(failed)
                        .setHeader(X_RETRIES_HEADER, retries + 1)
                        .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                                failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                        .build();
            }
            else {
                System.out.println("Retries exhausted for " + failed);
                streamBridge.send("parkingLot", MessageBuilder.fromMessage(failed)
                    .setHeader(BinderHeaders.PARTITION_OVERRIDE,
                            failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
                    .build());
            }
            return null;
        };
    }

    @Override
    public void run(String... args) throws Exception {
        while (true) {
            int count = this.processed.get();
            Thread.sleep(5000);
            if (count == this.processed.get()) {
                System.out.println("Idle, exiting");
                return;
            }
        }
    }
}