消息消费
1. 脉冲星监听器
对于Pulsar消费者,我们建议终端用户应用使用脉冲星听者注解。
使用脉冲星听者,你需要使用@EnablePulsar注解。
当你使用Spring Boot支持时,它会自动启用这个注释,并配置所有必要的组件脉冲星听者,例如消息监听者基础设施(负责创建Pulsar消费者)。PulsarMessageListenerContainer使用脉冲星消费者工厂创建和管理 Pulsar 消费者,即其用于接收消息的底层 Pulsar 消费者。
Spring Boot 提供了这个消费者工厂,你可以通过指定Spring.脉冲星.消费者。*应用属性。
让我们重新审视脉冲星听者我们在快速参观区看到的代码片段:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
你还可以进一步简化这个方法:
@PulsarListener
public void listen(String message) {
System.out.println("Message Received: " + message);
}
在这种最基本的形式中,当订阅名称在@PulsarListener注释会使用自动生成的订阅名称。同样,当主题这些内容并未直接提供,而是通过主题解决过程来确定目标主题。
在脉冲星听者前面展示的方法,我们接收数据为字符串但我们不指定任何模式类型。框架内部依赖Pulsar的模式机制将数据转换为所需类型。框架检测到您期望字符串类型,然后根据该信息推断模式类型,并将该模式提供给消费者。框架对所有原语类型进行这种推断。对于所有非原始类型,默认模式假设为JSON。如果复杂类型使用了除JSON以外的任何形式(如AVRO或KEY_VALUE),你必须在注释中通过schemaType财产。
以下示例展示了另一个脉冲星听者方法,该方法具有整数:
@PulsarListener(subscriptionName = "my-subscription-1", topics = "my-topic-1")
public void listen(Integer message) {
System.out.println(message);
}
如下脉冲星听者方法说明我们如何从一个主题中获取复杂型:
@PulsarListener(subscriptionName = "my-subscription-2", topics = "my-topic-2", schemaType = SchemaType.JSON)
public void listen(Foo message) {
System.out.println(message);
}
让我们再看看几种方式。
你可以直接接收脉冲星消息:
@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println(message.getValue());
}
以下示例通过使用 Spring 消息封函来消耗记录:
@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.springframework.messaging.Message<String> message) {
System.out.println(message.getPayload());
}
现在让我们看看如何批量消费记录。以下示例使用脉冲星听者以POJO批量消费记录:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Foo> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message));
}
注意,在这个例子中,我们以集合的形式接收记录(列表)的对象。此外,为了在脉冲星听者你需要设置Batch注释上的属性为true.
根据实际类型列表成立时,框架尝试推断出要使用的模式。如果列表除了 JSON 之外,包含一个复杂类型,你仍然需要提供schemaType上脉冲星听者.
以下内容使用了消息由 Pulsar Java 客户端提供的包络:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Message<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}
以下示例使用带有Spring消息封装的批处理记录消息类型:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<org.springframework.messaging.Message<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getPayload()));
}
最后,你还可以使用消息Pulsar 中批次监听器的 holder 对象:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(org.apache.pulsar.client.api.Messages<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}
当你使用脉冲星听者你可以直接在注释本身上提供Pulsar消费者属性。如果你不想使用前面提到的Boot配置属性或拥有多个属性,这非常方便脉冲星听者方法。
以下示例直接使用脉冲星消费者属性于脉冲星听者:
@PulsarListener(properties = { "subscriptionName=subscription-1", "topicNames=foo-1", "receiverQueueSize=5000" })
void listen(String message) {
}
所用的属性是直接的Pulsar消费者特性,而不是spring.pulsar.consumer应用配置属性 |
1.1. 带有 AUTO_CONSUME 的通用记录
如果无法提前知道 Pulsar 主题的模式类型,你可以使用AUTO_CONSUMEschema类型以消耗通用记录。在这种情况下,主题将消息反序列化为通用记录对象使用与主题关联的模式信息。
要消费通用记录,设置schemaType = SchemaType.AUTO_CONSUME在你的@PulsarListener并使用类型的脉冲星消息通用记录作为消息参数,如下所示。
@PulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
void listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
GenericRecord record = message.getValue();
record.getFields().forEach((f) ->
System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
}
这通用记录API 允许访问字段及其相关值 |
1.2. 定制消费者构建器
你可以通过以下方式自定义任何可用的字段消费者建设者使用PulsarListenerConsumerBuilderCustomizer通过提供一个@Bean类型PulsarListenerConsumerBuilderCustomizer然后将它提供给脉冲星听者如下所示。
@PulsarListener(topics = "hello-topic", consumerCustomizer = "myCustomizer")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
@Bean
PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
return (builder) -> builder.consumerName("myConsumer");
}
如果你的申请只有一个@PulsarListener以及一首单曲PulsarListenerConsumerBuilderCustomizerBean注册后,自定义器会自动应用。 |
2. 规范模式信息
如前所述,对于 Java 原语,Spring for Apache Pulsar 框架可以推断出用于脉冲星听者. 对于非原语类型,如果注释中没有明确指定模式,Spring for Apache Pulsar 框架将尝试构建Schema.JSON从类型上看。
| 目前支持的复杂模式类型有JSON、AVRO、PROTOBUF、AUTO_CONSUME、KEY_VALUE带内联编码。 |
2.1. 自定义模式映射
作为指定模式的替代方案脉冲星听者对于复杂类型,模式解析器可以配置为类型映射。这样就无需在框架通过接收消息类型咨询解析器时设置模式。
2.1.1. 配置属性
模式映射可以通过以下配置配置spring.pulsar.defaults.type-mappings财产。
以下示例使用application.yml为 添加映射用户和地址复杂对象使用阿弗罗和JSON分别是 schema:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
这消息类型是消息类的全限定名称。 |
2.1.2. Schema resolver customizer
添加映射的首选方法是上述属性。 不过,如果需要更多控制,你可以提供模式解析器自定义工具来添加映射。
以下示例使用模式解析器自定义工具来添加映射用户和地址复杂对象使用阿弗罗和JSON分别是 schema:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.1.3. 类型映射注释
另一种指定特定消息类型默认模式信息的选项是用@PulsarMessage注解。
模式信息可以通过以下方式指定schemaType注释上的属性。
以下示例配置系统在生成或使用类型消息时,使用 JSON 作为默认模式福:
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
有了这种配置,就无需在监听者上设置模式,例如:
@PulsarListener(subscriptionName = "user-sub", topics = "user-topic")
public void listen(User user) {
System.out.println(user);
}
3. 访问脉冲星消费者对象
有时,你需要直接访问Pulsar消费者对象。 以下示例展示了如何获得它:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message, org.apache.pulsar.client.api.Consumer<String> consumer) {
System.out.println("Message Received: " + message);
ConsumerStats stats = consumer.getStats();
...
}
访问消费者通过调用接收方法,不要调用任何会改变消费者光标位置的作。
所有此类作都必须由容器完成。 |
4. 脉冲星消息监听器容器
现在我们看到了消费者端的基本交互脉冲星听者.现在让我们深入了解其内部运作脉冲星听者与底层的脉冲星消费者互动。
请记住,对于终端用户应用,在大多数情况下,我们建议使用脉冲星听者在使用 Spring for Apache Pulsar 时,直接从 Pulsar 主题中获取注释,因为该模型涵盖了广泛的应用场景。
然而,理解其原理非常重要脉冲星听者内部工作。本节将详细介绍这些细节。
如前所述,消息监听器容器是使用 Spring for Apache Pulsar 消息消费的核心。脉冲星听者在幕后使用消息监听器容器基础设施来创建和管理Pulsar消费者。
Spring for Apache Pulsar 通过以下方式为该消息监听器容器提供合同PulsarMessageListenerContainer.
该消息监听器容器的默认实现通过以下方式提供DefaultPulsarMessageListenerContainer.
顾名思义,PulsarMessageListenerContainer包含消息监听器。
容器创建 Pulsar 消费者,然后运行一个独立线程来接收和处理数据。
数据由提供的消息监听器实现处理。
消息监听器容器通过使用消费者的batchReceive方法。
数据接收后,会交给所选的消息监听器实现。
当你使用 Apache Pulsar 的 Spring 时,可以使用以下消息监听器类型。
我们将在接下来的章节中了解这些不同信息监听者的详细信息。
不过在此之前,让我们仔细看看这个容器本身。
4.1. DefaultPulsarMessageListenerContainer
这是一个基于消费者的单一消息监听器容器。 以下列表展示了其制造商:
public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties)
}
它获得了脉冲星消费者工厂(它用来创建消费者)以及一个脉冲星容器属性对象(包含容器属性信息)。脉冲星容器属性具有以下构造子:
public PulsarContainerProperties(String... topics)
public PulsarContainerProperties(Pattern topicPattern)
你可以通过以下方式提供相关主题信息脉冲星容器属性或者作为提供给消费工厂的消费品。
以下示例使用了DefaultPulsarMessageListenerContainer:
Map<String, Object> config = new HashMap<>();
config.put("topics", "my-topic");
PulsarConsumerFactory<String> pulsarConsumerFactorY = DefaultPulsarConsumerFactory<>(pulsarClient, config);
PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();
pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
});
DefaultPulsarMessageListenerContainer<String> pulsarListenerContainer = new DefaultPulsarMessageListenerContainer(pulsarConsumerFacotyr,
pulsarContainerProperties);
return pulsarListenerContainer;
如果在直接使用监听器容器时未指定主题信息,则使用相同的主题解析过程脉冲星听者使用的唯一例外是省略了“消息类型默认”步骤。 |
DefaultPulsarMessageListenerContainer只能创造一个消费者。
如果你想通过多个线程管理多个消费者,你需要使用ConcurrentPulsarMessageListenerContainer.
4.2. 并发脉冲星MessageListenerContainer
ConcurrentPulsarMessageListenerContainer具有如下构造子:
public ConcurrentPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties)
ConcurrentPulsarMessageListenerContainer你可以指定一个并发通过二传手获得财产。
并发于1仅允许在非独家订阅上使用(备援切换,共享和密钥共享).
你只能用默认的1当你有独家订阅模式时,这主要是为了并发。
以下示例使得并发通过脉冲星听者注释备援切换订阅。
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Failover, concurrency = "3")
void listen(String message, Consumer<String> consumer) {
...
System.out.println("Current Thread: " + Thread.currentThread().getName());
System.out.println("Current Consumer: " + consumer.getConsumerName());
}
在前一位听者中,假设主题为我的话题有三个分区。
如果是非分区主题,并发设置为3什么都没做。除了主要的活跃用户外,还有两个闲置用户。
如果主题有超过三个分区,则消息会在容器创建的消费者之间实现负载均衡。
如果你运行这个脉冲星听者你会看到来自不同分区的消息通过不同的消费者被消耗,正如前述示例中线程名称和消费者名打印出来的所示。
当你使用故障切换通过这种方式订阅分区主题,Pulsar保证消息顺序。 |
以下列表展示了另一个例子脉冲星听者,但共享订阅及并发启用。
@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Shared, concurrency = "5")
void listen(String message) {
...
}
在前面的例子中,脉冲星听者创建五个不同的消费者(这次假设主题有五个分区)。
在此版本中,没有消息排序,因为共享订阅不保证在Pulsar中会有消息排序。 |
如果你需要消息排序,同时又想要共享订阅类型,你需要使用Key_Shared订阅类型。
4.3. 消费唱片
让我们来看看消息监听器容器如何支持单记录和批处理消息的使用。
单次创纪录消耗
让我们重新回顾一下基本情况脉冲星听者为了讨论方便:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
用这个脉冲星听者方法中,我们本质上要求 Spring for Apache Pulsar 每次调用一个记录的监听者方法。
我们提到消息监听器容器会通过batchReceive对消费者采取方法。
该框架检测到脉冲星听者在这种情况下,接收到一条记录。这意味着每次调用该方法时,都需要一个单记录。
虽然记录被消息监听器容器批量消耗,但它会遍历接收批次,并通过适配器调用监听器方法,脉冲星记录消息监听器.
正如你在上一节看到的,脉冲星记录消息监听器从消息监听器由 Pulsar Java 客户端提供,并且支持基础收到方法。
批量消耗
以下示例展示了脉冲星听者批量消费记录:
@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen4(List<Foo> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message));
}
当你使用这种类型的脉冲星听者框架检测你处于批处理模式。
因为它已经通过使用消费者的batchReceive方法,它通过适配器将整个批次交接给监听者方法,PulsarBatchMessageListener.
5. 脉冲星头部
Pulsar 消息元数据可以作为 Spring 消息头部使用。 可用头部列表可在PulsarHeaders.java中找到。
5.1. 基于单一记录的消费者访问
以下示例展示了如何在使用单记录消费模式的应用中访问各种脉冲星头部:
@PulsarListener(topics = "simpleListenerWithHeaders")
void simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header(PulsarHeaders.RAW_DATA) byte[] rawData,
@Header("foo") String foo) {
}
在前面的例子中,我们访问了messageId(信息ID和原始数据消息元数据以及一个名为福.
Spring@Header每个头字段都使用注释。
你也可以用脉冲星消息作为承载有效载荷的包层。
在此过程中,用户可以直接调用 Pulsar 消息中的相应方法来获取元数据。
不过,为了方便,你也可以通过以下方式取回页眉注解。
注意你也可以使用春季消息消息包络 以携带有效载荷,然后通过以下方式获取 Pulsar 头部@Header.
5.2. 批处理记录访问基于消费者的应用
在本节中,我们将了解如何访问使用批处理消费者的应用程序中的各种脉冲星头:
@PulsarListener(topics = "simpleBatchListenerWithHeaders", batch = true)
void simpleBatchListenerWithHeaders(List<String> data,
@Header(PulsarHeaders.MESSAGE_ID) List<MessageId> messageIds,
@Header(PulsarHeaders.TOPIC_NAME) List<String> topicNames, @Header("foo") List<String> fooValues) {
}
在前面的例子中,我们以List<String>.
在提取各个头部时,我们会以名单<>也。
Spring for Apache Pulsar 确保头部列表对应于数据列表。
你也可以像用批处理监听器一样提取头部,接收有效载荷时,像List<org.apache.pulsar.client.api.Message<?>,org.apache.pulsar.client.api.Messages<?>或org.springframework.messaging.Messsge<?>.
6. 消息确认
当你使用 Spring 进行 Apache Pulsar 时,消息确认由框架处理,除非应用程序选择退出。 在本节中,我们将详细介绍该框架如何处理消息确认。
6.1. 消息确认模式
Apache Pulsar 的 Spring 提供了以下确认消息的模式:
-
Batch -
记录 -
手动
Batch确认模式是默认的,但你可以在消息监听器容器中更改。
在接下来的章节中,我们将展示当你同时使用单个版本和批量版本时,确认是如何工作的脉冲星听者以及它们如何转化为支持信息的听取器容器(最终转化为Pulsar消费者)。
6.2. 单记录模式下的自动消息确认
让我们重新审视基于单一信息的基本内容脉冲星听者:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
当你使用时,承认是如何运作的,这是很自然的脉冲星听者尤其是如果你熟悉直接使用Pulsar Consumer的话。
答案归结于消息监听器容器,因为这是Apache Pulsar春季中协调所有消费者相关活动的核心平台。
假设你没有覆盖默认行为,这就是你使用前述方法时幕后发生的事情脉冲星听者:
-
首先,监听器容器以批量方式接收来自 Pulsar 消费者的消息。
-
接收到的消息会传递给
脉冲星听者一条消息一条地来。 -
当所有记录都传递给监听器方法并成功处理后,容器会确认原始批次中的所有消息。
这是正常的流程。如果原始批次中的任何记录触发异常,Spring for Apache Pulsar会单独跟踪这些记录。
当批次中的所有记录都处理完毕后,Spring for Apache Pulsar会确认所有成功消息,并对所有失败消息负面确认(nack)。
换句话说,当使用单条记录时,使用脉冲星记录消息监听器以及默认的 ACK 模式Batch被使用时,框架等待从batchReceive调用 处理成功,然后调用承认在脉冲星消费者上进行方法。
如果某个特定记录在调用处理方法时抛出异常,Apache Pulsar 的 Spring 会跟踪这些记录并单独调用否定确认在整批处理完毕后,记录上会有记录。
如果应用程序希望每条记录都发生确认或负面确认,则记录可以启用ACK模式。
在这种情况下,处理完每条记录后,如果没有错误,消息会被确认;如果有错误,则会被否定确认。
以下示例使得记录Pulsar监听器的ack模式:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.RECORD)
public void listen(String message) {
System.out.println("Message Received: " + message);
}
6.3. 单记录模式下的手动消息确认
你可能不总是希望框架发送确认,而是直接从应用程序本身发送确认。 Spring for Apache Pulsar 提供了几种方式来启用手动消息确认。以下示例展示了其中一个:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Acknowledgment acknowledgment) {
System.out.println("Message Received: " + message.getValue());
acknowledgment.acknowledge();
}
这里有几点值得解释。首先,我们通过设置来启用手动获取模式ack模式上脉冲星听者.
启用手动 ack 模式时,Spring for Apache Pulsar 允许应用程序注入确认对象。
该框架通过选择兼容的消息监听器容器来实现这一目标:脉冲确认消息听众对于基于单条记录的消费,这让你能够访问确认对象。
这确认object 提供了以下 API 方法:
void acknowledge();
void acknowledge(MessageId messageId);
void acknowledge(List<MessageId> messageIds);
void nack();
void nack(MessageId messageId);
你可以注射这个确认物体进入你的脉冲星听者在使用手动然后调用相应的方法之一。
在前述中脉冲星听者例如,我们称 无参数承认方法。
这是因为框架知道具体情况消息目前正在运营。
呼唤时确认(),你不必通过消息但实际上,使用目标类型——字符串,在本例中。
你也可以调用另一个变体承认通过提供消息ID:acknowledge.acknowledge(message.getMessageId());当你使用acknowledge(messageId),你必须通过使用留言<?>信封。
类似于确认的可能范围,确认API 还提供负向确认的选项。
参见前面展示的nack方法。
你也可以打电话承认直接在Pulsar消费者端:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Consumer<String> consumer) {
System.out.println("Message Received: " + message.getValue());
try {
consumer.acknowledge(message);
}
catch (Exception e) {
....
}
}
呼唤时承认直接针对底层消费者,你需要自己处理错误。
使用确认不需要,因为框架可以帮你做到这一点。
因此,你应该使用确认使用手动确认时采用对象方式。
| 使用手动确认时,重要的是要明白该框架完全不包含任何确认。 因此,在设计应用程序时,思考正确的确认策略极为重要。 |
6.4. 批量消耗中的自动消息 Ack
当你批量消费记录(参见“消息确认模式”)并使用默认的确认模式Batch当整个批次成功处理时,整个批次都会被确认。
如果有任何记录抛出异常,整个批次都会被负面确认。
请注意,这可能不是生产方批次的同一批。而是那批刚从电话回来的batchReceive关于消费者
考虑以下批次监听器:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", batch = true)
public void batchListen(List<Foo> messages) {
for (Foo foo : messages) {
...
}
}
当所有消息都出现在来的集合中(消息在此示例中)被处理,框架承认所有这些。
批量使用时,记录不允许使用ACK模式。
这可能会引发问题,因为应用程序可能不希望整个批次再次被重新交付。
在这种情况下,你需要使用手动确认模式。
6.5. 批量使用中的手动消息 ack
如前一节所示,当手动ack 模式设置为消息监听器容器,框架不进行任何确认,无论是正面还是负面。
这些问题完全取决于申请方来处理。
什么时候手动ack 模式已设置,Spring for Apache Pulsar 选择一个兼容的消息监听器容器:PulsarBatchAcknowledgeledgingMessageListener批量使用,这能让你获得确认对象。
以下是可用的方法确认应用程序接口:
void acknowledge();
void acknowledge(MessageId messageId);
void acknowledge(List<MessageId> messageIds);
void nack();
void nack(MessageId messageId);
你可以注射这个确认物体进入你的脉冲星听者在使用手动ACK模式。
以下列表展示了基于批处理的监听器的基本示例:
@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(List<Message<String>> messgaes, Acknowlegement acknowledgment) {
for (Message<String> message : messages) {
try {
...
acknowledgment.acknowledge(message.getMessageId());
}
catch (Exception e) {
acknowledgment.nack(message.getMessageId());
}
}
}
使用批处理监听器时,消息监听器容器无法知道当前正在作哪个记录。
因此,要手动确认,你需要使用其中一个超载的承认方法MessageId(信息ID)或者List<MessageId>.
你也可以用MessageId(信息ID)给批次听众。
7. 消息重发与错误处理
既然我们已经看到了两者脉冲星听者以及消息监听器容器基础设施及其各种功能,现在让我们尝试理解消息重投和错误处理。
Apache Pulsar 提供了多种原生策略用于消息重发和错误处理。我们会看看这些工具,看看如何在Spring for Apache Pulsar中使用它们。
7.1. 指定消息重投的确认超时
默认情况下,Pulsar 消费者不会重投消息,除非消费者崩溃,但你可以通过设置 Pulsar 消费者的 ack 超时来改变这种行为。 如果 ack 超时属性值大于零,且 Pulsar 用户在超时期间未确认消息,则消息会被重新传递。
当你使用 Spring 为 Apache Pulsar 设计时,可以通过消费者定制工具或原生 Pulsar 来设置该属性ackTimeoutMillis财产在性能属性@PulsarListener:
@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
properties = {"ackTimeoutMillis=60000"})
public void listen(String s) {
...
}
当你指定确认超时时,如果消费者在60秒内没有发送确认,Pulsar会将消息重新转达给消费者。
如果你想指定一些带有不同延迟的ack超时高级退回选项,可以这样做:
@EnablePulsar
@Configuration
class AckTimeoutRedeliveryConfig {
@PulsarListener(subscriptionName = "withAckTimeoutRedeliveryBackoffSubscription",
topics = "withAckTimeoutRedeliveryBackoff-test-topic",
ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff",
properties = { "ackTimeoutMillis=60000" })
void listen(String msg) {
// some long-running process that may cause an ack timeout
}
@Bean
RedeliveryBackoff ackTimeoutRedeliveryBackoff() {
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
.build();
}
}
在前面的例子中,我们指定一个豆子来表示脉冲星的重送退让最小延迟为1秒,最大延迟为10秒,退避乘数为2。
在初始确认超时后,消息的重送通过该退回豆进行控制。
我们会把后退豆提供给脉冲星听者通过设置ackTimeoutRedeliveryBackoff与实际Bean名称的财产——ackTimeoutRedeliveryBackoff,在这种情况下。
7.2. 指定负面确认重送
当确认为负面时,Pulsar 消费者允许你指定应用希望如何重新传递消息。
默认是一分钟内重新发送消息,但你可以通过消费者自定义工具或原生 Pulsar 来更改负面AckRedeliveryDelay财产在性能属性@PulsarListener:
@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
properties = {"negativeAckRedeliveryDelay=10ms"})
public void listen(String s) {
...
}
你还可以通过提供重送退让豆子并提供豆子名称否定AckRedeliveryBackoffPulsarProducer 上的财产,具体如下:
@EnablePulsar
@Configuration
class NegativeAckRedeliveryConfig {
@PulsarListener(subscriptionName = "withNegRedeliveryBackoffSubscription",
topics = "withNegRedeliveryBackoff-test-topic", negativeAckRedeliveryBackoff = "redeliveryBackoff",
subscriptionType = SubscriptionType.Shared)
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@Bean
RedeliveryBackoff redeliveryBackoff() {
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
.build();
}
}
7.3. 使用 Apache Pulsar 中的死字母主题进行消息重发和错误处理
Apache Pulsar 允许应用程序在消费者上使用死符主题,且共享订阅类型。
对于独家和故障切换订阅类型,此功能不可用。
基本思路是,如果消息被重试一定次数(可能是因为确认超时或 nack 重投),当重试次数用尽后,消息可以发送到一个称为死字母队列(DLQ)的特殊主题。
让我们通过检查一些代码片段来了解该功能的具体作细节:
@EnablePulsar
@Configuration
class DeadLetterPolicyConfig {
@PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
subscriptionType = SubscriptionType.Shared, properties = { "ackTimeoutMillis=1000" })
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@PulsarListener(id = "dlqListener", topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}
@Bean
DeadLetterPolicy deadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}
}
首先,我们有一种特殊的豆子DeadLetterPolicy,其名称为deadLetterPolicy(你可以随意取任何名字)。
这个豆子指定了许多内容,比如最大送达时间(这里是10)和死字母主题的名称——我的DLQ主题,在这种情况下。
如果你没有指定DLQ主题名称,默认为<topicname>-<subscriptionname>-DLQ在脉冲星。
接下来,我们将这个豆子命名为脉冲星听者通过设置deadLetterPolicy财产。
注意脉冲星听者拥有一种订阅类型共享,因为DLQ功能只适用于共享订阅。
本代码主要用于演示,因此我们提供ackTimeoutMillis价值1000。
其理念是代码抛出异常,如果 Pulsar 在 1 秒内未收到确认,则进行重试。
如果这个循环持续十次(因为这是我们最大的重送次数DeadLetterPolicy),Pulsar 消费者将消息发布到 DLQ 主题。
我们又来了脉冲星听者它监听 DLQ 主题,接收发布到该主题的数据。
7.4. Apache Pulsar 春季中的原生错误处理
正如我们之前提到的,Apache Pulsar 中的 DLQ 功能仅适用于共享订阅。
如果应用程序需要对非共享订阅使用类似功能,该怎么办?
Pulsar 不支持 DLQ 的独占和备用切换订阅的主要原因是这些订阅类型是订单保证的。
允许重投、DLQ等实际上是报纸顺序错乱。
但是,如果某个应用对此没问题,但更重要的是,非共享订阅需要这个DLQ功能呢?
为此,Spring for Apache Pulsar 提供了脉冲星消费者错误处理你可以在Pulsar中的任何订阅类型中使用:独家,故障切换,共享或Key_Shared.
当你使用脉冲星消费者错误处理来自Apache Pulsar的Spring,确保不要在监听器上设置ack超时属性。
让我们通过查看几个代码片段来了解一些细节:
@EnablePulsar
@Configuration
class PulsarConsumerErrorHandlerConfig {
@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
PulsarTemplate<String> pulsarTemplate) {
return new DefaultPulsarConsumerErrorHandler<>(
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}
@PulsarListener(id = "pulsarConsumerErrorHandler-id", subscriptionName = "pulsatConsumerErrorHandler-subscription",
topics = "pulsarConsumerErrorHandler-topic",
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler")
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@PulsarListener(id = "pceh-dltListener", topics = "my-foo-dlt")
void listenDlt(String msg) {
System.out.println("From DLT: " + msg);
}
}
考虑pulsarConsumerErrorHandler豆。
这就形成了 的脉冲星消费者错误处理并采用了 Spring 为 Apache Pulsar 默认提供的实现:默认脉冲消耗者错误处理.默认脉冲消耗者错误处理有一个构造子,使得脉冲星消息恢复工厂以及一个org.springframework.util.backoff.Backoff.脉冲星消息恢复工厂是一个功能接口,支持以下 API:
@FunctionalInterface
public interface PulsarMessageRecovererFactory<T> {
/**
* Provides a message recoverer {@link PulsarMessageRecoverer}.
* @param consumer Pulsar consumer
* @return {@link PulsarMessageRecoverer}.
*/
PulsarMessageRecoverer<T> recovererForConsumer(Consumer<T> consumer);
}
这recovererForConsumer(恢复者消费者)方法接收一个脉冲星消费者并返回脉冲星消息恢复器,这是另一个功能接口。
以下是 的 API脉冲星消息恢复器:
public interface PulsarMessageRecoverer<T> {
/**
* Recover a failed message, for e.g. send the message to a DLT.
* @param message Pulsar message
* @param exception exception from failed message
*/
void recoverMessage(Message<T> message, Exception exception);
}
Apache Pulsar 的 Spring 提供了一个实现脉冲星消息恢复工厂叫脉冲星死信出版恢复者该系统提供默认实现,可以通过发送消息到死信主题(DLT)来恢复消息。
我们向前述构造子提供该实现默认脉冲消耗者错误处理.
作为第二个参数,我们给出一个固定退后.
你也可以提供指数退缩来自Spring,用于高级退场功能。
然后我们给出这个豆名脉冲星消费者错误处理作为脉冲星听者.
该性质称为pulsarConsumerErrorHandler.
每当脉冲星听者如果该方法对消息失败,则会重新尝试。
重试次数由退避提供实现值。在我们的例子中,我们做了10次重试(总共11次——第一次再10次)。
当所有重试次数用尽后,消息会发送到DLT主题。
这脉冲星死信出版恢复者我们提供的实现是脉冲星模板用于向DLT发布消息。
在大多数情况下,同样的自动配置脉冲星模板仅仅针对分区主题有个前提。
使用分区主题并对主主题使用自定义消息路由时,必须使用不同的脉冲星模板这不需要自动配置脉冲星生产工厂该 值为自定义分区为消息路由模式.
你可以用脉冲星消费者错误处理并附有以下蓝图:
@Bean
PulsarConsumerErrorHandler<Integer> pulsarConsumerErrorHandler(PulsarClient pulsarClient) {
PulsarProducerFactory<Integer> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Map.of());
PulsarTemplate<Integer> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);
BiFunction<Consumer<?>, Message<?>, String> destinationResolver =
(c, m) -> "my-foo-dlt";
PulsarDeadLetterPublishingRecoverer<Integer> pulsarDeadLetterPublishingRecoverer =
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, destinationResolver);
return new DefaultPulsarConsumerErrorHandler<>(pulsarDeadLetterPublishingRecoverer,
new FixedBackOff(100, 5));
}
注意,我们为脉冲星死信出版恢复者作为第二个构造子论证。
如果没有提供,脉冲星死信出版恢复者使用<订阅名>-<topic-name>-DLT>作为DLT主题名称。
使用此功能时,你应该通过设置目标解析器来使用正确的目标名称,而不是默认名称。
使用单记录消息监听器时,就像我们之前做的那样PulsarConsumerErrorHnadler如果你使用手动确认,确保在异常出现时不要负面确认该消息。
而是将异常重新抛回容器。否则,容器会认为消息被单独处理,错误处理不会被触发。
最后,我们有第二个脉冲星听者该机构接收来自DLT主题的消息。
在本节迄今为止提供的示例中,我们只看到如何使用脉冲星消费者错误处理使用单一记录消息监听器。
接下来,我们会看看如何将它用于批量监听器。
7.5. 带有 PulsarConsumerErrorHandler 的批处理监听器
首先,让我们来看一批脉冲星听者方法:
@PulsarListener(subscriptionName = "batch-demo-5-sub", topics = "batch-demo-4", batch = true, concurrency = "3",
subscriptionType = SubscriptionType.Failover,
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler", ackMode = AckMode.MANUAL)
void listen(List<Message<Integer>> data, Consumer<Integer> consumer, Acknowledgment acknowledgment) {
for (Message<Integer> datum : data) {
if (datum.getValue() == 5) {
throw new PulsarBatchListenerFailedException("failed", datum);
}
acknowledgement.acknowledge(datum.getMessageId());
}
}
@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
PulsarTemplate<String> pulsarTemplate) {
return new DefaultPulsarConsumerErrorHandler<>(
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}
@PulsarListener(subscriptionName = "my-dlt-subscription", topics = "my-foo-dlt")
void dltReceiver(Message<Integer> message) {
System.out.println("DLT - RECEIVED: " + message.getValue());
}
我们再次提供pulsarConsumerErrorHandler具有脉冲星消费者错误处理豆子的名字。
当你使用批处理监听器(如前例所示)并想使用脉冲星消费者错误处理Apache Pulsar 的 Spring 需要手动确认。
这样,你可以确认所有成功的个人信息。
对于失败的,你必须掷出一个PulsarBatchListenerFailedException以及它失败的信息。
没有这个例外,框架就不知道该如何处理失败。
重试时,容器会从失败消息开始向监听者发送新一批消息。
如果再次失败,则会重试,直到重试次数用尽,消息才发送给DLT。
此时,容器确认消息,监听者会与原始批次中的后续消息一同交接。
8. PulsarListener上的消费者定制
Apache Pulsar 的 Spring 提供了一种方便的方式,可以自定义由所用容器创建的消费者脉冲星听者.
应用程序可以提供一个PulsarListenerConsumerBuilderCustomizer.
这里有一个例子。
@Bean
public PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
return cb -> {
cb.subscriptionName("modified-subscription-name");
};
}
然后,这个定制豆名可以作为属性在PuslarListener(普斯拉听者)注释如下所示。
@PulsarListener(subscriptionName = "my-subscription",
topics = "my-topic", consumerCustomizer = "myCustomizer")
void listen(String message) {
}
该框架通过脉冲星听者并在创建脉冲星消费者之前,将该定制器应用到消费者构建者身上。
如果你有多重脉冲星听者方法,每种都有不同的自定义规则,你应该创建多个自定义豆,并在每个豆子上安装合适的定制器脉冲星听者.
9. 消息监听器容器生命周期
9.1. 暂停与继续
在某些情况下,应用程序可能希望暂时暂停消息的使用,然后稍后恢复。 Apache Pulsar 的 Spring 提供了暂停和恢复底层消息监听器容器的功能。 当Pulsar消息监听器容器暂停时,容器为接收Pulsar消费者数据所做的任何轮询都会被暂停。 同样,当容器恢复时,下一次轮询会开始返回暂停期间新增的记录。
要暂停或恢复监听器容器,首先通过PulsarListenerEndpointRegistry然后在容器实例上调用暂停/恢复API——如下图所示:
@Autowired
private PulsarListenerEndpointRegistry registry;
void someMethod() {
PulsarMessageListenerContainer container = registry.getListenerContainer("my-listener-id");
container.pause();
}
id参数传递给getListenerContainer是容器ID——将是@PulsarListener暂停/继续时的ID属性@PulsarListener. |
10. 脉冲星读卡器支持
该框架支持通过脉冲星读者工厂.
Spring Boot 提供了这个读取器工厂,你可以通过指定任意Spring脉冲星读者。*应用属性。
10.1. 脉冲星读者注释
虽然可以使用脉冲星读者工厂直接来说,Spring for Apache Pulsar 提供了脉冲星读者注释,你可以快速阅读主题,无需自己搭建任何阅读器工厂。
这与背后的理念相似脉冲听众。这里有一个简短的例子。
@PulsarReader(id = "reader-demo-id", topics = "reader-demo-topic", startMessageId = "earliest")
void read(String message) {
//...
}
这身份证属性是可选的,但提供对你的应用有意义的值是最佳实践。
未指定时,将使用自动生成的ID。
另一方面,主题和startMessageId(启动消息ID)属性是必修的。
这主题属性可以是单个主题,也可以是逗号分隔的主题列表。
这startMessageId(启动消息ID)属性指示读者从主题中的某个特定信息开始。
有效值为startMessageId(启动消息ID)是最早或最近的。假设你希望读者从一个主题中任意开始阅读消息,而非最早或最新的可用消息。在这种情况下,你需要使用ReaderBuilderCustomizer以定制读者构建器所以它知道正确的权利MessageId(信息ID)从这里开始。
10.2. 定制读者构建器
你可以通过以下方式自定义任何可用的字段读者构建器使用脉冲读者读者构建器定制器春季则为Apache Pulsar制作。
你可以提供@Bean类型脉冲读者读者构建器定制器然后将它提供给脉冲星读者如下所示。
@PulsarReader(id = "reader-customizer-demo-id", topics = "reader-customizer-demo-topic",
readerCustomizer = "myCustomizer")
void read(String message) {
//...
}
@Bean
public PulsarReaderReaderBuilderCustomizer<String> myCustomizer() {
return readerBuilder -> {
readerBuilder.startMessageId(messageId); // the first message read is after this message id.
// Any other customizations on the readerBuilder
};
}
如果你的申请只有一个@PulsarReader以及一首单曲脉冲读者读者构建器定制器Bean注册后,自定义器会自动应用。 |