Spring Cloud Stream Binder for Apache Pulsar

Spring for Apache Pulsar 提供了一个 Spring Cloud Stream 的绑定器,我们可以用它来构建基于事件驱动的微服务,利用发布-订阅范式。 在本节中,我们将介绍该活页夹的基本细节。spring-doc.cadn.net.cn

用法

我们需要包含以下依赖条件,以使用Apache Pulsar绑定器为Spring Cloud Stream使用。spring-doc.cadn.net.cn

<dependencies>
    <dependency>
        <groupId>org.springframework.pulsar</groupId>
        <artifactId>spring-pulsar-spring-cloud-stream-binder</artifactId>
    </dependency>
</dependencies>
dependencies {
    implementation 'org.springframework.pulsar:spring-pulsar-spring-cloud-stream-binder'
}

概述

Apache Pulsar 的 Spring Cloud Stream 绑定器允许应用程序专注于业务逻辑,而非处理管理和维护 Pulsar 的底层细节。 活页夹会为应用开发者处理所有这些细节。 Spring Cloud Stream 基于 Spring Cloud 函数,采用强大的编程模型,使应用开发者能够使用函数式风格编写复杂的事件驱动应用。 应用程序可以从中间件中立的方式开始,然后通过 Spring Boot 配置属性将 Pulsar 主题映射为 Spring Cloud Stream 中的目的地。 Spring Cloud Stream 是建立在 Spring Boot 之上的,当你用 Spring Cloud Stream 编写事件驱动的微服务时,实际上你是在编写一个 Boot 应用。 这里有一个简单的Spring Cloud Stream应用。spring-doc.cadn.net.cn

@SpringBootApplication
public class SpringPulsarBinderSampleApp {

	private final Logger logger = LoggerFactory.getLogger(this.getClass());

	public static void main(String[] args) {
		SpringApplication.run(SpringPulsarBinderSampleApp.class, args);
	}

	@Bean
	public Supplier<Time> timeSupplier() {
		return () -> new Time(String.valueOf(System.currentTimeMillis()));
	}

	@Bean
	public Function<Time, EnhancedTime> timeProcessor() {
		return (time) -> {
			EnhancedTime enhancedTime = new EnhancedTime(time, "5150");
			this.logger.info("PROCESSOR: {} --> {}", time, enhancedTime);
			return enhancedTime;
		};
	}

	@Bean
	public Consumer<EnhancedTime> timeLogger() {
		return (time) -> this.logger.info("SINK:      {}", time);
	}

	record Time(String time) {
	}

	record EnhancedTime(Time time, String extra) {
	}

}

上述示例应用是一个完整的 Spring Boot 应用,值得做一些解释。不过,初步看,你会发现这其实就是纯Java加上一些Spring和Spring Boot注释。 我们有三个这里的方法 - ajava.util.function.Supplier一个java.util.function.函数,最后,一个java.util.function.Consumer. 提供商以毫秒计时生成当前时间,函数利用这段时间,通过添加随机数据来增强时间,消费者再记录这些增强时间。spring-doc.cadn.net.cn

为了简洁,我们省略了所有导入,但整个应用中没有针对 Spring Cloud Stream 的导入。 它是如何成为一个与Apache Pulsar交互的Spring Cloud Stream应用? 你必须在申请中包含上述对活页夹的依赖性。 添加该依赖后,您必须提供以下配置属性。spring-doc.cadn.net.cn

spring:
  cloud:
    function:
      definition: timeSupplier;timeProcessor;timeLogger;
    stream:
      bindings:
        timeProcessor-in-0:
          destination: timeSupplier-out-0
        timeProcessor-out-0:
          destination: timeProcessor-out-0
        timeLogger-in-0:
          destination: timeProcessor-out-0

