消息消费

1. @ReactivePulsarListener

对于 Pulsar 消费者,我们建议最终用户应用程序使用ReactivePulsarListener注解。 使用ReactivePulsarListener,您需要使用@EnableReactivePulsar注解。 当您使用 Spring Boot 支持时,它会自动启用此 Comments 并配置所有必要的组件,例如消息侦听器基础设施(负责创建底层 Pulsar 消费者)。spring-doc.cadn.net.cn

让我们重新审视一下ReactivePulsarListener我们在快速浏览部分看到的代码片段:spring-doc.cadn.net.cn

@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}
监听器方法返回一个Mono<Void>以指示消息是否已成功处理。Mono.empty()表示成功(确认),并Mono.error()表示失败(否定确认)。

您还可以进一步简化此方法:spring-doc.cadn.net.cn

@ReactivePulsarListener
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}

在这种最基本的形式中,当topics未直接提供,则使用主题解析过程来确定目标主题。 同样,当subscriptionName上未提供@ReactivePulsarListener注释将使用自动生成的订阅名称。spring-doc.cadn.net.cn

ReactivePulsarListener方法,我们收到的数据为String,但我们没有指定任何模式类型。 在内部,该框架依赖于 Pulsar 的模式机制将数据转换为所需的类型。spring-doc.cadn.net.cn

框架检测到您期望String类型,然后根据该信息推断架构类型,并将该架构提供给使用者。 框架对所有基元类型进行此推断。 对于所有非原始类型,默认模式假定为 JSON。 如果复杂类型使用 JSON 以外的任何内容(例如 AVRO 或 KEY_VALUE),则必须使用schemaType财产。spring-doc.cadn.net.cn

此示例显示了我们如何从主题中使用复杂类型:spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "my-topic-2", schemaType = SchemaType.JSON)
Mono<Void> listen(Foo message) {
    System.out.println(message);
    return Mono.empty();
}

让我们看看更多我们可以消费的方式。spring-doc.cadn.net.cn

此示例直接使用 Pulsar 消息:spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.apache.pulsar.client.api.Message<String> message) {
    System.out.println(message.getValue());
    return Mono.empty();
}

此示例使用包装在 Spring 消息信封中的记录:spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.springframework.messaging.Message<String> message) {
    System.out.println(message.getPayload());
    return Mono.empty();
}

1.1. 流媒体

以上都是逐条消费单条记录的示例。 然而,使用 Reactive 的令人信服的原因之一是支持背压的流功能。spring-doc.cadn.net.cn

以下示例使用ReactivePulsarListener要使用POJO流:spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "streaming-1", stream = true)
Flux<MessageResult<Void>> listen(Flux<org.apache.pulsar.client.api.Message<String>> messages) {
    return messages
        .doOnNext((msg) -> System.out.println("Received: " + msg.getValue()))
        .map(MessageResult::acknowledge);
}

在这里,我们将记录作为Flux脉冲星消息。 此外,要在ReactivePulsarListener级别,您需要将stream属性设置为true.spring-doc.cadn.net.cn

监听器方法返回一个Flux<MessageResult<Void>>其中每个元素表示已处理的消息,并保存消息 ID、值以及是否已确认。 这MessageResult具有一组静态工厂方法,可用于创建适当的MessageResult实例。

根据Flux,框架会尝试推断要使用的模式。 如果它包含复杂类型,您仍然需要提供schemaTypeReactivePulsarListener.spring-doc.cadn.net.cn

以下监听器使用 Spring 消息传递Message具有复杂类型的信封:spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "streaming-2", stream = true, schemaType = SchemaType.JSON)
Flux<MessageResult<Void>> listen2(Flux<org.springframework.messaging.Message<Foo>> messages) {
    return messages
        .doOnNext((msg) -> System.out.println("Received: " + msg.getPayload()))
        .map(MessageUtils::acknowledge);
}
监听器方法返回一个Flux<MessageResult<Void>>其中每个元素表示已处理的消息,并保存消息 ID、值以及是否已确认。 SpringMessageUtils具有一组静态工厂方法,可用于创建适当的MessageResult实例。 这MessageUtils为 Spring 消息提供与MessagResult对 Pulsar 消息执行。
不支持使用org.apache.pulsar.client.api.Messages<T>@ReactivePulsarListener

1.2. 配置 - 应用程序属性

监听器依赖于ReactivePulsarConsumerFactory创建和管理用于使用消息的底层 Pulsar 使用者。 Spring Boot 提供了这个消费者工厂,您可以通过指定spring.pulsar.consumer.*应用程序属性。spring-doc.cadn.net.cn

