消息消费

1. @ReactivePulsarListener

对于Pulsar消费者,我们建议终端用户应用使用反应脉冲监听器注解。 使用反应脉冲监听器,你需要使用@EnableReactivePulsar注解。 当你使用Spring Boot支持时,它会自动启用该注释并配置所有必要的组件,比如负责创建底层Pulsar消费者的消息监听器基础设施。spring-doc.cadn.net.cn

让我们重新审视反应脉冲监听器我们在快速参观区看到的代码片段:spring-doc.cadn.net.cn

@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}
监听器方法返回单<虚空>用以表示消息是否已成功处理。Mono.empty()表示成功(承认),且Mono.error()表示失败(负面确认)。

你也可以进一步简化这个方法:spring-doc.cadn.net.cn

@ReactivePulsarListener
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}

在这种最基本的形式中,当主题这些内容并未直接提供,而是通过主题解决过程来确定目标主题。 同样地,当订阅名称@ReactivePulsarListener注释:将使用自动生成的订阅名称。spring-doc.cadn.net.cn

反应脉冲监听器前面展示的方法,我们接收数据为字符串但我们没有指定任何模式类型。 在内部,该框架依赖于Pulsar的模式机制来将数据转换为所需的类型。spring-doc.cadn.net.cn

框架检测到你期望字符串类型,然后根据该信息推断模式类型,并将该模式提供给消费者。 该框架对所有原始类型都进行这种推断。 对于所有非原例类型,默认模式假设为 JSON。 如果复杂类型使用了除 JSON 以外的任何形式(如 AVRO 或 KEY_VALUE),你必须在注释中提供模式类型,使用schemaType财产。spring-doc.cadn.net.cn

这个例子展示了我们如何从一个主题中获取复杂类型:spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "my-topic-2", schemaType = SchemaType.JSON)
Mono<Void> listen(Foo message) {
    System.out.println(message);
    return Mono.empty();
}

让我们看看更多可以消费的方式。spring-doc.cadn.net.cn

此示例直接消耗脉冲星消息:spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.apache.pulsar.client.api.Message<String> message) {
    System.out.println(message.getValue());
    return Mono.empty();
}

这个例子是用 Spring 消息封装的记录:spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.springframework.messaging.Message<String> message) {
    System.out.println(message.getPayload());
    return Mono.empty();
}

1.1. 流媒体

以上都是逐条消费单张唱片的例子。 然而,使用Reactive的一个重要原因是支持背压的流媒体功能。spring-doc.cadn.net.cn

以下示例使用反应脉冲监听器以消耗一连串POJOs:spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "streaming-1", stream = true)
Flux<MessageResult<Void>> listen(Flux<org.apache.pulsar.client.api.Message<String>> messages) {
    return messages
        .doOnNext((msg) -> System.out.println("Received: " + msg.getValue()))
        .map(MessageResult::acknowledge);
}

我们在这里收到记录,作为通量脉冲星信息。 此外,为了实现在反应脉冲监听器你需要设置注释上的属性为true.spring-doc.cadn.net.cn

监听器方法返回Flux<MessageResult<Void>>每个元素代表已处理中的消息,并保存消息ID、值及其是否已被确认。 这消息结果拥有一组静态工厂方法,可用于创建相应的消息结果实例。

基于消息的实际类型通量框架尝试推断出所用的模式。 如果它包含复类型,你仍然需要提供schemaType反应脉冲监听器.spring-doc.cadn.net.cn

以下听众使用春季消息消息具有复数类型的包络:spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "streaming-2", stream = true, schemaType = SchemaType.JSON)
Flux<MessageResult<Void>> listen2(Flux<org.springframework.messaging.Message<Foo>> messages) {
    return messages
        .doOnNext((msg) -> System.out.println("Received: " + msg.getPayload()))
        .map(MessageUtils::acknowledge);
}
监听器方法返回Flux<MessageResult<Void>>每个元素代表已处理中的消息,并保存消息ID、值及其是否已被确认。 SpringMessageUtils拥有一组静态工厂方法,可用于创建相应的消息结果来自一个春季消息的实例。 这MessageUtils为 Spring 消息提供了与 上工厂方法集相同的功能MessageagResult为脉冲星消息而做。
目前没有支持使用org.apache.pulsar.client.api.Messages<T>@ReactivePulsarListener

1.2. 配置 - 应用属性

听者依赖于响应式脉冲星消费者工厂创建和管理其用于消费消息的底层 Pulsar 消费者。 Spring Boot 提供了这个消费者工厂,你可以通过指定Spring.脉冲星.消费者。*应用属性。spring-doc.cadn.net.cn