通过此,上述 Spring Boot 应用已成为基于 Spring Cloud Stream 的端到端事件驱动应用。 由于类路径上有 Pulsar 绑定器,应用程序与 Apache Pulsar 交互。 如果应用中只有一个函数,那么我们不需要告诉 Spring Cloud Stream 激活该函数执行,因为它默认会执行。 如果应用程序中有多个此类函数,如我们的示例,我们需要指示 Spring Cloud Stream 想要激活哪些函数。 在我们的情况下,我们需要所有这些设备都被激活,我们通过spring.cloud.function.definition财产。 豆名默认成为春云溪绑定名称的一部分。 绑定是 Spring Cloud Stream 中一个本质上抽象的概念,框架通过它与中间件目的地通信。 春云流几乎所有的动作都发生在混凝土绑定之上。 提供商只有输出绑定;函数有输入和输出绑定,而消费者只有输入绑定。 我们以提供商豆子为例——时间提供商。该提供商的默认绑定名称为timeSupplier-out-0. 同样,默认的绑定名称时间处理器函数为timeProcessor-in-0在进站和time处理器出0在出程路上。 请参阅Spring Cloud Stream参考文档,了解如何更改默认绑定名称。 在大多数情况下,使用默认的绑定名称就足够了。 我们在绑定名称上设置了目的地,如上所示。 如果未提供目的地,绑定名称即为目的地的值,如timeSupplier-out-0.spring-doc.cadn.net.cn

运行上述应用时,你应该看到提供商每秒执行一次,函数会消耗这些数据,从而增加记录器用户所耗时的时间。spring-doc.cadn.net.cn

基于Binder应用中的消息转换

在上述示例应用中,我们未提供消息转换的模式信息。 这是因为默认情况下,Spring Cloud Stream 使用其消息转换机制,通过 Spring Messaging 项目在 Spring Framework 中建立的消息支持。 除非特别说明,春云溪流使用application/json作为内容类型用于进站和出站绑定的消息转换。 在出站时,数据被序列化为字节[],而脉冲星结合器则使用Schema.BYTES通过电报发送到脉冲星主题。 同样,在入站时,数据被消耗为字节[]从脉冲星主题中读取,然后使用合适的消息转换器转换为目标类型。spring-doc.cadn.net.cn

在 Pulsar 中使用 Pulsar 模式进行原生转换

虽然默认使用框架提供的消息转换,但 Spring Cloud Stream 允许每个绑定器决定消息的转换方式。 假设应用程序选择了这条路径。在这种情况下,Spring Cloud Stream 避免使用任何 Spring 提供的消息转换功能,而是将接收或产生的数据传递出去。 Spring Cloud Stream 中的这一特性在生产者端称为原生编码,在消费者端称为原生解码。这意味着编码和解码在目标中间件上原生完成,我们是在Apache Pulsar上。 对于上述应用,我们可以采用以下配置绕过框架转换,采用原生编码和解码。spring-doc.cadn.net.cn

spring:
  cloud:
    stream:
      bindings:
        timeSupplier-out-0:
          producer:
            use-native-encoding: true
        timeProcessor-in-0:
          destination: timeSupplier-out-0
          consumer:
            use-native-decoding: true
        timeProcessor-out-0:
          destination: timeProcessor-out-0
          producer:
            use-native-encoding: true
        timeLogger-in-0:
          destination: timeProcessor-out-0
          consumer:
            use-native-decoding: true
      pulsar:
        bindings:
          timeSupplier-out-0:
            producer:
              schema-type: JSON
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
          timeProcessor-in-0:
            consumer:
              schema-type: JSON
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.Time
          timeProcessor-out-0:
            producer:
              schema-type: AVRO
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime
          timeLogger-in-0:
            consumer:
              schema-type: AVRO
              message-type: org.springframework.pulsar.sample.binder.SpringPulsarBinderSampleApp.EnhancedTime

在生产者端启用原生编码的属性是核心 Spring Cloud Stream 中的绑定层属性。 你把它放在制作人装订上——spring.cloud.stream.bindings.<binding-name>.producer.use-native-encoding并将此设为真。同样,使用——spring.cloud.stream.bindings.<binding-name>.consumer.user-native-decoding对于消费者绑定,并将其设置为真。如果我们决定使用原生编码和解码,比如 Pulsar,我们需要设置对应的模式和底层消息类型信息。 这些信息作为扩展结合性质提供。 如你在上图配置中所见,其性质如下——spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.schema-type用于模式信息和spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.message-type针对实际目标类型。 如果你消息里同时有键和值,可以用消息密钥类型消息值类型以指定目标类型。spring-doc.cadn.net.cn

任何配置的自定义模式映射都会在模式类型省略属性。

消息头转换

每个消息通常包含头部信息,在通过Spring Cloud Stream输入和输出绑定在Pulsar和Spring消息之间穿梭时需要携带。 为了支持这种遍历,框架负责必要的消息头转换。spring-doc.cadn.net.cn

自定义头部映射器

Pulsar 绑定器配置了一个默认的头部映射器,可以通过提供自己的标头来覆盖脉冲头图器豆。spring-doc.cadn.net.cn

