消息消费
1. Pulsar 监听器
对于 Pulsar 消费者,我们建议最终用户应用程序使用PulsarListener注解。
要使用PulsarListener,您需要使用@EnablePulsar注解。
当您使用 Spring Boot 支持时,它会自动启用此 Comments 并配置所有必要的组件PulsarListener,例如消息侦听器基础设施(负责创建 Pulsar 消费者)。PulsarMessageListenerContainer使用PulsarConsumerFactory创建和管理 Pulsar 消费者,即用于消费消息的底层 Pulsar 消费者。
Spring Boot 提供了这个消费者工厂,您可以通过指定spring.pulsar.consumer.*应用程序属性。工厂上的大多数已配置属性将在侦听器中得到遵守,但以下情况除外:
这spring.pulsar.consumer.subscription.name属性将被忽略,而是在未在 Comments 上指定时生成。 |
这spring.pulsar.consumer.subscription-typeproperty 被忽略,而是取自 annotation 上的值。但是,您可以将subscriptionType = {}以改用 Property 值作为默认值。 |
让我们重新审视一下PulsarListener我们在 Quick-Tour 部分看到的代码片段:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
您可以进一步简化此方法:
@PulsarListener
public void listen(String message) {
System.out.println("Message Received: " + message);
}
在这种最基本的形式中,当subscriptionName未在@PulsarListenerannotation 将使用自动生成的订阅名称。
同样,当topics未直接提供,则使用主题解析过程来确定目标主题。
在PulsarListener方法,我们接收数据为String,但我们没有指定任何 schema 类型。
在内部,该框架依赖于 Pulsar 的 schema 机制将数据转换为所需的类型。
框架检测到您希望Stringtype 的 Schema 类型,然后根据该信息推断 schema 类型,并将该 schema 提供给使用者。
框架对所有基元类型执行此推理。
对于所有非基元类型,默认架构假定为 JSON。
如果复杂类型使用 JSON 以外的任何内容(例如 AVRO 或 KEY_VALUE),则必须使用schemaType财产。
以下示例显示了另一个PulsarListener方法,该方法采用Integer:
@PulsarListener(subscriptionName = "my-subscription-1", topics = "my-topic-1")
public void listen(Integer message) {
System.out.println(message);
}
以下内容PulsarListenermethod 展示了我们如何从 topic 中使用复杂类型:
@PulsarListener(subscriptionName = "my-subscription-2", topics = "my-topic-2", schemaType = SchemaType.JSON)
public void listen(Foo message) {
System.out.println(message);
}
让我们看看更多方法。
你可以直接消费 Pulsar 消息:
@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println(message.getValue());
}
下面的示例使用 Spring 消息传递信封来使用记录:
@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.springframework.messaging.Message<String> message) {
System.out.println(message.getPayload());
}
现在让我们看看如何批量使用记录。
以下示例使用PulsarListener要批量使用记录作为 POJO:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Foo> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message));
}
请注意,在此示例中,我们将记录作为集合 (List) 的对象。
此外,要在PulsarListenerlevel 中,您需要设置batch属性设置为true.
根据实际类型Listholds,则框架会尝试推断要使用的 schema。
如果List包含除 JSON 之外的复杂类型,您仍然需要提供schemaType上PulsarListener.
下面使用Messageenvelope 的 Pulsar Java 客户端提供的 Paypal 中:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Message<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}
以下示例使用带有 Spring 消息传递信封的批处理记录Message类型:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<org.springframework.messaging.Message<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getPayload()));
}
最后,您还可以使用Messagesholder 对象用于批处理监听器:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(org.apache.pulsar.client.api.Messages<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}
当您使用PulsarListener,你可以直接在 Comments 本身上提供 Pulsar consumer 属性。
如果您不想使用前面提到的 Boot 配置属性或具有多个PulsarListener方法。
以下示例直接在PulsarListener:
@PulsarListener(properties = { "subscriptionName=subscription-1", "topicNames=foo-1", "receiverQueueSize=5000" })
void listen(String message) {
}
使用的属性是直接的 Pulsar 消费者属性,而不是spring.pulsar.consumer应用程序配置属性 |
1.1. 使用 AUTO_CONSUME 的通用记录
如果无法提前知道某个 Pulsar 主题的 schema 类型,可以使用AUTO_CONSUMEschema 类型来使用通用记录。
在这种情况下,主题将消息反序列化为GenericRecord对象使用与主题关联的架构信息。
要使用通用记录,请将schemaType = SchemaType.AUTO_CONSUME在您的@PulsarListener并使用GenericRecord作为 message 参数,如下所示。
@PulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
void listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
GenericRecord record = message.getValue();
record.getFields().forEach((f) ->
System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
}
这GenericRecordAPI 允许访问字段及其关联值 |
1.2. 自定义 ConsumerBuilder
您可以通过ConsumerBuilder使用PulsarListenerConsumerBuilderCustomizer通过提供@Bean的类型PulsarListenerConsumerBuilderCustomizer然后将其提供给PulsarListener如下所示。
@PulsarListener(topics = "hello-topic", consumerCustomizer = "myCustomizer")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
@Bean
PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
return (builder) -> builder.consumerName("myConsumer");
}
如果您的应用程序只有一个@PulsarListener和一个PulsarListenerConsumerBuilderCustomizerbean 已注册,则将自动应用定制器。 |
2. 指定 Schema 信息
如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出要在PulsarListener.
对于非原始类型,如果未在注解上显式指定 Schema,则 Spring for Apache Pulsar 框架将尝试构建一个Schema.JSON从 type.
| 当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_CONSUME KEY_VALUE w/ INLINE 编码。 |
2.1. 自定义 Schema 映射
作为在PulsarListener对于复杂类型,可以使用类型的映射配置 Schema Resolver。
这样就无需在侦听器上设置 schema,因为框架使用传入消息类型咨询解析程序。
2.1.1. 配置属性
架构映射可以使用spring.pulsar.defaults.type-mappings财产。
以下示例使用application.yml要为User和Address复杂对象 usingAVRO和JSONschemas 的 Schemas:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
这message-type是 Message 类的完全限定名称。 |
2.1.2. Schema 解析器定制器
添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,则可以提供架构解析程序定制器来添加映射。
以下示例使用架构解析程序定制器为User和Address复杂对象 usingAVRO和JSONschemas 的 Schemas:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.1.3. 类型映射注解
指定要用于特定消息类型的默认架构信息的另一个选项是使用@PulsarMessage注解。
架构信息可以通过schemaType属性。
以下示例将系统配置为在生成或使用 JSON 类型的消息时使用 JSON 作为默认架构Foo:
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
有了这个配置,就不需要在侦听器上设置 schema,例如:
@PulsarListener(subscriptionName = "user-sub", topics = "user-topic")
public void listen(User user) {
System.out.println(user);
}
3. 访问 Pulsar Consumer Object
有时,你需要直接访问 Pulsar Consumer 对象。 以下示例显示了如何获取它:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message, org.apache.pulsar.client.api.Consumer<String> consumer) {
System.out.println("Message Received: " + message);
ConsumerStats stats = consumer.getStats();
...
}
访问Consumer对象,请不要调用任何会通过调用任何 receive 方法来改变 Consumer 光标位置的作。
所有此类作都必须由容器完成。 |
4. Pulsar 消息监听器容器
现在我们已经看到了消费者端的基本交互PulsarListener.现在让我们深入了解一下PulsarListener与底层 Pulsar 消费者交互。
请记住,对于最终用户应用程序,在大多数情况下,我们建议使用PulsarListener注解,以便在使用 Spring for Apache Pulsar 时直接从 Pulsar 主题中使用,因为该模型涵盖了广泛的应用程序用例。
但是,了解如何作非常重要PulsarListener在内部工作。本节将介绍这些详细信息。
如前所述,当您使用 Spring for Apache Pulsar 时,消息侦听器容器是消息消费的核心。PulsarListener在幕后使用消息侦听器容器基础设施来创建和管理 Pulsar 消费者。
Spring for Apache Pulsar 通过以下方式提供此消息侦听器容器的 ContractPulsarMessageListenerContainer.
此消息侦听器容器的默认实现是通过DefaultPulsarMessageListenerContainer.
顾名思义,PulsarMessageListenerContainer包含消息侦听器。
容器创建 Pulsar 消费者,然后运行一个单独的线程来接收和处理数据。
数据由提供的消息侦听器实现处理。
消息侦听器容器使用使用者的batchReceive方法。
收到数据后,数据将移交给选定的消息侦听器实现。
使用 Spring for Apache Pulsar 时,可以使用以下消息侦听器类型。
我们将在以下部分中看到有关这些不同消息侦听器的详细信息。
但是,在这样做之前,让我们仔细看看容器本身。
4.1. 默认 PulsarMessageListenerContainer
这是一个基于使用者的消息侦听器容器。 下面的清单显示了它的构造函数:
public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties)
}
它接收一个PulsarConsumerFactory(用于创建使用者)和PulsarContainerProperties对象(包含有关容器属性的信息)。PulsarContainerProperties具有以下构造函数:
public PulsarContainerProperties(String... topics)
public PulsarContainerProperties(Pattern topicPattern)
您可以通过以下方式提供主题信息PulsarContainerProperties或作为提供给 Consumer Factory 的 Consumer Property 的 Consumer 属性。
以下示例使用DefaultPulsarMessageListenerContainer:
Map<String, Object> config = new HashMap<>();
config.put("topics", "my-topic");
PulsarConsumerFactory<String> pulsarConsumerFactorY = DefaultPulsarConsumerFactory<>(pulsarClient, config);
PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();
pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
});
DefaultPulsarMessageListenerContainer<String> pulsarListenerContainer = new DefaultPulsarMessageListenerContainer(pulsarConsumerFacotyr,
pulsarContainerProperties);
return pulsarListenerContainer;
DefaultPulsarMessageListenerContainer仅创建一个使用者。
如果要通过多个线程管理多个使用者,则需要使用ConcurrentPulsarMessageListenerContainer.
4.2. ConcurrentPulsarMessageListenerContainer
ConcurrentPulsarMessageListenerContainer具有以下构造函数:
public ConcurrentPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties)
ConcurrentPulsarMessageListenerContainer允许您指定concurrencyproperty 的 setter 实现。
并发性超过1仅允许在非独占订阅 (failover,shared和key-shared).
您只能使用默认的1用于并发。
以下示例启用concurrency通过PulsarListener注解failover订阅。
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Failover, concurrency = "3")
void listen(String message, Consumer<String> consumer) {
...
System.out.println("Current Thread: " + Thread.currentThread().getName());
System.out.println("Current Consumer: " + consumer.getConsumerName());
}
在前面的监听器中,假设 topicmy-topic有三个分区。
如果是非分区主题,请将并发设置为3什么都不做。除了主要的活跃使用者之外,您还会获得两个空闲使用者。
如果主题具有三个以上的分区,则消息将在容器创建的使用者之间进行负载均衡。
如果您运行此PulsarListener,您会看到来自不同分区的消息通过不同的使用者使用,如前面示例中的线程名称和使用者名称打印输出所暗示的那样。
当您使用Failoversubscription 这样,Pulsar 保证消息排序。 |
下面的清单显示了PulsarListener,但使用Sharedsubscription 和concurrency启用。
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Shared, concurrency = "5")
void listen(String message) {
...
}
在前面的示例中,PulsarListener创建五个不同的使用者(这一次,我们假设主题有 5 个分区)。
在这个版本中,没有消息排序,因为Shared订阅不保证 Pulsar 中的任何消息排序。 |
如果您需要消息排序,但仍需要共享订阅类型,则需要使用Key_Shared订阅类型。
4.3. 使用记录
让我们看看消息侦听器容器如何实现基于单记录和基于批处理的消息使用。
单记录消耗
让我们重新审视一下我们的基本PulsarListener为了这次讨论:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
有了这个PulsarListener方法,我们必须要求 Spring for Apache Pulsar 每次调用具有单个记录的监听器方法。
我们提到了消息侦听器容器使用batchReceive方法。
框架检测到PulsarListener,在本例中,接收单个记录。这意味着,在每次调用该方法时,它都需要一个 singe 记录。
尽管消息侦听器容器会批量使用记录,但它会遍历收到的批处理,并通过适配器调用PulsarRecordMessageListener.
正如您在上一节中所看到的,PulsarRecordMessageListener从MessageListener由 Pulsar Java 客户端提供,并且支持基本的received方法。
批量消耗
以下示例显示了PulsarListener批量消费记录:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen4(List<Foo> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message));
}
当您使用这种类型的PulsarListener,框架会检测到您处于批处理模式。
由于它已经使用 Consumer 的batchReceive方法,它会通过适配器将整个 Batch 移交给 listener 方法PulsarBatchMessageListener.
5. Pulsar 标头
Pulsar 消息元数据可以作为 Spring 消息头使用。 可用标头的列表可以在 PulsarHeaders.java 中找到。
5.1. 在基于单记录的 Consumer 中访问
以下示例展示了如何在使用单记录消费模式的应用程序中访问各种 Pulsar Headers:
@PulsarListener(topics = "simpleListenerWithHeaders")
void simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header(PulsarHeaders.RAW_DATA) byte[] rawData,
@Header("foo") String foo) {
}
在前面的示例中,我们访问messageId和rawDatamessage 元数据以及名为foo.
Spring@Headerannotation 用于每个 Header 字段。
您还可以使用 Pulsar 的Message作为 envelope 来承载负载。
这样做时,用户可以直接调用 Pulsar 消息上的相应方法来检索元数据。
但是,为方便起见,您也可以使用Header注解。
请注意,您还可以使用 Spring 消息传递Messageenvelope 来携带 payload,然后使用@Header.
5.2. 基于批量记录的 Consumer 访问
在本节中,我们将了解如何在使用批处理 consumer 的应用程序中访问各种 Pulsar Headers:
@PulsarListener(topics = "simpleBatchListenerWithHeaders", batch = true)
void simpleBatchListenerWithHeaders(List<String> data,
@Header(PulsarHeaders.MESSAGE_ID) List<MessageId> messageIds,
@Header(PulsarHeaders.TOPIC_NAME) List<String> topicNames, @Header("foo") List<String> fooValues) {
}
在前面的示例中,我们将数据作为List<String>.
在提取各种 header 时,我们以List<>也。
Spring for Apache Pulsar 确保 header 列表与 data 列表相对应。
当您使用批处理侦听器并接收有效负载时,您还可以以相同的方式提取标头List<org.apache.pulsar.client.api.Message<?>,org.apache.pulsar.client.api.Messages<?>或org.springframework.messaging.Messsge<?>.
6. 消息鸣谢
当您使用 Spring for Apache Pulsar 时,消息确认由框架处理,除非应用程序选择退出。 在本节中,我们将详细介绍框架如何处理消息确认。
6.1. 消息 ACK 模式
Spring for Apache Pulsar 提供了以下确认消息的模式:
-
BATCH -
RECORD -
MANUAL
BATCH确认模式是默认模式,但您可以在 Message Listener Container (消息侦听器) 容器上更改它。
在以下部分中,我们将了解在同时使用PulsarListener以及它们如何转换为后备消息侦听器容器(并最终转换为 Pulsar 消费者)。
6.2. 单记录模式下的自动消息确认
让我们重新审视一下我们基本的基于PulsarListener:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
很自然地想知道,当您使用PulsarListener,特别是如果你熟悉直接使用 Pulsar consumer 的话。
答案归结为消息侦听器容器,因为这是 Spring for Apache Pulsar 中协调所有与消费者相关的活动的中心位置。
假设您没有覆盖默认行为,那么当您使用前面的PulsarListener:
-
首先,侦听器容器从 Pulsar 消费者批量接收消息。
-
收到的消息将传递给
PulsarListener一次一条消息。 -
当所有记录都传递给侦听器方法并成功处理时,容器将确认来自原始批处理的所有消息。
这是正常流程。如果原始批处理中的任何记录引发异常, Spring for Apache Pulsar 会单独跟踪这些记录。
当批处理中的所有记录都被处理完时, Spring for Apache Pulsar 会确认所有成功的消息,并否定地确认 (nack) 所有失败的消息。
换句话说,当使用PulsarRecordMessageListener和默认的 ACK 模式BATCH时,框架会等待从batchReceive调用以成功处理,然后调用acknowledge方法。
如果任何特定记录在调用处理程序方法时引发异常,Spring for Apache Pulsar 会跟踪这些记录并单独调用negativeAcknowledge在处理整个批次之后的这些记录上。
如果应用程序希望每条记录发生确认或否定确认,则RECORD可以启用 ACK 模式。
在这种情况下,在处理每条记录后,如果没有错误,则确认消息,如果有错误,则否定确认消息。
以下示例启用RECORDack 模式:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.RECORD)
public void listen(String message) {
System.out.println("Message Received: " + message);
}
6.3. 单记录模式下的手动消息确认
您可能并不总是希望框架发送确认,而是直接从应用程序本身发送。 Spring for Apache Pulsar 提供了几种启用手动消息确认的方法。以下示例显示了其中之一:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Acknowledgment acknowledgment) {
System.out.println("Message Received: " + message.getValue());
acknowledgment.acknowledge();
}
有几件事值得在这里解释。首先,我们通过设置ackMode上PulsarListener.
启用手动 ack 模式时, Spring for Apache Pulsar 允许应用程序注入Acknowledgment对象。
框架通过选择兼容的消息侦听器容器来实现此目的:PulsarAcknowledgingMessageListener对于基于单个记录的消费,它允许您访问Acknowledgment对象。
这Acknowledgmentobject 提供了以下 API 方法:
void acknowledge();
void acknowledge(MessageId messageId);
void acknowledge(List<MessageId> messageIds);
void nack();
void nack(MessageId messageId);
你可以注入这个Acknowledgmentobject 导入到PulsarListener使用MANUALack 模式,然后调用相应的方法之一。
在前面的PulsarListener例如,我们将无参数acknowledge方法。
这是因为框架知道哪个Message它目前正在 Under.
调用acknowledge(),则无需接收带有Messageenveloper' 的 URL,而是使用 target 类型 —String,在此示例中。
您还可以调用acknowledge通过提供消息 ID:acknowledge.acknowledge(message.getMessageId());当您使用acknowledge(messageId),您必须使用Message<?>信封。
与可能的确认类似,AcknowledgmentAPI 还提供了用于否定确认的选项。
请参阅前面显示的 nack 方法。
您还可以调用acknowledge直接在 Pulsar 消费者上:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Consumer<String> consumer) {
System.out.println("Message Received: " + message.getValue());
try {
consumer.acknowledge(message);
}
catch (Exception e) {
....
}
}
调用acknowledge直接在底层 Consumer 上,你需要自己做错误处理。
使用Acknowledgment不需要这样做,因为框架可以为您执行此作。
因此,您应该使用AcknowledgmentObject 方法。
| 使用手动确认时,请务必了解框架完全不执行任何确认。 因此,在设计应用程序时考虑正确的确认策略非常重要。 |
6.4. 批量消费自动确认消息
当您批量使用记录时(请参阅“消息 ACK 模式”),并且使用默认的 ack 模式BATCH,则当成功处理整个批处理时,将确认整个批处理。
如果任何记录引发异常,则对整个批处理进行否定确认。
请注意,这可能不是在生产者端批处理的同一批次。相反,这是从调用batchReceive在消费者身上
请考虑以下批处理侦听器:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", batch = true)
public void batchListen(List<Foo> messages) {
for (Foo foo : messages) {
...
}
}
当传入集合中的所有邮件 (messages在此示例中)进行处理,则框架会确认所有这些 ID。
在批处理模式下使用时,RECORD不是允许的 ACK 模式。
这可能会导致问题,因为应用程序可能不希望再次重新交付整个批处理。
在这种情况下,您需要使用MANUAL确认模式。
6.5. 批量消费时手动消息确认
如上一节所示,当MANUALack 模式,框架不做任何确认,无论是肯定的还是否定的。
处理此类问题完全取决于应用程序。
什么时候MANUALack 模式,则 Spring for Apache Pulsar 会选择兼容的消息侦听器容器:PulsarBatchAcknowledgingMessageListener用于批量使用,这使您可以访问Acknowledgment对象。
以下是Acknowledgment应用程序接口:
void acknowledge();
void acknowledge(MessageId messageId);
void acknowledge(List<MessageId> messageIds);
void nack();
void nack(MessageId messageId);
你可以注入这个Acknowledgmentobject 导入到PulsarListener使用MANUALACK 模式。
下面的清单显示了基于批处理的侦听器的基本示例:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(List<Message<String>> messgaes, Acknowlegement acknowledgment) {
for (Message<String> message : messages) {
try {
...
acknowledgment.acknowledge(message.getMessageId());
}
catch (Exception e) {
acknowledgment.nack(message.getMessageId());
}
}
}
使用批处理侦听器时,消息侦听器容器无法知道它当前正在处理哪个记录。
因此,要手动确认,您需要使用一个重载的acknowledge方法,该方法采用MessageId或List<MessageId>.
您也可以使用MessageId对于批处理侦听器。
7. 消息重新投递和错误处理
现在我们已经看到了两者PulsarListener以及消息侦听器容器基础设施及其各种功能,现在让我们尝试了解消息重投和错误处理。
Apache Pulsar 提供了各种用于消息重新传递和错误处理的原生策略。我们来看看它们,看看如何通过 Spring for Apache Pulsar 使用它们。
7.1. 指定消息重新投递的确认超时
默认情况下,除非 Consumer 崩溃,否则 Pulsar Consumer 不会重新传递消息,但你可以通过在 Pulsar Consumer 上设置 ack 超时来更改此行为。 如果 ack timeout 属性的值大于零,并且 Pulsar 消费者在该超时时间内没有确认消息,则重新传递该消息。
使用 Spring for Apache Pulsar 时,可以通过消费者定制器或本机 Pulsar 设置此属性ackTimeoutMillis属性在properties属性@PulsarListener:
@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
properties = {"ackTimeoutMillis=60000"})
public void listen(String s) {
...
}
当你指定 ack 超时时,如果 Consumer 在 60 秒内没有发送确认,则 Pulsar 会将消息重新投递给 Consumer。
如果要为具有不同延迟的 ack timeout 指定一些高级回退选项,可以执行以下作:
@EnablePulsar
@Configuration
class AckTimeoutRedeliveryConfig {
@PulsarListener(subscriptionName = "withAckTimeoutRedeliveryBackoffSubscription",
topics = "withAckTimeoutRedeliveryBackoff-test-topic",
ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff",
properties = { "ackTimeoutMillis=60000" })
void listen(String msg) {
// some long-running process that may cause an ack timeout
}
@Bean
RedeliveryBackoff ackTimeoutRedeliveryBackoff() {
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
.build();
}
}
在前面的示例中,我们为 Pulsar 的RedeliveryBackoff最小延迟为 1 秒,最大延迟为 10 秒,回退乘数为 2。
在发生初始 ack 超时后,通过此回退 Bean 控制消息重新传递。
我们将 backoff bean 提供给PulsarListener注解,方法是将ackTimeoutRedeliveryBackoffproperty 设置为实际的 bean 名称 —ackTimeoutRedeliveryBackoff,在本例中。
7.2. 指定否定确认重新传递
当否定确认时,Pulsar consumer 允许你指定应用程序希望如何重新传递消息。
默认设置是在一分钟内重新传递消息,但您可以通过消费者定制器或使用本机 Pulsar 进行更改negativeAckRedeliveryDelay属性在properties属性@PulsarListener:
@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
properties = {"negativeAckRedeliveryDelay=10ms"})
public void listen(String s) {
...
}
您还可以通过提供RedeliveryBackoffBean 并提供 Bean 名称作为negativeAckRedeliveryBackoffproperty 属性,如下所示:
@EnablePulsar
@Configuration
class NegativeAckRedeliveryConfig {
@PulsarListener(subscriptionName = "withNegRedeliveryBackoffSubscription",
topics = "withNegRedeliveryBackoff-test-topic", negativeAckRedeliveryBackoff = "redeliveryBackoff",
subscriptionType = SubscriptionType.Shared)
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@Bean
RedeliveryBackoff redeliveryBackoff() {
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
.build();
}
}
7.3. 使用 Apache Pulsar 的死信主题进行消息重投和错误处理
Apache Pulsar 允许应用程序在消费者上使用死信主题,并带有Shared订阅类型。
对于Exclusive和Failover订阅类型,则此功能不可用。
基本思想是,如果消息重试一定次数(可能是由于 ack 超时或 nack 重新传递),一旦重试次数用完,就可以将消息发送到称为死信队列 (DLQ) 的特殊主题。
让我们通过检查一些代码片段来了解有关此功能的一些细节:
@EnablePulsar
@Configuration
class DeadLetterPolicyConfig {
@PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
subscriptionType = SubscriptionType.Shared, properties = { "ackTimeoutMillis=1000" })
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@PulsarListener(id = "dlqListener", topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}
@Bean
DeadLetterPolicy deadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}
}
首先,我们有一个特殊的 beanDeadLetterPolicy,它被命名为deadLetterPolicy(您可以根据需要使用任何名称)。
这个 bean 指定了许多内容,例如最大交付量(在本例中为 10)和死信主题的名称 —my-dlq-topic,在本例中。
如果未指定 DLQ 主题名称,则默认为<topicname>-<subscriptionname>-DLQ在 Pulsar 中。
接下来,我们将此 bean 名称提供给PulsarListener通过设置deadLetterPolicy财产。
请注意,PulsarListener订阅类型的值为Shared,因为 DLQ 功能仅适用于共享订阅。
此代码主要用于演示目的,因此我们提供了一个ackTimeoutMillis值为 1000。
这个想法是代码抛出异常,如果 Pulsar 在 1 秒内没有收到 ack,它会重试。
如果该周期持续 10 次(因为这是我们在DeadLetterPolicy),则 Pulsar consumer 会将消息发布到 DLQ 主题。
我们还有另一个PulsarListener侦听 DLQ 主题以在发布到 DLQ 主题时接收数据。
7.4. Spring for Apache Pulsar 中的原生错误处理
正如我们之前提到的,Apache Pulsar 中的 DLQ 功能仅适用于共享订阅。
如果应用程序需要对非共享订阅使用一些类似的功能,该怎么办?
Pulsar 不支持独占订阅和故障转移订阅的 DLQ 的主要原因是因为这些订阅类型是有订单保证的。
允许重新投递、DLQ 等实际上会不按顺序接收消息。
但是,如果应用程序对此没有问题,但更重要的是,需要此 DLQ 功能用于非共享订阅,该怎么办?
为此,Spring for Apache Pulsar 提供了一个PulsarConsumerErrorHandler,您可以在 Pulsar 中的任何订阅类型中使用它:Exclusive,Failover,Shared或Key_Shared.
当您使用PulsarConsumerErrorHandler从 Spring for Apache Pulsar 开始,请确保不要在侦听器上设置 ack timeout 属性。
让我们通过检查一些代码片段来了解一些细节:
@EnablePulsar
@Configuration
class PulsarConsumerErrorHandlerConfig {
@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
PulsarTemplate<String> pulsarTemplate) {
return new DefaultPulsarConsumerErrorHandler<>(
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}
@PulsarListener(id = "pulsarConsumerErrorHandler-id", subscriptionName = "pulsatConsumerErrorHandler-subscription",
topics = "pulsarConsumerErrorHandler-topic",
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler")
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@PulsarListener(id = "pceh-dltListener", topics = "my-foo-dlt")
void listenDlt(String msg) {
System.out.println("From DLT: " + msg);
}
}
考虑一下pulsarConsumerErrorHandler豆。
这将创建一个PulsarConsumerErrorHandler并使用 Spring for Apache Pulsar 提供的开箱即用的默认实现:DefaultPulsarConsumerErrorHandler.DefaultPulsarConsumerErrorHandler有一个构造函数,该构造函数采用PulsarMessageRecovererFactory以及org.springframework.util.backoff.Backoff.PulsarMessageRecovererFactory是具有以下 API 的功能性接口:
@FunctionalInterface
public interface PulsarMessageRecovererFactory<T> {
/**
* Provides a message recoverer {@link PulsarMessageRecoverer}.
* @param consumer Pulsar consumer
* @return {@link PulsarMessageRecoverer}.
*/
PulsarMessageRecoverer<T> recovererForConsumer(Consumer<T> consumer);
}
这recovererForConsumermethod 接受一个 Pulsar Consumer 并返回一个PulsarMessageRecoverer,这是另一个功能接口。
下面是 APIPulsarMessageRecoverer:
public interface PulsarMessageRecoverer<T> {
/**
* Recover a failed message, for e.g. send the message to a DLT.
* @param message Pulsar message
* @param exception exception from failed message
*/
void recoverMessage(Message<T> message, Exception exception);
}
Spring for Apache Pulsar 为PulsarMessageRecovererFactory叫PulsarDeadLetterPublishingRecoverer,它提供了一个默认实现,可以通过将消息发送到死信主题 (DLT) 来恢复消息。
我们将此实现提供给前面的构造函数DefaultPulsarConsumerErrorHandler.
作为第二个参数,我们提供了一个FixedBackOff.
您还可以提供ExponentialBackoff来自 Spring 的高级回退功能。
然后我们为PulsarConsumerErrorHandler作为属性添加到PulsarListener.
该属性称为pulsarConsumerErrorHandler.
每次使用PulsarListener方法失败,则会重试。
重试次数由Backoff提供的实现值。在我们的示例中,我们执行 10 次重试(总共 11 次尝试 — 第一次,然后是 10 次重试)。
用尽所有重试后,消息将发送到 DLT 主题。
这PulsarDeadLetterPublishingRecovererimplementation 我们使用PulsarTemplate用于将消息发布到 DLT。
在大多数情况下,相同的自动配置PulsarTemplate从 Spring Boot 就足够了,但需要注意分区主题。
使用分区主题并为主主题使用自定义消息路由时,您必须使用不同的PulsarTemplate那不采用自动配置的PulsarProducerFactory,该值填充为custompartition为message-routing-mode.
您可以使用PulsarConsumerErrorHandler使用以下蓝图:
@Bean
PulsarConsumerErrorHandler<Integer> pulsarConsumerErrorHandler(PulsarClient pulsarClient) {
PulsarProducerFactory<Integer> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Map.of());
PulsarTemplate<Integer> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);
BiFunction<Consumer<?>, Message<?>, String> destinationResolver =
(c, m) -> "my-foo-dlt";
PulsarDeadLetterPublishingRecoverer<Integer> pulsarDeadLetterPublishingRecoverer =
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, destinationResolver);
return new DefaultPulsarConsumerErrorHandler<>(pulsarDeadLetterPublishingRecoverer,
new FixedBackOff(100, 5));
}
请注意,我们为PulsarDeadLetterPublishingRecoverer作为第二个 constructor 参数。
如果未提供,则PulsarDeadLetterPublishingRecoverer使用<subscription-name>-<topic-name>-DLT>作为 DLT 主题名称。
使用此功能时,您应该通过设置目标解析程序而不是使用默认值来使用正确的目标名称。
当使用单记录消息侦听器时,就像我们对PulsarConsumerErrorHnadler,如果使用手动确认,请确保在引发异常时不要否定确认消息。
相反,将异常重新引发回容器。否则,容器会认为消息是单独处理的,不会触发错误处理。
最后,我们有第二个PulsarListener接收来自 DLT 主题的消息。
到目前为止,在本节提供的示例中,我们只看到了如何使用PulsarConsumerErrorHandler使用单个记录消息侦听器。
接下来,我们看看如何在批处理侦听器上使用它。
7.5. 使用 PulsarConsumerErrorHandler 的批处理监听器
首先,让我们看一个批次PulsarListener方法:
@PulsarListener(subscriptionName = "batch-demo-5-sub", topics = "batch-demo-4", batch = true, concurrency = "3",
subscriptionType = SubscriptionType.Failover,
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler", ackMode = AckMode.MANUAL)
void listen(List<Message<Integer>> data, Consumer<Integer> consumer, Acknowledgment acknowledgment) {
for (Message<Integer> datum : data) {
if (datum.getValue() == 5) {
throw new PulsarBatchListenerFailedException("failed", datum);
}
acknowledgement.acknowledge(datum.getMessageId());
}
}
@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
PulsarTemplate<String> pulsarTemplate) {
return new DefaultPulsarConsumerErrorHandler<>(
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}
@PulsarListener(subscriptionName = "my-dlt-subscription", topics = "my-foo-dlt")
void dltReceiver(Message<Integer> message) {
System.out.println("DLT - RECEIVED: " + message.getValue());
}
我们再次提供pulsarConsumerErrorHandler属性替换为PulsarConsumerErrorHandlerBean 名称。
当您使用批处理侦听器(如前面的示例所示)并希望使用PulsarConsumerErrorHandler从 Spring for Apache Pulsar 开始,您需要使用手动确认。
这样,您可以确认所有成功的单个消息。
对于失败的 API,您必须抛出一个PulsarBatchListenerFailedException替换为失败的消息。
如果没有此异常,框架不知道如何处理失败。
重试时,容器会向侦听器发送一批新的消息,从失败的消息开始。
如果再次失败,则重试,直到重试次数用完,此时消息将发送到 DLT。
此时,容器会确认消息,并将原始批处理中的后续消息移交给侦听器。
8. PulsarListener 上的消费者定制
Spring for Apache Pulsar 提供了一种便捷的方式来自定义由PulsarListener.
应用程序可以为PulsarListenerConsumerBuilderCustomizer.
下面是一个示例。
@Bean
public PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
return cb -> {
cb.subscriptionName("modified-subscription-name");
};
}
然后,可以将此定制器 Bean 名称作为PuslarListenerannotation 中,如下所示。
@PulsarListener(subscriptionName = "my-subscription",
topics = "my-topic", consumerCustomizer = "myCustomizer")
void listen(String message) {
}
框架通过PulsarListener并在创建 Pulsar Consumer 之前将此定制器应用于 Consumer 构建器。
如果您有多个PulsarListener方法,并且每个方法都有不同的自定义规则,则应创建多个定制器 bean 并在每个 bean 上附加适当的定制器PulsarListener.
9. 暂停和恢复消息侦听器容器
在某些情况下,应用程序可能希望暂时暂停消息使用,然后稍后恢复。 Spring for Apache Pulsar 提供了暂停和恢复底层消息侦听器容器的能力。 当 Pulsar 消息侦听器容器暂停时,容器为从 Pulsar 消费者接收数据而进行的任何轮询都将被暂停。 同样,当容器恢复时,如果主题在暂停时添加了任何新记录,则下一次轮询将开始返回数据。
要暂停或恢复侦听器容器,请首先通过PulsarListenerEndpointRegistrybean,然后在容器实例上调用 pause/resume API - 如下面的代码段所示:
@Autowired
private PulsarListenerEndpointRegistry registry;
void someMethod() {
PulsarMessageListenerContainer container = registry.getListenerContainer("my-listener-id");
container.pause();
}
传递给getListenerContainer是容器 ID - 这将是@PulsarListenerid 属性@PulsarListener. |
10. Pulsar Reader 支持
该框架支持通过PulsarReaderFactory.
Spring Boot 提供了这个读取器工厂,您可以通过指定任何spring.pulsar.reader.*应用程序属性。
10.1. PulsarReader 注解
虽然可以使用PulsarReaderFactory直接,Spring for Apache Pulsar 提供了PulsarReader注解,你可以用它来快速读取主题,而无需自己设置任何 Reader 工厂。
这与背后的相同想法类似PulsarListener.下面是一个简短的示例。
@PulsarReader(id = "reader-demo-id", topics = "reader-demo-topic", startMessageId = "earliest")
void read(String message) {
//...
}
这id属性是可选的,但最佳实践是提供对应用程序有意义的值。
如果未指定,则将使用自动生成的 ID。
另一方面,topics和startMessageId属性是必需的。
这topicsattribute 可以是单个主题或逗号分隔的主题列表。
这startMessageId属性指示读者从主题中的特定消息开始。
的有效值startMessageId是earliest或latest.假设您希望读者从最早或最新的可用消息以外的主题开始任意读取消息。在这种情况下,您需要使用ReaderBuilderCustomizer要自定义ReaderBuilder所以它知道正确的MessageId开始。
10.2. 自定义 ReaderBuilder
您可以通过ReaderBuilder使用PulsarReaderReaderBuilderCustomizer在 Spring 中用于 Apache Pulsar。
您可以提供@Bean的类型PulsarReaderReaderBuilderCustomizer,然后将其提供给PulsarReader如下。
@PulsarReader(id = "reader-customizer-demo-id", topics = "reader-customizer-demo-topic",
readerCustomizer = "myCustomizer")
void read(String message) {
//...
}
@Bean
public PulsarReaderReaderBuilderCustomizer<String> myCustomizer() {
return readerBuilder -> {
readerBuilder.startMessageId(messageId); // the first message read is after this message id.
// Any other customizations on the readerBuilder
};
}
如果您的应用程序只有一个@PulsarReader和一个PulsarReaderReaderBuilderCustomizerbean 已注册,则将自动应用定制器。 |