Spring Cloud Stream Schema Registry

介绍

当组织采用基于消息的发布/订阅架构,且多个生产者和消费者微服务相互通信时,通常需要所有这些微服务就基于模式达成协议。 当这种模式需要演进以适应新的业务需求时,现有组件仍然需要继续工作。 Spring Cloud Stream 支持独立的模式注册服务器,应用程序可以在此注册并使用上述模式。 Spring Cloud Stream 模式注册表支持还支持 avro 模式注册表客户端,这些客户端本质上是消息转换器,与模式注册表通信,用于在消息转换时对模式进行协调。 Spring Cloud Stream 提供的模式演化支持既适用于上述独立模式注册表,也支持专门支持 Apache Kafka 的 Confluent 模式注册表。spring-doc.cadn.net.cn

Spring Cloud Stream Schema Registry 概述

Spring Cloud Stream Schema Registry 支持模式演化,使数据能够随时间演进,同时仍能与较老的生产者和消费者合作,反之亦然。 大多数序列化模型,尤其是那些旨在跨平台和语言移植的模型,依赖于描述数据如何在二进制负载中序列化的模式。 为了序列化数据并解释数据,发送端和接收端都必须访问描述二进制格式的模式。 在某些情况下,模式可以从序列化时的有效载荷类型或反序列化时的目标类型推断。 然而,许多应用程序受益于能够访问描述二进制数据格式的显式模式。 模式注册表允许你以文本格式(通常是JSON)存储模式信息,并使这些信息对需要接收和发送二进制数据的各种应用程序开放。 模式可作为由以下组成的元组引用:spring-doc.cadn.net.cn

Spring Cloud 流模式注册表提供以下组件spring-doc.cadn.net.cn

  • 独立模式注册服务器spring-doc.cadn.net.cn

    By default, it is using an H2 database, but server can be used with PostgreSQL or MySQL by providing appropriate datasource configuration.
  • 模式注册表客户端能够通过与模式注册表通信实现消息编组。spring-doc.cadn.net.cn

    Currently, the client can communicate to the standalone schema registry or the Confluent Schema Registry.

模式注册表客户端

客户端与模式注册表服务器交互的抽象是SchemaRegistryClient接口,其结构如下:spring-doc.cadn.net.cn

public interface SchemaRegistryClient {

    SchemaRegistrationResponse register(String subject, String format, String schema);

    String fetch(SchemaReference schemaReference);

    String fetch(Integer id);

}

Spring Cloud Stream 提供了开箱即用的实现,用于与自身的模式服务器交互,以及与 Confluent 模式注册表交互。spring-doc.cadn.net.cn

Spring Cloud Stream 模式注册表的客户端可以通过以下方式配置@EnableSchemaRegistryClient如下:spring-doc.cadn.net.cn

@SpringBootApplication
@EnableSchemaRegistryClient
public class ConsumerApplication {

}
默认转换器优化为不仅缓存远程服务器的模式,还缓存解析(parse)toString()而这些方法相当昂贵。 因此,它使用了一个DefaultSchemaRegistryClient那个不会缓存响应。 如果你打算更改默认行为,可以直接用客户端覆盖代码,并覆盖到想要的结果。 为此,你需要添加spring.cloud.stream.schemaRegistryClient.cached=true对你的应用属性来说。

模式注册表客户端属性

模式注册客户端支持以下属性:spring-doc.cadn.net.cn

spring.cloud.stream.schemaRegistryClient.endpoint

模式服务器的位置。 设置时,请使用包含协议的完整URL(httphttps) 、端口和上下文路径。spring-doc.cadn.net.cn

默认值

localhost:8990/spring-doc.cadn.net.cn

spring.cloud.stream.schemaRegistryClient.cached

客户端是否应该缓存模式服务器响应。 通常设置为false,因为缓存发生在消息转换器中。 使用模式注册表客户端的客户端应将此设置为true.spring-doc.cadn.net.cn

默认值

falsespring-doc.cadn.net.cn

Avro Schema 注册表客户端消息转换器

对于在应用上下文中注册了 SchemaRegistryClient 豆的应用程序,Spring Cloud Stream 会自动配置 Apache Avro 消息转换器用于模式管理。 这简化了模式演进,因为接收消息的应用程序可以轻松访问写入模式,并与其自身的读卡模式进行调和。spring-doc.cadn.net.cn

