消息消费
1. @ReactivePulsarListener
对于 Pulsar 消费者,我们建议最终用户应用程序使用ReactivePulsarListener
注解。
使用ReactivePulsarListener
,您需要使用@EnableReactivePulsar
注解。
当您使用 Spring Boot 支持时,它会自动启用此 Comments 并配置所有必要的组件,例如消息侦听器基础设施(负责创建底层 Pulsar 消费者)。
让我们重新审视一下ReactivePulsarListener
我们在快速浏览部分看到的代码片段:
@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
监听器方法返回一个Mono<Void> 以指示消息是否已成功处理。Mono.empty() 表示成功(确认),并Mono.error() 表示失败(否定确认)。 |
您还可以进一步简化此方法:
@ReactivePulsarListener
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
在这种最基本的形式中,当topics
未直接提供,则使用主题解析过程来确定目标主题。
同样,当subscriptionName
上未提供@ReactivePulsarListener
注释将使用自动生成的订阅名称。
在ReactivePulsarListener
方法,我们收到的数据为String
,但我们没有指定任何模式类型。
在内部,该框架依赖于 Pulsar 的模式机制将数据转换为所需的类型。
框架检测到您期望String
类型,然后根据该信息推断架构类型,并将该架构提供给使用者。
框架对所有基元类型进行此推断。
对于所有非原始类型,默认模式假定为 JSON。
如果复杂类型使用 JSON 以外的任何内容(例如 AVRO 或 KEY_VALUE),则必须使用schemaType
财产。
此示例显示了我们如何从主题中使用复杂类型:
@ReactivePulsarListener(topics = "my-topic-2", schemaType = SchemaType.JSON)
Mono<Void> listen(Foo message) {
System.out.println(message);
return Mono.empty();
}
让我们看看更多我们可以消费的方式。
此示例直接使用 Pulsar 消息:
@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println(message.getValue());
return Mono.empty();
}
此示例使用包装在 Spring 消息信封中的记录:
@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.springframework.messaging.Message<String> message) {
System.out.println(message.getPayload());
return Mono.empty();
}
1.1. 流媒体
以上都是逐条消费单条记录的示例。 然而,使用 Reactive 的令人信服的原因之一是支持背压的流功能。
以下示例使用ReactivePulsarListener
要使用POJO流:
@ReactivePulsarListener(topics = "streaming-1", stream = true)
Flux<MessageResult<Void>> listen(Flux<org.apache.pulsar.client.api.Message<String>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getValue()))
.map(MessageResult::acknowledge);
}
在这里,我们将记录作为Flux
脉冲星消息。
此外,要在ReactivePulsarListener
级别,您需要将stream
属性设置为true
.
监听器方法返回一个Flux<MessageResult<Void>> 其中每个元素表示已处理的消息,并保存消息 ID、值以及是否已确认。
这MessageResult 具有一组静态工厂方法,可用于创建适当的MessageResult 实例。 |
根据Flux
,框架会尝试推断要使用的模式。
如果它包含复杂类型,您仍然需要提供schemaType
上ReactivePulsarListener
.
以下监听器使用 Spring 消息传递Message
具有复杂类型的信封:
@ReactivePulsarListener(topics = "streaming-2", stream = true, schemaType = SchemaType.JSON)
Flux<MessageResult<Void>> listen2(Flux<org.springframework.messaging.Message<Foo>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getPayload()))
.map(MessageUtils::acknowledge);
}
监听器方法返回一个Flux<MessageResult<Void>> 其中每个元素表示已处理的消息,并保存消息 ID、值以及是否已确认。
SpringMessageUtils 具有一组静态工厂方法,可用于创建适当的MessageResult 实例。
这MessageUtils 为 Spring 消息提供与MessagResult 对 Pulsar 消息执行。 |
不支持使用org.apache.pulsar.client.api.Messages<T> 在@ReactivePulsarListener
|
1.2. 配置 - 应用程序属性
监听器依赖于ReactivePulsarConsumerFactory
创建和管理用于使用消息的底层 Pulsar 使用者。
Spring Boot 提供了这个消费者工厂,您可以通过指定spring.pulsar.consumer.*
应用程序属性。
1.3. 带有AUTO_CONSUME的通用记录
如果没有机会提前知道 Pulsar 主题的 schema 类型,可以使用AUTO_CONSUME
schema 类型来使用通用记录。
在这种情况下,主题将消息反序列化为GenericRecord
对象使用与主题关联的架构信息。
要使用通用记录,请将schemaType = SchemaType.AUTO_CONSUME
在您的@ReactivePulsarListener
并使用GenericRecord
作为 message 参数,如下所示。
@ReactivePulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
Mono<Void> listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
GenericRecord record = message.getValue();
record.getFields().forEach((f) ->
System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
return Mono.empty();
}
这GenericRecord API 允许访问字段及其关联值 |
1.4. 消费者定制
您可以指定一个ReactivePulsarListenerMessageConsumerBuilderCustomizer
配置底层 Pulsar 消费者构建器,该构建器最终构造监听器用于接收消息的消费者。
请谨慎使用,因为这可以完全访问消费者构建器并调用其某些方法(例如create )可能会产生意想不到的副作用。 |
例如,以下代码演示如何将订阅的初始位置设置为主题上最早的消息。
@ReactivePulsarListener(topics = "hello-pulsar-topic", consumerCustomizer = "myConsumerCustomizer")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myConsumerCustomizer() {
return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}
如果您的应用程序只有一个@ReactivePulsarListener 和单个ReactivePulsarListenerMessageConsumerBuilderCustomizer bean 注册,则定制器将自动应用。 |
您还可以使用定制器向消费者构建器提供直接的 Pulsar 消费者属性。
如果您不想使用前面提到的启动配置属性或有多个ReactivePulsarListener
配置不同的方法。
以下定制器示例使用直接的 Pulsar 使用者属性:
@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> directConsumerPropsCustomizer() {
return b -> b.property("subscriptionName", "subscription-1").property("topicNames", "foo-1");
}
使用的属性是直接的 Pulsar 使用者属性,而不是spring.pulsar.consumer Spring Boot 配置属性 |
2. 指定模式信息
如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出要在ReactivePulsarListener
.
对于非原始类型,如果未在 Comments 上显式指定 Schema,则 Spring for Apache Pulsar 框架将尝试构建一个Schema.JSON
从类型。
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_CONSUME KEY_VALUE 带内联编码。 |
2.1. 自定义模式映射
作为在ReactivePulsarListener
对于复杂类型,可以使用类型的映射来配置模式解析器。
这样就无需在侦听器上设置模式,因为框架使用传入消息类型咨询解析器。
2.1.1. 配置属性
架构映射可以使用spring.pulsar.defaults.type-mappings
财产。
以下示例使用application.yml
为User
和Address
复杂对象使用AVRO
和JSON
schemas,分别:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
这message-type 是消息类的完全限定名称。 |
2.1.2. 模式解析器定制器
添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,可以提供架构解析器定制器来添加映射。
以下示例使用架构解析器定制器为User
和Address
复杂对象使用AVRO
和JSON
schemas,分别:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.1.3. 类型映射注解
指定要用于特定消息类型的默认模式信息的另一个选项是使用@PulsarMessage
注解。
架构信息可以通过schemaType
属性。
以下示例将系统配置为在生成或使用Foo
:
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
使用此配置后,无需在侦听器上设置架构,例如:
@ReactivePulsarListener(topics = "user-topic")
Mono<Void> listen(User user) {
System.out.println(user);
return Mono.empty();
}
3. 消息监听器容器基础设施
在大多数情况下,我们建议使用ReactivePulsarListener
注释,以便直接从 Pulsar 主题中使用,因为该模型涵盖了广泛的应用程序用例。
但是,重要的是要了解如何ReactivePulsarListener
在内部工作。
当您使用 Spring for Apache Pulsar 时,消息侦听器容器是消息消费的核心。
这ReactivePulsarListener
在幕后使用消息监听器容器基础设施来创建和管理底层 Pulsar 消费者。
3.1. ReactivePulsarMessageListenerContainer
此消息侦听器容器的合约通过ReactivePulsarMessageListenerContainer
其默认实现创建一个响应式 Pulsar 使用者,并连接使用创建的使用者的响应式消息管道。
3.2. 响应式消息管道
管道是底层 Apache Pulsar Reactive 客户端的一项功能,它以响应方式接收数据,然后将其移交给提供的消息处理程序。响应式消息侦听器容器实现要简单得多,因为管道处理了大部分工作。
3.3. ReactivePulsarMessageHandler
“监听器”方面由ReactivePulsarMessageHandler
其中有两个提供的实现:
-
ReactivePulsarOneByOneMessageHandler
- 逐条处理单个消息 -
ReactivePulsarStreamingHandler
- 通过Flux
4. 并发
在流式处理模式下使用记录时 (stream = true
) 并发性通过客户端实现中的底层 Reactive 支持自然而然地实现。
但是,在逐个处理消息时,可以指定并发以提高处理吞吐量。
只需将concurrency
属性@ReactivePulsarListener
.
此外,当concurrency > 1
您可以确保消息按键排序,因此通过设置useKeyOrderedProcessing = "true"
在注释上。
同样,ReactiveMessagePipeline
做繁重的工作,我们只需在其上设置属性即可。
5. 脉冲星接头
Pulsar 消息元数据可以作为 Spring 消息头使用。 可用标头列表可以在PulsarHeaders.java中找到。
5.1. 在OneByOne监听器中访问
以下示例显示了在使用逐个消息监听器时如何访问 Pulsar 标头:
@ReactivePulsarListener(topics = "some-topic")
Mono<Void> listen(String data,
@Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header("foo") String foo) {
System.out.println("Received " + data + " w/ id=" + messageId + " w/ foo=" + foo);
return Mono.empty();
}
在前面的示例中,我们访问messageId
消息元数据以及名为foo
.
Spring@Header
注释用于每个标题字段。
您还可以使用 Pulsar 的Message
作为承载有效载荷的信封。
这样做时,用户可以直接调用 Pulsar 消息上的相应方法来检索元数据。
但是,为了方便起见,您也可以使用Header
注解。
请注意,您还可以使用 Spring 消息传递Message
信封来承载有效负载,然后使用@Header
.
6. 消息确认
7. 消息重新传递和错误处理
Apache Pulsar 为消息重新传递和错误处理提供了各种本机策略。我们将看看它们,并了解如何通过 Spring for Apache Pulsar 使用它们。
7.1. 确认超时
默认情况下,除非使用者崩溃,否则 Pulsar 使用者不会重新传递消息,但您可以通过在 Pulsar 使用者上设置 ack 超时来更改此行为。如果 ack timeout 属性的值大于零,并且 Pulsar 使用者在该超时期限内未确认消息,则会重新传递该消息。
您可以通过消费者定制器将此属性直接指定为 Pulsar 消费者属性,例如:
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("ackTimeoutMillis", "60000");
}
7.2. 否定确认重新传递延迟
当确认为否定时,Pulsar 消费者允许您指定应用程序希望如何重新传递消息。默认设置是在一分钟内重新传递消息,但您可以通过消费者定制器进行更改,例如:
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("negativeAckRedeliveryDelay", "10ms");
}
7.3. 死信主题
Apache Pulsar 允许应用程序在消费者上使用死信主题,并使用Shared
订阅类型。
对于Exclusive
和Failover
订阅类型,此功能不可用。
基本思想是,如果消息重试了一定次数(可能是由于 ack 超时或 nack 重新传递),一旦重试次数用尽,就可以将消息发送到称为死信队列 (DLQ) 的特殊主题。
让我们通过检查一些代码片段来了解有关此功能的一些细节:
@Configuration(proxyBeanMethods = false)
class DeadLetterPolicyConfig {
@ReactivePulsarListener(
topics = "topic-with-dlp",
subscriptionType = SubscriptionType.Shared,
deadLetterPolicy = "myDeadLetterPolicy",
consumerCustomizer = "ackTimeoutCustomizer" )
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@ReactivePulsarListener(topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}
@Bean
DeadLetterPolicy myDeadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> ackTimeoutCustomizer() {
return b -> b.property("ackTimeoutMillis", "1000");
}
}
首先,我们有一个特殊的豆子DeadLetterPolicy
,并将其命名为deadLetterPolicy
(它可以是任何你想要的名称)。
此 bean 指定了许多内容,例如最大传递量(在本例中为 10)和死信主题的名称 —my-dlq-topic
,在这种情况下。
如果未指定 DLQ 主题名称,则默认为<topicname>-<subscriptionname>-DLQ
在脉冲星中。
接下来,我们将此 bean 名称提供给ReactivePulsarListener
通过将deadLetterPolicy
财产。
请注意,ReactivePulsarListener
订阅类型为Shared
,因为 DLQ 功能仅适用于共享订阅。
此代码主要用于演示目的,因此我们提供了一个ackTimeoutMillis
值为 1000。
这个想法是代码抛出异常,如果 Pulsar 在 1 秒内没有收到 ack,它就会重试。
如果该循环持续十次(因为这是我们在DeadLetterPolicy
),Pulsar 消费者将消息发布到 DLQ 主题。
我们还有另一个ReactivePulsarListener
侦听 DLQ 主题以接收发布到 DLQ 主题的数据。
8. Pulsar 读卡器支持
Spring Boot 提供了这个读取器工厂,可以配置任何spring.pulsar.reader.*
应用程序属性。