|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0! |
Spring Cloud Stream Schema Registry
介绍
当组织采用基于消息的发布/订阅架构,且多个生产者和消费者微服务相互通信时,通常需要所有这些微服务达成基于模式的合同。当此类模式需要演进以适应新的业务需求时,现有组件仍需继续工作。Spring Cloud Stream 支持独立的模式注册表服务器,应用程序可以通过该服务器注册并使用上述模式。Spring Cloud Stream 模式注册表支持还支持基于 avro 的模式注册表客户端,这些客户端本质上提供消息转换器,与模式注册表通信,以便在消息转换时对模式进行协调。Spring Cloud Stream 提供的模式演化支持既适用于上述独立模式注册表,也适用于 Confluent 提供的专门支持 Apache Kafka 的模式注册表。
Spring Cloud Stream Schema Registry 概述
Spring Cloud 流模式注册表支持模式演进,使数据可以随着时间演进,同时仍能与较老或更新的生产者和消费者合作,反之亦然。大多数序列化模型,尤其是那些旨在跨平台和语言移植的模型,依赖于描述数据如何在二进制负载中序列化的模式。为了序列化数据并解释数据,发送端和接收端都必须访问描述二进制格式的模式。在某些情况下,模式可以从序列化时的有效载荷类型或反序列化时的目标类型推断。然而,许多应用受益于能够访问描述二进制数据格式的显式模式。模式注册表允许您以文本格式(通常是 JSON)存储模式信息,并使这些信息供需要接收和发送二进制数据的各种应用程序访问。模式可作为一组引用,包括:
-
主体是该模式的逻辑名称
-
模式版本
-
模式格式,描述数据的二进制格式
Spring Cloud 流模式注册表提供以下组件
-
独立模式注册服务器
By default, it is using an H2 database, but server can be used with PostgreSQL or MySQL by providing appropriate datasource configuration.
-
模式注册表客户端能够通过与模式注册表通信实现消息编组。
Currently, the client can communicate to the standalone schema registry or the Confluent Schema Registry.
模式注册表客户端
客户端与模式注册表服务器交互的抽象是SchemaRegistryClient接口,其结构如下:
public interface SchemaRegistryClient {
SchemaRegistrationResponse register(String subject, String format, String schema);
String fetch(SchemaReference schemaReference);
String fetch(Integer id);
}
Spring Cloud Stream 提供了开箱即用的实现,用于与自身的模式服务器交互,以及与 Confluent 模式注册表交互。
Spring Cloud Stream 模式注册表的客户端可以通过以下方式配置@EnableSchemaRegistryClient如下:
@SpringBootApplication
@EnableSchemaRegistryClient
public class ConsumerApplication {
}
默认转换器优化为不仅缓存远程服务器的模式,还缓存解析(parse)和toString()而这些方法相当昂贵。
因此,它使用了一个DefaultSchemaRegistryClient那个不会缓存响应。
如果你打算更改默认行为,可以直接用客户端覆盖代码,并覆盖到想要的结果。
为此,你需要添加spring.cloud.stream.schemaRegistryClient.cached=true对你的应用属性来说。 |
模式注册表客户端属性
模式注册客户端支持以下属性:
spring.cloud.stream.schemaRegistryClient.endpoint-
模式服务器的位置。 设置时,请使用包含协议的完整URL(
http或https) 、端口和上下文路径。 - 默认值
spring.cloud.stream.schemaRegistryClient.cached-
客户端是否应该缓存模式服务器响应。 通常设置为
false,因为缓存发生在消息转换器中。 使用模式注册表客户端的客户端应将此设置为true. - 默认值
-
false
Avro Schema 注册表客户端消息转换器
对于在应用上下文中注册了 SchemaRegistryClient 豆的应用程序,Spring Cloud Stream 会自动配置 Apache Avro 消息转换器用于模式管理。 这简化了模式演进,因为接收消息的应用程序可以轻松访问写入模式,并与其自身的读卡模式进行调和。
对于外发消息,如果绑定的内容类型设置为application/*+avro这消息转换器激活了,如下示例所示:
spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro
在出站转换过程中,消息转换器尝试根据每个出站消息的类型推断出其模式,并根据有效载荷类型将其注册到主体,通过使用SchemaRegistryClient.
如果已经找到相同的模式,则检索该模式的引用。
如果不符合,则会注册该模式,并提供新的版本号。
该消息通过内容类型通过以下方案实现报头:应用/[前缀]。[主语].v[版本]+avro哪里前缀是可配置的,且主题由有效载荷类型推导。
例如,一条类型的消息用户可能以内容类型application/vnd.user.v2+avro哪里用户是主语,且2是版本号。
接收消息时,转换器会从收到消息的头部推断模式引用并尝试检索。该模式作为反序列化过程中的写入模式使用。
Avro 模式注册表消息转换器属性
如果你启用了基于Avro的架构注册客户端,可以通过设置spring.cloud.stream.stream.bindings.<output-binding-name>.contentType=application/*+avro你可以通过设置以下属性来自定义注册的行为。
- spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled
-
如果你想让转换器用反射从POJO推断出Schema,可以启用。
违约:
false - spring.cloud.stream.schema.avro.readerSchema
-
Avro 通过比较写入模式(原始负载)和读取模式(你的应用负载)来比较模式版本。更多信息请参见Avro文档。 如果设置为,它会覆盖模式服务器上的任何查找,并使用本地模式作为读取模式。 违约:
零 - spring.cloud.stream.schema.avro.schemaLocations
-
寄存器 任意
.avsc与Schema Server一起列出的该属性文件。违约:
empty - spring.cloud.stream.schema.avro.prefix
-
该前缀将用于 Content-Type 头部。
违约:
VND - spring.cloud.stream.schema.avro.subjectNamingStrategy
-
确定用于在模式注册表中注册Avro模式的主题名称。有两种实现方式,
org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy,其中主语是模式名称,且org.springframework.cloud.stream.schema.avro.QualifiedSubjectNamingStrategy该系统返回一个完全合格的主体,使用Avro模式的命名空间和名称。通过实施可以创建定制策略org.springframework.cloud.stream.schema.avro.SubjectNamingStrategy.违约:
org.springframework.cloud.stream.schema.avro.DefaultSubjectNamingStrategy - spring.cloud.stream.schema.avro.ignoreSchemaRegistryServer
-
忽略任何模式注册表的通信。它对测试很有用,这样在运行单元测试时,不会不必要地尝试连接到模式注册表服务器。
违约:
false
Apache Avro 消息转换器
Spring Cloud Stream 通过其Spring-cloud-stream-schema-registry-client模块。
目前,唯一开箱即用支持的基于模式的消息转换器序列化格式是Apache Avro,未来版本还会添加更多格式。
这Spring-cloud-stream-schema-registry-client模块包含两种可用于Apache Avro序列化的消息转换器:
-
转换器利用序列化或反序列化对象的类信息,或启动时已知位置的模式。
-
使用模式注册表的转换器。它们在运行时定位模式,并随着域对象的演变动态注册新的模式。
支持模式的转换器
这AvroSchemaMessageConverter支持通过使用预定义的模式或类中可用的模式信息(反射式或包含在具体记录).
如果你提供了自定义转换器,那么默认的 AvroSchemaMessageConverter 豆子就不会被创建。
以下示例展示了一个定制转换器:
使用自定义转换器时,你可以将其添加到应用上下文中,并可选地指定一个或多个哑剧类型与之联系起来。
默认模仿类型是应用/AVRO.
如果转换的目标类型是通用记录必须设置一个模式。
以下示例展示了如何在汇应用中配置转换器,通过注册Apache Avro来实现消息转换器没有预定义的模式。
在这个例子中,注意哑剧类型的值为Avro/Bytes,不是默认应用/AVRO.
@SpringBootApplication
public static class SinkApplication {
//...
@Bean
public MessageConverter userMessageConverter() {
return new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
}
}
相反,以下应用注册一个带有预定义模式(在类路径上)的转换器:
@SpringBootApplication
public static class SinkApplication {
//...
@Bean
public MessageConverter userMessageConverter() {
AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter(MimeType.valueOf("avro/bytes"));
converter.setSchemaLocation(new ClassPathResource("schemas/User.avro"));
return converter;
}
}
模式注册表服务器
Spring Cloud Stream 提供了模式注册表服务器实现。
使用时,你可以下载最新版Spring-cloud-stream-schema-registry-server发布并作为独立应用程序运行:
wget https://repo1.maven.org/maven2/org/springframework/cloud/spring-cloud-stream-schema-registry-server/4.0.3/spring-cloud-stream-schema-registry-server-4.0.3.jar
java -jar ./spring-cloud-stream-schema-registry-server-4.0.3.jar
|
你可以将模式注册表嵌入到你现有的 Spring Boot 网页应用中。
为此,添加
|
这spring.cloud.stream.schema.server.path属性可以用来控制模式服务器的根路径(尤其是嵌入在其他应用中时)。
这spring.cloud.stream.schema.server.allowSchemaDeletion布尔属性使得模式可以被删除。默认情况下,这个功能是被禁用的。
模式注册服务器使用关系数据库来存储模式。 默认情况下,它使用嵌入式数据库。 你可以通过使用 Spring Boot SQL 数据库和 JDBC 配置选项来自定义模式存储。
模式注册表服务器 API
模式注册表服务器 API 包含以下作:
-
发布/—— 参见注册新模式 -
获取 /{subject}/{format}/{version}—— 参见按主题、格式和版本检索现有的模式 -
获取 /{subject}/{format}—— 参见按主题和格式检索现有的模式 -
获取 /schemas/{id}—— 参见通过ID检索现有模式 -
删除 /{subject}/{format}/{version}—— 参见按主题、格式和版本删除模式 -
删除 /schemas/{id}—— 参见通过ID删除模式 -
删除 /{subject}—— 参见按主语删除图式
注册新模式
要注册新模式,发送发布请求到端点。/
该 接受包含以下字段的 JSON 负载:/
-
主题:模式主语 -
格式:模式格式 -
定义:模式定义
它的响应是一个 JSON 模式对象,包含以下字段:
-
身份证:模式ID -
主题:模式主语 -
格式:模式格式 -
版本:模式版本 -
定义:模式定义
按主题、格式和版本检索现有的模式
要按主题、格式和版本检索现有模式,发送获取向{subject}/{format}/{version}端点。
它的响应是一个 JSON 模式对象,包含以下字段:
-
身份证:模式ID -
主题:模式主语 -
格式:模式格式 -
版本:模式版本 -
定义:模式定义
按主题和格式检索现有的模式
要按主语和格式检索现有模式,发送获取向/主题/格式端点。
它的响应是包含每个模式对象的JSON模式列表,包含以下字段:
-
身份证:模式ID -
主题:模式主语 -
格式:模式格式 -
版本:模式版本 -
定义:模式定义
通过ID检索现有模式
要通过 ID 获取模式,发送获取向/schemas/{id}端点。
它的响应是一个 JSON 模式对象,包含以下字段:
-
身份证:模式ID -
主题:模式主语 -
格式:模式格式 -
版本:模式版本 -
定义:模式定义
按主语删除图式
删除 /{subject}
按主题删除现有的模式。
本说明仅适用于 Spring Cloud Stream 1.1.0.RELEASE 的用户。
Spring Cloud Stream 1.1.0.RELEASE 使用了表名,图式,用于存储图式对象。图式是许多数据库实现中的一个关键词。
为了避免未来发生冲突,从 1.1.1.RELEASE 开始,我们选择了该名称SCHEMA_REPOSITORY给储物桌用的。
任何升级的 Spring Cloud Stream 1.1.0.RELEASE 用户都应在升级前将现有模式迁移到新表。 |
使用 Confluent 的模式注册表
默认配置会生成DefaultSchemaRegistryClient豆。
如果你想使用 Confluent schema 注册表,你需要创建一个类型的 beanConfluentSchemaRegistryClient,取代了框架默认配置的版本。以下示例展示了如何制作这样的豆子:
@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
| ConfluentSchemaRegistryClient 在 Confluent 平台 4.0.0 版本的测试中。 |
模式注册过程(序列化)
注册过程的第一步是从通过信道发送的有效载荷中提取一个模式。
Avro类型,如具体记录或通用记录已经包含一个模式,可以立即从实例中检索。
对于POJO来说,如果spring.cloud.stream.schema.avro.dynamicSchemaGenerationEnabled属性设置为true(默认)。
一旦获得模式,转换器会从远程服务器加载其元数据(版本)。 首先,它查询本地缓存。如果没有发现结果,它会将数据提交给服务器,服务器会回复版本控制信息。 转换器总是缓存结果,以避免每次需要序列化的新消息都向模式服务器查询的开销。
利用模式版本信息,转换器设置内容类型用于传输版本信息的消息头——例如:application/vnd.user.v1+avro.
模式解析过程(反序列化)
当阅读包含版本信息的消息(即内容类型具有如下描述方案的头部模式注册过程(序列化),转换器查询模式服务器以获取消息的写入模式。
一旦找到正确的接收消息模式,它会检索读取模式,并通过 Avro 的模式解析支持,将其读入读取器定义(设置默认值和缺失属性)。
你应该明白写入模式(写消息的应用程序)和读取模式(接收应用)之间的区别。
我们建议花点时间阅读Avro的术语,了解整个流程。
Spring Cloud Stream 总是获取写入模式来判断如何读取消息。
如果你想让 Avro 的架构演化支持正常工作,你需要确保readerSchema已经为你的申请设置正确了。 |