|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 spring-cloud-stream 5.0.0! |
RabbitMQ 绑定器的分区
RabbitMQ原生不支持分区。
有时,将数据发送到特定分区是有利的——例如,当你想严格订购消息处理时,特定客户的所有消息都应发送到同一个分区。
这兔子消息频道绑定器通过为每个分区绑定一个队列到目的交换机,实现分区。
以下 Java 和 YAML 示例展示了如何配置生产者:
制作人
@SpringBootApplication
public class RabbitPartitionProducerApplication {
private static final Random RANDOM = new Random(System.currentTimeMillis());
private static final String[] data = new String[] {
"abc1", "def1", "qux1",
"abc2", "def2", "qux2",
"abc3", "def3", "qux3",
"abc4", "def4", "qux4",
};
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionProducerApplication.class)
.web(false)
.run(args);
}
@Bean
public Supplier<Message<?>> generate() {
return () -> {
String value = data[RANDOM.nextInt(data.length)];
System.out.println("Sending: " + value);
return MessageBuilder.withPayload(value)
.setHeader("partitionKey", value)
.build();
};
}
}
application.yml
spring:
cloud:
stream:
bindings:
generate-out-0:
destination: partitioned.destination
producer:
partitioned: true
partition-key-expression: headers['partitionKey']
partition-count: 2
required-groups:
- myGroup
|
前述示例中的配置使用默认的分区( 这 |
以下配置提供了主题交换功能:
以下队列绑定到该交换:
以下绑定将队列关联到交换:
以下 Java 和 YAML 示例延续了之前的示例,展示了如何配置消费者:
消费者
@SpringBootApplication
public class RabbitPartitionConsumerApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(RabbitPartitionConsumerApplication.class)
.web(false)
.run(args);
}
@Bean
public Consumer<Message<String>> listen() {
return message -> {
String queue =- message.getHeaders().get(AmqpHeaders.CONSUMER_QUEUE);
System.out.println(in + " received from queue " + queue);
};
}
}
application.yml
spring:
cloud:
stream:
bindings:
listen-in-0:
destination: partitioned.destination
group: myGroup
consumer:
partitioned: true
instance-index: 0
这兔子消息频道绑定器不支持动态缩放。
每个分区至少必须有一个消费者。
消费者的实例索引用于表示被消耗的分区。
像Cloud Foundry这样的平台只能有一个实例,且实例索引. |