1.3. 带有AUTO_CONSUME的通用记录

如果没有机会提前知道 Pulsar 主题的 schema 类型,可以使用AUTO_CONSUMEschema 类型来使用通用记录。在这种情况下,主题将消息反序列化为GenericRecord对象使用与主题关联的架构信息。spring-doc.cadn.net.cn

要使用通用记录,请将schemaType = SchemaType.AUTO_CONSUME在您的@ReactivePulsarListener并使用GenericRecord作为 message 参数,如下所示。spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
Mono<Void> listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
    GenericRecord record = message.getValue();
    record.getFields().forEach((f) ->
            System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
	return Mono.empty();
}
GenericRecordAPI 允许访问字段及其关联值

1.4. 消费者定制

您可以指定一个ReactivePulsarListenerMessageConsumerBuilderCustomizer配置底层 Pulsar 消费者构建器,该构建器最终构造监听器用于接收消息的消费者。spring-doc.cadn.net.cn

请谨慎使用,因为这可以完全访问消费者构建器并调用其某些方法(例如create)可能会产生意想不到的副作用。

例如,以下代码演示如何将订阅的初始位置设置为主题上最早的消息。spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "hello-pulsar-topic", consumerCustomizer = "myConsumerCustomizer")
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myConsumerCustomizer() {
    return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}
如果您的应用程序只有一个@ReactivePulsarListener和单个ReactivePulsarListenerMessageConsumerBuilderCustomizerbean 注册,则定制器将自动应用。

您还可以使用定制器向消费者构建器提供直接的 Pulsar 消费者属性。 如果您不想使用前面提到的启动配置属性或有多个ReactivePulsarListener配置不同的方法。spring-doc.cadn.net.cn

以下定制器示例使用直接的 Pulsar 使用者属性:spring-doc.cadn.net.cn

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> directConsumerPropsCustomizer() {
    return b -> b.property("subscriptionName", "subscription-1").property("topicNames", "foo-1");
}
使用的属性是直接的 Pulsar 使用者属性,而不是spring.pulsar.consumerSpring Boot 配置属性

2. 指定模式信息

如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出要在ReactivePulsarListener. 对于非原始类型,如果未在 Comments 上显式指定 Schema,则 Spring for Apache Pulsar 框架将尝试构建一个Schema.JSON从类型。spring-doc.cadn.net.cn

当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_CONSUME KEY_VALUE 带内联编码。

2.1. 自定义模式映射

作为在ReactivePulsarListener对于复杂类型,可以使用类型的映射来配置模式解析器。 这样就无需在侦听器上设置模式,因为框架使用传入消息类型咨询解析器。spring-doc.cadn.net.cn

2.1.1. 配置属性

架构映射可以使用spring.pulsar.defaults.type-mappings财产。 以下示例使用application.ymlUserAddress复杂对象使用AVROJSONschemas,分别:spring-doc.cadn.net.cn

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
message-type是消息类的完全限定名称。

2.1.2. 模式解析器定制器

添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,可以提供架构解析器定制器来添加映射。spring-doc.cadn.net.cn

以下示例使用架构解析器定制器为UserAddress复杂对象使用AVROJSONschemas,分别:spring-doc.cadn.net.cn

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

2.1.3. 类型映射注解

指定要用于特定消息类型的默认模式信息的另一个选项是使用@PulsarMessage注解。 架构信息可以通过schemaType属性。spring-doc.cadn.net.cn

以下示例将系统配置为在生成或使用Foo:spring-doc.cadn.net.cn

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

使用此配置后,无需在侦听器上设置架构,例如:spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "user-topic")
Mono<Void> listen(User user) {
    System.out.println(user);
    return Mono.empty();
}

3. 消息监听器容器基础设施

在大多数情况下,我们建议使用ReactivePulsarListener注释,以便直接从 Pulsar 主题中使用,因为该模型涵盖了广泛的应用程序用例。 但是,重要的是要了解如何ReactivePulsarListener在内部工作。spring-doc.cadn.net.cn

当您使用 Spring for Apache Pulsar 时,消息侦听器容器是消息消费的核心。 这ReactivePulsarListener在幕后使用消息监听器容器基础设施来创建和管理底层 Pulsar 消费者。spring-doc.cadn.net.cn

3.1. ReactivePulsarMessageListenerContainer

此消息侦听器容器的合约通过ReactivePulsarMessageListenerContainer其默认实现创建一个响应式 Pulsar 使用者,并连接使用创建的使用者的响应式消息管道。spring-doc.cadn.net.cn

3.2. 响应式消息管道

