|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0! |
死字母主题处理
启用DLQ
为了启用DLQ,基于Kafka绑定器的应用程序必须通过该属性提供消费者组spring.cloud.stream.bindings.<binding-name>.group.
匿名消费者组(即应用程序未明确提供组)无法启用DLQ功能。
当应用程序想要错误地将记录发送到DLQ主题时,该应用程序必须启用DLQ功能,因为默认情况下并未启用DLQ。
为了启用DLQ,性质spring.cloud.stream.kafka.bindings.<binding-name>.consumer.enable-dlq必须设置为 。
当DLQ启用时,处理过程中发生错误且所有重试次数都耗尽,基于spring.cloud.stream.bindings.<binding-name>.consumer.max-attempts属性,然后该记录会被发送到DLQ主题。
默认情况下,最大尝试次数属性设置为三。
什么时候最大尝试次数属性大于1,并且DLQ已启用,那么你会看到重试是对最大尝试次数财产。
当未启用DLQ(默认状态)时,则最大尝试次数属性不会影响重试的处理方式。
在这种情况下,重试会回退到 Apache Kafka 在 Spring 中的容器默认状态,也就是10重试。
如果应用在DLQ被禁用时完全禁用重试,那么最大尝试次数属性到1不行。
在这种情况下,要完全禁用重试,你需要提供ListenerContainerCustomizer然后使用 appropriate退避设置。
这里有一个例子。
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
return (container, destinationName, group) -> {
var commonErrorHandler = new DefaultErrorHandler(new FixedBackOff(0L, 0l));
container.setCommonErrorHandler(commonErrorHandler);
};
}
这样,默认容器行为将被禁用,且不会尝试重试。 如上所述,启用DLQ时,绑定器设置会优先。
在死信话题中处理记录
由于该框架无法预见用户如何处理死字母消息,因此没有提供任何标准处理机制。 如果死字母的原因是暂时的,你可能需要将消息路由回原主题。 但如果问题是永久性的,可能会导致无限循环。 本主题中的示例 Spring Boot 应用展示了如何将这些消息路由回原始主题,但尝试三次后会将其移至“停车场”主题。 该应用是另一个春云流应用,读取死字母主题。 当5秒内无消息时,它会退出。
示例假设原始目的地为SO8400 出而消费者组为SO8400.
有几种策略需要考虑:
-
考虑只在主应用程序未运行时运行重定向。 否则,瞬态错误的重试次数会很快被用尽。
-
或者,采用两阶段方法:用该应用路由到第三个主题,再用另一个应用从那里路由回主主题。
以下代码列表展示了示例应用:
spring.cloud.stream.bindings.input.group=so8400replay
spring.cloud.stream.bindings.input.destination=error.so8400out.so8400
spring.cloud.stream.bindings.output.destination=so8400out
spring.cloud.stream.bindings.parkingLot.destination=so8400in.parkingLot
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=earliest
spring.cloud.stream.kafka.binder.headers=x-retries
@SpringBootApplication
public class ReRouteDlqKApplication implements CommandLineRunner {
private static final String X_RETRIES_HEADER = "x-retries";
public static void main(String[] args) {
SpringApplication.run(ReRouteDlqKApplication.class, args).close();
}
private final AtomicInteger processed = new AtomicInteger();
@Autowired
private StreamBridge streamBridge;
@Bean
public Function<Message<?>, Message<?>> reRoute() {
return failed -> {
processed.incrementAndGet();
Integer retries = failed.getHeaders().get(X_RETRIES_HEADER, Integer.class);
if (retries == null) {
System.out.println("First retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, 1)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else if (retries < 3) {
System.out.println("Another retry for " + failed);
return MessageBuilder.fromMessage(failed)
.setHeader(X_RETRIES_HEADER, retries + 1)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build();
}
else {
System.out.println("Retries exhausted for " + failed);
streamBridge.send("parkingLot", MessageBuilder.fromMessage(failed)
.setHeader(BinderHeaders.PARTITION_OVERRIDE,
failed.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID))
.build());
}
return null;
};
}
@Override
public void run(String... args) throws Exception {
while (true) {
int count = this.processed.get();
Thread.sleep(5000);
if (count == this.processed.get()) {
System.out.println("Idle, exiting");
return;
}
}
}
}