|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0! |
分区
Spring Cloud Stream 支持在给定应用程序的多个实例之间分区数据。 在分区场景中,物理通信介质(如代理主题)被视为结构化为多个分区。 一个或多个生产者应用实例将数据发送给多个消费者应用实例,并确保具有共同特征的数据被同一消费者实例处理。
Spring Cloud Stream 提供了一种通用的抽象,用于统一实现分区处理用例。 因此,无论代理本身是否自然分区(例如Kafka),都可以使用分区(例如RabbitMQ)。
分区是有状态处理中的关键概念,确保所有相关数据同时处理(无论是出于性能还是一致性的原因)至关重要。 例如,在时间窗平均计算中,任何给定传感器的所有测量值都必须由同一应用实例处理,这一点非常重要。
| 要建立分区处理场景,必须配置数据产生端和数据消耗端。 |
Spring Cloud Stream 中的分区包括两个任务:
配置分区的输出绑定
你可以配置输出绑定发送分区数据,只需设置其中一个partitionKeyExpression或partitionKeyExtractorName性质以及其partitionCount财产。
例如,以下是一个有效且典型的配置:
spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=headers.id spring.cloud.stream.bindings.func-out-0.producer.partitionCount=5
基于该示例配置,数据通过以下逻辑发送到目标分区。
分区键的值是根据partitionKeyExpression.
这partitionKeyExpression是针对外发消息进行评估的SpEL表达式(在前例中,它是身份证来自消息头部)用于提取分区密钥。
如果 SpEL 表达式不足以满足需求,你可以通过提供 的实现来计算分区键值org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy并将其配置为豆子(通过使用@Bean注释)。
如果你有不止一种类型的豆子org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy在应用上下文中可用,你可以通过指定其名称来进一步筛选partitionKeyExtractorName性质,如下例所示:
--spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExtractorName=customPartitionKeyExtractor
--spring.cloud.stream.bindings.func-out-0.producer.partitionCount=5
. . .
@Bean
public CustomPartitionKeyExtractorClass customPartitionKeyExtractor() {
return new CustomPartitionKeyExtractorClass();
}
在之前的 Spring Cloud Stream 版本中,你可以指定实现org.springframework.cloud.stream.binder.PartitionKeyExtractorStrategy通过设置spring.cloud.stream.bindings.output.producer.partitionKeyExtractorClass财产。
自3.0版本起,该属性被移除。 |
一旦消息密钥计算完成,分区选择过程会确定目标分区为0和partitionCount - 1.
默认计算适用于大多数场景,基于以下公式:key.hashCode() % partitionCount.
这可以在绑定上进行自定义,比如设置一个 SpEL 表达式,使其对“密钥”进行评估(通过partitionSelectorExpression属性)或通过配置 的实现org.springframework.cloud.stream.binder.PartitionSelectorStrategy作为豆子(通过使用@Bean注释)。
类似于PartitionKeyExtractorStrategy,你可以通过使用spring.cloud.stream.bindings.output.producer.partitionSelectorName当应用上下文中有多个此类豆子时,属性,如下示例所示:
--spring.cloud.stream.bindings.func-out-0.producer.partitionSelectorName=customPartitionSelector
. . .
@Bean
public CustomPartitionSelectorClass customPartitionSelector() {
return new CustomPartitionSelectorClass();
}
在之前的 Spring Cloud Stream 版本中,你可以指定实现org.springframework.cloud.stream.binder.PartitionSelectorStrategy通过设置spring.cloud.stream.bindings.output.producer.partitionSelectorClass财产。
自3.0版本起,该属性被移除。 |
配置用于分区的输入绑定
输入绑定(绑定名为0大写)通过设置其来接收分区数据分区财产,以及实例索引和实例计数应用程序本身的属性,如下示例所示:
spring.cloud.stream.bindings.uppercase-in-0.consumer.partitioned=true spring.cloud.stream.instanceIndex=3 spring.cloud.stream.instanceCount=5
这实例计数value 表示数据应被分割的应用实例总数。
这实例索引必须是多个实例中唯一的值,且值介于0和实例计数 - 1.
实例索引帮助每个应用实例识别其接收数据的唯一分区。
对于使用不原生支持分区技术的活页夹来说,这是必须的。
例如,RabbitMQ为每个分区都有一个队列,队列名称包含实例索引。
如果卡夫卡,如果自动重新平衡启用是true(默认情况下),Kafka 负责分区在实例间的分配,这些属性并非必需。
如果自动重新平衡启用设置为假,则实例计数和实例索引绑定器用来确定实例所订阅的分区(你必须拥有与实例数量相当的分区数量)。
装订员分配了分区,而不是卡夫卡。
如果你希望某个分区的消息总是发送到同一个实例,这可能很有用。
当绑定器配置需要时,正确设置这两个值非常重要,以确保所有数据被消耗,并且应用程序实例接收互斥的数据集。
虽然在单独情况下,使用多个实例进行分区数据处理可能较为复杂,但 Spring Cloud Dataflow 可以通过正确填充输入和输出值,并让你依赖运行时基础设施提供实例索引和实例计数信息,大大简化了流程。