管道是底层 Apache Pulsar 响应式客户端的一项功能,它完成了以反应方式接收数据,然后将其移交给提供的消息处理程序的繁重工作。响应式消息侦听器容器实现要简单得多,因为管道处理了大部分工作。spring-doc.cadn.net.cn

3.3. ReactivePulsarMessageHandler

“监听器”方面由ReactivePulsarMessageHandler其中有两个提供的实现:spring-doc.cadn.net.cn

如果在直接使用监听器容器时未指定主题信息,则ReactivePulsarListener使用,但有一个例外是省略了“消息类型默认”步骤。

3.4. 处理启动失败

消息侦听器容器在刷新应用程序上下文时启动。 默认情况下,启动期间遇到的任何故障都会重新抛出,并且应用程序将无法启动。 您可以使用StartupFailurePolicy在相应的容器属性上。spring-doc.cadn.net.cn

可用选项包括:spring-doc.cadn.net.cn

默认重试行为是重试 3 次,中间有 10 秒的延迟 每次尝试。 但是,可以在相应的容器属性上指定自定义重试模板。 如果容器在重试用尽后无法重启,则容器将处于非运行状态。spring-doc.cadn.net.cn

3.4.1. 配置

使用 Spring Boot

使用 Spring Boot 时,您可以注册一个PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>>设置容器启动属性的 bean。spring-doc.cadn.net.cn

没有 Spring Boot

但是,如果要手动配置组件,则在构造消息侦听器容器工厂时,必须相应地更新容器启动属性。spring-doc.cadn.net.cn

4. 并发

在流式处理模式下使用记录时 (stream = true) 并发性通过客户端实现中的底层 Reactive 支持自然而然地实现。spring-doc.cadn.net.cn

但是,在逐个处理消息时,可以指定并发以提高处理吞吐量。 只需将concurrency属性@ReactivePulsarListener. 此外,当concurrency > 1您可以确保消息按键排序,因此通过设置useKeyOrderedProcessing = "true"在注释上。spring-doc.cadn.net.cn

同样,ReactiveMessagePipeline做繁重的工作,我们只需在其上设置属性即可。spring-doc.cadn.net.cn

响应式与命令式

响应式容器中的并发性与命令式容器不同。后者创建多个线程(每个线程都有一个 Pulsar 使用者),而前者将消息分派到响应式并行调度器上同时的多个处理程序实例。spring-doc.cadn.net.cn

响应式并发模型的一个优点是它可以与Exclusive订阅,而命令式并发模型则不能。spring-doc.cadn.net.cn

5. 脉冲星接头

Pulsar 消息元数据可以作为 Spring 消息头使用。可用头的列表可以在 PulsarHeaders.java 中找到。spring-doc.cadn.net.cn

5.1. 在OneByOne监听器中访问

以下示例显示了在使用逐个消息监听器时如何访问 Pulsar 标头:spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "some-topic")
Mono<Void> listen(String data,
        @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
        @Header("foo") String foo) {
    System.out.println("Received " + data + " w/ id=" + messageId + " w/ foo=" + foo);
    return Mono.empty();
}

在前面的示例中,我们访问messageId消息元数据以及名为foo. Spring@Header注释用于每个标题字段。spring-doc.cadn.net.cn

您还可以使用 Pulsar 的Message作为信封来承载有效负载。这样做时,用户可以直接调用 Pulsar 消息上的相应方法来检索元数据。但是,为了方便起见,您也可以使用Header注解。 请注意,您还可以使用 Spring 消息传递Message信封来承载有效负载,然后使用@Header.spring-doc.cadn.net.cn

5.2. 在流式监听器中访问

使用流式消息侦听器时,标头支持有限。 仅当Flux包含Springorg.springframework.messaging.Message元素将填充标题。 此外,Spring@Header注释不能用于检索数据。 必须直接调用Spring消息上的相应方法来检索数据。spring-doc.cadn.net.cn

6. 消息确认

该框架会自动处理消息确认。 但是,侦听器方法必须发送一个信号,指示消息是否已成功处理。 然后,容器实现使用该信号执行 ack 或 nack作。 这与命令式对应项略有不同,在命令式对应项中,除非方法抛出异常,否则信号隐含为正。spring-doc.cadn.net.cn

6.1. OneByOne 监听器

单个消息(又名 OneByOne)消息侦听器方法返回一个Mono<Void>以指示消息是否已成功处理。Mono.empty()表示成功(确认),并Mono.error()表示失败(否定确认)。spring-doc.cadn.net.cn

6.2. 流式监听器

