|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0! |
错误处理
在本节中,我们将解释框架所提供的错误处理机制背后的一般思想。 我们将以兔子装订器为例,因为每个活页夹定义不同的集合 这些属性适用于特定支持机制,具体用于底层中介能力(如Kafka绑定器)。
错误时有发生,Spring Cloud Stream 提供了多种灵活的处理机制。注意,这些技术依赖于绑定器的实现和 底层消息中间件的能力以及编程模型(后文会详细说明)。
每当消息处理程序(函数)抛出异常时,该异常会被传播回绑定器,绑定器会多次尝试重新尝试
相同的消息(默认为3),使用重试模板由春季重试图书馆提供。
如果重试未成功,则由错误处理机制决定,可能会丢弃消息、重新排队重处理或将失败消息发送给DLQ。
《兔子》和《卡夫卡》都支持这些概念(尤其是《DLQ》)。不过,其他活页夹可能不支持,所以请参考你个人的活页夹文档来了解支持内容 错误处理选项。
但请记住,响应式函数不符合消息处理程序的标准,因为它不处理单个消息, 相反,它提供了一种方式,将框架提供的流(即 Flux)与用户提供的流连接起来。这为什么重要?这是因为你在本节后面看到的任何内容,关于重试模板、丢弃失败消息、重试, DLQ和所有这些配置属性仅适用于消息处理程序(即命令式函数)。
响应式API提供了非常丰富的自有作符和机制库,帮助你处理特定错误处理
各种响应式使用场景比简单的消息处理情况复杂得多,所以可以使用它们,比如
如公开最终 Flux<T>retryWhen(Retry retrySpec);你可以在reactor.core.publisher.Flux.
@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
return flux -> flux
.retryWhen(Retry.backoff(3, Duration.ofMillis(1000)))
.map(v -> v.toUpperCase());
}
丢弃失败消息
系统默认提供错误处理程序。第一个错误处理程序会简单地记录错误消息。第二个错误处理程序是绑定器专用错误处理程序 该系统负责在特定消息系统(例如发送到 DLQ)的上下文中处理错误消息。但由于当前场景中没有提供额外的错误处理配置,这个处理器不会做任何作。所以基本上,日志记录后,消息就会被丢弃。
虽然在某些情况下可以接受,但大多数情况下并非如此,我们需要某种恢复机制以避免消息丢失。
处理错误消息
在上一节我们提到,默认情况下导致错误的消息会被记录并丢弃。这个框架还会为你揭示机制
提供自定义错误处理程序(例如发送通知或写入数据库等)。你可以通过添加消费者专门设计来接受错误消息除了所有关于错误的信息(例如栈迹等),它还包含了原始消息(触发错误的消息)。
| 自定义错误处理程序与框架提供的错误处理程序(即日志错误处理程序和绑定器专用错误处理程序——见上一节)互斥,以确保它们不会相互干扰。当你提供自定义错误处理程序时,即使DLQ配置了发送失败消息也无法正常工作。 |
@Bean
public Consumer<ErrorMessage> myErrorHandler() {
return v -> {
// send SMS notification code
};
}
要识别此类消费者为错误处理者,只需提供错误处理定义指向函数名称的属性 -spring.cloud.stream.bindings.<binding-name>.error-handler-definition=myErrorHandler.
例如,对于绑定名称0大写该地产将呈现如下:
spring.cloud.stream.bindings.uppercase-in-0.error-handler-definition=myErrorHandler
如果你用特殊的映射指令把绑定映射到更易读的名称——spring.cloud.stream.function.bindings.uppercase-in-0=upper,那么该性质将表现为:
spring.cloud.stream.bindings.upper.error-handler-definition=myErrorHandler.
如果你不小心声明了该处理者为功能它仍然可以工作,但输出不会被处理。然而,鉴于该处理程序仍依赖 Spring Cloud 函数提供的功能,如果你的处理程序存在某些复杂性,想通过函数组合解决(尽管可能性不大),函数组合也能带来好处。 |
默认错误处理程序
如果你想为所有函数豆都用一个错误处理程序,可以用标准的 Spring-Cloud-Stream 机制来定义默认属性spring.cloud.stream.default.error-handler-definition=myErrorHandler
DLQ - 死信队列
也许最常见的机制是,DLQ允许将失败消息发送到特殊目的地:死信队列。
配置完成后,失败消息会发送到该目的地进行后续的重新处理或审计和对账。
请考虑以下例子:
@SpringBootApplication
public class SimpleStreamApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(SimpleStreamApplication.class,
"--spring.cloud.function.definition=uppercase",
"--spring.cloud.stream.bindings.uppercase-in-0.destination=uppercase",
"--spring.cloud.stream.bindings.uppercase-in-0.group=myGroup",
"--spring.cloud.stream.rabbit.bindings.uppercase-in-0.consumer.auto-bind-dlq=true"
);
}
@Bean
public Function<Person, Person> uppercase() {
return personIn -> {
throw new RuntimeException("intentional");
});
};
}
}
提醒一下,在这个例子中0大写属性的段对应输入目的绑定的名称。 这消费者分段表示它是消费物业。
使用DLQ时,至少群必须提供属性以正确命名DLQ目的地。 然而群通常一起使用 跟目的地财产,如我们的例子所示。 |
除了一些标准性质外,我们还设置了auto-bind-dlq指示绑定器创建并配置 DLQ 目的地0大写对应的绑定大写目的地(见对应性质),这会导致一个额外的兔子队列,名为uppercase.myGroup.dlq(有关 Kafka 特定的 DLQ 属性,请参见 Kafka 文档)。
配置完成后,所有失败消息会被路由到该目的地,保留原始消息以便后续作。
你可以看到错误信息包含了与原始错误相关的更多信息,如下所示:
. . . .
x-exception-stacktrace: org.springframework.messaging.MessageHandlingException: nested exception is
org.springframework.messaging.MessagingException: has an error, failedMessage=GenericMessage [payload=byte[15],
headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=input.hello, amqp_deliveryTag=1,
deliveryAttempt=3, amqp_consumerQueue=input.hello, amqp_redelivered=false, id=a15231e6-3f80-677b-5ad7-d4b1e61e486e,
amqp_consumerTag=amq.ctag-skBFapilvtZhDsn0k3ZmQg, contentType=application/json, timestamp=1522327846136}]
at org.spring...integ...han...MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107)
at. . . . .
Payload: blah
你也可以通过设置来实现立即派遣到DLQ(无需重试)最大尝试次数到“1”。 例如
--spring.cloud.stream.bindings.uppercase-in-0.consumer.max-attempts=1
重试模板
本节将介绍与重试能力配置相关的配置属性。
这重试模板是 Spring Retry 库的一部分。虽然本文档不涵盖所有功能重试模板我们 将提及以下具体相关的消费性质 这重试模板:
- 最大尝试次数
-
处理该消息的次数。
默认:3。
- backOffInitialInterval
-
重试时的退回初始间隔。
默认是1000毫秒。
- backOffMaxInterval
-
最大后退间隔。
默认是10000毫秒。
- backOffMultiplier
-
退后倍数。
默认2.0。
- default可重试
-
听者是否抛出未在
retryableExceptions可以重试。违约:
true. - retryableExceptions
-
键中包含可投掷类名的映射,值中包含布尔值。指定哪些例外(及子类)会被重试或不会重试。另见
defaultRetriable. 例:spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false.默认:空。
虽然上述设置已足够满足大部分自定义需求,但它们可能无法满足某些复杂的要求,这时你可能需要提供你自己的实例重试模板. 为此,在你的应用配置中将其配置为 bean。所提供的应用实例将覆盖框架提供的实例。此外,为避免冲突,你必须限定重试模板你想被活页夹利用 如@StreamRetryTemplate. 例如
@StreamRetryTemplate
public RetryTemplate myRetryTemplate() {
return new RetryTemplate();
}
正如你从上面的例子中看到的,你不需要用@Bean因为@StreamRetryTemplate是合格的@Bean.
如果你需要更精确地处理你的情况重试模板你可以在你的消费者属性关联每个绑定的特定重试豆。
spring.cloud.stream.bindings.<foo>.consumer.retry-template-name=<your-retry-template-bean-name>