脉冲星功能
目录
1. 脉冲星功能管理
该框架提供了脉冲功能管理管理脉冲星功能的组件。
当你使用脉冲星Spring Boot器时,你会获得脉冲功能管理自动配置。
默认情况下,应用程序尝试连接到本地的 Pulsar 实例本地主持人:8080.
然而,因为它利用了已经配置好的脉冲星管理,请参阅Pulsar管理客户端以了解可用的客户端选项(包括认证)。
还有其他配置选项可选spring.pulsar.function.*应用属性。
2. 自动功能管理
在应用程序启动时,框架会找到所有脉冲星功能,脉冲星沉没和脉冲星源在应用语境中。
每个豆子都会创建或更新相应的脉冲星功能。
正确的 API 是基于函数类型、函数配置以及该函数是否已经存在来调用的。
这脉冲星功能,脉冲星沉没和脉冲星源豆子是Apache Pulsar配置对象的简单包装工具FunctionConfig,SinkConfig和SourceConfig分别。
由于支持的连接器数量众多(且配置多样),该框架并未尝试创建一个配置属性层级来镜像不同的Apache Pulsar连接器。
相反,用户需要提供完整的配置对象,然后框架根据提供的配置来管理(创建/更新)。 |
应用关闭时,所有在应用启动过程中处理的功能都会被强制执行停止策略,这些功能要么被保留,要么停止,要么从Pulsar服务器上删除。
4. 配置
5. 自定义函数
关于如何开发和打包自定义函数的详细信息可以在 Pulsar 文档中找到。不过,从高层次看,要求如下:
-
代码使用 Java8
-
代码实现了以下
java.util.函数或org.apache.pulsar.functions.api.function -
包装成超级罐
函数构建和打包后,有多种方式可以使其可用于函数注册。
6. 示例
以下是一些示例,展示了如何配置脉冲星源豆子,结果为脉冲功能管理自动创建支持的Pulsar源连接器。
使用内置兔子连接器的PulsarSource
@Bean
PulsarSource rabbitSource() {
Map<String, Object> configs = new HashMap<>();
configs.put("host", "my.rabbit.host");
configs.put("port", 5672);
configs.put("virtualHost", "/");
configs.put("username", "guest");
configs.put("password", "guest");
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
下一个例子和之前的相同,只是它使用了自动配置的 Spring Boot兔子属性以减轻配置负担。这当然需要应用程序使用 Spring Boot 并启用 Rabbit自动配置。
使用内置兔子连接器和Spring BootRabbitProperties的PulsarSource
@Bean
PulsarSource rabbitSourceWithBootProps(RabbitProperties props) {
Map<String, Object> configs = new HashMap<>();
configs.put("host", props.determineHost());
configs.put("port", props.determinePort());
configs.put("virtualHost", props.determineVirtualHost());
configs.put("username", props.determineUsername());
configs.put("password", props.determinePassword());
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
| 更详细的示例请参见带脉冲星函数的示例流水线示例应用 |