流式侦听器方法返回一个Flux<MessageResult<Void>>其中每个MessageResult元素表示已处理的消息,并保存消息 ID、值以及是否已确认。这MessageResult有一组acknowledgenegativeAcknowledge静态工厂方法,可用于创建适当的MessageResult实例。spring-doc.cadn.net.cn

7. 消息重新传递和错误处理

Apache Pulsar 为消息重新传递和错误处理提供了各种本机策略。 我们将看看它们,并了解如何通过 Spring for Apache Pulsar 使用它们。spring-doc.cadn.net.cn

7.1. 确认超时

默认情况下,除非使用者崩溃,否则 Pulsar 使用者不会重新传递消息,但您可以通过在 Pulsar 使用者上设置 ack 超时来更改此行为。 如果 ack timeout 属性的值大于零,并且 Pulsar 使用者在该超时期限内未确认消息,则会重新传递该消息。spring-doc.cadn.net.cn

您可以通过消费者定制器将此属性直接指定为 Pulsar 消费者属性,例如:spring-doc.cadn.net.cn

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
    return b -> b.property("ackTimeoutMillis", "60000");
}

7.2. 否定确认重新传递延迟

当确认否定时,Pulsar 消费者允许您指定应用程序希望如何重新传递消息。 默认设置是在一分钟内重新传递消息,但您可以通过消费者定制器进行更改,例如:spring-doc.cadn.net.cn

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
    return b -> b.property("negativeAckRedeliveryDelay", "10ms");
}

7.3. 死信主题

Apache Pulsar 允许应用程序在消费者上使用死信主题,并使用Shared订阅类型。 对于ExclusiveFailover订阅类型,此功能不可用。 基本思想是,如果消息重试了一定次数(可能是由于 ack 超时或 nack 重新传递),一旦重试次数用尽,就可以将消息发送到称为死信队列 (DLQ) 的特殊主题。 让我们通过检查一些代码片段来了解有关此功能的一些细节:spring-doc.cadn.net.cn

@Configuration(proxyBeanMethods = false)
class DeadLetterPolicyConfig {

    @ReactivePulsarListener(
            topics = "topic-with-dlp",
            subscriptionType = SubscriptionType.Shared,
            deadLetterPolicy = "myDeadLetterPolicy",
            consumerCustomizer = "ackTimeoutCustomizer" )
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @ReactivePulsarListener(topics = "my-dlq-topic")
    void listenDlq(String msg) {
        System.out.println("From DLQ: " + msg);
    }

    @Bean
    DeadLetterPolicy myDeadLetterPolicy() {
        return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
    }

    @Bean
    ReactiveMessageConsumerBuilderCustomizer<String> ackTimeoutCustomizer() {
        return b -> b.property("ackTimeoutMillis", "1000");
    }
}

首先,我们有一个特殊的豆子DeadLetterPolicy,并将其命名为deadLetterPolicy(它可以是任何你想要的名称)。 此 bean 指定了许多内容,例如最大传递量(在本例中为 10)和死信主题的名称 —my-dlq-topic,在这种情况下。 如果未指定 DLQ 主题名称,则默认为<topicname>-<subscriptionname>-DLQ在脉冲星中。 接下来,我们将此 bean 名称提供给ReactivePulsarListener通过将deadLetterPolicy财产。 请注意,ReactivePulsarListener订阅类型为Shared,因为 DLQ 功能仅适用于共享订阅。 此代码主要用于演示目的,因此我们提供了一个ackTimeoutMillis值为 1000。 这个想法是代码抛出异常,如果 Pulsar 在 1 秒内没有收到 ack,它就会重试。 如果该循环持续十次(因为这是我们在DeadLetterPolicy),Pulsar 消费者将消息发布到 DLQ 主题。 我们还有另一个ReactivePulsarListener侦听 DLQ 主题以接收发布到 DLQ 主题的数据。spring-doc.cadn.net.cn

使用分区主题时关于 DLQ 主题的特别说明

如果主主题是分区的,那么在幕后,每个分区都会被 Pulsar 视为一个单独的主题。Pulsar 将partition-<n>哪里n代表主主题名称的分区号。问题是,如果您没有指定 DLQ 主题(与我们上面所做的相反),Pulsar 会发布到具有此`partition-<n>其中的信息——例如:topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQ. 解决此问题的简单方法是始终提供 DLQ 主题名称。spring-doc.cadn.net.cn

8. Pulsar 读卡器支持

该框架通过ReactivePulsarReaderFactory.spring-doc.cadn.net.cn

Spring Boot 提供了这个读取器工厂,可以配置任何spring.pulsar.reader.*应用程序属性。spring-doc.cadn.net.cn