脉冲星函数
1. 脉冲星功能管理
该框架提供了PulsarFunctionAdministration
组件来管理 Pulsar 功能。
当您使用 Pulsar Spring Boot Starters时,您将获得PulsarFunctionAdministration
自动配置。
默认情况下,应用程序会尝试连接到本地 Pulsar 实例localhost:8080
.
但是,因为它利用了已经配置的PulsarAdministration
,请参阅 Pulsar 管理客户端 了解可用的客户端选项(包括身份验证)。
其他配置选项可用于spring.pulsar.function.*
应用程序属性。
2. 自动功能管理
在应用程序启动时,框架会找到所有PulsarFunction
,PulsarSink
和PulsarSource
bean 的 bean。
对于每个 bean,都会创建或更新相应的 Pulsar 函数。
根据函数类型、函数配置以及函数是否已经存在来调用正确的 API。
这PulsarFunction ,PulsarSink 和PulsarSource bean 是 Apache Pulsar 配置对象的简单包装器FunctionConfig ,SinkConfig 和SourceConfig 分别。
由于受支持的连接器数量众多(及其各种配置),该框架不会尝试创建配置属性层次结构来镜像各种 Apache Pulsar 连接器。
相反,提供完整配置对象的负担在于用户,然后框架使用提供的配置处理管理(创建/更新)。 |
在应用程序关闭时,在应用程序启动期间处理的所有函数都会强制执行其停止策略,并且要么单独保留,要么停止,要么从 Pulsar 服务器中删除。
4. 配置
5. 自定义功能
有关如何开发和打包自定义函数的详细信息,请参阅 Pulsar 文档。 但是,在高层次上,要求如下:
-
代码使用 Java8
-
代码实现
java.util.Function
或org.apache.pulsar.functions.api.Function
-
包装为 uber jar
构建和打包函数后,有几种方法可以使其可用于函数注册。
6. 示例
下面是一些示例,展示了如何配置PulsarSource
bean 的 bean,这会导致PulsarFunctionAdministration
自动创建后备 Pulsar 源连接器。
@Bean
PulsarSource rabbitSource() {
Map<String, Object> configs = new HashMap<>();
configs.put("host", "my.rabbit.host");
configs.put("port", 5672);
configs.put("virtualHost", "/");
configs.put("username", "guest");
configs.put("password", "guest");
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
下一个示例与前面的示例相同,只是它使用自动配置的 Spring BootRabbitProperties
以减轻配置负担。这当然需要应用程序使用启用了 Rabbit 自动配置的 Spring Boot。
@Bean
PulsarSource rabbitSourceWithBootProps(RabbitProperties props) {
Map<String, Object> configs = new HashMap<>();
configs.put("host", props.determineHost());
configs.put("port", props.determinePort());
configs.put("virtualHost", props.determineVirtualHost());
configs.put("username", props.determineUsername());
configs.put("password", props.determinePassword());
configs.put("queueName", "test_rabbit");
configs.put("connectionName", "test-connection");
SourceConfig sourceConfig = SourceConfig.builder()
.tenant("public")
.namespace("default")
.name("rabbit-test-source")
.archive("builtin://rabbitmq")
.topicName("incoming_rabbit")
.configs(configs).build();
return new PulsarSource(sourceConfig, null);
}
有关更详细的示例,请参阅使用 Pulsar Functions 示例应用的示例流管线 |