1.3. 带有AUTO_CONSUME的通用记录

如果无法提前知道 Pulsar 主题的模式类型,你可以使用AUTO_CONSUME模式类型用于消费泛型记录。 在这种情况下,主题将消息反序列化为通用记录对象使用与主题关联的模式信息。spring-doc.cadn.net.cn

要消费通用记录,设置schemaType = SchemaType.AUTO_CONSUME在你的@ReactivePulsarListener并使用类型的脉冲星消息通用记录作为消息参数,如下所示。spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
Mono<Void> listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
    GenericRecord record = message.getValue();
    record.getFields().forEach((f) ->
            System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
	return Mono.empty();
}
通用记录API 允许访问字段及其相关值

1.4. 消费者定制

你可以指定一个ReactivePulsarListenerMessageConsumerBuilderCustomizer配置底层的Pulsar消费者构建器,最终构建听者用来接收消息的消费者。spring-doc.cadn.net.cn

使用时要谨慎,因为这样可以完全访问消费者构建者,并调用其一些方法(例如创造)可能出现意想不到的副作用。

例如,以下代码展示了如何将订阅的初始位置设置为该主题的最早消息。spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "hello-pulsar-topic", consumerCustomizer = "myConsumerCustomizer")
Mono<Void> listen(String message) {
    System.out.println(message);
    return Mono.empty();
}

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myConsumerCustomizer() {
    return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}
如果你的申请只有一个@ReactivePulsarListener以及一首单曲ReactivePulsarListenerMessageConsumerBuilderCustomizerBean注册后,自定义器会自动应用。

你也可以用定制器直接向消费级建造者提供Pulsar消费者属性。 如果你不想使用前面提到的启动配置属性或有多个配置,这很方便反应脉冲监听器其配置各异。spring-doc.cadn.net.cn

以下定制器示例直接使用了Pulsar消费者属性:spring-doc.cadn.net.cn

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> directConsumerPropsCustomizer() {
    return b -> b.property("subscriptionName", "subscription-1").property("topicNames", "foo-1");
}
所用的属性是直接的Pulsar消费者特性,而不是spring.pulsar.consumerSpring Boot 配置属性

2. 规范模式信息

如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出用于反应脉冲监听器. 对于非原语类型,如果注释中没有明确指定模式,Spring for Apache Pulsar 框架将尝试构建Schema.JSON从类型上看。spring-doc.cadn.net.cn

目前支持的复杂模式类型有JSON、AVRO、PROTOBUF、AUTO_CONSUME、KEY_VALUE带内联编码。

2.1. 自定义模式映射

作为指定模式的替代方案反应脉冲监听器对于复杂类型,模式解析器可以配置该类型的映射。 这样就无需在监听器上设置模式,因为框架会根据收到的消息类型访问解析器。spring-doc.cadn.net.cn

2.1.1. 配置属性

模式映射可以通过以下配置配置spring.pulsar.defaults.type-mappings财产。 以下示例使用application.yml为 添加映射用户地址复杂对象使用阿弗罗JSON分别是 schema:spring-doc.cadn.net.cn

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
消息类型是消息类的全限定名称。

2.1.2. Schema resolver customizer

添加映射的首选方法是上述属性。 不过,如果需要更多控制,你可以提供模式解析器自定义工具来添加映射。spring-doc.cadn.net.cn

以下示例使用模式解析器自定义工具来添加映射用户地址复杂对象使用阿弗罗JSON分别是 schema:spring-doc.cadn.net.cn

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

2.1.3. 类型映射注释

另一种指定特定消息类型默认模式信息的选项是用@PulsarMessage注解。 模式信息可以通过以下方式指定schemaType注释上的属性。spring-doc.cadn.net.cn

以下示例配置系统在生成或使用类型消息时,使用 JSON 作为默认模式:spring-doc.cadn.net.cn

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

有了这种配置,就无需在监听者上设置模式,例如:spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "user-topic")
Mono<Void> listen(User user) {
    System.out.println(user);
    return Mono.empty();
}

3. 消息监听器容器基础设施

在大多数情况下,我们建议使用反应脉冲监听器直接用于从Pulsar主题中获取的注释,因为该模型涵盖了广泛的应用用例。 然而,理解其原理非常重要反应脉冲监听器内部工作。spring-doc.cadn.net.cn

消息监听器容器是使用 Apache Pulsar 时消息消费的核心。 这反应脉冲监听器在幕后使用消息监听器容器基础设施来创建和管理底层的 Pulsar 消费者。spring-doc.cadn.net.cn

3.1. 响应式脉冲星信息Listener容器

