在本节中,我们将解释框架提供的错误处理机制背后的一般思想。 我们将使用 Rabbit 活页夹作为示例,因为各个活页夹定义了不同的集合 特定于底层代理功能(如 Kafka 绑定器)的某些受支持机制的属性。

错误时有发生,Spring Cloud Stream 提供了几种灵活的机制来处理它们。请注意,这些技术依赖于 binder 实现和 底层消息传递中间件的功能以及编程模型(稍后会详细介绍)。

每当消息处理程序(函数)抛出异常时,它都会传播回绑定程序,此时绑定程序将多次尝试重试 使用 Spring Retry 库提供的相同消息(默认为 3)。 如果重试不成功,则取决于错误处理机制,该机制可能会丢弃消息、将消息重新排队以进行重新处理或将失败的消息发送到 DLQRetryTemplate

Rabbit 和 Kafka 都支持这些概念(尤其是 DLQ)。但是,其他活页夹可能没有,因此请参阅您个人活页夹的文档,了解支持的详细信息 错误处理选项。

但是请记住,反应式函数不符合消息处理程序的条件,因为它不处理单个消息和 相反,它提供了一种将框架提供的流(即 Flux)与用户提供的流连接起来的方法。为什么这很重要?这是因为您在本节后面阅读的有关重试模板、删除失败消息、重试、 DLQ 和配置属性仅适用于消息处理程序(即命令式函数)。

Reactive API 提供了一个非常丰富的库,其中包含自己的运算符和机制,以帮助您处理特定于 各种反应式用例,这些用例比简单的消息处理程序用例复杂得多,因此请使用它们,例如 正如您可以在 .public final Flux<T> retryWhen(Retry retrySpec);reactor.core.publisher.Flux

@Bean
public Function<Flux<String>, Flux<String>> uppercase() {
	return flux -> flux
			.retryWhen(Retry.backoff(3, Duration.ofMillis(1000)))
			.map(v -> v.toUpperCase());
}

删除失败的消息

默认情况下,系统提供错误处理程序。第一个错误处理程序将仅记录错误消息。第二个错误处理程序是特定于活页夹的错误处理程序 它负责在特定消息传递系统的上下文中处理错误消息(例如,发送到 DLQ)。但是,由于没有提供其他错误处理配置(在当前方案中),因此此处理程序不会执行任何操作。因此,基本上在记录后,消息将被丢弃。

虽然在某些情况下是可以接受的,但在大多数情况下,它不是,我们需要一些恢复机制来避免消息丢失。

处理错误消息

在上一节中,我们提到,默认情况下,导致错误的消息会被有效地记录和删除。该框架还为您公开了机制 提供自定义错误处理程序(即发送通知或写入数据库等)。为此,您可以添加专门设计用于接受有关错误的所有信息(例如,堆栈跟踪等)包含原始消息(触发错误的消息)的表格。 注意:自定义错误处理程序与框架提供的错误处理程序(即日志记录和绑定程序错误处理程序 - 请参阅上一节)是互斥的,以确保它们不会干扰。ConsumerErrorMessage

@Bean
public Consumer<ErrorMessage> myErrorHandler() {
	return v -> {
		// send SMS notification code
	};
}

要将此类使用者标识为错误处理程序,您只需要提供指向函数名称 - 的属性。error-handler-definitionspring.cloud.stream.bindings.<binding-name>.error-handler-definition=myErrorHandler

例如,对于绑定名称,属性将如下所示:uppercase-in-0

spring.cloud.stream.bindings.uppercase-in-0.error-handler-definition=myErrorHandler

如果使用特殊的映射指令将绑定映射到更易读的名称 - ,则此属性将如下所示:spring.cloud.stream.function.bindings.uppercase-in-0=upper

spring.cloud.stream.bindings.upper.error-handler-definition=myErrorHandler.
如果您不小心将此类处理程序声明为 ,它仍然可以工作,但不会对其输出执行任何操作。但是,鉴于此类处理程序仍然依赖于 Spring Cloud Function 提供的功能,如果您的处理程序具有一些复杂性,您希望通过函数组合来解决(尽管不太可能),您也可以从函数组合中受益。Function

默认错误处理程序

如果要为所有函数 Bean 使用单个错误处理程序,则可以使用标准的 spring-cloud-stream 机制来定义默认属性spring.cloud.stream.default.error-handler-definition=myErrorHandler

如果您不小心将此类处理程序声明为 ,它仍然可以工作,但不会对其输出执行任何操作。但是,鉴于此类处理程序仍然依赖于 Spring Cloud Function 提供的功能,如果您的处理程序具有一些复杂性,您希望通过函数组合来解决(尽管不太可能),您也可以从函数组合中受益。Function

DLQ - 死信队列

