发布和消费分区主题
在以下示例中,我们发布到一个名为 hello-pulsar-partitioned
的主题。这是一个分区主题,并且在本示例中,我们假设该主题已经创建并具有三个分区。
@SpringBootApplication
public class PulsarBootPartitioned {
public static void main(String[] args) {
SpringApplication.run(PulsarBootPartitioned.class, "--spring.pulsar.producer.message-routing-mode=CustomPartition");
}
@Bean
public ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
pulsarTemplate.setDefaultTopicName("hello-pulsar-partitioned");
return args -> {
for (int i = 0; i < 10; i++) {
pulsarTemplate.sendAsync("hello john doe 0 ", new FooRouter());
pulsarTemplate.sendAsync("hello alice doe 1", new BarRouter());
pulsarTemplate.sendAsync("hello buzz doe 2", new BuzzRouter());
}
};
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
static class FooRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 0;
}
}
static class BarRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 1;
}
}
static class BuzzRouter implements MessageRouter {
@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 2;
}
}
}
在前面的示例中,我们发布到一个分区主题,并且我们希望将某些数据段发布到特定的分区。如果你将其留给 Pulsar 的默认设置,它会遵循轮询模式的分区分配,而我们希望覆盖这种行为。为此,我们在 send
方法中提供了一个消息路由器对象。考虑以下三种实现的消息路由器。FooRouter
总是将数据发送到分区 0
,BarRouter
发送到分区 1
,而 BuzzRouter
发送到分区 2
。还需要注意的是,我们现在使用了 PulsarTemplate
的 sendAsync
方法,该方法返回一个 CompletableFuture
。在运行应用程序时,我们还需要将生产者的 messageRoutingMode
设置为 CustomPartition
(spring.pulsar.producer.message-routing-mode
)。
在消费者端,我们使用了一个带有独占订阅类型的 PulsarListener
。这意味着来自所有分区的数据最终都会进入同一个消费者,并且没有顺序保证。
如果我们希望每个分区都由一个独立的消费者来消费,我们可以切换到 failover
订阅模式并添加三个独立的消费者:
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen1(String foo) {
System.out.println("Message Received 1: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen2(String foo) {
System.out.println("Message Received 2: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen3(String foo) {
System.out.println("Message Received 3: " + foo);
}
当你采用这种方法时,一个分区总是由一个专用的消费者消费。
同样地,如果你想使用 Pulsar 的共享消费者类型,你可以使用 shared
订阅类型。然而,当你使用 shared
模式时,你将失去任何顺序保证,因为单个消费者可能会在所有分区中接收消息,而另一个消费者还没有机会接收消息。
考虑以下示例:
@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen1(String foo) {
System.out.println("Message Received 1: " + foo);
}
@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen2(String foo) {
System.out.println("Message Received 2: " + foo);
}