脉冲星函数

Spring for Apache Pulsar 为 Pulsar IO(连接器)和 Pulsar 函数提供基本支持,允许用户定义由以下内容组成的流处理管道sources,processorssinks. 这sourcessinksPulsar IO(连接器)建模,并且processors脉冲星函数表示。spring-doc.cadn.net.cn

因为连接器只是特殊功能,为了简单起见,我们将源、接收器和函数统称为“脉冲星函数”。
前提条件

熟悉度 - 预计观众对 Pulsar IOPulsar 函数有点熟悉。 如果不是这种情况,查看他们的入门指南可能会有所帮助。spring-doc.cadn.net.cn

启用功能 - 要使用这些功能,必须启用并配置 Apache Pulsar 中的函数支持(默认情况下处于禁用状态)。 内置连接器可能还需要安装在 Pulsar 集群上。spring-doc.cadn.net.cn

有关更多详细信息,请参阅 Pulsar IOPulsar Functions 文档。spring-doc.cadn.net.cn

1. 脉冲星功能管理

该框架提供了PulsarFunctionAdministration组件来管理 Pulsar 功能。 当您使用 Pulsar Spring Boot Starters时,您将获得PulsarFunctionAdministration自动配置。spring-doc.cadn.net.cn

默认情况下,应用程序会尝试连接到本地 Pulsar 实例localhost:8080. 但是,因为它利用了已经配置的PulsarAdministration,请参阅 Pulsar 管理客户端 了解可用的客户端选项(包括身份验证)。 其他配置选项可用于spring.pulsar.function.*应用程序属性。spring-doc.cadn.net.cn

2. 自动功能管理

在应用程序启动时,框架会找到所有PulsarFunction,PulsarSinkPulsarSourcebean 的 bean。 对于每个 bean,都会创建或更新相应的 Pulsar 函数。 根据函数类型、函数配置以及函数是否已经存在来调用正确的 API。spring-doc.cadn.net.cn

PulsarFunction,PulsarSinkPulsarSourcebean 是 Apache Pulsar 配置对象的简单包装器FunctionConfig,SinkConfigSourceConfig分别。 由于受支持的连接器数量众多(及其各种配置),该框架不会尝试创建配置属性层次结构来镜像各种 Apache Pulsar 连接器。 相反,提供完整配置对象的负担在于用户,然后框架使用提供的配置处理管理(创建/更新)。

在应用程序关闭时,在应用程序启动期间处理的所有函数都会强制执行其停止策略,并且要么单独保留,要么停止,要么从 Pulsar 服务器中删除。spring-doc.cadn.net.cn

3. 限制

3.1. 没有魔法脉冲星函数

Pulsar 函数和自定义连接器由自定义应用程序代码表示(例如java.util.Function). 没有自动注册自定义代码的魔术支持。 虽然这会很了不起,但它有一些技术挑战,尚未实施。 因此,用户应确保函数(或自定义连接器)在函数配置中指定的位置可用。 例如,如果函数配置具有jar./some/path/MyFunction.jar则函数 jar 文件必须存在于指定的路径中。spring-doc.cadn.net.cn

3.2. 名称标识符

name函数配置中的属性用作标识符,以确定函数是否已存在,以便决定是否执行更新或创建作。 因此,如果需要函数更新,则不应修改名称。spring-doc.cadn.net.cn

4. 配置

4.1. Pulsar 函数存档

每个 Pulsar 函数都由一个实际的存档(eg. jar 文件)表示。 存档的路径通过archive属性,以及jar函数的属性。spring-doc.cadn.net.cn

以下规则确定路径的“类型”:spring-doc.cadn.net.cn

在创建/更新作期间发生的作取决于路径“类型”,如下所示:spring-doc.cadn.net.cn

4.2. 内置源和接收器

Apache Pulsar 提供了许多开箱即用的源连接器和接收器连接器,也就是内置连接器。要使用内置连接器,只需将archivebuiltin://<connector-type>(例如builtin://rabbit).spring-doc.cadn.net.cn

5. 自定义功能

有关如何开发和打包自定义函数的详细信息,请参阅 Pulsar 文档。 但是,在高层次上,要求如下:spring-doc.cadn.net.cn

构建和打包函数后,有几种方法可以使其可用于函数注册。spring-doc.cadn.net.cn

5.1. file://

jar 文件可以上传到服务器,然后通过file://jar函数配置的属性spring-doc.cadn.net.cn

5.2. 本地

jar 文件可以保持本地状态,然后通过jar函数配置的属性。spring-doc.cadn.net.cn

5.3. http://

jar 文件可以通过 HTTP 服务器提供,然后通过http(s)://jar函数配置的属性spring-doc.cadn.net.cn

5.4. function://

jar 文件可以上传到 Pulsar 包管理器,然后通过function://jar函数配置的属性spring-doc.cadn.net.cn

6. 示例

下面是一些示例,展示了如何配置PulsarSourcebean 的 bean,这会导致PulsarFunctionAdministration自动创建后备 Pulsar 源连接器。spring-doc.cadn.net.cn

使用内置 Rabbit 连接器的 PulsarSource
@Bean
PulsarSource rabbitSource() {
    Map<String, Object> configs = new HashMap<>();
    configs.put("host", "my.rabbit.host");
    configs.put("port", 5672);
    configs.put("virtualHost", "/");
    configs.put("username", "guest");
    configs.put("password", "guest");
    configs.put("queueName", "test_rabbit");
    configs.put("connectionName", "test-connection");
    SourceConfig sourceConfig = SourceConfig.builder()
            .tenant("public")
            .namespace("default")
            .name("rabbit-test-source")
            .archive("builtin://rabbitmq")
            .topicName("incoming_rabbit")
            .configs(configs).build();
    return new PulsarSource(sourceConfig, null);
}

下一个示例与前面的示例相同,只是它使用自动配置的 Spring BootRabbitProperties以减轻配置负担。这当然需要应用程序使用启用了 Rabbit 自动配置的 Spring Boot。spring-doc.cadn.net.cn

使用内置 Rabbit 连接器和 Spring Boot RabbitProperties 的 PulsarSource
@Bean
PulsarSource rabbitSourceWithBootProps(RabbitProperties props) {
    Map<String, Object> configs = new HashMap<>();
    configs.put("host", props.determineHost());
    configs.put("port", props.determinePort());
    configs.put("virtualHost", props.determineVirtualHost());
    configs.put("username", props.determineUsername());
    configs.put("password", props.determinePassword());
    configs.put("queueName", "test_rabbit");
    configs.put("connectionName", "test-connection");
    SourceConfig sourceConfig = SourceConfig.builder()
            .tenant("public")
            .namespace("default")
            .name("rabbit-test-source")
            .archive("builtin://rabbitmq")
            .topicName("incoming_rabbit")
            .configs(configs).build();
    return new PulsarSource(sourceConfig, null);
}
有关更详细的示例,请参阅使用 Pulsar Functions 示例应用的示例流管线