DLQ 也许是最常见的机制,它允许将失败的消息发送到一个特殊的目的地:死信队列

配置后,失败的消息将发送到此目标,以便进行后续重新处理或审核和协调。

请看以下示例:

@SpringBootApplication
public class SimpleStreamApplication {

	public static void main(String[] args) throws Exception {
		SpringApplication.run(SimpleStreamApplication.class,
		  "--spring.cloud.function.definition=uppercase",
		  "--spring.cloud.stream.bindings.uppercase-in-0.destination=uppercase",
		  "--spring.cloud.stream.bindings.uppercase-in-0.group=myGroup",
		  "--spring.cloud.stream.rabbit.bindings.uppercase-in-0.consumer.auto-bind-dlq=true"
		);
	}

	@Bean
	public Function<Person, Person> uppercase() {
		return personIn -> {
		   throw new RuntimeException("intentional");
	      });
		};
	}
}

提醒一下,在此示例中,属性段对应于输入目标绑定的名称。 该段表示它是消费者财产。uppercase-in-0consumer

使用 DLQ 时,至少必须提供该属性才能正确命名 DLQ 目标。但是经常一起使用 与属性,如我们的示例所示。groupgroupdestination

除了一些标准属性之外,我们还设置了 to 指示绑定程序创建和配置 DLQ 目标以进行绑定,该目标对应于目标(请参阅相应的属性),这会导致一个名为 Rabbit 队列的附加队列(参见 Kafka 特定 DLQ 属性的 Kafka 文档)。auto-bind-dlquppercase-in-0uppercaseuppercase.myGroup.dlq

配置完成后,所有失败的消息都将路由到此目标,保留原始消息以供进一步操作。

您可以看到错误消息包含与原始错误相关的更多信息,如下所示:

. . . .
x-exception-stacktrace:	org.springframework.messaging.MessageHandlingException: nested exception is
      org.springframework.messaging.MessagingException: has an error, failedMessage=GenericMessage [payload=byte[15],
      headers={amqp_receivedDeliveryMode=NON_PERSISTENT, amqp_receivedRoutingKey=input.hello, amqp_deliveryTag=1,
      deliveryAttempt=3, amqp_consumerQueue=input.hello, amqp_redelivered=false, id=a15231e6-3f80-677b-5ad7-d4b1e61e486e,
      amqp_consumerTag=amq.ctag-skBFapilvtZhDsn0k3ZmQg, contentType=application/json, timestamp=1522327846136}]
      at org.spring...integ...han...MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:107)
      at. . . . .
Payload: blah

您还可以通过设置为“1”来方便立即派往 DLQ(无需重试)。例如max-attempts

--spring.cloud.stream.bindings.uppercase-in-0.consumer.max-attempts=1
使用 DLQ 时,至少必须提供该属性才能正确命名 DLQ 目标。但是经常一起使用 与属性,如我们的示例所示。groupgroupdestination

重试模板

在本节中,我们将介绍与重试功能配置相关的配置属性。

Spring Retry 库的一部分。 虽然它超出了本文档的范围,无法涵盖 的所有功能,但我们 将提及以下与以下特定相关的消费者属性 这:RetryTemplateRetryTemplateRetryTemplate

最大尝试次数

尝试处理消息的次数。

默认值:3。

backOffInitialInterval

重试时的回退初始间隔。

默认值为 1000 毫秒。

backOffMaxInterval

最大回退间隔。

默认值为 10000 毫秒。

backOff乘法器

退避乘数。

默认 2.0。

default可重试

侦听器引发的未在 中列出的异常是否可重试。retryableExceptions

违约:。true

retryableExceptions

键中 Throwable 类名的映射和值中的布尔值的映射。 指定将要或不会重试的异常(和子类)。 另请参见。 例:。defaultRetriablespring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=false

默认值:空。

虽然上述设置足以满足大多数自定义要求,但它们可能无法满足某些复杂要求,其中 点,您可能希望提供自己的 .为此,请在应用程序配置中将其配置为 Bean。提供的应用程序 实例将覆盖框架提供的实例。此外,为了避免冲突,必须限定要由活页夹使用的实例 如。例如RetryTemplateRetryTemplate@StreamRetryTemplate

@StreamRetryTemplate
public RetryTemplate myRetryTemplate() {
    return new RetryTemplate();
}

从上面的例子中可以看出,你不需要用它来注释它,因为是一个合格的。@Bean@StreamRetryTemplate@Bean

如果你需要更精确地使用 ,你可以在 your 中按名称指定 bean 以关联 每个绑定的特定重试 Bean。RetryTemplateConsumerProperties

spring.cloud.stream.bindings.<foo>.consumer.retry-template-name=<your-retry-template-bean-name>