消息消费
1. 脉冲星监听器
对于 Pulsar 消费者,我们建议最终用户应用程序使用PulsarListener
注解。 使用PulsarListener
,您需要使用@EnablePulsar
注解。 当您使用 Spring Boot 支持时,它会自动启用此注释并配置所有必要的组件PulsarListener
,例如消息侦听器基础设施(负责创建 Pulsar 消费者)。PulsarMessageListenerContainer
使用PulsarConsumerFactory
创建和管理 Pulsar 消费者,即它用于消费消息的底层 Pulsar 消费者。
Spring Boot 提供了这个消费者工厂,您可以通过指定spring.pulsar.consumer.*
应用程序属性。
让我们重新审视一下PulsarListener
我们在快速浏览部分看到的代码片段:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
您可以进一步简化此方法:
@PulsarListener
public void listen(String message) {
System.out.println("Message Received: " + message);
}
在这种最基本的形式中,当subscriptionName
上未提供@PulsarListener
注释将使用自动生成的订阅名称。同样,当topics
未直接提供,则使用主题解析过程来确定目标主题。
在PulsarListener
方法,我们收到的数据为String
,但我们没有指定任何模式类型。在内部,框架依赖于 Pulsar 的模式机制将数据转换为所需的类型。框架检测到您期望的String
类型,然后根据该信息推断架构类型,并将该架构提供给使用者。框架对所有原始类型执行此推理。对于所有非原始类型,默认架构假定为 JSON。如果复杂类型使用 JSON 以外的任何内容(例如 AVRO 或 KEY_VALUE),则必须使用schemaType
财产。
以下示例显示了另一个PulsarListener
方法,该方法采用Integer
:
@PulsarListener(subscriptionName = "my-subscription-1", topics = "my-topic-1")
public void listen(Integer message) {
System.out.println(message);
}
以下内容PulsarListener
方法展示了我们如何从主题中使用复杂类型:
@PulsarListener(subscriptionName = "my-subscription-2", topics = "my-topic-2", schemaType = SchemaType.JSON)
public void listen(Foo message) {
System.out.println(message);
}
让我们再看看几种方法。
您可以直接使用 Pulsar 消息:
@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println(message.getValue());
}
以下示例使用 Spring 消息信封使用记录:
@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.springframework.messaging.Message<String> message) {
System.out.println(message.getPayload());
}
现在让我们看看如何批量使用记录。
以下示例使用PulsarListener
要将批量记录作为 POJO 使用:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Foo> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message));
}
请注意,在此示例中,我们将记录作为集合 (List
) 的对象。
此外,要在PulsarListener
级别,您需要将batch
属性设置为true
.
根据实际类型,该List
holds,则框架会尝试推断要使用的模式。
如果List
除了 JSON 之外,还包含复杂类型,您仍然需要提供schemaType
上PulsarListener
.
以下使用Message
Pulsar Java 客户端提供的信封:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Message<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}
以下示例使用带有 Spring 消息传递信封的批处理记录Message
类型:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<org.springframework.messaging.Message<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getPayload()));
}
最后,您还可以使用Messages
holder 对象:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(org.apache.pulsar.client.api.Messages<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}
当您使用PulsarListener
,您可以直接在注释本身上提供 Pulsar 消费者属性。
如果您不想使用前面提到的启动配置属性或有多个PulsarListener
方法。
以下示例直接在PulsarListener
:
@PulsarListener(properties = { "subscriptionName=subscription-1", "topicNames=foo-1", "receiverQueueSize=5000" })
void listen(String message) {
}
使用的属性是直接的 Pulsar 使用者属性,而不是spring.pulsar.consumer 应用程序配置属性 |
1.1. 带有AUTO_CONSUME的通用记录
如果没有机会提前知道 Pulsar 主题的 schema 类型,可以使用AUTO_CONSUME
schema 类型来使用通用记录。
在这种情况下,主题将消息反序列化为GenericRecord
对象使用与主题关联的架构信息。
要使用通用记录,请将schemaType = SchemaType.AUTO_CONSUME
在您的@PulsarListener
并使用GenericRecord
作为 message 参数,如下所示。
@PulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
void listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
GenericRecord record = message.getValue();
record.getFields().forEach((f) ->
System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
}
这GenericRecord API 允许访问字段及其关联值 |
1.2. 自定义 ConsumerBuilder
您可以通过以下方式自定义任何可用字段ConsumerBuilder
使用PulsarListenerConsumerBuilderCustomizer
通过提供@Bean
类型PulsarListenerConsumerBuilderCustomizer
然后将其提供给PulsarListener
如下图所示。
@PulsarListener(topics = "hello-topic", consumerCustomizer = "myCustomizer")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
@Bean
PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
return (builder) -> builder.consumerName("myConsumer");
}
如果您的应用程序只有一个@PulsarListener 和单个PulsarListenerConsumerBuilderCustomizer bean 注册,则定制器将自动应用。 |
2. 指定模式信息
如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出要在PulsarListener
.
对于非原始类型,如果未在 Comments 上显式指定 Schema,则 Spring for Apache Pulsar 框架将尝试构建一个Schema.JSON
从类型。
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_CONSUME KEY_VALUE 带内联编码。 |
2.1. 自定义模式映射
作为在PulsarListener
对于复杂类型,可以使用类型的映射来配置模式解析器。
这样就无需在侦听器上设置模式,因为框架使用传入消息类型咨询解析器。
2.1.1. 配置属性
架构映射可以使用spring.pulsar.defaults.type-mappings
财产。
以下示例使用application.yml
为User
和Address
复杂对象使用AVRO
和JSON
schemas,分别:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
这message-type 是消息类的完全限定名称。 |
2.1.2. 模式解析器定制器
添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,可以提供架构解析器定制器来添加映射。
以下示例使用架构解析器定制器为User
和Address
复杂对象使用AVRO
和JSON
schemas,分别:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.1.3. 类型映射注解
指定要用于特定消息类型的默认模式信息的另一个选项是使用@PulsarMessage
注解。
架构信息可以通过schemaType
属性。
以下示例将系统配置为在生成或使用Foo
:
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
使用此配置后,无需在侦听器上设置架构,例如:
@PulsarListener(subscriptionName = "user-sub", topics = "user-topic")
public void listen(User user) {
System.out.println(user);
}
3. 访问 Pulsar 消费者对象
有时,您需要直接访问 Pulsar Consumer 对象。以下示例显示了如何获取它:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message, org.apache.pulsar.client.api.Consumer<String> consumer) {
System.out.println("Message Received: " + message);
ConsumerStats stats = consumer.getStats();
...
}
当访问Consumer 对象,不要调用任何会通过调用任何接收方法来更改使用者光标位置的作。所有此类作都必须由容器完成。 |
4. Pulsar 消息监听器容器
现在我们看到了消费者端的基本交互PulsarListener
.现在让我们深入了解如何PulsarListener
与底层 Pulsar 使用者交互。
请记住,对于最终用户应用程序,在大多数情况下,我们建议使用PulsarListener
注释,以便在使用 Spring for Apache Pulsar 时直接从 Pulsar 主题中使用,因为该模型涵盖了广泛的应用程序用例。
但是,重要的是要了解如何PulsarListener
在内部工作。本节介绍这些细节。
如前所述,当您使用 Spring for Apache Pulsar 时,消息监听器容器是消息消费的核心。PulsarListener
在幕后使用消息侦听器容器基础设施来创建和管理 Pulsar 消费者。
Spring for Apache Pulsar 通过PulsarMessageListenerContainer
.
此消息侦听器容器的默认实现是通过DefaultPulsarMessageListenerContainer
.
顾名思义,PulsarMessageListenerContainer
包含消息侦听器。
容器创建 Pulsar 使用者,然后运行单独的线程来接收和处理数据。
数据由提供的消息侦听器实现处理。
消息侦听器容器使用消费者的batchReceive
方法。 收到数据后,它将移交给选定的消息侦听器实现。
当您使用 Spring for Apache Pulsar 时,可以使用以下消息侦听器类型。
我们将在以下部分中看到有关这些不同消息侦听器的详细信息。
然而,在此之前,让我们仔细看看容器本身。
4.1. 默认 PulsarMessageListenerContainer
这是一个基于消费者的单个消息侦听器容器。以下列表显示了其构造函数:
public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties)
}
它收到一个PulsarConsumerFactory
(它用来创建消费者)和一个PulsarContainerProperties
对象(包含有关容器属性的信息)。PulsarContainerProperties
具有以下构造函数:
public PulsarContainerProperties(String... topics)
public PulsarContainerProperties(Pattern topicPattern)
您可以通过以下方式提供主题信息PulsarContainerProperties
或作为提供给使用者工厂的使用者属性。以下示例使用DefaultPulsarMessageListenerContainer
:
Map<String, Object> config = new HashMap<>();
config.put("topics", "my-topic");
PulsarConsumerFactory<String> pulsarConsumerFactorY = DefaultPulsarConsumerFactory<>(pulsarClient, config);
PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();
pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
});
DefaultPulsarMessageListenerContainer<String> pulsarListenerContainer = new DefaultPulsarMessageListenerContainer(pulsarConsumerFacotyr,
pulsarContainerProperties);
return pulsarListenerContainer;
DefaultPulsarMessageListenerContainer
仅创建单个使用者。
如果要通过多个线程管理多个消费者,则需要使用ConcurrentPulsarMessageListenerContainer
.
4.2. ConcurrentPulsarMessageListenerContainer
ConcurrentPulsarMessageListenerContainer
具有以下构造函数:
public ConcurrentPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties)
ConcurrentPulsarMessageListenerContainer
允许您指定concurrency
属性。
并发性超过1
仅允许非独占订阅 (failover
,shared
和key-shared
).
您只能拥有默认的1
当您具有独占订阅模式时,用于并发。
以下示例启用concurrency
通过PulsarListener
注释failover
订阅。
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Failover, concurrency = "3")
void listen(String message, Consumer<String> consumer) {
...
System.out.println("Current Thread: " + Thread.currentThread().getName());
System.out.println("Current Consumer: " + consumer.getConsumerName());
}
在前面的侦听器中,假定主题my-topic
有三个分区。
如果是未分区的主题,则将并发设置为3
什么都不做。除了主要的活动消费者之外,您还会得到两个空闲消费者。
如果主题具有三个以上的分区,则消息将在容器创建的使用者之间进行负载均衡。
如果运行此PulsarListener
,则您会看到来自不同分区的消息通过不同的使用者使用,如前面示例中的线程名称和使用者名称打印输出所暗示的那样。
当您使用Failover 订阅,Pulsar 保证消息排序。 |
以下列表显示了另一个示例PulsarListener
,但与Shared
subscription 和concurrency
启用。
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Shared, concurrency = "5")
void listen(String message) {
...
}
在前面的示例中,PulsarListener
创建五个不同的使用者(这一次,我们假设主题有五个分区)。
在此版本中,没有消息排序,因为Shared 订阅不保证 Pulsar 中的任何消息排序。 |
如果您需要消息排序,并且仍然需要共享订阅类型,则需要使用Key_Shared
订阅类型。
4.3. 消费记录
让我们看看消息侦听器容器如何实现单记录和基于批处理的消息消费。
单条记录消耗
让我们重新审视我们的基本PulsarListener
为了这个讨论:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
有了这个PulsarListener
方法,我们必须要求 Spring for Apache Pulsar 每次调用带有一条记录的监听器方法。
我们提到消息侦听器容器使用batchReceive
方法。
框架检测到PulsarListener
在这种情况下,接收一条记录。这意味着,在每次调用该方法时,它都需要一个单记录。
尽管消息侦听器容器会批量使用记录,但它会循环访问接收到的批处理,并通过适配器调用侦听器方法PulsarRecordMessageListener
.
正如您在上一节中看到的,PulsarRecordMessageListener
从MessageListener
由 Pulsar Java 客户端提供,它支持基本的received
方法。
批量消耗
以下示例显示了PulsarListener
批量消费记录:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen4(List<Foo> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message));
}
当您使用这种类型的PulsarListener
,则框架检测到您处于批处理模式。
由于它已经使用消费者的batchReceive
方法,它通过适配器将整个批次交给监听器方法,用于PulsarBatchMessageListener
.
5. 脉冲星接头
Pulsar 消息元数据可以作为 Spring 消息头使用。可用头的列表可以在 PulsarHeaders.java 中找到。
5.1. 在基于单记录的消费者中访问
以下示例显示了如何在使用单记录消费模式的应用程序中访问各种 Pulsar 标头:
@PulsarListener(topics = "simpleListenerWithHeaders")
void simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header(PulsarHeaders.RAW_DATA) byte[] rawData,
@Header("foo") String foo) {
}
在前面的示例中,我们访问messageId
和rawData
消息元数据以及名为foo
. Spring@Header
注释用于每个标题字段。
您还可以使用 Pulsar 的Message
作为信封来承载有效负载。这样做时,用户可以直接调用 Pulsar 消息上的相应方法来检索元数据。但是,为了方便起见,您也可以使用Header
注解。 请注意,您还可以使用 Spring 消息传递Message
信封来承载有效负载,然后使用@Header
.
5.2. 在基于批处理记录的消费者中访问
在本节中,我们将了解如何在使用批处理消费者的应用程序中访问各种 Pulsar 标头:
@PulsarListener(topics = "simpleBatchListenerWithHeaders", batch = true)
void simpleBatchListenerWithHeaders(List<String> data,
@Header(PulsarHeaders.MESSAGE_ID) List<MessageId> messageIds,
@Header(PulsarHeaders.TOPIC_NAME) List<String> topicNames, @Header("foo") List<String> fooValues) {
}
在前面的示例中,我们将数据用作List<String>
. 在提取各种标头时,我们作为List<>
也。 Spring for Apache Pulsar 确保标头列表与数据列表相对应。
当您使用批处理侦听器时,您还可以以相同的方式提取标头,并接收有效负载作为List<org.apache.pulsar.client.api.Message<?>
,org.apache.pulsar.client.api.Messages<?>
或org.springframework.messaging.Messsge<?>
.
6. 消息确认
当您使用 Spring for Apache Pulsar 时,消息确认由框架处理,除非应用程序选择退出。在本节中,我们将详细介绍框架如何处理消息确认。
6.1. 消息确认模式
Spring for Apache Pulsar 提供了以下用于确认消息的模式:
-
BATCH
-
RECORD
-
MANUAL
BATCH
确认模式是默认模式,但您可以在消息侦听器容器上更改它。
在以下部分中,我们将了解同时使用PulsarListener
以及它们如何转换为支持消息侦听器容器(并最终转换为 Pulsar 消费者)。
6.2. 单记录模式下的自动消息确认
让我们重新审视我们基于基本的单一消息PulsarListener
:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
很自然地想知道,当您使用PulsarListener
,特别是如果您熟悉直接使用 Pulsar 消费者。
答案归结为消息侦听器容器,因为它是 Spring for Apache Pulsar 中的中心位置,用于协调所有与消费者相关的活动。
假设您没有覆盖默认行为,那么当您使用上述内容时,这就是在后台发生的情况PulsarListener
:
-
首先,侦听器容器以批处理形式接收来自 Pulsar 消费者的消息。
-
收到的消息将传递给
PulsarListener
一次一条消息。 -
当所有记录都传递给侦听器方法并成功处理时,容器将确认原始批处理中的所有消息。
这是正常流程。如果原始批次中的任何记录抛出异常,Spring for Apache Pulsar 会单独跟踪这些记录。
当处理批处理中的所有记录时,Spring for Apache Pulsar 会确认所有成功的消息,并否定确认(nack)所有失败的消息。
换句话说,当使用PulsarRecordMessageListener
默认的 ack 模式为BATCH
,则框架会等待从batchReceive
调用 process 成功,然后调用acknowledge
方法。
如果任何特定记录在调用处理程序方法时抛出异常,Spring for Apache Pulsar 会跟踪这些记录并单独调用negativeAcknowledge
在处理整个批次后,在这些记录上。
如果应用程序希望每条记录发生确认或否定确认,则RECORD
可以启用 ACK 模式。
在这种情况下,在处理每条记录后,如果没有错误,则确认消息,如果出现错误,则否定确认。
以下示例启用RECORD
ack 模式:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.RECORD)
public void listen(String message) {
System.out.println("Message Received: " + message);
}
6.3. 单记录模式下的手动消息确认
您可能并不总是希望框架发送确认,而是直接从应用程序本身执行此作。 Spring for Apache Pulsar 提供了几种启用手动消息确认的方法。以下示例显示了其中之一:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Acknowledgment acknowledgment) {
System.out.println("Message Received: " + message.getValue());
acknowledgment.acknowledge();
}
这里有几件事值得解释。首先,我们通过设置ackMode
上PulsarListener
.
启用手动确认模式时,Spring for Apache Pulsar 允许应用程序注入Acknowledgment
对象。 该框架通过选择兼容的消息侦听器容器来实现这一点:PulsarAcknowledgingMessageListener
对于基于单条记录的使用,这使您可以访问Acknowledgment
对象。
这Acknowledgment
object 提供了以下 API 方法:
void acknowledge();
void acknowledge(MessageId messageId);
void acknowledge(List<MessageId> messageIds);
void nack();
void nack(MessageId messageId);
您可以注入此Acknowledgment
object 输入到您的PulsarListener
使用MANUAL
ack 模式,然后调用相应的方法之一。
在前面的PulsarListener
例如,我们调用一个无参数acknowledge
方法。 这是因为框架知道哪个Message
它目前在下运行。调用时acknowledge()
,则无需接收带有Message
enveloper',而是使用目标类型——String
,在此示例中。您还可以调用acknowledge
通过提供消息 ID:acknowledge.acknowledge(message.getMessageId());
当您使用acknowledge(messageId)
,您必须使用Message<?>
信封。
与可能的确认类似,确认的Acknowledgment
API 还提供了否定确认的选项。
请参阅前面显示的 nack 方法。
您也可以调用acknowledge
直接在 Pulsar 消费者身上:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Consumer<String> consumer) {
System.out.println("Message Received: " + message.getValue());
try {
consumer.acknowledge(message);
}
catch (Exception e) {
....
}
}
调用时acknowledge
直接在底层消费者上,需要自己做错误处理。
使用Acknowledgment
不需要这样做,因为框架可以为您做到这一点。
因此,您应该使用Acknowledgment
使用手动确认时的对象方法。
使用手动确认时,重要的是要了解该框架完全不进行任何确认。 因此,在设计应用程序时考虑正确的确认策略非常重要。 |
6.4. 批量消费中的自动消息确认
当您批量使用记录时(请参阅“消息确认模式”),并且使用默认的确认模式BATCH
,当整个批处理成功时,确认整个批次。
如果任何记录引发异常,则整个批处理将被否定确认。
请注意,这可能与在生产者端批处理的批次不同。相反,这是从调用batchReceive
对消费者
考虑以下批处理侦听器:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", batch = true)
public void batchListen(List<Foo> messages) {
for (Foo foo : messages) {
...
}
}
当传入集合中的所有消息 (messages
在此示例中),框架会确认所有这些。
在批量模式下消费时,RECORD
不允许的确认模式。
这可能会导致问题,因为应用程序可能不希望再次重新传递整个批次。
在这种情况下,您需要使用MANUAL
确认模式。
6.5. 批量消费中的手动消息确认
如上一节所示,当MANUAL
ack 模式,则框架不执行任何确认,无论是正面还是负面。
完全取决于应用程序来处理此类问题。
什么时候MANUAL
ack 模式,则 Spring for Apache Pulsar 选择一个兼容的消息监听器容器:PulsarBatchAcknowledgingMessageListener
用于批量使用,这使您可以访问Acknowledgment
对象。 以下是Acknowledgment
应用程序接口:
void acknowledge();
void acknowledge(MessageId messageId);
void acknowledge(List<MessageId> messageIds);
void nack();
void nack(MessageId messageId);
您可以注入此Acknowledgment
object 输入到您的PulsarListener
使用MANUAL
ack 模式。以下列表显示了基于批处理的侦听器的基本示例:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(List<Message<String>> messgaes, Acknowlegement acknowledgment) {
for (Message<String> message : messages) {
try {
...
acknowledgment.acknowledge(message.getMessageId());
}
catch (Exception e) {
acknowledgment.nack(message.getMessageId());
}
}
}
使用批处理侦听器时,消息侦听器容器无法知道它当前正在哪个记录上运行。因此,要手动确认,您需要使用重载的acknowledge
采用MessageId
或List<MessageId>
. 您还可以使用MessageId
对于批处理侦听器。
7. 消息重新传递和错误处理
现在我们已经看到了两者PulsarListener
以及消息侦听器容器基础设施及其各种功能,现在让我们尝试了解消息重新传递和错误处理。Apache Pulsar 为消息重新传递和错误处理提供了各种本机策略。我们来看看它们,看看我们如何通过 Spring for Apache Pulsar 使用它们。
7.1. 指定消息重新传递的确认超时
默认情况下,除非使用者崩溃,否则 Pulsar 使用者不会重新传递消息,但您可以通过在 Pulsar 使用者上设置 ack 超时来更改此行为。如果 ack timeout 属性的值大于零,并且 Pulsar 使用者在该超时期限内未确认消息,则会重新传递该消息。
当您使用 Spring for Apache Pulsar 时,您可以通过消费者定制器或使用本机 Pulsar 设置此属性ackTimeoutMillis
属性中的properties
属性@PulsarListener
:
@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
properties = {"ackTimeoutMillis=60000"})
public void listen(String s) {
...
}
指定 ack 超时时,如果消费者在 60 秒内未发送确认,则 Pulsar 将消息重新传递给消费者。
如果要为具有不同延迟的确认超时指定一些高级回退选项,可以执行以下作:
@EnablePulsar
@Configuration
class AckTimeoutRedeliveryConfig {
@PulsarListener(subscriptionName = "withAckTimeoutRedeliveryBackoffSubscription",
topics = "withAckTimeoutRedeliveryBackoff-test-topic",
ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff",
properties = { "ackTimeoutMillis=60000" })
void listen(String msg) {
// some long-running process that may cause an ack timeout
}
@Bean
RedeliveryBackoff ackTimeoutRedeliveryBackoff() {
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
.build();
}
}
在前面的示例中,我们为 Pulsar 的RedeliveryBackoff
最小延迟为 1 秒,最大延迟为 10 秒,回退乘数为 2。
在初始确认超时发生后,消息重新传递通过此退避 Bean 进行控制。
我们将退避 bean 提供给PulsarListener
通过设置ackTimeoutRedeliveryBackoff
属性添加到实际的 bean 名称 —ackTimeoutRedeliveryBackoff
,在这种情况下。
7.2. 指定否定确认重新传递
当确认否定时,Pulsar 消费者允许您指定应用程序希望如何重新传递消息。
默认设置是在一分钟内重新传递消息,但您可以通过消费者定制器或使用本机 Pulsar 进行更改negativeAckRedeliveryDelay
属性中的properties
属性@PulsarListener
:
@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
properties = {"negativeAckRedeliveryDelay=10ms"})
public void listen(String s) {
...
}
您还可以通过提供RedeliveryBackoff
bean 并将 bean 名称作为negativeAckRedeliveryBackoff
PulsarProducer 上的属性,如下所示:
@EnablePulsar
@Configuration
class NegativeAckRedeliveryConfig {
@PulsarListener(subscriptionName = "withNegRedeliveryBackoffSubscription",
topics = "withNegRedeliveryBackoff-test-topic", negativeAckRedeliveryBackoff = "redeliveryBackoff",
subscriptionType = SubscriptionType.Shared)
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@Bean
RedeliveryBackoff redeliveryBackoff() {
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
.build();
}
}
7.3. 使用 Apache Pulsar 的死信主题进行消息重新传递和错误处理
Apache Pulsar 允许应用程序在消费者上使用死信主题,并使用Shared
订阅类型。对于Exclusive
和Failover
订阅类型,此功能不可用。基本思想是,如果消息重试了一定次数(可能是由于 ack 超时或 nack 重新传递),一旦重试次数用完,就可以将消息发送到称为死信队列 (DLQ) 的特殊主题。让我们通过检查一些代码片段来了解此功能的一些细节:
@EnablePulsar
@Configuration
class DeadLetterPolicyConfig {
@PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
subscriptionType = SubscriptionType.Shared, properties = { "ackTimeoutMillis=1000" })
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@PulsarListener(id = "dlqListener", topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}
@Bean
DeadLetterPolicy deadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}
}
首先,我们有一个特殊的豆子DeadLetterPolicy
,并将其命名为deadLetterPolicy
(它可以是任何名称)。这个 bean 指定了许多内容,例如最大交付量(在本例中为 10)和死信主题的名称 —my-dlq-topic
,在本例中。如果未指定 DLQ 主题名称,则默认为<topicname>-<subscriptionname>-DLQ
在 Pulsar 中。接下来,我们将此 bean 名称提供给PulsarListener
通过将deadLetterPolicy
财产。 请注意,PulsarListener
订阅类型为Shared
,因为 DLQ 功能仅适用于共享订阅。此代码主要用于演示目的,因此我们提供了ackTimeoutMillis
值为 1000。这个想法是代码抛出异常,如果 Pulsar 在 1 秒内没有收到 ack,它就会进行重试。如果该循环持续十次(因为这是我们在DeadLetterPolicy
),Pulsar 消费者将消息发布到 DLQ 主题。我们还有另一个PulsarListener
侦听 DLQ 主题以接收发布到 DLQ 主题的数据。
7.4. Apache Pulsar 的 Spring 中的本机错误处理
正如我们之前提到的,Apache Pulsar 中的 DLQ 功能仅适用于共享订阅。
如果应用程序需要对非共享订阅使用一些类似的功能,该怎么办?
Pulsar 不支持独占和故障转移订阅的 DLQ 的主要原因是因为这些订阅类型是订单保证的。
允许重新传递、DLQ 等有效地接收无序消息。
但是,如果应用程序可以接受,但更重要的是,非共享订阅需要此 DLQ 功能怎么办?
为此,Spring for Apache Pulsar 提供了一个PulsarConsumerErrorHandler
,您可以在 Pulsar 中的任何订阅类型中使用:Exclusive
,Failover
,Shared
或Key_Shared
.
当您使用PulsarConsumerErrorHandler
从 Spring for Apache Pulsar 中,请确保不要在侦听器上设置 ack 超时属性。
让我们通过检查一些代码片段来了解一些细节:
@EnablePulsar
@Configuration
class PulsarConsumerErrorHandlerConfig {
@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
PulsarTemplate<String> pulsarTemplate) {
return new DefaultPulsarConsumerErrorHandler<>(
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}
@PulsarListener(id = "pulsarConsumerErrorHandler-id", subscriptionName = "pulsatConsumerErrorHandler-subscription",
topics = "pulsarConsumerErrorHandler-topic",
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler")
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@PulsarListener(id = "pceh-dltListener", topics = "my-foo-dlt")
void listenDlt(String msg) {
System.out.println("From DLT: " + msg);
}
}
考虑一下pulsarConsumerErrorHandler
豆。
这将创建一个类型为PulsarConsumerErrorHandler
并使用 Spring 为 Apache Pulsar 提供的开箱即用的默认实现:DefaultPulsarConsumerErrorHandler
.DefaultPulsarConsumerErrorHandler
有一个构造函数,该构造函数采用PulsarMessageRecovererFactory
和org.springframework.util.backoff.Backoff
.PulsarMessageRecovererFactory
是一个具有以下 API 的功能接口:
@FunctionalInterface
public interface PulsarMessageRecovererFactory<T> {
/**
* Provides a message recoverer {@link PulsarMessageRecoverer}.
* @param consumer Pulsar consumer
* @return {@link PulsarMessageRecoverer}.
*/
PulsarMessageRecoverer<T> recovererForConsumer(Consumer<T> consumer);
}
这recovererForConsumer
方法采用 Pulsar 消费者并返回PulsarMessageRecoverer
,这是另一个功能接口。
这是PulsarMessageRecoverer
:
public interface PulsarMessageRecoverer<T> {
/**
* Recover a failed message, for e.g. send the message to a DLT.
* @param message Pulsar message
* @param exception exception from failed message
*/
void recoverMessage(Message<T> message, Exception exception);
}
Spring for Apache Pulsar 提供了一个实现PulsarMessageRecovererFactory
叫PulsarDeadLetterPublishingRecoverer
提供默认实现,可以通过将消息发送到死信主题 (DLT) 来恢复消息。
我们将此实现提供给前面的构造函数DefaultPulsarConsumerErrorHandler
.
作为第二个参数,我们提供了一个FixedBackOff
.
您还可以提供ExponentialBackoff
来自 Spring 的高级退避功能。
然后我们为PulsarConsumerErrorHandler
作为属性添加到PulsarListener
.
该属性称为pulsarConsumerErrorHandler
.
每次PulsarListener
方法失败,则重试。
重试次数由Backoff
提供的实现值。在我们的示例中,我们进行了 10 次重试(总共 11 次尝试 - 第一次尝试,然后是 10 次重试)。
一旦所有重试都用完,消息将发送到 DLT 主题。
这PulsarDeadLetterPublishingRecoverer
我们提供的实现使用PulsarTemplate
用于将消息发布到 DLT。
在大多数情况下,相同的自动配置PulsarTemplate
来自 Spring Boot 就足够了,但对分区主题有警告。
使用分区主题并对主主题使用自定义消息路由时,必须使用不同的PulsarTemplate
不采用自动配置的PulsarProducerFactory
填充了值custompartition
为message-routing-mode
.
您可以使用PulsarConsumerErrorHandler
使用以下蓝图:
@Bean
PulsarConsumerErrorHandler<Integer> pulsarConsumerErrorHandler(PulsarClient pulsarClient) {
PulsarProducerFactory<Integer> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Map.of());
PulsarTemplate<Integer> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);
BiFunction<Consumer<?>, Message<?>, String> destinationResolver =
(c, m) -> "my-foo-dlt";
PulsarDeadLetterPublishingRecoverer<Integer> pulsarDeadLetterPublishingRecoverer =
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, destinationResolver);
return new DefaultPulsarConsumerErrorHandler<>(pulsarDeadLetterPublishingRecoverer,
new FixedBackOff(100, 5));
}
请注意,我们为PulsarDeadLetterPublishingRecoverer
作为第二个构造函数参数。
如果未提供,PulsarDeadLetterPublishingRecoverer
使用<subscription-name>-<topic-name>-DLT>
作为 DLT 主题名称。
使用此功能时,应通过设置目标解析器来使用正确的目标名称,而不是使用默认值。
当使用单个记录消息侦听器时,就像我们所做的那样PulsarConsumerErrorHnadler
,如果使用手动确认,请确保在抛出异常时不要负向确认消息。
相反,将异常重新抛回容器。否则,容器会认为消息是单独处理的,并且不会触发错误处理。
最后,我们有了第二个PulsarListener
接收来自 DLT 主题的消息。
在本节提供的示例中,到目前为止,我们只看到了如何使用PulsarConsumerErrorHandler
使用单个记录消息侦听器。
接下来,我们看看如何在批量侦听器上使用它。
7.5. 使用 PulsarConsumerErrorHandler 的批量监听器
首先,让我们看看一批PulsarListener
方法:
@PulsarListener(subscriptionName = "batch-demo-5-sub", topics = "batch-demo-4", batch = true, concurrency = "3",
subscriptionType = SubscriptionType.Failover,
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler", ackMode = AckMode.MANUAL)
void listen(List<Message<Integer>> data, Consumer<Integer> consumer, Acknowledgment acknowledgment) {
for (Message<Integer> datum : data) {
if (datum.getValue() == 5) {
throw new PulsarBatchListenerFailedException("failed", datum);
}
acknowledgement.acknowledge(datum.getMessageId());
}
}
@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
PulsarTemplate<String> pulsarTemplate) {
return new DefaultPulsarConsumerErrorHandler<>(
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}
@PulsarListener(subscriptionName = "my-dlt-subscription", topics = "my-foo-dlt")
void dltReceiver(Message<Integer> message) {
System.out.println("DLT - RECEIVED: " + message.getValue());
}
再一次,我们提供了pulsarConsumerErrorHandler
属性替换为PulsarConsumerErrorHandler
bean 名称。
当您使用批处理侦听器(如前面的示例所示)并希望使用PulsarConsumerErrorHandler
从 Spring for Apache Pulsar 开始,您需要使用手动确认。
这样,您就可以确认所有成功的单个消息。
对于失败的,您必须抛出一个PulsarBatchListenerFailedException
与失败的消息。
如果没有这个例外,框架就不知道该如何处理故障。
重试时,容器会向侦听器发送一批新的消息,从失败的消息开始。
如果再次失败,则重试,直到重试用尽,此时消息将发送到 DLT。
此时,容器确认消息,侦听器将与原始批处理中的后续消息一起移交。
8. PulsarListener 上的消费者定制
Spring for Apache Pulsar 提供了一种方便的方法来自定义由PulsarListener
.
应用程序可以提供一个 beanPulsarListenerConsumerBuilderCustomizer
.
这是一个例子。
@Bean
public PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
return cb -> {
cb.subscriptionName("modified-subscription-name");
};
}
然后,可以将此定制器 bean 名称作为属性提供PuslarListener
注释,如下所示。
@PulsarListener(subscriptionName = "my-subscription",
topics = "my-topic", consumerCustomizer = "myCustomizer")
void listen(String message) {
}
框架通过PulsarListener
并在创建 Pulsar 消费者之前在消费者构建器上应用此定制器。
如果您有多个PulsarListener
方法,并且每个方法都有不同的自定义规则,您应该创建多个定制器 bean 并在每个 bean 上附加适当的定制器PulsarListener
.
9. 消息监听器容器生命周期
9.1. 暂停和恢复
在某些情况下,应用程序可能希望暂时暂停消息使用,然后稍后恢复。 Spring for Apache Pulsar 提供了暂停和恢复底层消息侦听器容器的功能。 当 Pulsar 消息侦听器容器暂停时,容器为从 Pulsar 消费者接收数据而执行的任何轮询都将暂停。 同样,当容器恢复时,如果主题在暂停时添加了任何新记录,则下一次轮询将开始返回数据。
要暂停或恢复监听器容器,请首先通过PulsarListenerEndpointRegistry
bean,然后在容器实例上调用暂停/恢复 API - 如下面的代码片段所示:
@Autowired
private PulsarListenerEndpointRegistry registry;
void someMethod() {
PulsarMessageListenerContainer container = registry.getListenerContainer("my-listener-id");
container.pause();
}
传递给getListenerContainer 是容器 ID - 这将是@PulsarListener id 属性,暂停/恢复@PulsarListener . |
10. Pulsar 读卡器支持
Spring Boot 提供了这个读取器工厂,您可以通过指定任何spring.pulsar.reader.*
应用程序属性。
10.1. PulsarReader 注解
虽然可以使用PulsarReaderFactory
直接,Spring for Apache Pulsar 提供了PulsarReader
注释,您可以使用它来快速阅读主题,而无需自己设置任何阅读器工厂。这类似于背后的相同想法PulsarListener.
这是一个简单的例子。
@PulsarReader(id = "reader-demo-id", topics = "reader-demo-topic", startMessageId = "earliest")
void read(String message) {
//...
}
这id
属性是可选的,但最佳实践是提供对应用程序有意义的值。如果未指定,将使用自动生成的 ID。另一方面,topics
和startMessageId
属性是强制性的。 这topics
属性可以是单个主题,也可以是逗号分隔的主题列表。 这startMessageId
属性指示读者从主题中的特定消息开始。的有效值startMessageId
是earliest
或latest.
假设您希望读者从最早或最新的可用消息以外的主题开始任意读取消息。在这种情况下,您需要使用ReaderBuilderCustomizer
自定义ReaderBuilder
所以它知道正确的MessageId
从开始。
10.2. 自定义 ReaderBuilder
您可以通过以下方式自定义任何可用字段ReaderBuilder
使用PulsarReaderReaderBuilderCustomizer
在 Apache Pulsar 的 Spring 中。
您可以提供一个@Bean
类型PulsarReaderReaderBuilderCustomizer
,然后将其提供给PulsarReader
如下。
@PulsarReader(id = "reader-customizer-demo-id", topics = "reader-customizer-demo-topic",
readerCustomizer = "myCustomizer")
void read(String message) {
//...
}
@Bean
public PulsarReaderReaderBuilderCustomizer<String> myCustomizer() {
return readerBuilder -> {
readerBuilder.startMessageId(messageId); // the first message read is after this message id.
// Any other customizations on the readerBuilder
};
}
如果您的应用程序只有一个@PulsarReader 和单个PulsarReaderReaderBuilderCustomizer bean 注册,则定制器将自动应用。 |