发布与消费分割主题

在下面的例子中,我们将发布到一个名为你好-脉冲星-分割. 这是一个被划分的主题,对于这个示例,我们假设该主题已经由三个划分创建。spring-doc.cadn.net.cn

@SpringBootApplication
public class PulsarBootPartitioned {

	public static void main(String[] args) {
		SpringApplication.run(PulsarBootPartitioned.class, "--spring.pulsar.producer.message-routing-mode=CustomPartition");
	}

	@Bean
	public ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
		pulsarTemplate.setDefaultTopicName("hello-pulsar-partitioned");
		return args -> {
			for (int i = 0; i < 10; i++) {
				pulsarTemplate.sendAsync("hello john doe 0 ", new FooRouter());
				pulsarTemplate.sendAsync("hello alice doe 1", new BarRouter());
				pulsarTemplate.sendAsync("hello buzz doe 2", new BuzzRouter());
			}
		};
	}

	@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned")
	public void listen(String message) {
		System.out.println("Message Received: " + message);
	}

    static class FooRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 0;
		}
	}

	static class BarRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 1;
		}
	}

	static class BuzzRouter implements MessageRouter {

		@Override
		public int choosePartition(Message<?> msg, TopicMetadata metadata) {
			return 2;
		}
	}

}

在前面的例子中,我们发布到一个分区主题,并且希望将某个数据段发布到特定的分区。如果你保持 Pulsar 默认设置,它遵循一种轮转分区分配模式,我们希望覆盖这一点。为此,我们提供了一个带有发送方法。 考虑实现的三条消息路由器。FooRouter总是将数据发送到分区0,条形路由器发送到分区1嗡嗡路由器发送到分区2. 还要注意,我们现在使用sendAsync方法脉冲星模板返回完成未来. 运行应用程序时,我们还需要设置消息路由模式制作人自定义分区 (spring.pulsar.producer.message-routing-mode).spring-doc.cadn.net.cn

在消费者端,我们使用脉冲星听者采用独占订阅类型。这意味着所有分区的数据最终都集中在同一个消费者手中,且没有排序保证。spring-doc.cadn.net.cn

如果我们希望每个分区都被一个不同的消费者消耗,我们该怎么办?我们可以切换到备援切换订阅模式并添加三个独立的消费者:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen1(String foo) {
    System.out.println("Message Received 1: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen2(String foo) {
    System.out.println("Message Received 2: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription",  topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen3(String foo) {
    System.out.println("Message Received 3: " + foo);
}

采用这种方法时,单个分区总是被专用消费者消耗。spring-doc.cadn.net.cn

类似地,如果你想用Pulsar的共享消费者类型,可以用共享订阅类型。然而,当你使用共享模式中,你失去了任何排序保证,因为单个消费者可能会在另一个消费者之前收到来自所有分区的消息。spring-doc.cadn.net.cn

请考虑以下例子:spring-doc.cadn.net.cn

@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen1(String foo) {
    System.out.println("Message Received 1: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen2(String foo) {
    System.out.println("Message Received 2: " + foo);
}