消息生成
1. 脉冲星模板
在 Pulsar 生产者端,Spring Boot 自动配置提供了一个PulsarTemplate
用于发布记录。该模板实现了一个名为PulsarOperations
并提供通过其合同发布记录的方法。
这些发送 API 方法分为两类:send
和sendAsync
.
这send
方法使用 Pulsar 生产者上的同步发送功能来阻止调用。
他们返回MessageId
在消息在代理上持久化后发布的消息。
这sendAsync
方法调用是非阻塞的异步调用。
他们返回一个CompletableFuture
,您可以在消息发布后使用它异步接收消息 ID。
对于不包含主题参数的 API 变体,主题解析过程用于确定目标主题。 |
1.1. 简单的API
该模板为简单的发送请求提供了一些方法(以“send”为前缀)。对于更复杂的发送请求,Fluent API 允许您配置更多选项。
1.2. 流畅的 API
该模板提供了一个流畅的构建器来处理更复杂的发送请求。
1.3. 消息自定义
您可以指定一个TypedMessageBuilderCustomizer
以配置传出邮件。例如,以下代码演示如何发送密钥消息:
template.newMessage(msg)
.withMessageCustomizer((mb) -> mb.key("foo-msg-key"))
.send();
1.4. 生产者定制
您可以指定一个ProducerBuilderCustomizer
以配置底层 Pulsar 生产者构建器,该构建器最终构建用于发送传出消息的生产者。
请谨慎使用,因为这可以完全访问生产者构建器并调用其某些方法(例如create )可能会产生意想不到的副作用。 |
例如,以下代码演示如何禁用批处理和启用分块:
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.enableChunking(true).enableBatching(false))
.send();
另一个示例演示如何在将记录发布到分区主题时使用自定义路由。
指定您的自定义MessageRouter
在Producer
构建器,例如:
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.messageRouter(messageRouter))
.send();
请注意,当使用MessageRouter ,这是spring.pulsar.producer.message-routing-mode 是custom . |
另一个示例演示了如何添加ProducerInterceptor
这将拦截和更改生产者收到的消息,然后再发布到代理:
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.intercept(interceptor))
.send();
定制器仅适用于用于发送作的生产者。 如果要将定制器应用于所有生产者,则必须按照全局生产者定制中所述将其提供给生产者工厂。
使用 Lambda 定制器时,必须遵守“Lambda 定制器的注意事项”中描述的规则。 |
2. 指定模式信息
如果您使用 Java 基元类型,则框架会自动为您检测模式,并且您无需指定任何模式类型来发布数据。
对于非原始类型,如果在调用PulsarTemplate
,Spring for Apache Pulsar 框架将尝试构建一个Schema.JSON
从类型。
当前支持的复杂架构类型包括 JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES 和带内联编码的 KEY_VALUE。 |
2.1. 自定义模式映射
作为在调用发送作时指定模式的替代方法。PulsarTemplate
对于复杂类型,可以使用类型的映射来配置模式解析器。这样就无需指定模式,因为框架使用传出消息类型咨询解析器。
2.1.1. 配置属性
架构映射可以使用spring.pulsar.defaults.type-mappings
财产。 以下示例使用application.yml
为User
和Address
复杂对象使用AVRO
和JSON
schemas,分别:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
这message-type 是消息类的完全限定名称。 |
2.1.2. 模式解析器定制器
添加映射的首选方法是通过上述属性。但是,如果需要更多控制,可以提供架构解析器定制器来添加映射 () 。
以下示例使用架构解析器定制器为User
和Address
复杂对象使用AVRO
和JSON
schemas,分别:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.2. 使用 AUTO_SCHEMA 进行生产
如果没有机会提前知道 Pulsar 主题的模式类型,您可以使用AUTO_PRODUCE模式将原始 JSON 或 Avro 有效负载发布为byte[]
牢牢。
在这种情况下,生产者将验证出站字节是否与目标主题的模式兼容。
只需指定Schema.AUTO_PRODUCE_BYTES()
在模板上发送作,如下例所示:
void sendUserAsBytes(PulsarTemplate<byte[]> template, byte[] userAsBytes) {
template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES());
}
这仅支持 Avro 和 JSON 架构类型。 |
3. 脉冲星生产商工厂
这PulsarTemplate
依赖于PulsarProducerFactory
实际创建底层生产者。
Spring Boot 自动配置还提供了这个生产者工厂,您可以通过指定任何spring.pulsar.producer.*
应用程序属性。
3.1. 全球生产者定制
该框架提供了ProducerBuilderCustomizer
合约,允许您配置用于构建每个生产者的底层构建器。
要自定义所有生产者,您可以将定制器列表传递到PulsarProducerFactory
构造 函数。
使用多个定制器时,它们将按照它们在列表中的显示顺序应用。
如果您使用 Spring Boot 自动配置,您可以将定制器指定为 bean,它们将自动传递给PulsarProducerFactory ,根据他们的@Order 注解。 |
如果只想将定制器应用于单个生产者,可以使用 Fluent API 并在发送时指定定制器。
4. Pulsar Producer 缓存
每个底层 Pulsar 生产者都会消耗资源。 为了提高性能并避免持续创建生产者,生产者工厂会缓存它创建的生产者。 它们以 LRU 方式缓存,并在配置的时间段内未使用时逐出。 缓存密钥由足够的信息组成,以确保在后续创建请求中返回调用者相同的生产者。
此外,您可以通过指定任何spring.pulsar.producer.cache.*
应用程序属性。
4.1. Lambda定制器的注意事项
任何用户提供的生产者定制器也包含在缓存键中。
因为缓存键依赖于有效的实现equals/hashCode
,使用 Lambda 定制器时必须小心。
统治:作为 Lambda 实现的两个定制器将在equals/hashCode 当且仅当它们使用相同的 Lambda 实例并且不需要在其闭包之外定义任何变量时。 |
为了澄清上述规则,我们将看几个例子。
在以下示例中,定制器被定义为内联 Lambda,这意味着每次调用sendUser
使用相同的 Lambda 实例。此外,它不需要其闭包之外的变量。因此,它将作为缓存键匹配。
void sendUser() {
var user = randomUser();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> b.producerName("user"))
.send();
}
在下一个情况下,定制器被定义为内联 Lambda,这意味着每次调用sendUser
使用相同的 Lambda 实例。但是,它需要在其闭包之外有一个变量。因此,它不会匹配为缓存键。
void sendUser() {
var user = randomUser();
var name = randomName();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> b.producerName(name))
.send();
}
在最后一个示例中,定制器被定义为内联 Lambda,这意味着每次调用sendUser
使用相同的 Lambda 实例。虽然它确实使用变量名,但它不会源自其闭包之外,因此将匹配为缓存键。
这说明变量可以在 Lambda 闭包中使用,甚至可以调用静态方法。
void sendUser() {
var user = randomUser();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> {
var name = SomeHelper.someStaticMethod();
b.producerName(name);
})
.send();
}
统治:如果您的 Lambda 定制器未定义一次且仅定义一次(在后续调用中使用同一实例),或者它需要在其闭包之外定义变量,则您必须提供具有有效equals/hashCode 实现。 |
如果不遵守这些规则,则生产者缓存将始终丢失,并且您的应用程序性能将受到负面影响。 |
5. 拦截生产者上的消息
添加一个ProducerInterceptor
允许您在将消息发布到代理之前拦截和更改生产者收到的消息。
为此,您可以将拦截器列表传递到PulsarTemplate
构造 函数。
使用多个拦截器时,它们的应用顺序是它们在列表中的显示顺序。
如果您使用 Spring Boot 自动配置,则可以将拦截器指定为 Bean。
它们会自动传递给PulsarTemplate
.
拦截器的排序是通过使用@Order
注释如下:
@Bean
@Order(100)
ProducerInterceptor firstInterceptor() {
...
}
@Bean
@Order(200)
ProducerInterceptor secondInterceptor() {
...
}
如果您不使用Starters,则需要自行配置和注册上述组件。 |