Apache Pulsar 支持
通过提供 Spring for Apache Pulsar 项目的自动配置来支持 Apache Pulsar。
Spring Boot 将在org.springframework.pulsar:spring-pulsar位于 Classpath 上。
当org.springframework.pulsar:spring-pulsar-reactive位于 Classpath 上。
有spring-boot-starter-pulsar和spring-boot-starter-pulsar-reactivestarters 分别方便地收集用于命令式和响应式使用的依赖项。
连接到 Pulsar
当您使用 Pulsar Starters时,Spring Boot 将自动配置并注册一个PulsarClient豆。
默认情况下,应用程序会尝试连接到位于pulsar://localhost:6650.
这可以通过设置spring.pulsar.client.service-url属性设置为不同的值。
| 该值必须是有效的 Pulsar 协议 URL |
您可以通过指定任何spring.pulsar.client.*前缀 application 属性。
如果您需要对配置进行更多控制,请考虑注册一个或多个PulsarClientBuilderCustomizer豆。
认证
要连接到需要身份验证的 Pulsar 集群,您需要通过设置pluginClassName以及插件所需的任何参数。
您可以将参数设置为参数名称到参数值的映射。
以下示例显示如何配置AuthenticationOAuth2插件。
-
Properties
-
YAML
spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
param:
issuerUrl: https://auth.server.cloud/
privateKey: file:///Users/some-key.json
audience: urn:sn:acme:dev:my-instance
|
您需要确保在 例如,如果要为 这种缺乏松散的绑定还使得对身份验证参数使用环境变量成问题,因为在转换过程中会丢失区分大小写。 如果你对参数使用环境变量,则需要按照 Spring for Apache Pulsar 参考文档中的这些步骤进行作,使其正常工作。 |
连接到 Pulsar Reactors
当 Reactive 自动配置被激活时, Spring Boot 将自动配置并注册一个ReactivePulsarClient豆。
这ReactivePulsarClient适配前面描述的PulsarClient.
因此,请按照上一节配置PulsarClient由ReactivePulsarClient.
连接到 Pulsar Administration
适用于 Apache Pulsar 的 SpringPulsarAdministration客户端也会自动配置。
默认情况下,应用程序会尝试连接到位于http://localhost:8080.
这可以通过设置spring.pulsar.admin.service-urlproperty 设置为表单中的其他值(http|https)://<host>:<port>.
如果您需要对配置进行更多控制,请考虑注册一个或多个PulsarAdminBuilderCustomizer豆。
认证
访问需要身份验证的 Pulsar 集群时,admin 客户端需要与常规 Pulsar 客户端相同的安全配置。
您可以通过将spring.pulsar.client.authentication跟spring.pulsar.admin.authentication.
要在启动时创建主题,请添加PulsarTopic.
如果主题已存在,则忽略该 Bean。 |
发送消息
Spring的PulsarTemplate是自动配置的,您可以使用它来发送消息,如以下示例所示:
-
Java
-
Kotlin
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final PulsarTemplate<String> pulsarTemplate;
public MyBean(PulsarTemplate<String> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}
public void someMethod() {
this.pulsarTemplate.send("someTopic", "Hello");
}
}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {
@Throws(PulsarClientException::class)
fun someMethod() {
pulsarTemplate.send("someTopic", "Hello")
}
}
这PulsarTemplate依赖于PulsarProducerFactory创建底层 Pulsar 生产者。
Spring Boot 自动配置还提供了这个 producer 工厂,默认情况下,它会缓存它创建的 producer。
您可以通过指定任何spring.pulsar.producer.*和spring.pulsar.producer.cache.*前缀 application 属性。
如果您需要对 producer 出厂配置进行更多控制,请考虑注册一个或多个ProducerBuilderCustomizer豆。
这些定制器将应用于所有创建的生产者。
您还可以传入ProducerBuilderCustomizer发送消息时只影响当前生产者。
如果您需要对正在发送的消息进行更多控制,可以传入TypedMessageBuilderCustomizer发送消息时。
响应式发送消息
当 Reactive 自动配置被激活时, Spring 的ReactivePulsarTemplate是自动配置的,您可以使用它来发送消息,如以下示例所示:
-
Java
-
Kotlin
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final ReactivePulsarTemplate<String> pulsarTemplate;
public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}
public void someMethod() {
this.pulsarTemplate.send("someTopic", "Hello").subscribe();
}
}
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val pulsarTemplate: ReactivePulsarTemplate<String>) {
fun someMethod() {
pulsarTemplate.send("someTopic", "Hello").subscribe()
}
}
这ReactivePulsarTemplate依赖于ReactivePulsarSenderFactory以实际创建基础发件人。
Spring Boot 自动配置还提供了这个发送者工厂,默认情况下,它会缓存它创建的生产者。
您可以通过指定任何spring.pulsar.producer.*和spring.pulsar.producer.cache.*前缀 application 属性。
如果您需要对 sender factory configuration 进行更多控制,请考虑注册一个或多个ReactiveMessageSenderBuilderCustomizer豆。
这些定制器将应用于所有创建的发件人。
您还可以传入ReactiveMessageSenderBuilderCustomizer发送消息时仅影响当前发件人。
如果您需要对正在发送的消息进行更多控制,可以传入MessageSpecBuilderCustomizer发送消息时。
接收消息
当 Apache Pulsar 基础设施存在时,任何 bean 都可以用@PulsarListener以创建侦听器终端节点。
以下组件在someTopic主题:
-
Java
-
Kotlin
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@PulsarListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@PulsarListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
Spring Boot 自动配置提供了所有必要的组件PulsarListener,例如PulsarListenerContainerFactory以及它用来构建底层 Pulsar 消费者的 consumer factory。
您可以通过指定任何spring.pulsar.listener.*和spring.pulsar.consumer.*前缀 application 属性。
如果您需要对 Consumer Factory 的配置进行更多控制,请考虑注册一个或多个ConsumerBuilderCustomizer豆。
这些定制器应用于工厂创建的所有使用者,因此所有@PulsarListener实例。
您还可以通过设置consumerCustomizer属性的@PulsarListener注解。
如果您需要对实际容器工厂配置进行更多控制,请考虑注册一个或多个PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>>豆。
响应式接收消息
当 Apache Pulsar 基础设施存在并激活 Reactive 自动配置时,任何 bean 都可以使用@ReactivePulsarListener创建响应式侦听器终端节点。
以下组件在someTopic主题:
-
Java
-
Kotlin
import reactor.core.publisher.Mono;
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@ReactivePulsarListener(topics = "someTopic")
public Mono<Void> processMessage(String content) {
// ...
return Mono.empty();
}
}
import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener
import org.springframework.stereotype.Component
import reactor.core.publisher.Mono
@Component
class MyBean {
@ReactivePulsarListener(topics = ["someTopic"])
fun processMessage(content: String?): Mono<Void> {
// ...
return Mono.empty()
}
}
Spring Boot 自动配置提供了所有必要的组件ReactivePulsarListener,例如ReactivePulsarListenerContainerFactory以及它用来构建底层响应式 Pulsar 消费者的 consumer factory。
您可以通过指定任何spring.pulsar.listener.*和spring.pulsar.consumer.*前缀 application 属性。
如果您需要对 Consumer Factory 的配置进行更多控制,请考虑注册一个或多个ReactiveMessageConsumerBuilderCustomizer豆。
这些定制器应用于工厂创建的所有使用者,因此所有@ReactivePulsarListener实例。
您还可以通过设置consumerCustomizer属性的@ReactivePulsarListener注解。
如果您需要对实际容器工厂配置进行更多控制,请考虑注册一个或多个PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>>豆。
阅读消息
Pulsar 读取器接口使应用程序能够手动管理游标。 当您使用读取器连接到主题时,您需要指定读取器在连接到主题时从哪条消息开始读取。
当 Apache Pulsar 基础设施存在时,任何 bean 都可以用@PulsarReader以使用 Reader 使用消息。
以下组件创建一个读取器终端节点,该终端节点从someTopic主题:
-
Java
-
Kotlin
import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@PulsarReader(topics = "someTopic", startMessageId = "earliest")
public void processMessage(String content) {
// ...
}
}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component
@Component
class MyBean {
@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
fun processMessage(content: String?) {
// ...
}
}
这@PulsarReader依赖于PulsarReaderFactory创建底层 Pulsar 读取器。
Spring Boot 自动配置提供了这个 reader 工厂,可以通过设置任何spring.pulsar.reader.*前缀 application 属性。
如果您需要对 Reader Factory 的配置进行更多控制,请考虑注册一个或多个ReaderBuilderCustomizer豆。
这些定制器应用于工厂创建的所有读取器,因此所有@PulsarReader实例。
您还可以通过设置readerCustomizer属性的@PulsarReader注解。
如果您需要对实际容器工厂配置进行更多控制,请考虑注册一个或多个PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>>豆。
响应式读取消息
当 Apache Pulsar 基础设施存在并激活 Reactive 自动配置时, Spring 的ReactivePulsarReaderFactory,您可以使用它来创建一个 Reader,以便以响应式方式读取消息。
以下组件使用提供的工厂创建一个读取器,并从someTopic主题:
-
Java
-
Kotlin
import java.time.Instant;
import java.util.List;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.reactive.client.api.StartAtSpec;
import reactor.core.publisher.Mono;
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer;
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final ReactivePulsarReaderFactory<String> pulsarReaderFactory;
public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) {
this.pulsarReaderFactory = pulsarReaderFactory;
}
public void someMethod() {
ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder
.topic("someTopic")
.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)));
Mono<Message<String>> message = this.pulsarReaderFactory
.createReader(Schema.STRING, List.of(readerBuilderCustomizer))
.readOne();
// ...
}
}
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.reactive.client.api.ReactiveMessageReaderBuilder
import org.apache.pulsar.reactive.client.api.StartAtSpec
import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer
import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory
import org.springframework.stereotype.Component
import java.time.Instant
@Component
class MyBean(private val pulsarReaderFactory: ReactivePulsarReaderFactory<String>) {
fun someMethod() {
val readerBuilderCustomizer = ReactiveMessageReaderBuilderCustomizer {
readerBuilder: ReactiveMessageReaderBuilder<String> ->
readerBuilder
.topic("someTopic")
.startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5)))
}
val message = pulsarReaderFactory
.createReader(Schema.STRING, listOf(readerBuilderCustomizer))
.readOne()
// ...
}
}
Spring Boot 自动配置提供了这个 reader 工厂,可以通过设置任何spring.pulsar.reader.*前缀 application 属性。
如果您需要对读取器出厂配置进行更多控制,请考虑传入一个或多个ReactiveMessageReaderBuilderCustomizer实例。
如果您需要对读卡器出厂配置进行更多控制,请考虑注册一个或多个ReactiveMessageReaderBuilderCustomizer豆。
这些定制器将应用于所有创建的读取器。
您还可以传递一个或多个ReactiveMessageReaderBuilderCustomizer在创建 Reader 时,仅将自定义项应用于创建的 Reader。
| 有关上述任何组件的更多详细信息以及发现其他可用功能,请参阅 Spring for Apache Pulsar 参考文档。 |
事务支持
Spring for Apache Pulsar 支持在使用PulsarTemplate和@PulsarListener.
| 使用 reactive 变体时,当前不支持 Transactions。 |
设置spring.pulsar.transaction.enabledproperty 设置为true将:
-
启用事务支持
PulsarTemplate -
启用事务支持
@PulsarListener方法
这transactional属性@PulsarListener可用于微调何时应将事务与侦听器一起使用。
为了更好地控制 Spring for Apache Pulsar 事务功能,您应该定义自己的PulsarTemplate和/或ConcurrentPulsarListenerContainerFactory豆。
您还可以定义PulsarAwareTransactionManagerbean(如果默认的 auto-configuredPulsarTransactionManager不合适。
其他 Pulsar 属性
自动配置支持的属性显示在附录的 Integration Properties 部分中。 请注意,在大多数情况下,这些属性(带 hyphened 或 camelCase)直接映射到 Apache Pulsar 配置属性。 有关详细信息,请参阅 Apache Pulsar 文档。
只有 Pulsar 支持的属性子集可以直接通过PulsarProperties类。
如果您希望使用不直接支持的其他属性来优化自动配置的组件,则可以使用上述每个组件支持的定制器。