消息生产

1. 脉冲星模板

在Pulsar生产者端,Spring套自动配置提供了脉冲星模板用于发布记录。该模板实现了一个称为脉冲星运营并通过合同提供发布记录的方法。spring-doc.cadn.net.cn

这些发送API方法分为两类:发送sendAsync. 这发送方法通过使用Pulsar生产器的同步发送功能阻断调用。它们返回MessageId(信息ID)消息一旦在经纪人上持久化后发布。 这sendAsync方法调用是异步调用,且不阻塞。它们返回完成未来,你可以用它在消息发布后异步接收消息ID。spring-doc.cadn.net.cn

对于不包含主题参数的 API 变体,采用主题解析过程来确定目标主题。

1.1. 简单 API

该模板提供了少量方法(前缀为“send”)用于简单的发送请求。对于更复杂的发送请求,流畅的API允许你配置更多选项。spring-doc.cadn.net.cn

1.2. 流畅 API

该模板提供了一个流畅的构建工具,可以处理更复杂的发送请求。spring-doc.cadn.net.cn

1.3. 消息定制

你可以指定一个TypedMessageBuilderCustomizer配置外发消息。例如,以下代码展示了如何发送带密钥的消息:spring-doc.cadn.net.cn

template.newMessage(msg)
    .withMessageCustomizer((mb) -> mb.key("foo-msg-key"))
    .send();

1.4. 生产者定制

你可以指定一个制作者构建定制器配置底层的Pulsar生产器构建器,最终构建用于发送消息的生产者。spring-doc.cadn.net.cn

使用时要谨慎,因为这样可以完全访问生产者构建器并调用其一些方法(例如创造)可能出现意想不到的副作用。

例如,以下代码展示了如何禁用批处理并启用分块:spring-doc.cadn.net.cn

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.enableChunking(true).enableBatching(false))
    .send();

另一个例子展示了如何在将记录发布到分区主题时使用自定义路由。 请指定你的自定义消息路由器制作人建造者如:spring-doc.cadn.net.cn

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.messageRouter(messageRouter))
    .send();
注意,当使用消息路由器,唯一有效的设置是spring.pulsar.producer.message-routing-mode习惯.

另一个例子展示了如何添加制片人拦截者它会拦截并变异制作者在发布给经纪人之前收到的消息:spring-doc.cadn.net.cn

template.newMessage(msg)
    .withProducerCustomizer((pb) -> pb.intercept(interceptor))
    .send();

定制工具只适用于用于发送作的生产商。 如果你想对所有生产者应用定制器,必须按照全球生产者定制描述,向生产者工厂提供定制器。spring-doc.cadn.net.cn

使用Lambda定制器时必须遵守《注意Lambda定制器》中描述的规则。

2. 规范模式信息

如果你使用 Java 原语类型,框架会自动检测该模式,发布数据时无需指定任何模式类型。 对于非原始类型,如果在调用发送作时未明确指定模式,脉冲星模板Spring for Apache Pulsar 框架将尝试构建Schema.JSON从类型上看。spring-doc.cadn.net.cn

目前支持的复杂模式类型包括JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES和带内联编码的KEY_VALUE。

2.1. 自定义模式映射

作为调用发送作时指定模式的替代方案脉冲星模板对于复杂类型,模式解析器可以配置该类型的映射。 这样就无需指定模式,因为框架通过发送消息类型访问解析器。spring-doc.cadn.net.cn

2.1.1. 配置属性

模式映射可以通过以下配置配置spring.pulsar.defaults.type-mappings财产。 以下示例使用application.yml为 添加映射用户地址复杂对象使用阿弗罗JSON分别是 schema:spring-doc.cadn.net.cn

spring:
  pulsar:
    defaults:
      type-mappings:
        - message-type: com.acme.User
          schema-info:
            schema-type: AVRO
        - message-type: com.acme.Address
          schema-info:
            schema-type: JSON
消息类型是消息类的全限定名称。

2.1.2. Schema resolver customizer

添加映射的首选方法是上述属性。 不过,如果需要更多控制,你可以提供模式解析器自定义工具来添加映射。spring-doc.cadn.net.cn

以下示例使用模式解析器自定义工具来添加映射用户地址复杂对象使用阿弗罗JSON分别是 schema:spring-doc.cadn.net.cn

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
	return (schemaResolver) -> {
		schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
		schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
	}
}

2.1.3. 类型映射注释

另一种指定特定消息类型默认模式信息的选项是用@PulsarMessage注解。 模式信息可以通过以下方式指定schemaType注释上的属性。spring-doc.cadn.net.cn

以下示例配置系统在生成或使用类型消息时,使用 JSON 作为默认模式:spring-doc.cadn.net.cn

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}

有了这个配置,发送作时就无需设置指定模式。spring-doc.cadn.net.cn

2.2. 与AUTO_SCHEMA制作

如果事先无法知道 Pulsar 主题的模式类型,你可以用 AUTO_PRODUCE 模式发布原始 JSON 或 Avro 负载作为字节[]牢牢。spring-doc.cadn.net.cn

