Apache 脉冲星支持
Apache Pulsar 通过自动配置 Spring for Apache Pulsar 项目得到支持。
Spring Boot 会在 Spring Pulsar 组件的运行时自动配置并注册 Spring 的组件org.springframework.pulsar:spring-pulsar在阶级路径上。
有Spring靴Starters脉冲星方便地收集依赖以供使用。
连接脉冲星
当你使用PulsarStarters时,Spring Boot会自动配置并注册一个脉冲星客户端豆。
默认情况下,应用程序尝试连接到本地的 Pulsar 实例pulsar://localhost:6650.
这可以通过设置Spring.pulsar.client.service-url财产价值变为不同。
| 该值必须是有效的脉冲星协议URL。 |
你可以通过指定任意Spring.Pulsar.client.*预置应用属性。
如果你需要对配置有更多控制,可以考虑注册一个或多个PulsarClientBuilderCustomizer豆。
认证
要连接到需要认证的 Pulsar 集群,你需要通过设置pluginClassName以及插件所需的参数。
你可以把参数设置成参数名称到参数值的映射。
以下示例展示了如何配置AuthenticationOAuth2插件。
-
Properties
-
YAML
spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param.issuerUrl=https://auth.server.cloud/
spring.pulsar.client.authentication.param.privateKey=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
spring:
pulsar:
client:
authentication:
plugin-class-name: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
param:
issuerUrl: https://auth.server.cloud/
privateKey: file:///Users/some-key.json
audience: urn:sn:acme:dev:my-instance
|
你需要确保定义在以下 例如,如果你想配置发行者的URL, 这种缺乏宽松绑定也使得使用环境变量作为认证参数变得困难,因为翻译过程中大小写敏感性会丢失。 如果你用环境变量作为参数,那么你需要按照 Spring for Apache Pulsar 参考文档中的这些步骤作,才能正常工作。 |
连接脉冲星管理
阿帕奇脉冲星的Spring脉冲星管理客户端也是自动配置的。
默认情况下,应用程序尝试连接到本地的 Pulsar 实例http://localhost:8080.
这可以通过设置Spring.pulsar.admin.service-url性质变为不同的取值,形式为(http|https)://<host>:<port>.
如果你需要对配置有更多控制,可以考虑注册一个或多个PulsarAdminBuilderCustomizer豆。
发送消息
斯普林斯脉冲星模板是自动配置的,你可以用它发送消息,如下示例所示:
-
Java
-
Kotlin
import org.springframework.pulsar.core.PulsarTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final PulsarTemplate<String> pulsarTemplate;
public MyBean(PulsarTemplate<String> pulsarTemplate) {
this.pulsarTemplate = pulsarTemplate;
}
public void someMethod() {
this.pulsarTemplate.send("someTopic", "Hello");
}
}
import org.apache.pulsar.client.api.PulsarClientException
import org.springframework.pulsar.core.PulsarTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val pulsarTemplate: PulsarTemplate<String>) {
@Throws(PulsarClientException::class)
fun someMethod() {
pulsarTemplate.send("someTopic", "Hello")
}
}
这脉冲星模板依赖于脉冲星生产工厂以创建底层的脉冲星产生器。
Spring Boot 自动配置还提供了生产者工厂,默认缓存其所创建的生产者。
你可以通过指定任何一个Spring。脉冲星。制作人。*和spring.pulsar.producer.cache.*预置应用属性。
如果你需要对发送的消息有更多控制,可以传入TypedMessageBuilderCustomizer发送消息时。
接收消息
当Apache Pulsar基础设施存在时,任何豆子都可以被注释为@PulsarListener创建监听端点。
以下组件在某个话题主题:
-
Java
-
Kotlin
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@PulsarListener(topics = "someTopic")
public void processMessage(String content) {
// ...
}
}
import org.springframework.pulsar.annotation.PulsarListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@PulsarListener(topics = ["someTopic"])
fun processMessage(content: String?) {
// ...
}
}
Spring Boot 自动配置提供所有必要的组件脉冲星听者,例如脉冲星Listener容器工厂以及它用来制造底层脉冲星消费者的消费工厂。
你可以通过指定任意spring.pulsar.listener.*和Spring.脉冲星.消费者。*预置应用属性。
如果你需要对消费工厂配置有更多控制,可以考虑注册一个或多个消费者架构定制器豆。
这些定制工具适用于工厂生产的所有消费者,因此也适用于所有@PulsarListener实例。
你也可以通过设置consumerCustomizer属性@PulsarListener注解。
如果你需要对实际容器工厂配置有更多控制权,可以考虑注册一个或多个PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>>豆。
阅读信息
Pulsar 读取器接口使应用程序能够手动管理光标。 当你使用阅读器连接某个主题时,需要指定读者从哪条消息开始阅读,连接到某个主题。
当Apache Pulsar基础设施存在时,任何豆子都可以被注释为@PulsarReader使用阅读器接收消息。
接下来的组件创建了一个读取端点,从消息的起点开始读取某个话题主题:
-
Java
-
Kotlin
import org.springframework.pulsar.annotation.PulsarReader;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@PulsarReader(topics = "someTopic", startMessageId = "earliest")
public void processMessage(String content) {
// ...
}
}
import org.springframework.pulsar.annotation.PulsarReader
import org.springframework.stereotype.Component
@Component
class MyBean {
@PulsarReader(topics = ["someTopic"], startMessageId = "earliest")
fun processMessage(content: String?) {
// ...
}
}
这@PulsarReader依赖于脉冲星读者工厂以创建底层的脉冲星读取器。
Spring Boot 自动配置提供了该读卡器工厂,可以通过设置任意Spring脉冲星读者。*预置应用属性。
如果你需要对读卡器工厂配置有更多控制,可以考虑注册一个或多个ReaderBuilderCustomizer豆。
这些自定义器应用到工厂创建的所有读卡器,因此也适用于所有@PulsarReader实例。
你也可以通过设置阅读自定义器属性@PulsarReader注解。
如果你需要对实际容器工厂配置有更多控制权,可以考虑注册一个或多个PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>>豆。
| 如需了解上述组件的更多信息及发现其他可用功能,请参阅 Spring for Apache Pulsar 参考文档。 |
交易支持
Spring for Apache Pulsar 支持在使用脉冲星模板和@PulsarListener.
设置spring.pulsar.transaction.enabled属性到true将:
-
启用事务支持
脉冲星模板 -
启用事务支持
@PulsarListener方法
这事务属性@PulsarListener可用于微调何时应与监听者使用交易。
如果你想更好地控制 Spring for Apache Pulsar 的交易功能,你应该自己定义脉冲星模板和/或ConcurrentPulsarListenerContainerFactory豆。
你也可以定义一个PulsarAwareTransactionManager如果默认是自动配置的,则是 BEANPulsarTransactionManager不合适。