对于外发消息,如果绑定的内容类型设置为application/*+avro消息转换器激活了,如下示例所示:spring-doc.cadn.net.cn

spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro

在出站转换过程中,消息转换器尝试根据每个出站消息的类型推断出其模式,并根据有效载荷类型将其注册到主体,通过使用SchemaRegistryClient. 如果已经找到相同的模式,则检索该模式的引用。 如果不符合,则会注册该模式,并提供新的版本号。 该消息通过内容类型通过以下方案实现报头:应用/[前缀]。[主语].v[版本]+avro哪里前缀是可配置的,且主题由有效载荷类型推导。spring-doc.cadn.net.cn

例如,一条类型的消息用户可能以内容类型application/vnd.user.v2+avro哪里用户是主语,且2是版本号。spring-doc.cadn.net.cn

接收消息时,转换器会从收到消息的头部推断模式引用并尝试检索。该模式作为反序列化过程中的写入模式使用。spring-doc.cadn.net.cn

Avro 模式注册表消息转换器属性

如果你启用了基于Avro的架构注册客户端,可以通过设置spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro你可以通过设置以下属性来自定义注册的行为。spring-doc.cadn.net.cn

spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled

如果你想让转换器用反射从POJO推断出Schema,可以启用。spring-doc.cadn.net.cn

违约:falsespring-doc.cadn.net.cn

spring.cloud.stream.schema.avro.readerSchema

Avro 通过比较写入模式(原始负载)和读取模式(你的应用负载)来比较模式版本。更多信息请参见Avro文档。 如果设置为,它会覆盖模式服务器上的任何查找,并使用本地模式作为读取模式。 违约:spring-doc.cadn.net.cn

spring.cloud.stream.schema.avro.schemaLocations

寄存器 任意.avsc与Schema Server一起列出的该属性文件。spring-doc.cadn.net.cn

违约:emptyspring-doc.cadn.net.cn

spring.cloud.stream.schema.avro.prefix

该前缀将用于 Content-Type 头部。spring-doc.cadn.net.cn

spring.cloud.stream.schema.avro.subjectNamingStrategy

确定用于在模式注册表中注册Avro模式的主题名称。有两种实现方式,org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy,其中主语是模式名称,且org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy该系统返回一个完全合格的主体,使用Avro模式的命名空间和名称。通过实施可以创建定制策略org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy.spring-doc.cadn.net.cn

违约:org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategyspring-doc.cadn.net.cn

spring.cloud.stream.schema.avro.ignoreSchemaRegistryServer

忽略任何模式注册表的通信。它对测试很有用,这样在运行单元测试时,不会不必要地尝试连接到模式注册表服务器。spring-doc.cadn.net.cn

违约:falsespring-doc.cadn.net.cn

Apache Avro 消息转换器

Spring Cloud Stream 通过其Spring-cloud-stream-schema-registry-client模块。 目前,唯一开箱即用支持的基于模式的消息转换器序列化格式是Apache Avro,未来版本还会添加更多格式。spring-doc.cadn.net.cn

Spring-cloud-stream-schema-registry-client模块包含两种可用于Apache Avro序列化的消息转换器:spring-doc.cadn.net.cn

  • 转换器利用序列化或反序列化对象的类信息,或启动时已知位置的模式。spring-doc.cadn.net.cn

  • 使用模式注册表的转换器。它们在运行时定位模式,并随着域对象的演变动态注册新的模式。spring-doc.cadn.net.cn

支持模式的转换器

AvroSchemaMessageConverter支持通过使用预定义的模式或类中可用的模式信息(反射式或包含在具体记录). 如果你提供了自定义转换器,那么默认的 AvroSchemaMessageConverter 豆子就不会被创建。 以下示例展示了一个定制转换器:spring-doc.cadn.net.cn

使用自定义转换器时,你可以将其添加到应用上下文中,并可选地指定一个或多个哑剧类型与之联系起来。 默认模仿类型应用/AVRO.spring-doc.cadn.net.cn

如果转换的目标类型是通用记录必须设置一个模式。spring-doc.cadn.net.cn

以下示例展示了如何在汇应用中配置转换器,通过注册Apache Avro来实现消息转换器没有预定义的模式。 在这个例子中,注意哑剧类型的值为Avro/Bytes,不是默认应用/AVRO.spring-doc.cadn.net.cn

@SpringBootApplication
public static class SinkApplication {

  //...

  @Bean
  public MessageConverter userMessageConverter() {
      return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
  }
}

相反,以下应用注册一个带有预定义模式(在类路径上)的转换器:spring-doc.cadn.net.cn

@SpringBootApplication
public static class SinkApplication {

  //...

  @Bean
  public MessageConverter userMessageConverter() {
      AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
      converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
      return converter;
  }
}

模式注册表服务器

Spring Cloud Stream 提供了模式注册表服务器实现。 使用时,你可以下载最新版Spring-cloud-stream-schema-registry-server发布并作为独立应用程序运行:spring-doc.cadn.net.cn

wget https://repo1.maven.org/maven2/org/springframework/cloud/spring-cloud-stream-schema-registry-server/4.0.3/spring-cloud-stream-schema-registry-server-4.0.3.jar
java -jar ./spring-cloud-stream-schema-registry-server-4.0.3.jar

你可以将模式注册表嵌入到你现有的 Spring Boot 网页应用中。 为此,添加Spring-cloud-stream-schema-registry-core你的项目中出现了工件,并使用@EnableSchemaRegistryServer注释,它会将模式注册表服务器 REST 控制器添加到你的应用程序中。 以下示例展示了一个启用模式注册表的 Spring Boot 应用程序:spring-doc.cadn.net.cn

@SpringBootApplication
@EnableSchemaRegistryServer
public class SchemaRegistryServerApplication {
public static void main(String[] args) {
SpringApplication.run(SchemaRegistryServerApplication.class, args);
}
}

spring.cloud.stream.schema.server.path属性可以用来控制模式服务器的根路径(尤其是嵌入在其他应用中时)。 这spring.cloud.stream.schema.server.allowSchemaDeletion布尔属性使得模式可以被删除。默认情况下,这个功能是被禁用的。spring-doc.cadn.net.cn

模式注册服务器使用关系数据库来存储模式。 默认情况下,它使用嵌入式数据库。 你可以通过使用 Spring Boot SQL 数据库和 JDBC 配置选项来自定义模式存储。spring-doc.cadn.net.cn

模式注册表服务器 API

模式注册表服务器 API 包含以下作:spring-doc.cadn.net.cn

注册新模式

要注册新模式,发送发布请求到端点。/spring-doc.cadn.net.cn

该 接受包含以下字段的 JSON 负载:/spring-doc.cadn.net.cn

它的响应是一个 JSON 模式对象,包含以下字段:spring-doc.cadn.net.cn

按主题、格式和版本检索现有的模式

要按主题、格式和版本检索现有模式,发送获取{subject}/{format}/{version}端点。spring-doc.cadn.net.cn

它的响应是一个 JSON 模式对象,包含以下字段:spring-doc.cadn.net.cn

按主题和格式检索现有的模式

要按主语和格式检索现有模式,发送获取/主题/格式端点。spring-doc.cadn.net.cn

它的响应是包含每个模式对象的JSON模式列表,包含以下字段:spring-doc.cadn.net.cn

通过ID检索现有模式

要通过 ID 获取模式,发送获取/schemas/{id}端点。spring-doc.cadn.net.cn

它的响应是一个 JSON 模式对象,包含以下字段:spring-doc.cadn.net.cn

按主题、格式和版本删除模式

要删除根据主题、格式和版本识别的模式,发送删除{subject}/{format}/{version}端点。spring-doc.cadn.net.cn

通过ID删除模式

要按 ID 删除模式,发送删除/schemas/{id}端点。spring-doc.cadn.net.cn

按主语删除图式

删除 /{subject}spring-doc.cadn.net.cn

按主题删除现有的模式。spring-doc.cadn.net.cn

本说明仅适用于 Spring Cloud Stream 1.1.0.RELEASE 的用户。 Spring Cloud Stream 1.1.0.RELEASE 使用了表名,图式,用于存储图式对象。图式是许多数据库实现中的一个关键词。 为了避免未来发生冲突,从 1.1.1.RELEASE 开始,我们选择了该名称SCHEMA_REPOSITORY给储物桌用的。 任何升级的 Spring Cloud Stream 1.1.0.RELEASE 用户都应在升级前将现有模式迁移到新表。

使用 Confluent 的模式注册表

默认配置会生成DefaultSchemaRegistryClient豆。 如果你想使用 Confluent schema 注册表,你需要创建一个类型的 beanConfluentSchemaRegistryClient,取代了框架默认配置的版本。以下示例展示了如何制作这样的豆子:spring-doc.cadn.net.cn

@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
  ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
  client.setEndpoint(endpoint);
  return client;
}
ConfluentSchemaRegistryClient 在 Confluent 平台 4.0.0 版本的测试中。

模式注册与解析

为了更好地理解 Spring Cloud Stream 如何注册和解析新模式及其对 Avro 模式比较功能的应用,我们提供了两个独立的子板块:spring-doc.cadn.net.cn

模式注册过程(序列化)

注册过程的第一步是从通过信道发送的有效载荷中提取一个模式。 Avro类型,如具体记录通用记录已经包含一个模式,可以立即从实例中检索。 对于POJO来说,如果spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled属性设置为true(默认)。spring-doc.cadn.net.cn

一旦获得模式,转换器会从远程服务器加载其元数据(版本)。 首先,它查询本地缓存。如果没有发现结果,它会将数据提交给服务器,服务器会回复版本控制信息。 转换器总是缓存结果,以避免每次需要序列化的新消息都向模式服务器查询的开销。spring-doc.cadn.net.cn

利用模式版本信息,转换器设置内容类型用于传输版本信息的消息头——例如:application/vnd.user.v1+avro.spring-doc.cadn.net.cn

模式解析过程(反序列化)

当阅读包含版本信息的消息(即内容类型具有如下描述方案的头部模式注册过程(序列化),转换器查询模式服务器以获取消息的写入模式。 一旦找到正确的接收消息模式,它会检索读取模式,并通过 Avro 的模式解析支持,将其读入读取器定义(设置默认值和缺失属性)。spring-doc.cadn.net.cn

你应该明白写入模式(写消息的应用程序)和读取模式(接收应用)之间的区别。 我们建议花点时间阅读Avro的术语,了解整个流程。 Spring Cloud Stream 总是获取写入模式来判断如何读取消息。 如果你想让 Avro 的架构演化支持正常工作,你需要确保readerSchema已经为你的申请设置正确了。