消息生产
1. 反应脉冲星模板
在Pulsar生产者端,Spring套自动配置提供了反应脉冲星模板用于发布记录。该模板实现了一个称为反应脉冲星运算并通过合同提供发布记录的方法。
该模板提供了发送方法,可以接受单条消息并返回单声<MessageId>. 它还提供了接收多条消息的发送方法(以ReactiveStreams的形式)发行人类型)并返回Flux<MessageId>.
| 对于不包含主题参数的 API 变体,采用主题解析过程来确定目标主题。 |
1.1. 流畅 API
该模板提供了一个流畅的构建工具,可以处理更复杂的发送请求。
1.2. 消息定制
你可以指定一个MessageSpecBuilderCustomizer配置外发消息。例如,以下代码展示了如何发送带密钥的消息:
template.newMessage(msg)
.withMessageCustomizer((mc) -> mc.key("foo-msg-key"))
.send();
1.3. 发送者定制
你可以指定一个ReactiveMessageSenderBuilderCustomizer配置底层的Pulsar发送器构建器,最终构建用于发送消息的发送者。
使用时要谨慎,因为这可以完全访问发送器构建器并调用其一些方法(例如创造)可能出现意想不到的副作用。 |
例如,以下代码展示了如何禁用批处理并启用分块:
template.newMessage(msg)
.withSenderCustomizer((sc) -> sc.enableChunking(true).enableBatching(false))
.send();
另一个例子展示了如何在将记录发布到分区主题时使用自定义路由。
请指定你的自定义消息路由器在发送器构建器上的实现,例如:
template.newMessage(msg)
.withSenderCustomizer((sc) -> sc.messageRouter(messageRouter))
.send();
注意,当使用消息路由器,唯一有效的设置是spring.pulsar.producer.message-routing-mode是习惯. |
2. 规范模式信息
如果你使用 Java 原语类型,框架会自动检测该模式,发布数据时无需指定任何模式类型。
对于非原始类型,如果在调用发送作时未明确指定模式,反应脉冲星模板Spring for Apache Pulsar 框架将尝试构建Schema.JSON从类型上看。
| 目前支持的复杂模式类型包括JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES和带内联编码的KEY_VALUE。 |
2.1. 自定义模式映射
作为调用发送作时指定模式的替代方案反应脉冲星模板对于复杂类型,模式解析器可以配置该类型的映射。
这样就无需指定模式,因为框架通过发送消息类型访问解析器。
2.1.1. 配置属性
模式映射可以通过以下配置配置spring.pulsar.defaults.type-mappings财产。
以下示例使用application.yml为 添加映射用户和地址复杂对象使用阿弗罗和JSON分别是 schema:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
这消息类型是消息类的全限定名称。 |
2.1.2. Schema resolver customizer
添加映射的首选方法是上述属性。 不过,如果需要更多控制,你可以提供模式解析器自定义工具来添加映射。
以下示例使用模式解析器自定义工具来添加映射用户和地址复杂对象使用阿弗罗和JSON分别是 schema:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.2. 与AUTO_SCHEMA制作
如果事先无法知道 Pulsar 主题的模式类型,你可以用 AUTO_PRODUCE 模式发布原始 JSON 或 Avro 负载作为字节[]牢牢。
在这种情况下,生产者会验证出界字节是否与目标主题的模式兼容。
只需指定一个模式Schema.AUTO_PRODUCE_BYTES()在你的模板上,发送作如下示例所示:
void sendUserAsBytes(ReactivePulsarTemplate<byte[]> template, byte[] userAsBytes) {
template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES()).subscribe();
}
| 这只支持Avro和JSON模式类型。 |
3. 响应式脉冲星发射工厂
这反应脉冲星模板依赖于反应脉冲星发射工厂实际上是创建底层发送者。
Spring Boot 提供了该发送器工厂,可以配置为任何Spring。脉冲星。制作人。*应用属性。
如果在直接使用发送方工厂API时未指定主题信息,则使用相同的主题解析过程反应脉冲星模板使用的唯一例外是省略了“消息类型默认”步骤。 |
3.1. 生产者缓存
每个底层脉冲星生产者都会消耗资源。
为了提升性能并避免持续生产者,响应式消息发送缓存在底层的Apache中,Pulsar Reactive 客户端缓存了它创建的生产者。
它们以LRU方式缓存,并在未在配置时间内被使用时被驱逐。
你可以通过指定任意一个来配置缓存设置spring.pulsar.producer.cache.*应用属性。