跳到主要内容

发布和消费分区主题

DeepSeek V3 中英对照 Publishing and Consuming Partitioned Topics

在以下示例中,我们发布到一个名为 hello-pulsar-partitioned 的主题。这是一个分区主题,并且在本示例中,我们假设该主题已经创建并具有三个分区。

@SpringBootApplication
public class PulsarBootPartitioned {

public static void main(String[] args) {
SpringApplication.run(PulsarBootPartitioned.class, "--spring.pulsar.producer.message-routing-mode=CustomPartition");
}

@Bean
public ApplicationRunner runner(PulsarTemplate<String> pulsarTemplate) {
pulsarTemplate.setDefaultTopicName("hello-pulsar-partitioned");
return args -> {
for (int i = 0; i < 10; i++) {
pulsarTemplate.sendAsync("hello john doe 0 ", new FooRouter());
pulsarTemplate.sendAsync("hello alice doe 1", new BarRouter());
pulsarTemplate.sendAsync("hello buzz doe 2", new BuzzRouter());
}
};
}

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned")
public void listen(String message) {
System.out.println("Message Received: " + message);
}

static class FooRouter implements MessageRouter {

@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 0;
}
}

static class BarRouter implements MessageRouter {

@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 1;
}
}

static class BuzzRouter implements MessageRouter {

@Override
public int choosePartition(Message<?> msg, TopicMetadata metadata) {
return 2;
}
}

}
java

在前面的示例中,我们发布到一个分区主题,并且我们希望将某些数据段发布到特定的分区。如果你将其留给 Pulsar 的默认设置,它会遵循轮询模式的分区分配,而我们希望覆盖这种行为。为此,我们在 send 方法中提供了一个消息路由器对象。考虑以下三种实现的消息路由器。FooRouter 总是将数据发送到分区 0BarRouter 发送到分区 1,而 BuzzRouter 发送到分区 2。还需要注意的是,我们现在使用了 PulsarTemplatesendAsync 方法,该方法返回一个 CompletableFuture。在运行应用程序时,我们还需要将生产者的 messageRoutingMode 设置为 CustomPartitionspring.pulsar.producer.message-routing-mode)。

在消费者端,我们使用了一个带有独占订阅类型的 PulsarListener。这意味着来自所有分区的数据最终都会进入同一个消费者,并且没有顺序保证。

如果我们希望每个分区都由一个独立的消费者来消费,我们可以切换到 failover 订阅模式并添加三个独立的消费者:

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen1(String foo) {
System.out.println("Message Received 1: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen2(String foo) {
System.out.println("Message Received 2: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-partitioned-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Failover)
public void listen3(String foo) {
System.out.println("Message Received 3: " + foo);
}
java

当你采用这种方法时,一个分区总是由一个专用的消费者消费。

同样地,如果你想使用 Pulsar 的共享消费者类型,你可以使用 shared 订阅类型。然而,当你使用 shared 模式时,你将失去任何顺序保证,因为单个消费者可能会在所有分区中接收消息,而另一个消费者还没有机会接收消息。

考虑以下示例:

@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen1(String foo) {
System.out.println("Message Received 1: " + foo);
}

@PulsarListener(subscriptionName = "hello-pulsar-shared-subscription", topics = "hello-pulsar-partitioned", subscriptionType = SubscriptionType.Shared)
public void listen2(String foo) {
System.out.println("Message Received 2: " + foo);
}
java