该消息监听器容器的合同通过以下方式提供ReactivePulsarMessageListenerContainer其默认实现创建响应式 Pulsar 消费者,并布线响应式消息流水线,使用该消费者。spring-doc.cadn.net.cn

3.2. 响应式消息流水线

流水线是底层 Apache Pulsar Reactive 客户端的一个功能,负责以响应式方式接收数据,然后将其交给所提供的消息处理程序。响应式消息监听器容器的实现要简单得多,因为流水线承担了大部分工作。spring-doc.cadn.net.cn

3.3. 反应脉冲星消息处理器

“听者”方面由反应脉冲星消息处理器其中有两种实现方式:spring-doc.cadn.net.cn

如果在直接使用监听器容器时未指定主题信息,则使用相同的主题解析过程反应脉冲监听器使用的唯一例外是省略了“消息类型默认”步骤。

3.4. 启动失败处理

消息监听器容器在应用上下文刷新时启动。 默认情况下,启动过程中遇到的任何失败都会被重新抛出,应用程序将无法启动。 你可以用StartupFailurePolicy在相应的容器属性上。spring-doc.cadn.net.cn

可选方案包括:spring-doc.cadn.net.cn

默认的重试行为是重试3次,间隔10秒 每一次尝试。 不过,可以在相应的容器属性上指定自定义重试模板。 如果容器在重试次数用尽后未能重启,则保持为非运行状态。spring-doc.cadn.net.cn

3.4.1. 配置

与Spring Boot合作

使用Spring Boot时,你可以注册一个PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>>BEAN,负责设置容器启动属性。spring-doc.cadn.net.cn

无Spring靴

但是,如果你是手动配置组件,构建消息监听器容器工厂时就必须相应更新容器启动属性。spring-doc.cadn.net.cn

4. 并发

在流媒体模式下消费记录时(流 = 真)并发性通过客户端实现中的底层响应式支持自然实现。spring-doc.cadn.net.cn

然而,当逐条处理消息时,可以指定并发以提高处理吞吐量。 只需设置并发@ReactivePulsarListener. 此外,当并发> 1你可以通过设置确保消息按键排序,从而发送到同一处理器useKeyOrderedProcessing = “true”在注释上。spring-doc.cadn.net.cn

再说一次,响应式消息管道主要负责,我们只需在上面设置属性。spring-doc.cadn.net.cn

响应式与命令式

响应式容器中的并发与其命令式容器不同。 后者创建多个线程(每个线程都有一个 Pulsar 消费者),而前者则在响应式并行调度器上同时向多个处理实例分发消息。spring-doc.cadn.net.cn

响应式并发模型的一个优点是它可以用于独家订阅,而命令式并发模型则不能。spring-doc.cadn.net.cn

5. 脉冲星头部

Pulsar 消息元数据可以作为 Spring 消息头部使用。 可用头部列表可在PulsarHeaders.java中找到。spring-doc.cadn.net.cn

5.1. 在 OneByOne 监听器中访问

以下示例展示了使用逐一消息监听器时如何访问脉冲星头部:spring-doc.cadn.net.cn

@ReactivePulsarListener(topics = "some-topic")
Mono<Void> listen(String data,
        @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
        @Header("foo") String foo) {
    System.out.println("Received " + data + " w/ id=" + messageId + " w/ foo=" + foo);
    return Mono.empty();
}

在前面的例子中,我们访问了messageId(信息ID消息元数据以及一个名为. Spring@Header每个头字段都使用注释。spring-doc.cadn.net.cn

你也可以用脉冲星消息作为承载有效载荷的包层。 在此过程中,用户可以直接调用 Pulsar 消息中的相应方法来获取元数据。 不过,为了方便,你也可以通过以下方式取回页眉注解。 注意你也可以使用春季消息消息包络 以携带有效载荷,然后通过以下方式获取 Pulsar 头部@Header.spring-doc.cadn.net.cn

5.2. 在流媒体监听器中访问

使用流式消息监听器时,头部支持有限。 只有当通量包含Springorg.springframework.messaging.Message元素将被填充。 此外,Spring@Header注释无法用于检索数据。 你必须直接调用 Spring 消息中的相应方法来获取数据。spring-doc.cadn.net.cn

6. 消息确认

该框架会自动处理消息确认。 然而,监听者方法必须发送一个信号,表示消息是否已成功处理。 容器实现随后利用该信号执行 ack 或 nack作。 这与其命令式对应法略有不同,后者信号默认为正,除非方法抛出异常。spring-doc.cadn.net.cn

6.1. 一一听众

单消息(又称 OneByOne)消息监听器方法返回单<虚空>用以表示消息是否已成功处理。Mono.empty()表示成功(承认),且Mono.error()表示失败(负面确认)。spring-doc.cadn.net.cn

6.2. 流媒体听众

流式监听器方法返回Flux<MessageResult<Void>>其中每个消息结果元素表示已处理中的消息,包含消息ID、值以及是否已被确认。这消息结果有一组承认否定确认静态工厂方法可用于创建合适的消息结果实例。spring-doc.cadn.net.cn

7. 消息重发与错误处理

Apache Pulsar 提供了多种原生策略用于消息重发和错误处理。 我们将看看它们,并了解如何通过Spring for Apache Pulsar使用它们。spring-doc.cadn.net.cn

7.1. 确认超时

默认情况下,Pulsar 消费者不会重投消息,除非消费者崩溃,但你可以通过设置 Pulsar 消费者的 ack 超时来改变这种行为。 如果 ack 超时属性值大于零,且 Pulsar 用户在超时期间未确认消息,则消息会被重新传递。spring-doc.cadn.net.cn

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
    return b -> b.property("ackTimeoutMillis", "60000");
}

