该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0!spring-doc.cadn.net.cn

卡夫卡装订器划分

Apache Kafka 原生支持主题分区。spring-doc.cadn.net.cn

有时将数据发送到特定分区是有利的——例如,当你想严格订购消息处理时(特定客户的所有消息都应发送到同一分区)。spring-doc.cadn.net.cn

以下示例展示了如何配置生产者和消费者端:spring-doc.cadn.net.cn

@SpringBootApplication
public class KafkaPartitionProducerApplication {

    private static final Random RANDOM = new Random(System.currentTimeMillis());

    private static final String[] data = new String[] {
            "foo1", "bar1", "qux1",
            "foo2", "bar2", "qux2",
            "foo3", "bar3", "qux3",
            "foo4", "bar4", "qux4",
            };

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionProducerApplication.class)
            .web(false)
            .run(args);
    }

    @Bean
    public Supplier<Message<?>> generate() {
        return () -> {
            String value = data[RANDOM.nextInt(data.length)];
            System.out.println("Sending: " + value);
            return MessageBuilder.withPayload(value)
                    .setHeader("partitionKey", value)
                    .build();
        };
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        generate-out-0:
          destination: partitioned.topic
          producer:
            partition-key-expression: headers['partitionKey']
            partition-count: 12
需要注意的是,由于Apache Kafka原生支持分区,除非你使用了示例中的自定义分区键或涉及payload本身的表达式,否则无需依赖上述绑定器分区。 绑定器提供的分区选择本应适用于不支持原生分区的中间件技术。 注意我们使用的是一个名为partitionKey在上述例子中,该划分将成为划分的决定因素,因此在此情况下使用束缚器划分是合适的。 当使用原生Kafka分区时,即当你不提供分区键表达式然后Apache Kafka会选择一个分区,默认为记录键的哈希值除以可用分区数。 要向出站记录添加密钥,设置KafkaHeaders.KEY在 Spring-Messaging 中,将目标键值的头部留言<?>. 默认情况下,当没有提供记录键时,Apache Kafka 会根据 Apache Kafka 文档中描述的逻辑选择一个分区。
主题必须配置为拥有足够的分区,以实现所有消费者组所需的并发性。 上述配置支持最多12个消费者实例(如果是6个,则支持)并发是2,如果它们的并发是3,则为4,依此类推)。 通常最好“过度配置”分区,以便未来消费者或并发增加。
之前的配置使用默认的分区(key.hashCode() % partitionCount). 这可能提供或不合适的平衡算法,取决于关键值的不同。特别注意,这种分区策略不同于独立 Kafka 生产者的默认方式——如 Kafka Streams 所采用的,这意味着同一键值在客户端生成时可能在不同分区间的平衡不同。 你可以通过使用partitionSelectorExpressionpartitionSelectorClass性能。

由于分区由Kafka原生处理,消费者端无需特殊配置。 Kafka 在实例之间分配分区。spring-doc.cadn.net.cn

kafka 主题的 partitionCount 可能在运行时发生变化(例如因管理任务)。 计算后的划分会有所不同(例如,此时会使用新的划分)。 自 Spring Cloud Stream 4.0.3 版本起,将支持分区计数的更改。 另见参数“spring.kafka.producer.properties.metadata.max.age.ms”以配置更新间隔。 由于某些限制,无法使用引用消息“有效载荷”的“分区密钥表达式”,此时该机制将被禁用。 整体行为默认被禁用,可以通过配置参数 'producer.dynamicPartitionUpdatesEnabled=true' 来启用。

以下 Spring Boot 应用程序监听 Kafka 流,并打印(向控制台)每个消息所指向的分区 ID:spring-doc.cadn.net.cn

@SpringBootApplication
public class KafkaPartitionConsumerApplication {

    public static void main(String[] args) {
        new SpringApplicationBuilder(KafkaPartitionConsumerApplication.class)
            .web(WebApplicationType.NONE)
            .run(args);
    }

    @Bean
    public Consumer<Message<String>> listen() {
        return message -> {
            int partition = (int) message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION);
            System.out.println(message + " received from partition " + partition);
        };
    }

}
application.yml
spring:
  cloud:
    stream:
      bindings:
        listen-in-0:
          destination: partitioned.topic
          group: myGroup

你可以根据需要添加实例。 卡夫卡会重新平衡分区分配。 如果实例计数(或实例计数 * 并发)超过分区数,部分消费者处于空闲状态。spring-doc.cadn.net.cn