发布和使用分区主题
在以下示例中,我们发布到名为hello-pulsar-partitioned
. 这是一个已分区的主题,对于此示例,我们假设该主题已创建了三个分区。
@SpringBootApplication
public class PulsarBootPartitioned {
public static void main(String[] args) {
SpringApplication.run(PulsarBootPartitioned.class, "--spring.pulsar.producer.message-routing-mode=CustomPartition");
}
@Bean
public ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
pulsarTemplate.setDefaultTopicName("hello-pulsar-partitioned");
return args -> {
for (int i = 0; i < 10; i++) {
pulsarTemplate.sendAsync("hello john doe 0 ", new FooRouter());
pulsarTemplate.sendAsync("hello alice doe 1", new BarRouter());
pulsarTemplate.sendAsync("hello buzz doe 2", new BuzzRouter());
}
};
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
static class FooRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 0;
}
}
static class BarRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 1;
}
}
static class BuzzRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 2;
}
}
}
在前面的示例中,我们发布到一个分区主题,我们想将一些数据段发布到特定分区。如果你把它保留为 Pulsar 的默认值,它遵循分区分配的轮循机制,我们想覆盖它。为此,我们提供了一个消息路由器对象,其中包含send
方法。 考虑实现的三个消息路由器。FooRouter
始终将数据发送到分区0
,BarRouter
发送到分区1
和BuzzRouter
发送到分区2
. 另请注意,我们现在使用sendAsync
方法PulsarTemplate
返回一个CompletableFuture
. 在运行应用程序时,我们还需要将messageRoutingMode
在生产者上CustomPartition
(spring.pulsar.producer.message-routing-mode
).
在消费者方面,我们使用PulsarListener
替换为独占订阅类型。这意味着来自所有分区的数据最终位于同一个使用者中,并且没有排序保证。
如果我们希望每个分区都由单个不同的消费者使用,我们该怎么办?我们可以切换到failover
订阅模式,并添加三个单独的使用者:
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen1(String foo) {
System.out.println("Message Received 1: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen2(String foo) {
System.out.println("Message Received 2: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen3(String foo) {
System.out.println("Message Received 3: " + foo);
}
当您遵循此方法时,单个分区始终由专用使用者使用。
同样,如果你想使用 Pulsar 的共享消费者类型,你可以使用shared
订阅类型。但是,当您使用shared
模式,则失去任何排序保证,因为单个使用者可能会在另一个使用者有机会之前从所有分区接收消息。
请考虑以下示例:
@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen1(String foo) {
System.out.println("Message Received 1: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen2(String foo) {
System.out.println("Message Received 2: " + foo);
}