脉冲星功能

Apache Pulsar 的 Spring 为 Pulsar IO(连接器)和 Pulsar 函数提供了基本支持,允许用户定义由以下组成的流处理管道来源,处理器. 这来源Pulsar IO(连接器)和处理器脉冲星函数表示。spring-doc.cadn.net.cn

因为连接器只是特殊功能,为了简化起见,我们将源、汇和功能统称为“脉冲星函数”。
前提条件

熟悉度——观众对脉冲星IO脉冲星功能有一定了解。 如果不是这样,看看他们的入门指南可能会有帮助。spring-doc.cadn.net.cn

启用功能——要使用这些功能,必须启用并配置Apache Pulsar支持的功能(默认为禁用)。 内置连接器也可能需要安装在脉冲星集群上。spring-doc.cadn.net.cn

更多详情请参见Pulsar IOPulsar Functions文档。spring-doc.cadn.net.cn

1. 脉冲星功能管理

该框架提供了脉冲功能管理管理脉冲星功能的组件。 当你使用脉冲星Spring Boot器时,你会获得脉冲功能管理自动配置。spring-doc.cadn.net.cn

默认情况下,应用程序尝试连接到本地的 Pulsar 实例本地主持人:8080. 然而,因为它利用了已经配置好的脉冲星管理,请参阅Pulsar管理客户端以了解可用的客户端选项(包括认证)。 还有其他配置选项可选spring.pulsar.function.*应用属性。spring-doc.cadn.net.cn

2. 自动功能管理

在应用程序启动时,框架会找到所有脉冲星功能,脉冲星沉没脉冲星源在应用语境中。 每个豆子都会创建或更新相应的脉冲星功能。 正确的 API 是基于函数类型、函数配置以及该函数是否已经存在来调用的。spring-doc.cadn.net.cn

脉冲星功能,脉冲星沉没脉冲星源豆子是Apache Pulsar配置对象的简单包装工具FunctionConfig,SinkConfigSourceConfig分别。 由于支持的连接器数量众多(且配置多样),该框架并未尝试创建一个配置属性层级来镜像不同的Apache Pulsar连接器。 相反,用户需要提供完整的配置对象,然后框架根据提供的配置来管理(创建/更新)。

应用关闭时,所有在应用启动过程中处理的功能都会被强制执行停止策略,这些功能要么被保留,要么停止,要么从Pulsar服务器上删除。spring-doc.cadn.net.cn

3. 限制

3.1. 无魔法脉冲星功能

脉冲星功能和自定义连接器由自定义应用代码表示(例如 ajava.util.函数). 没有魔法支持自动注册自定义代码。 虽然这会很棒,但它存在一些技术挑战,且尚未实现。 因此,用户需确保功能(或自定义连接器)在功能配置指定的位置可用。 例如,如果函数配置有./some/path/MyFunction.jar那么函数 jar 文件必须存在于指定的路径上。spring-doc.cadn.net.cn

3.2. 名称标识符

名称函数配置中的属性作为标识符,用于判断函数是否已存在,从而决定是否执行更新或创建作。 因此,如果需要函数更新,名称不应被修改。spring-doc.cadn.net.cn

4. 配置

4.1. 脉冲星功能档案

每个脉冲星功能都由一个实际的归档(eg. jar文件)表示。 归档的路径通过档案源和汇的属性,以及函数的性质。spring-doc.cadn.net.cn

以下规则决定路径的“类型”:spring-doc.cadn.net.cn

创建/更新作过程中发生的动作取决于路径“类型”,具体如下:spring-doc.cadn.net.cn

4.2. 内置源与汇

Apache Pulsar 开箱即用地提供了许多源和汇连接器,也就是内置连接器。使用内置连接器只需设置档案builtin://<连接器类型>(例如:builtin://rabbit).spring-doc.cadn.net.cn

5. 自定义函数

关于如何开发和打包自定义函数的详细信息可以在Pulsar文档中找到。 但从高层次来看,要求如下:spring-doc.cadn.net.cn

函数构建和打包后,有多种方式可以使其可用于函数注册。spring-doc.cadn.net.cn

5.1. file://

jar 文件可以上传到服务器,然后通过以下方式引用文件://函数配置的性质spring-doc.cadn.net.cn

5.2. 本地

jar 文件可以保持本地状态,然后通过本地路径在函数配置的属性。spring-doc.cadn.net.cn

5.3. http://

jar 文件可以通过 HTTP 服务器提供,然后通过以下方式引用http(s)@函数配置的性质spring-doc.cadn.net.cn

5.4. function://

jar 文件可以上传到 Pulsar 包管理器,然后通过以下方式引用功能://函数配置的性质spring-doc.cadn.net.cn

6. 示例

以下是一些示例,展示了如何配置脉冲星源豆子,结果为脉冲功能管理自动创建支持的Pulsar源连接器。spring-doc.cadn.net.cn

使用内置兔子连接器的PulsarSource
@Bean
PulsarSource rabbitSource() {
    Map<String, Object> configs = new HashMap<>();
    configs.put("host", "my.rabbit.host");
    configs.put("port", 5672);
    configs.put("virtualHost", "/");
    configs.put("username", "guest");
    configs.put("password", "guest");
    configs.put("queueName", "test_rabbit");
    configs.put("connectionName", "test-connection");
    SourceConfig sourceConfig = SourceConfig.builder()
            .tenant("public")
            .namespace("default")
            .name("rabbit-test-source")
            .archive("builtin://rabbitmq")
            .topicName("incoming_rabbit")
            .configs(configs).build();
    return new PulsarSource(sourceConfig, null);
}

下一个例子和之前的相同,只是它使用了自动配置的 Spring Boot兔子属性以减轻配置负担。这当然需要应用程序使用 Spring Boot 并启用 Rabbit 自动配置。spring-doc.cadn.net.cn

使用内置兔子连接器和Spring BootRabbitProperties的PulsarSource
@Bean
PulsarSource rabbitSourceWithBootProps(RabbitProperties props) {
    Map<String, Object> configs = new HashMap<>();
    configs.put("host", props.determineHost());
    configs.put("port", props.determinePort());
    configs.put("virtualHost", props.determineVirtualHost());
    configs.put("username", props.determineUsername());
    configs.put("password", props.determinePassword());
    configs.put("queueName", "test_rabbit");
    configs.put("connectionName", "test-connection");
    SourceConfig sourceConfig = SourceConfig.builder()
            .tenant("public")
            .namespace("default")
            .name("rabbit-test-source")
            .archive("builtin://rabbitmq")
            .topicName("incoming_rabbit")
            .configs(configs).build();
    return new PulsarSource(sourceConfig, null);
}
更详细的示例请参见带脉冲星函数的示例流水线示例应用