在以下示例中,JSON头映射器的配置是:spring-doc.cadn.net.cn

@Bean
public PulsarHeaderMapper customPulsarHeaderMapper() {
    return JsonPulsarHeaderMapper.builder()
            .inboundPatterns("!top", "!secret", "*")
            .outboundPatterns("!id", "!timestamp", "!userId", "*")
            .trustedPackages("com.acme")
            .toStringClasses("com.acme.Money")
            .build();
}

结合器中脉冲星特性的应用

该绑定器使用Spring for Apache Pulsar框架中的基本组件来构建其生产者和消费者绑定。 由于基于Binder的应用程序是Spring Boot应用程序,Binder默认使用Spring Boot自动配置。 因此,核心框架层面所有 Pulsar Spring Boot 属性也可以通过绑定器访问。 例如,你可以用带有前缀的属性Spring。脉冲星。制作人......,Spring.脉冲星.消费者......等。 此外,你也可以在绑定器层面设置这些脉冲星属性。 例如,这个方法也可行——spring.cloud.stream.pulsar.binder.producer...spring.cloud.stream.pulsar.binder.consumer....spring-doc.cadn.net.cn

上述两种方法都可以,但如果使用这些特性,则会应用于整个应用。 如果应用里有多个函数,它们都会获得相同的属性。 你也可以在扩展绑定属性层面设置这些 Pulsar 属性来解决这个问题。 扩展的结合性质是在绑定本身应用的。 例如,如果你有输入和输出绑定,且它们都需要不同的脉冲星属性集合,你必须将它们设置为扩展绑定。 生产者装订的模式为spring.cloud.stream.pulsar.bindings.<output-binding-name>.producer.... 同样,消费者装订的图案为spring.cloud.stream.pulsar.bindings.<input-binding-name>.consumer.... 这样,你可以在同一应用中为不同的绑定应用单独一套脉冲星属性。spring-doc.cadn.net.cn

最高优先级用于扩展结合性质。 结合器中应用性质的先后顺序为扩展绑定属性→结合器属性→Spring Boot属性。(从最高到最低)。spring-doc.cadn.net.cn

以下是一些可依赖的资源,帮助你了解通过Pulsar结合器提供的特性。spring-doc.cadn.net.cn

冲星产生器绑定配置。 这些属性需要spring.cloud.stream.bindings.<binding-name>.producer前缀。 所有Spring Boot提供的Pulsar产生器属性也可通过该配置类别获得。spring-doc.cadn.net.cn

Pulsar消费者绑定配置。 这些属性需要spring.cloud.stream.bindings.<binding-name>.consumer前缀。 Spring Boot 提供的所有 Pulsar 消费者属性也可以通过该配置类别获得。spring-doc.cadn.net.cn

关于常见的Pulsar结合剂特有配置特性,请参见此文。这些属性需要一个前缀Spring.云.stream.pulsar.binder. 上述指定的生产者和消费者属性(包括Spring Boot的)可以在活页夹上使用,spring.cloud.stream.pulsar.binder.producerspring.cloud.stream.pulsar.binder.consumer前缀。spring-doc.cadn.net.cn

脉冲星主题配置器

Apache Pulsar 的 Spring Cloud Stream 绑定器自带了 Pulsar 主题的开箱即用配置器。 运行应用程序时,如果缺少必要的主题,Pulsar 会帮你创建相关主题。 不过,这是一个基础的非分区主题,如果你想要创建分区主题等高级功能,可以依赖活板夹中的主题提供器。 脉冲星主题分配器的用途脉冲星管理来自该框架,该框架使用脉冲管理员建造者。因此,你需要设置Spring.Pulsar.Administration.Service-URL除非你在默认服务器和端口上运行 Pulsar,否则属性。spring-doc.cadn.net.cn

创建主题时指定分区计数

创建主题时,你可以用两种方式设置分区数量。 首先,你可以用 属性在绑定器层面设置Spring.cloud.stream.pulsar.binder.partition-count. 如上所述,这样做会让应用程序创建的所有主题继承该属性。 假设你想在绑定层面实现细致的控制来设置分区。 在这种情况下,你可以设置分区计数使用以下格式的每个绑定属性spring.cloud.stream.pulsar.bindings.<binding-name>.producer|consumer.partition-count. 这样,同一应用程序中不同函数创建的各种主题会根据应用需求拥有不同的分区。spring-doc.cadn.net.cn