消息生成
1. 反应性脉冲星模板
在 Pulsar 生产者端,Spring Boot 自动配置提供了一个ReactivePulsarTemplate
用于发布记录。该模板实现了一个名为ReactivePulsarOperations
并提供通过其合同发布记录的方法。
该模板提供了接受单个消息并返回Mono<MessageId>
. 它还提供了接受多个消息的发送方法(以 ReactiveStreams 的形式Publisher
类型)并返回一个Flux<MessageId>
.
对于不包含主题参数的 API 变体,主题解析过程用于确定目标主题。 |
1.1. 流畅的 API
该模板提供了一个流畅的构建器来处理更复杂的发送请求。
1.2. 消息定制
您可以指定一个MessageSpecBuilderCustomizer
以配置传出消息。例如,以下代码演示如何发送密钥消息:
template.newMessage(msg)
.withMessageCustomizer((mc) -> mc.key("foo-msg-key"))
.send();
1.3. 发送方自定义
您可以指定一个ReactiveMessageSenderBuilderCustomizer
以配置底层 Pulsar 发送器构建器,该构建器最终构建用于发送外发消息的发送者。
请谨慎使用,因为这提供了对发送方构建器的完全访问权限并调用其某些方法(例如create )可能会产生意想不到的副作用。 |
例如,以下代码演示如何禁用批处理和启用分块:
template.newMessage(msg)
.withSenderCustomizer((sc) -> sc.enableChunking(true).enableBatching(false))
.send();
另一个示例演示如何在将记录发布到分区主题时使用自定义路由。指定您的自定义MessageRouter
在发送方构建器上实现,例如:
template.newMessage(msg)
.withSenderCustomizer((sc) -> sc.messageRouter(messageRouter))
.send();
请注意,当使用MessageRouter ,这是spring.pulsar.producer.message-routing-mode 是custom . |
2. 指定模式信息
如果您使用 Java 基元类型,则框架会自动为您检测模式,并且您无需指定任何模式类型来发布数据。对于非基元类型,如果在调用ReactivePulsarTemplate
,Spring for Apache Pulsar 框架将尝试构建一个Schema.JSON
从类型。
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES 和带内联编码的 KEY_VALUE。 |
2.1. 自定义模式映射
作为在调用发送作时指定模式的替代方法。ReactivePulsarTemplate
对于复杂类型,可以使用类型的映射来配置模式解析器。这样就无需指定模式,因为框架使用传出消息类型咨询解析器。
2.1.1. 配置属性
架构映射可以使用spring.pulsar.defaults.type-mappings
财产。
以下示例使用application.yml
为User
和Address
复杂对象使用AVRO
和JSON
schemas,分别:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
这message-type 是消息类的完全限定名称。 |
2.1.2. 模式解析器定制器
添加映射的首选方法是通过上述属性。 但是,如果需要更多控制,可以提供架构解析器定制器来添加映射。
以下示例使用架构解析器定制器为User
和Address
复杂对象使用AVRO
和JSON
schemas,分别:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.2. 使用 AUTO_SCHEMA 进行生产
如果没有机会提前知道 Pulsar 主题的模式类型,您可以使用AUTO_PRODUCE模式将原始 JSON 或 Avro 有效负载发布为byte[]
牢牢。
在这种情况下,生产者将验证出站字节是否与目标主题的模式兼容。
只需指定Schema.AUTO_PRODUCE_BYTES()
在模板上发送作,如下例所示:
void sendUserAsBytes(ReactivePulsarTemplate<byte[]> template, byte[] userAsBytes) {
template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES()).subscribe();
}
这仅支持 Avro 和 JSON 架构类型。 |
3. 反应性脉冲星发送器工厂
这ReactivePulsarTemplate
依赖于ReactivePulsarSenderFactory
实际创建基础发件人。
Spring Boot 提供了这个发送方工厂,它可以配置任何spring.pulsar.producer.*
应用程序属性。
3.1. 生产者缓存
每个底层 Pulsar 生产者都会消耗资源。
为了提高性能并避免持续创建生产者,ReactiveMessageSenderCache
在底层 Apache Pulsar Reactive 客户端中缓存它创建的生产者。
它们以 LRU 方式缓存,并在配置的时间段内未使用时逐出。
您可以通过指定任何spring.pulsar.producer.cache.*
应用程序属性。