7.2. 负面确认重送延迟

当确认为负面时,Pulsar 消费者允许你指定应用希望如何重新传递消息。 默认情况下,消息需一分钟内重投,但你可以通过消费者自定义工具(如:spring-doc.cadn.net.cn

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
    return b -> b.property("negativeAckRedeliveryDelay", "10ms");
}

7.3. 死信话题

Apache Pulsar 允许应用程序在消费者上使用死符主题,且共享订阅类型。 对于独家故障切换订阅类型,此功能不可用。 基本思路是,如果消息被重试一定次数(可能是因为确认超时或 nack 重投),当重试次数用尽后,消息可以发送到一个称为死字母队列(DLQ)的特殊主题。 让我们通过检查一些代码片段来了解该功能的具体作细节:spring-doc.cadn.net.cn

@Configuration(proxyBeanMethods = false)
class DeadLetterPolicyConfig {

    @ReactivePulsarListener(
            topics = "topic-with-dlp",
            subscriptionType = SubscriptionType.Shared,
            deadLetterPolicy = "myDeadLetterPolicy",
            consumerCustomizer = "ackTimeoutCustomizer" )
    void listen(String msg) {
        throw new RuntimeException("fail " + msg);
    }

    @ReactivePulsarListener(topics = "my-dlq-topic")
    void listenDlq(String msg) {
        System.out.println("From DLQ: " + msg);
    }

    @Bean
    DeadLetterPolicy myDeadLetterPolicy() {
        return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
    }

    @Bean
    ReactiveMessageConsumerBuilderCustomizer<String> ackTimeoutCustomizer() {
        return b -> b.property("ackTimeoutMillis", "1000");
    }
}

首先,我们有一种特殊的豆子DeadLetterPolicy,其名称为deadLetterPolicy(你可以随意取任何名字)。 这个豆子指定了许多内容,比如最大送达时间(这里是10)和死字母主题的名称——我的DLQ主题,在这种情况下。 如果你没有指定DLQ主题名称,默认为<topicname>-<subscriptionname>-DLQ在脉冲星。 接下来,我们将这个豆子命名为反应脉冲监听器通过设置deadLetterPolicy财产。 注意反应脉冲监听器拥有一种订阅类型共享,因为DLQ功能只适用于共享订阅。 本代码主要用于演示,因此我们提供ackTimeoutMillis价值1000。 其理念是代码抛出异常,如果 Pulsar 在 1 秒内未收到确认,则进行重试。 如果这个循环持续十次(因为这是我们最大的重送次数DeadLetterPolicy),Pulsar 消费者将消息发布到 DLQ 主题。 我们又来了反应脉冲监听器它监听 DLQ 主题,接收发布到该主题的数据。spring-doc.cadn.net.cn

关于使用分区主题时的DLQ主题的特别说明

如果主主题被分割,幕后每个分割都会被Pulsar视为独立主题。 脉冲星附录分区<n>哪里n代表主主题名称的分区号。 问题是,如果你没有指定DLQ主题(与我们上面做的相反),Pulsar会发布到默认主题名称中,该主题名称包含以下内容'分<n>其中的信息——例如:topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQ. 解决这个问题的简单方法是始终提供一个DLQ主题名称。spring-doc.cadn.net.cn

8. 脉冲星读卡器支持

该框架支持以响应式方式使用 Pulsar Reader,通过响应式脉冲星读者工厂.spring-doc.cadn.net.cn

Spring Boot 提供了该读取器工厂,可以配置为任何Spring脉冲星读者。*应用属性。spring-doc.cadn.net.cn