|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0! |
重试与死信处理
默认情况下,当你配置重试(例如,最大尝试次数)enableDlq在消费者绑定中,这些功能在绑定器内完成,监听器容器或 Kafka 消费者不参与。
在某些情况下,将此功能迁移到监听器容器会更为理想,例如:
-
重试和延误的总和将超过消费者
max.poll.interval.ms财产,可能导致分区重新平衡。 -
你想向另一个卡夫卡群发表死信。
-
你想向错误处理程序添加重试监听器。
-
…
要配置将此功能从绑定器迁移到容器,定义一个@Bean类型ListenerContainerWithDlqAndRetryCustomizer.
该接口具有以下方法:
/**
* Configure the container.
* @param container the container.
* @param destinationName the destination name.
* @param group the group.
* @param dlqDestinationResolver a destination resolver for the dead letter topic (if
* enableDlq).
* @param backOff the backOff using retry properties (if configured).
* @see #retryAndDlqInBinding(String, String)
*/
void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName, String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff);
/**
* Return false to move retries and DLQ from the binding to a customized error handler
* using the retry metadata and/or a {@code DeadLetterPublishingRecoverer} when
* configured via
* {@link #configure(AbstractMessageListenerContainer, String, String, BiFunction, BackOff)}.
* @param destinationName the destination name.
* @param group the group.
* @return false to disable retries and DLQ in the binding
*/
default boolean retryAndDlqInBinding(String destinationName, String group) {
return true;
}
目的解析器和退避如果配置了,则由绑定属性创建。这卡夫卡模板使用来自的配置Spring。卡夫卡......性能。你可以用这些工具创建自定义的错误处理程序和死符发布器;例如:
@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
return new ListenerContainerWithDlqAndRetryCustomizer() {
@Override
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff) {
if (destinationName.equals("topicWithLongTotalRetryConfig")) {
ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
dlqDestinationResolver);
container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
}
}
@Override
public boolean retryAndDlqInBinding(String destinationName, String group) {
return !destinationName.contains("topicWithLongTotalRetryConfig");
}
};
}
现在,只需一次重试延迟超过消费者的延迟max.poll.interval.ms财产。
在使用多个绑定器时,'ListenerContainerWithDlqAndRetryCustomizer' 豆子会被'DefaultBinderFactory'覆盖。为了豆子 要应用,你需要使用“BinderCustomizer”来设置容器自定义器(参见[binder-customizer]):
@Bean
public BinderCustomizer binderCustomizer(ListenerContainerWithDlqAndRetryCustomizer containerCustomizer) {
return (binder, binderName) -> {
if (binder instanceof KafkaMessageChannelBinder kafkaMessageChannelBinder) {
kafkaMessageChannelBinder.setContainerCustomizer(containerCustomizer);
}
else if (binder instanceof KStreamBinder) {
...
}
else if (binder instanceof RabbitMessageChannelBinder) {
...
}
};
}