在这种情况下,生产者会验证出界字节是否与目标主题的模式兼容。spring-doc.cadn.net.cn

只需指定一个模式Schema.AUTO_PRODUCE_BYTES()在你的模板上,发送作如下示例所示:spring-doc.cadn.net.cn

void sendUserAsBytes(PulsarTemplate<byte[]> template, byte[] userAsBytes) {
	template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES());
}
这只支持Avro和JSON模式类型。

3. 脉冲星生产工厂

脉冲星模板依赖于脉冲星生产工厂真正创造出潜在的生产者。 Spring Boot 自动配置还提供生产者工厂,您可以通过指定任意Spring。脉冲星。制作人。*应用属性。spring-doc.cadn.net.cn

如果在直接使用生产者工厂 API 时未指定主题信息,则使用脉冲星模板使用的唯一例外是省略了“消息类型默认”步骤。

3.1. 全球生产者定制化

该框架提供了制作者构建定制器合同允许你配置用于建造每个生产商的底层架构。 要自定义所有生产者,你可以将定制器列表传递到脉冲星生产工厂构造 函数。 使用多个自定义工具时,按列表中出现的顺序应用。spring-doc.cadn.net.cn

如果你用 Spring Boot 自动配置,你可以把自定义器指定为豆子,它们会自动传递给脉冲星生产工厂根据他们的顺序排列@Order注解。

如果你只想对单个生产者应用定制器,可以使用 Fluent API,并在发送时指定定制器spring-doc.cadn.net.cn

4. 脉冲星生产者缓存

每个底层脉冲星生产者都会消耗资源。 为了提升性能并避免持续生产者,生产者工厂会缓存其所创造的生产者。 它们以LRU方式缓存,并在未在配置时间内被使用时被驱逐。 缓存密钥包含足够的信息,确保调用者在后续创建请求中返回同一个生产者。spring-doc.cadn.net.cn

此外,你还可以通过指定任意一个spring.pulsar.producer.cache.*应用属性。spring-doc.cadn.net.cn

4.1. 对 Lambda 定制器的注意

任何用户提供的生产者自定义工具也包含在缓存键中。 因为缓存密钥依赖于有效的实现等值/哈希码使用Lambda定制器时必须谨慎。spring-doc.cadn.net.cn

统治:两个自定义器作为 Lambda 实现,会匹配于等值/哈希码 当且仅当它们使用相同的 Lambda 实例,且不要求定义在闭包之外的任何变量。

为了澄清上述规则,我们将看几个例子。 在以下示例中,自定义器被定义为内联 Lambda,这意味着每次调用sendUser使用相同的 Lambda 实例。此外,它不要求闭包外的变量。因此,它作为缓存密钥匹配。spring-doc.cadn.net.cn

void sendUser() {
    var user = randomUser();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> b.producerName("user"))
        .send();
}

在接下来的情况下,定制器被定义为内联 Lambda,这意味着每次调用sendUser使用相同的 Lambda 实例。然而,它需要一个闭包外的变量。因此,它无法匹配为缓存密钥。spring-doc.cadn.net.cn

void sendUser() {
    var user = randomUser();
    var name = randomName();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> b.producerName(name))
        .send();
}

在这个最后一个例子中,自定义器被定义为内联 Lambda,这意味着每个调用sendUser使用相同的 Lambda 实例。虽然它确实使用变量名,但该变量名称不源自其闭包,因此匹配为缓存键。 这说明变量可以在Lambda闭包中使用,甚至可以调用静态方法。spring-doc.cadn.net.cn

void sendUser() {
    var user = randomUser();
    template.newMessage(user)
        .withTopic("user-topic")
        .withProducerCustomizer((b) -> {
           var name = SomeHelper.someStaticMethod();
           b.producerName(name);
        })
        .send();
}
统治:如果你的 Lambda 自定义器没有定义一次且仅定义一次(后续调用中仍使用同一个实例),或者它需要在闭包之外定义变量,那么你必须提供一个有效的等值/哈希码实现。
如果不遵守这些规则,生产者缓存总是会失败,应用性能也会受到负面影响。

5. 截获生产者的消息

添加一个制片人拦截者它允许你在发送给经纪人之前拦截并变异制作人收到的消息。 为此,你可以将拦截器列表传递到脉冲星模板构造 函数。 使用多个拦截器时,应用顺序即为它们在列表中的出现顺序。spring-doc.cadn.net.cn

如果你用 Spring Boot 自动配置,可以把拦截器指定为 Beans。 这些数据会自动传递给脉冲星模板. 拦截器的排序通过使用@Order注释如下:spring-doc.cadn.net.cn

@Bean
@Order(100)
ProducerInterceptor firstInterceptor() {
  ...
}

@Bean
@Order(200)
ProducerInterceptor secondInterceptor() {
  ...
}
如果你不使用起动机,你需要自己配置并注册上述组件。