跳到主要内容

消息消费

QWen Max 中英对照 Message Consumption

目录

1. Pulsar Listener

当谈到 Pulsar 消费者时,我们建议最终用户应用程序使用 PulsarListener 注解。要使用 PulsarListener,你需要使用 @EnablePulsar 注解。当你使用 Spring Boot 支持时,它会自动启用这个注解,并配置 PulsarListener 所需的所有组件,例如消息监听器基础设施(负责创建 Pulsar 消费者)。PulsarMessageListenerContainer 使用 PulsarConsumerFactory 来创建和管理 Pulsar 消费者,即它用来消费消息的底层 Pulsar 消费者。

Spring Boot 提供了这个消费者工厂,你可以通过指定 spring.pulsar.consumer.* 应用属性来进一步配置。

让我们回顾一下在快速入门部分看到的 PulsarListener 代码片段:

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
java

你可以进一步简化这个方法:

@PulsarListener
public void listen(String message) {
System.out.println("Message Received: " + message);
}
java

在这种最基本的形式中,当 @PulsarListener 注解上没有提供 subscriptionName 时,将使用自动生成的订阅名称。同样,当 topics 没有直接提供时,将使用 主题解析过程 来确定目标主题。

在前面展示的 PulsarListener 方法中,我们接收的数据是 String 类型,但我们没有指定任何模式类型。框架内部依赖 Pulsar 的模式机制将数据转换为所需类型。框架检测到你期望的是 String 类型,然后基于该信息推断模式类型,并将该模式提供给消费者。框架会对所有基本类型进行这种推断。对于所有非基本类型,默认假定模式为 JSON。如果复杂类型使用了 JSON 以外的其他类型(如 AVRO 或 KEY_VALUE),则必须在注解中使用 schemaType 属性提供模式类型。

以下示例展示了另一个 PulsarListener 方法,该方法接收一个 Integer

@PulsarListener(subscriptionName = "my-subscription-1", topics = "my-topic-1")
public void listen(Integer message) {
System.out.println(message);
}
java

以下 PulsarListener 方法展示了我们如何从主题中消费复杂类型:

@PulsarListener(subscriptionName = "my-subscription-2", topics = "my-topic-2", schemaType = SchemaType.JSON)
public void listen(Foo message) {
System.out.println(message);
}
java

让我们再看几种方法。

你可以直接消费 Pulsar 消息:

@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println(message.getValue());
}
java

以下示例使用 Spring 消息信封来消费记录:

@PulsarListener(subscriptionName = "my-subscription", topics = "my-topic")
public void listen(org.springframework.messaging.Message<String> message) {
System.out.println(message.getPayload());
}
java

现在让我们看看如何批量消费记录。下面的示例使用 PulsarListener 将记录作为 POJO 批量消费:

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Foo> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message));
}
java

请注意,在此示例中,我们接收记录作为对象的集合(List)。此外,要在 PulsarListener 级别启用批量消费,需要将注解上的 batch 属性设置为 true

根据 List 实际持有的类型,框架会尝试推断要使用的模式。如果 List 包含 JSON 以外的复杂类型,你仍然需要在 PulsarListener 上提供 schemaType

以下使用 Pulsar Java 客户端提供的 Message 信封:

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<Message<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}
java

以下示例使用 Spring 消息传递 Message 类型的信封来消费批记录:

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(List<org.springframework.messaging.Message<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getPayload()));
}
java

最后,您还可以使用 Pulsar 中的 Messages 容器对象来进行批处理监听:

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen(org.apache.pulsar.client.api.Messages<Foo>> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message.getValue()));
}
java

当你使用 PulsarListener 时,你可以直接在注解本身上提供 Pulsar 消费者属性。如果你不想使用前面提到的 Boot 配置属性,或者有多个 PulsarListener 方法,这样做会很方便。

以下示例直接在 PulsarListener 上使用 Pulsar 消费者属性:

@PulsarListener(properties = { "subscriptionName=subscription-1", "topicNames=foo-1", "receiverQueueSize=5000" })
void listen(String message) {
}
java
提示

所使用的属性是直接的 Pulsar 消费者属性,而不是 spring.pulsar.consumer 应用配置属性

1.1. 通用记录与 AUTO_CONSUME

如果无法提前知道 Pulsar 主题的模式类型,可以使用 AUTO_CONSUME 模式类型来消费通用记录。在这种情况下,主题会使用与主题关联的模式信息将消息反序列化为 GenericRecord 对象。

要消费泛型记录,请在 @PulsarListener 上设置 schemaType = SchemaType.AUTO_CONSUME,并使用类型为 GenericRecord 的 Pulsar 消息作为消息参数,如下所示。

@PulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
void listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
GenericRecord record = message.getValue();
record.getFields().forEach((f) ->
System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
}
java
提示

GenericRecord API 允许访问字段及其关联的值。

1.2. 自定义 ConsumerBuilder

你可以通过提供一个类型为 PulsarListenerConsumerBuilderCustomizer@Bean,并使用 PulsarListenerConsumerBuilderCustomizer 来自定义 ConsumerBuilder 中可用的任何字段,然后将其提供给 PulsarListener,如下所示。

@PulsarListener(topics = "hello-topic", consumerCustomizer = "myCustomizer")
public void listen(String message) {
System.out.println("Message Received: " + message);
}

@Bean
PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
return (builder) -> builder.consumerName("myConsumer");
}
java
提示

如果你的应用程序只有一个 @PulsarListener 和一个 PulsarListenerConsumerBuilderCustomizer bean 注册,那么定制器将被自动应用。

2. 指定模式信息

如前所述,对于 Java 基本类型,Spring for Apache Pulsar 框架可以推断出在 PulsarListener 上使用的正确 Schema。对于非基本类型,如果在注解上没有显式指定 Schema,Spring for Apache Pulsar 框架将尝试从类型构建一个 Schema.JSON

important

目前支持的复杂 Schema 类型有 JSON、AVRO、PROTOBUF、AUTO_CONSUME、带有 INLINE 编码的 KEY_VALUE。

2.1. 自定义 Schema 映射

作为为复杂类型在 PulsarListener 上指定模式的替代方法,可以配置模式解析器以映射这些类型。这样就不需要在监听器上设置模式了,因为框架会根据传入的消息类型查询解析器。

2.1.1. 配置属性

Schema 映射可以通过 spring.pulsar.defaults.type-mappings 属性进行配置。以下示例使用 application.ymlUserAddress 复杂对象分别使用 AVROJSON 模式添加映射:

spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
yaml
备注

message-type 是消息类的完全限定名。

2.1.2. Schema resolver customizer

添加映射的首选方法是通过上面提到的属性。但是,如果需要更多的控制,您可以提供一个模式解析器定制器来添加映射。

以下示例使用模式解析器定制器分别为 UserAddress 复杂对象添加使用 AVROJSON 模式的映射:

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
java

2.1.3. 类型映射注解

另一种为特定消息类型指定默认模式信息的方法是在消息类上标记 @PulsarMessage 注解。可以通过注解上的 schemaType 属性指定模式信息。

以下示例配置系统在生产和消费类型为 Foo 的消息时使用 JSON 作为默认模式:

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
java

有了这样的配置,就无需在监听器上设置模式了,例如:

@PulsarListener(subscriptionName = "user-sub", topics = "user-topic")
public void listen(User user) {
System.out.println(user);
}
java

3. 访问 Pulsar 消费者对象

有时,你需要直接访问 Pulsar Consumer 对象。下面的示例展示了如何获取它:

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message, org.apache.pulsar.client.api.Consumer<String> consumer) {
System.out.println("Message Received: " + message);
ConsumerStats stats = consumer.getStats();
...
}
java
警告

当以这种方式访问 Consumer 对象时,不要调用任何会通过调用接收方法来改变 Consumer 的游标位置的操作。所有此类操作必须由容器完成。

4. Pulsar Message Listener Container

现在我们已经看到了通过 PulsarListener 在消费者端的基本交互。现在让我们深入探讨 PulsarListener 如何与底层的 Pulsar 消费者进行交互。请记住,对于终端用户应用程序,在大多数场景下,当我们使用 Spring for Apache Pulsar 从 Pulsar 主题消费消息时,我们推荐直接使用 PulsarListener 注解,因为该模型涵盖了广泛的应用程序用例。然而,理解 PulsarListener 的内部工作原理是很重要的。本节将详细介绍这些内容。

正如前面简要提到的,当你使用 Spring for Apache Pulsar 时,消息监听器容器是消息消费的核心。PulsarListener 在幕后使用消息监听器容器基础设施来创建和管理 Pulsar 消费者。Spring for Apache Pulsar 通过 PulsarMessageListenerContainer 提供了这个消息监听器容器的契约。该消息监听器容器的默认实现是通过 DefaultPulsarMessageListenerContainer 提供的。顾名思义,PulsarMessageListenerContainer 包含了消息监听器。容器创建 Pulsar 消费者,然后运行一个单独的线程来接收和处理数据。数据由所提供的消息监听器实现来处理。

消息监听容器通过使用消费者的 batchReceive 方法批量消费数据。一旦接收到数据,它就会被传递给选定的消息监听器实现。

当你使用 Spring for Apache Pulsar 时,可以使用以下消息监听器类型。

我们在以下章节中了解有关这些各种消息监听器的详细信息。

不过,在此之前,让我们先仔细看看容器本身。

4.1. DefaultPulsarMessageListenerContainer

这是一个基于单个消费者的 message listener container。以下清单展示了它的构造函数:

public DefaultPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties)
}
java

它接收一个 PulsarConsumerFactory(用于创建消费者)和一个 PulsarContainerProperties 对象(包含关于容器属性的信息)。PulsarContainerProperties 有以下构造函数:

public PulsarContainerProperties(String... topics)

public PulsarContainerProperties(Pattern topicPattern)
java

你可以通过 PulsarContainerProperties 或作为提供给消费者工厂的消费者属性来提供主题信息。以下示例使用了 DefaultPulsarMessageListenerContainer

Map<String, Object> config = new HashMap<>();
config.put("topics", "my-topic");
PulsarConsumerFactory<String> pulsarConsumerFactorY = DefaultPulsarConsumerFactory<>(pulsarClient, config);

PulsarContainerProperties pulsarContainerProperties = new PulsarContainerProperties();

pulsarContainerProperties.setMessageListener((PulsarRecordMessageListener<?>) (consumer, msg) -> {
});

DefaultPulsarMessageListenerContainer<String> pulsarListenerContainer = new DefaultPulsarMessageListenerContainer(pulsarConsumerFacotyr,
pulsarContainerProperties);

return pulsarListenerContainer;
java
备注

如果在直接使用监听器容器时未指定主题信息,则会使用与 PulsarListener 相同的主题解析过程,唯一的例外是“消息类型默认”步骤被省略

DefaultPulsarMessageListenerContainer 只创建一个消费者。如果你希望通过多个线程管理多个消费者,你需要使用 ConcurrentPulsarMessageListenerContainer

4.2. ConcurrentPulsarMessageListenerContainer

ConcurrentPulsarMessageListenerContainer 有以下构造函数:

public ConcurrentPulsarMessageListenerContainer(PulsarConsumerFactory<? super T> pulsarConsumerFactory,
PulsarContainerProperties pulsarContainerProperties)
java

ConcurrentPulsarMessageListenerContainer 允许你通过 setter 指定一个 concurrency 属性。仅当订阅是非独占订阅(failoversharedkey-shared)时,才允许 concurrency 大于 1。在独占订阅模式下,你只能将 concurrency 设置为默认值 1

以下示例通过 PulsarListener 注解为 failover 订阅启用 concurrency

@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Failover, concurrency = "3")
void listen(String message, Consumer<String> consumer) {
...
System.out.println("Current Thread: " + Thread.currentThread().getName());
System.out.println("Current Consumer: " + consumer.getConsumerName());
}
java

在前面的监听器中,假设主题 my-topic 有三个分区。如果它是一个非分区主题,将并发设置为 3 是没有作用的。除了主活动消费者外,你还会得到两个空闲的消费者。如果主题有超过三个分区,消息会在容器创建的消费者之间进行负载均衡。如果你运行这个 PulsarListener,你会看到来自不同分区的消息通过不同的消费者被消费,正如前面示例中的线程名称和消费者名称打印所示。

备注

当你在分区主题上以这种方式使用 Failover 订阅时,Pulsar 保证消息顺序。

以下清单展示了另一个 PulsarListener 的示例,但使用了 Shared 订阅并启用了 concurrency

@PulsarListener(topics = "my-topic", subscriptionName = "subscription-1",
subscriptionType = SubscriptionType.Shared, concurrency = "5")
void listen(String message) {
...
}
java

在前面的示例中,PulsarListener 创建了五个不同的消费者(这次,我们假设该主题有五个分区)。

备注

在此版本中,没有消息排序,因为 Shared 订阅在 Pulsar 中不保证任何消息排序。

如果你需要消息顺序并且仍然想要使用共享订阅类型,你需要使用 Key_Shared 订阅类型。

4.3. 消费记录

让我们来看看消息监听容器是如何实现单条记录和批量消息消费的。

单条记录消费

让我们重新回顾一下我们之前讨论的基本 PulsarListener

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
java

通过这个 PulsarListener 方法,我们实际上是要求 Spring for Apache Pulsar 每次调用监听器方法时只传递一条记录。我们提到消息监听容器使用消费者上的 batchReceive 方法批量消费数据。框架检测到在这种情况下 PulsarListener 只接收一条记录。这意味着每次调用该方法时,它只需要一条记录。尽管记录是由消息监听容器批量消费的,但它会遍历接收到的批次,并通过 PulsarRecordMessageListener 的适配器调用监听器方法。正如您在上一节中看到的,PulsarRecordMessageListener 继承自 Pulsar Java 客户端提供的 MessageListener,并且它支持基本的 received 方法。

批量消费

以下示例展示了 PulsarListener 批量消费记录:

@PulsarListener(subscriptionName = "hello-batch-subscription", topics = "hello-batch", schemaType = SchemaType.JSON, batch = true)
public void listen4(List<Foo> messages) {
System.out.println("records received :" + messages.size());
messages.forEach((message) -> System.out.println("record : " + message));
}
java

当你使用这种类型的 PulsarListener 时,框架会检测到你处于批处理模式。由于它已经通过使用 Consumer 的 batchReceive 方法以批处理方式接收了数据,因此它会通过一个适配器将整个批次传递给 PulsarBatchMessageListener 的监听器方法。

5. Pulsar Headers

Pulsar 消息元数据可以作为 Spring 消息头被消费。可用的消息头列表可以在 PulsarHeaders.java 中找到。

5.1. 访问基于单条记录的消费者

以下示例展示了如何在使用单条记录模式消费的应用程序中访问各种 Pulsar Headers:

// 代码示例
java
@PulsarListener(topics = "simpleListenerWithHeaders")
void simpleListenerWithHeaders(String data, @Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header(PulsarHeaders.RAW_DATA) byte[] rawData,
@Header("foo") String foo) {

}
java

在前面的示例中,我们访问了 messageIdrawData 消息元数据的值以及一个名为 foo 的自定义消息属性。每个头部字段都使用了 Spring 的 @Header 注解。

你也可以使用 Pulsar 的 Message 作为信封来携带负载。这样做时,用户可以直接调用 Pulsar 消息上的相应方法来检索元数据。然而,为了方便起见,你也可以通过使用 Header 注解来检索它。请注意,你还可以使用 Spring messaging Message 信封来携带负载,然后通过使用 @Header 来检索 Pulsar 头信息。

5.2. 批量记录基于的消费者中访问

在本节中,我们将了解如何在使用批量消费者的应用程序中访问各种 Pulsar Headers:

@PulsarListener(topics = "simpleBatchListenerWithHeaders", batch = true)
void simpleBatchListenerWithHeaders(List<String> data,
@Header(PulsarHeaders.MESSAGE_ID) List<MessageId> messageIds,
@Header(PulsarHeaders.TOPIC_NAME) List<String> topicNames, @Header("foo") List<String> fooValues) {

}
java

在前面的示例中,我们以 List<String> 的形式消费数据。在提取各种头部信息时,我们也以 List<> 的形式进行。Spring for Apache Pulsar 确保头部列表与数据列表相对应。

您也可以在使用批处理监听器并以 List<org.apache.pulsar.client.api.Message<?>>org.apache.pulsar.client.api.Messages<?>org.springframework.messaging.Message<?> 形式接收负载时,以相同的方式提取头部信息。

6. 消息确认

当你使用 Spring for Apache Pulsar 时,消息确认由框架处理,除非应用程序选择不使用该功能。在本节中,我们将详细介绍框架如何处理消息确认。

6.1. 消息 ACK 模式

Spring for Apache Pulsar 提供了以下模式来确认消息:

  • BATCH

  • RECORD

  • MANUAL

BATCH 确认模式是默认的,但你可以在消息监听器容器上更改它。在接下来的部分中,我们将看到当你使用 PulsarListener 的单个版本和批量版本时确认是如何工作的,以及它们如何转换为底层的消息监听器容器(最终转换为 Pulsar 消费者)。

6.2. 自动消息确认在单条记录模式下

让我们重新回顾一下基于单个消息的基本 PulsarListener

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(String message) {
System.out.println("Message Received: " + message);
}
java

很自然会想知道,当你使用 PulsarListener 时,确认机制是如何工作的,特别是如果你熟悉直接使用 Pulsar 消费者。答案在于消息监听容器,因为这是 Spring for Apache Pulsar 中协调所有与消费者相关活动的中心位置。

假设你没有重写默认行为,当你使用前面的 PulsarListener 时,幕后会发生以下事情:

  1. 首先,监听器容器从 Pulsar 消费者那里以批量形式接收消息。

  2. 接收到的消息会逐条传递给 PulsarListener

  3. 当所有记录都被传递给监听器方法并成功处理后,容器会确认原始批次中的所有消息。

这是正常的流程。如果原始批次中的任何记录抛出异常,Spring for Apache Pulsar 会单独跟踪这些记录。当批次中的所有记录都处理完毕后,Spring for Apache Pulsar 会确认所有成功的消息,并对所有失败的消息进行否定确认(nack)。换句话说,当使用 PulsarRecordMessageListener 消费单个记录并使用默认的 BATCH 确认模式时,框架会等待从 batchReceive 调用接收到的所有记录成功处理,然后在 Pulsar 消费者上调用 acknowledge 方法。如果在调用处理程序方法时有任何特定记录抛出异常,Spring for Apache Pulsar 会跟踪这些记录,并在处理完整个批次后对这些记录单独调用 negativeAcknowledge

如果应用程序希望针对每条记录进行确认或否定确认,可以启用 RECORD 确认模式。在这种情况下,在处理每条记录后,如果没有错误则确认消息,如果有错误则否定确认消息。以下示例在 Pulsar 监听器上启用 RECORD 确认模式:

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.RECORD)
public void listen(String message) {
System.out.println("Message Received: " + message);
}
java

6.3. 单条记录模式下的手动消息确认

你可能并不总是希望框架发送确认,而是直接从应用程序本身进行确认。Spring for Apache Pulsar 提供了几种方式来启用手动消息确认。以下示例展示了其中一种方式:

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Acknowledgment acknowledgment) {
System.out.println("Message Received: " + message.getValue());
acknowledgment.acknowledge();
}
java

有几件事需要在这里解释一下。首先,我们通过在 PulsarListener 上设置 ackMode 来启用手动确认模式。当启用手动确认模式时,Spring for Apache Pulsar 允许应用程序注入一个 Acknowledgment 对象。框架通过选择一个兼容的消息监听容器来实现这一点:对于基于单个记录的消费,使用 PulsarAcknowledgingMessageListener,这使你可以访问一个 Acknowledgment 对象。

Acknowledgment 对象提供以下 API 方法:

void acknowledge();

void acknowledge(MessageId messageId);

void acknowledge(List<MessageId> messageIds);

void nack();

void nack(MessageId messageId);
java

你可以将这个 Acknowledgment 对象注入到你的 PulsarListener 中,同时使用 MANUAL 确认模式,然后调用相应的方法。

在前面的 PulsarListener 示例中,我们调用了一个无参数的 acknowledge 方法。这是因为框架知道它当前正在处理哪个 Message。当调用 acknowledge() 时,你不需要使用 Message 封装来接收负载,而是可以使用目标类型 — 在这个示例中是 String。你还可以通过提供消息 ID 来调用 acknowledge 的不同变体:acknowledge.acknowledge(message.getMessageId()); 当你使用 acknowledge(messageId) 时,必须使用 Message<?> 封装来接收负载。

类似于确认操作,Acknowledgment API 也提供了否定确认的选项。请参见前面展示的 nack 方法。

您也可以直接在 Pulsar 消费者上调用 acknowledge

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", ackMode = AckMode.MANUAL)
public void listen(Message<String> message, Consumer<String> consumer) {
System.out.println("Message Received: " + message.getValue());
try {
consumer.acknowledge(message);
}
catch (Exception e) {
....
}
}
java

当直接在底层消费者上调用 acknowledge 时,你需要自己进行错误处理。使用 Acknowledgment 则不需要这样做,因为框架可以为你处理。因此,在使用手动确认时,你应该使用 Acknowledgment 对象方法。

important

当使用手动确认时,重要的是要明白框架完全不参与任何确认过程。因此,在设计应用程序时仔细考虑正确的确认策略是非常重要的。

6.4. 批量消费中的自动消息确认

当你批量消费记录(见“消息 ACK 模式”)并且你使用默认的 BATCH 确认模式时,如果整个批次成功处理,则整个批次将被确认。如果有任何记录抛出异常,则整个批次将被否定确认。请注意,这可能与生产者端批量发送的批次不同。确切地说,这是从消费者调用 batchReceive 返回的批次。

考虑以下批量监听器:

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", batch = true)
public void batchListen(List<Foo> messages) {
for (Foo foo : messages) {
...
}
}
java

当传入集合(本例中为 messages)中的所有消息都被处理后,框架会确认所有这些消息。

在批量消费模式下,RECORD 不是允许的确认模式。这可能会导致一个问题,因为应用程序可能不希望整个批次被重新传递。在这种情况下,你需要使用 MANUAL 确认模式。

6.5. 手动消息确认在批量消费中

如前一节所述,当在消息监听器容器上设置 MANUAL 确认模式时,框架不会进行任何确认,无论是正面的还是负面的。完全由应用程序来处理这些问题。当设置 MANUAL 确认模式时,Spring for Apache Pulsar 会选择一个兼容的消息监听器容器:对于批量消费,选择 PulsarBatchAcknowledgingMessageListener,这使您可以访问 Acknowledgment 对象。以下是 Acknowledgment API 中可用的方法:

void acknowledge();

void acknowledge(MessageId messageId);

void acknowledge(List<MessageId> messageIds);

void nack();

void nack(MessageId messageId);
java

你可以将这个 Acknowledgment 对象注入到你的 PulsarListener 中,同时使用 MANUAL 确认模式。以下是一个基于批处理的监听器的基本示例:

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar")
public void listen(List<Message<String>> messgaes, Acknowlegement acknowledgment) {
for (Message<String> message : messages) {
try {
...
acknowledgment.acknowledge(message.getMessageId());
}
catch (Exception e) {
acknowledgment.nack(message.getMessageId());
}
}
}
java

当你使用批处理监听器时,消息监听容器无法知道它当前正在操作哪个记录。因此,要手动确认,你需要使用接受 MessageIdList<MessageId> 的重载 acknowledge 方法之一。你也可以使用批处理监听器的 MessageId 进行否定确认。

7. 消息重发和错误处理

现在我们已经了解了 PulsarListener 以及消息监听器容器基础设施及其各种功能,让我们现在尝试理解消息重发和错误处理。Apache Pulsar 提供了多种原生的消息重发和错误处理策略。我们将看看这些策略,并了解如何通过 Spring for Apache Pulsar 使用它们。

7.1. 指定消息重发的确认超时

默认情况下,Pulsar 消费者不会重新发送消息,除非消费者崩溃,但你可以通过在 Pulsar 消费者上设置一个 ack 超时来改变这种行为。如果 ack 超时属性的值大于零,并且 Pulsar 消费者在该超时时间内未确认消息,则该消息将被重新发送。

当你使用 Spring for Apache Pulsar 时,你可以通过消费者定制器或在 @PulsarListenerproperties 属性中使用原生的 Pulsar ackTimeoutMillis 属性来设置这个属性:

@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
properties = {"ackTimeoutMillis=60000"})
public void listen(String s) {
...
}
java

当你指定 ack 超时时间时,如果消费者在 60 秒内没有发送确认,Pulsar 会将消息重新发送给消费者。

如果你想要为 ack 超时指定一些具有不同延迟的高级退避选项,你可以执行以下操作:

@EnablePulsar
@Configuration
class AckTimeoutRedeliveryConfig {

@PulsarListener(subscriptionName = "withAckTimeoutRedeliveryBackoffSubscription",
topics = "withAckTimeoutRedeliveryBackoff-test-topic",
ackTimeoutRedeliveryBackoff = "ackTimeoutRedeliveryBackoff",
properties = { "ackTimeoutMillis=60000" })
void listen(String msg) {
// some long-running process that may cause an ack timeout
}

@Bean
RedeliveryBackoff ackTimeoutRedeliveryBackoff() {
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
.build();
}

}
java

在前面的示例中,我们为 Pulsar 的 RedeliveryBackoff 指定了一个 bean,其最小延迟为 1 秒,最大延迟为 10 秒,退避倍数为 2。在初始 ack 超时发生后,消息的重新传递通过此退避 bean 控制。我们通过将 ackTimeoutRedeliveryBackoff 属性设置为实际的 bean 名称(在本例中为 ackTimeoutRedeliveryBackoff)来提供给 PulsarListener 注解。

7.2. 指定否定确认重传

当否定确认时,Pulsar 消费者允许你指定应用程序希望如何重新传递消息。默认是在一分钟内重新传递消息,但你可以通过消费者定制器或在 @PulsarListenerproperties 属性中使用原生 Pulsar 的 negativeAckRedeliveryDelay 属性来更改它:

@PulsarListener(subscriptionName = "subscription-1", topics = "topic-1"
properties = {"negativeAckRedeliveryDelay=10ms"})
public void listen(String s) {
...
}
java

您还可以通过提供一个 RedeliveryBackoff bean 并将该 bean 的名称作为 PulsarProducer 上的 negativeAckRedeliveryBackoff 属性来指定不同的延迟和带有乘数的退避机制,如下所示:

@EnablePulsar
@Configuration
class NegativeAckRedeliveryConfig {

@PulsarListener(subscriptionName = "withNegRedeliveryBackoffSubscription",
topics = "withNegRedeliveryBackoff-test-topic", negativeAckRedeliveryBackoff = "redeliveryBackoff",
subscriptionType = SubscriptionType.Shared)
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}

@Bean
RedeliveryBackoff redeliveryBackoff() {
return MultiplierRedeliveryBackoff.builder().minDelayMs(1000).maxDelayMs(10 * 1000).multiplier(2)
.build();
}

}
java

7.3. 使用 Apache Pulsar 的死信主题进行消息重发和错误处理

Apache Pulsar 允许应用程序在使用 Shared 订阅类型的情况下使用死信主题。对于 ExclusiveFailover 订阅类型,此功能不可用。基本思想是,如果一条消息被重试了一定次数(可能由于 ack 超时或 nack 重发),一旦重试次数用尽,该消息可以被发送到一个称为死信队列(DLQ)的特殊主题。让我们通过检查一些代码片段来看看此功能的实际细节:

@EnablePulsar
@Configuration
class DeadLetterPolicyConfig {

@PulsarListener(id = "deadLetterPolicyListener", subscriptionName = "deadLetterPolicySubscription",
topics = "topic-with-dlp", deadLetterPolicy = "deadLetterPolicy",
subscriptionType = SubscriptionType.Shared, properties = { "ackTimeoutMillis=1000" })
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}

@PulsarListener(id = "dlqListener", topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}

@Bean
DeadLetterPolicy deadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}

}
java

首先,我们有一个特殊的 DeadLetterPolicy 的 bean,并将其命名为 deadLetterPolicy(你可以根据需要命名)。这个 bean 指定了许多内容,例如最大投递次数(在这个例子中是 10 次)和死信主题的名称 — 在这个例子中是 my-dlq-topic。如果你没有指定一个死信队列(DLQ)主题名称,它在 Pulsar 中默认为 <topicname>-<subscriptionname>-DLQ。接下来,我们通过设置 deadLetterPolicy 属性将这个 bean 名称提供给 PulsarListener。请注意,PulsarListener 的订阅类型是 Shared,因为 DLQ 功能仅适用于共享订阅。这段代码主要用于演示目的,所以我们提供了 ackTimeoutMillis 值为 1000。其思路是,代码抛出异常,如果 Pulsar 在 1 秒内没有收到确认,则进行重试。如果该循环持续十次(因为我们在 DeadLetterPolicy 中的最大重投递次数是 10 次),Pulsar 消费者会将消息发布到死信队列主题。我们还有另一个 PulsarListener 监听死信队列主题,以便在消息发布到死信队列主题时接收数据。

使用分区主题时关于 DLQ 主题的特别说明

如果主主题是分区的,Pulsar 在幕后会将每个分区视为一个单独的主题。Pulsar 会在主主题名称后追加 partition-<n>,其中 <n> 代表分区号。问题是,如果你不指定一个 DLQ 主题(与我们在上面所做的相反),Pulsar 会发布到一个默认的主题名称,该名称中包含此 partition-<n> 信息 — 例如:topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQ。解决这个问题的一个简单方法是始终提供一个 DLQ 主题名称。

7.4. Spring for Apache Pulsar 中的原生错误处理

正如我们之前提到的,Apache Pulsar 中的 DLQ 功能仅适用于共享订阅。如果应用程序需要对非共享订阅使用类似的功能,该怎么办?Pulsar 不支持独占和故障转移订阅的 DLQ 的主要原因是因为这些订阅类型保证了顺序。允许重发、DLQ 等实际上会导致消息乱序接收。然而,如果应用程序可以接受这一点,并且更重要的是,需要在非共享订阅中使用此 DLQ 功能,该怎么办?为此,Spring for Apache Pulsar 提供了一个 PulsarConsumerErrorHandler,你可以在 Pulsar 的任何订阅类型中使用它:ExclusiveFailoverSharedKey_Shared

当你使用 Spring for Apache Pulsar 提供的 PulsarConsumerErrorHandler 时,确保不要在监听器上设置 ack 超时属性。

让我们通过查看一些代码片段来了解一些细节:

@EnablePulsar
@Configuration
class PulsarConsumerErrorHandlerConfig {

@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
PulsarTemplate<String> pulsarTemplate) {
return new DefaultPulsarConsumerErrorHandler<>(
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}

@PulsarListener(id = "pulsarConsumerErrorHandler-id", subscriptionName = "pulsatConsumerErrorHandler-subscription",
topics = "pulsarConsumerErrorHandler-topic",
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler")
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}

@PulsarListener(id = "pceh-dltListener", topics = "my-foo-dlt")
void listenDlt(String msg) {
System.out.println("From DLT: " + msg);
}

}
java

考虑 pulsarConsumerErrorHandler bean。这会创建一个类型为 PulsarConsumerErrorHandler 的 bean,并使用 Spring for Apache Pulsar 提供的默认实现:DefaultPulsarConsumerErrorHandlerDefaultPulsarConsumerErrorHandler 有一个构造函数,该构造函数接受一个 PulsarMessageRecovererFactory 和一个 org.springframework.util.backoff.BackoffPulsarMessageRecovererFactory 是一个函数式接口,具有以下 API:

@FunctionalInterface
public interface PulsarMessageRecovererFactory<T> {

/**
* Provides a message recoverer {@link PulsarMessageRecoverer}.
* @param consumer Pulsar consumer
* @return {@link PulsarMessageRecoverer}.
*/
PulsarMessageRecoverer<T> recovererForConsumer(Consumer<T> consumer);

}
java

recovererForConsumer 方法接收一个 Pulsar 消费者并返回一个 PulsarMessageRecoverer,这是另一个函数式接口。以下是 PulsarMessageRecoverer 的 API:

public interface PulsarMessageRecoverer<T> {

/**
* Recover a failed message, for e.g. send the message to a DLT.
* @param message Pulsar message
* @param exception exception from failed message
*/
void recoverMessage(Message<T> message, Exception exception);

}
java

Spring for Apache Pulsar 提供了一个名为 PulsarDeadLetterPublishingRecovererPulsarMessageRecovererFactory 实现,该实现提供了一个默认实现,可以通过将消息发送到死信主题(DLT)来恢复消息。我们将此实现提供给前面的 DefaultPulsarConsumerErrorHandler 构造函数。作为第二个参数,我们提供了一个 FixedBackOff。您还可以提供来自 Spring 的 ExponentialBackoff 以获得高级退避功能。然后,我们将此 bean 名称作为属性提供给 PulsarListener。该属性名为 pulsarConsumerErrorHandler。每次 PulsarListener 方法对消息失败时,都会进行重试。重试次数由提供的 Backoff 实现值控制。在我们的示例中,我们进行 10 次重试(总共 11 次尝试 — 第一次尝试和随后的 10 次重试)。一旦所有重试都用尽,消息就会被发送到 DLT 主题。

我们提供的 PulsarDeadLetterPublishingRecoverer 实现使用了 PulsarTemplate 来将消息发布到死信队列(DLT)。在大多数情况下,Spring Boot 自动配置的 PulsarTemplate 就足够了,但对于分区主题有一个例外。当使用分区主题并且为主主题使用自定义消息路由时,你必须使用一个不同的 PulsarTemplate,它不使用自动配置的 PulsarProducerFactory,该工厂被设置为 message-routing-mode 的值为 custompartition。你可以使用以下蓝图中的 PulsarConsumerErrorHandler

@Bean
public PulsarConsumerErrorHandler pulsarConsumerErrorHandler(
PulsarClient pulsarClient,
Schema<?> schema) {

PulsarTemplate<?> template = new PulsarTemplate<>(
new PulsarProducerFactory<Object>(pulsarClient, schema));
PulsarDeadLetterPublishingRecoverer recoverer = new PulsarDeadLetterPublishingRecoverer(template);
return (consumer, msg, ex) -> {
recoverer.accept(msg, ex);
consumer.negativeAcknowledge(msg);
};
}
java
@Bean
PulsarConsumerErrorHandler<Integer> pulsarConsumerErrorHandler(PulsarClient pulsarClient) {
PulsarProducerFactory<Integer> pulsarProducerFactory = new DefaultPulsarProducerFactory<>(pulsarClient, Map.of());
PulsarTemplate<Integer> pulsarTemplate = new PulsarTemplate<>(pulsarProducerFactory);

BiFunction<Consumer<?>, Message<?>, String> destinationResolver =
(c, m) -> "my-foo-dlt";

PulsarDeadLetterPublishingRecoverer<Integer> pulsarDeadLetterPublishingRecoverer =
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, destinationResolver);

return new DefaultPulsarConsumerErrorHandler<>(pulsarDeadLetterPublishingRecoverer,
new FixedBackOff(100, 5));
}
java

请注意,我们为 PulsarDeadLetterPublishingRecoverer 提供了一个目标解析器作为第二个构造函数参数。如果没有提供,PulsarDeadLetterPublishingRecoverer 将使用 <subscription-name>-<topic-name>-DLT> 作为 DLT 主题名称。在使用此功能时,您应该通过设置目标解析器来使用适当的目标名称,而不是使用默认值。

当使用单条记录消息监听器时,就像我们使用 PulsarConsumerErrorHnadler 一样,如果你使用手动确认,请确保在抛出异常时不要对消息进行负面确认。相反,应该将异常重新抛回给容器。否则,容器会认为消息已被单独处理,而不会触发错误处理。

最后,我们有一个接收来自DLT主题的消息的第二个 PulsarListener

在本节迄今为止提供的示例中,我们仅看到了如何将 PulsarConsumerErrorHandler 与单个记录消息监听器一起使用。接下来,我们看看如何在批量监听器上使用它。

7.5. 使用 PulsarConsumerErrorHandler 的批处理监听器

首先,让我们看一下批量 PulsarListener 方法:

@PulsarListener(subscriptionName = "batch-demo-5-sub", topics = "batch-demo-4", batch = true, concurrency = "3",
subscriptionType = SubscriptionType.Failover,
pulsarConsumerErrorHandler = "pulsarConsumerErrorHandler", ackMode = AckMode.MANUAL)
void listen(List<Message<Integer>> data, Consumer<Integer> consumer, Acknowledgment acknowledgment) {
for (Message<Integer> datum : data) {
if (datum.getValue() == 5) {
throw new PulsarBatchListenerFailedException("failed", datum);
}
acknowledgement.acknowledge(datum.getMessageId());
}
}

@Bean
PulsarConsumerErrorHandler<String> pulsarConsumerErrorHandler(
PulsarTemplate<String> pulsarTemplate) {
return new DefaultPulsarConsumerErrorHandler<>(
new PulsarDeadLetterPublishingRecoverer<>(pulsarTemplate, (c, m) -> "my-foo-dlt"), new FixedBackOff(100, 10));
}

@PulsarListener(subscriptionName = "my-dlt-subscription", topics = "my-foo-dlt")
void dltReceiver(Message<Integer> message) {
System.out.println("DLT - RECEIVED: " + message.getValue());
}
java

再次,我们提供 pulsarConsumerErrorHandler 属性,并设置为 PulsarConsumerErrorHandler bean 的名称。当你使用批处理监听器(如前面示例所示)并希望使用 Spring for Apache Pulsar 提供的 PulsarConsumerErrorHandler 时,你需要使用手动确认。这样,你可以确认所有成功的单个消息。对于失败的消息,你必须抛出带有失败消息的 PulsarBatchListenerFailedException。没有这个异常,框架将不知道如何处理失败。在重试时,容器会从失败的消息开始,向监听器发送一批新的消息。如果再次失败,它将被重试,直到重试次数用尽,此时消息将被发送到 DLT。此时,消息由容器确认,并将原始批次中的后续消息交给监听器处理。

8. 在 PulsarListener 上的消费者自定义

Spring for Apache Pulsar 提供了一种方便的方式来定制由 PulsarListener 使用的容器创建的消费者。应用程序可以提供一个 PulsarListenerConsumerBuilderCustomizer 的 bean。以下是一个示例。

@Bean
public PulsarListenerConsumerBuilderCustomizer<String> myCustomizer() {
return cb -> {
cb.subscriptionName("modified-subscription-name");
};
}
java

然后可以将这个定制器 bean 名称作为 PuslarListener 注解的属性提供,如下所示。

@PulsarListener(subscriptionName = "my-subscription",
topics = "my-topic", consumerCustomizer = "myCustomizer")
void listen(String message) {

}
java

框架通过 PulsarListener 检测提供的 bean,并在创建 Pulsar Consumer 之前将此定制器应用到 Consumer 构造器上。

如果你有多个 PulsarListener 方法,并且每个方法都有不同的自定义规则,你应该创建多个自定义器 bean,并将适当的自定义器附加到每个 PulsarListener 上。

9. 消息监听器容器生命周期

9.1. 暂停和恢复

有些情况下,应用程序可能希望暂时暂停消息消费,然后稍后再恢复。Spring for Apache Pulsar 提供了暂停和恢复底层消息监听器容器的能力。当 Pulsar 消息监听器容器被暂停时,容器从 Pulsar 消费者接收数据的任何轮询都会被暂停。同样地,当容器被恢复时,如果在暂停期间主题有任何新记录添加,下一次轮询将开始返回数据。

要暂停或恢复监听器容器,首先通过 PulsarListenerEndpointRegistry bean 获取容器实例,然后在容器实例上调用暂停/恢复 API —— 如下面的代码片段所示:

@Autowired
private PulsarListenerEndpointRegistry registry;

void someMethod() {
PulsarMessageListenerContainer container = registry.getListenerContainer("my-listener-id");
container.pause();
}
java
提示

传递给 getListenerContainer 的 id 参数是容器的 id - 在暂停/恢复 @PulsarListener 时,它将是 @PulsarListener 的 id 属性的值。

9.2. 处理启动失败

消息监听容器在应用上下文刷新时启动。默认情况下,启动过程中遇到的任何故障都会被重新抛出,应用程序将无法启动。你可以通过相应容器属性上的 StartupFailurePolicy 来调整此行为。

可用的选项有:

  • Stop(默认)- 记录并重新抛出异常,实际上停止应用程序

  • Continue - 记录异常,使容器处于非运行状态,但不停止应用程序

  • Retry - 记录异常,异步重试启动容器,但不停止应用程序。

默认的重试行为是重试 3 次,每次尝试之间间隔 10 秒。然而,可以在相应的容器属性上指定一个自定义的重试模板。如果容器在重试次数用尽后仍无法重启,则会保持在非运行状态。

9.2.1. 配置

使用 Spring Boot

在使用 Spring Boot 时,你可以注册一个 PulsarContainerFactoryCustomizer<ConcurrentPulsarListenerContainerFactory<?>> bean 来设置容器启动属性。

不使用 Spring Boot

但是,如果你是手动配置组件,则在构建消息监听容器工厂时,必须相应地更新容器启动属性。

10. Pulsar Reader 支持

框架通过 PulsarReaderFactory 提供了使用 Pulsar Reader 的支持。

Spring Boot 提供了这个读取器工厂,你可以通过指定任何 spring.pulsar.reader.* 应用属性来进一步配置它。

10.1. PulsarReader 注解

虽然可以直接使用 PulsarReaderFactory,但 Spring for Apache Pulsar 提供了 PulsarReader 注解,你可以使用它来快速读取主题而无需自己设置任何读取器工厂。这与 PulsarListener 背后的理念相似。以下是一个简单的示例。

@PulsarReader(id = "reader-demo-id", topics = "reader-demo-topic", startMessageId = "earliest")
void read(String message) {
//...
}
java

id 属性是可选的,但提供一个对你的应用程序有意义的值是一种最佳实践。如果没有指定,则会使用自动生成的 id。另一方面,topicsstartMessageId 属性是必填的。topics 属性可以是一个单独的主题或以逗号分隔的主题列表。startMessageId 属性指示读取器从主题中的特定消息开始。startMessageId 的有效值为 earliestlatest。假设你希望读取器从主题中任意位置(而不是最早或最新的可用消息)开始读取消息。在这种情况下,你需要使用 ReaderBuilderCustomizer 来自定义 ReaderBuilder,以便它知道要从哪个 MessageId 开始。

10.2. 自定义 ReaderBuilder

你可以使用 Spring for Apache Pulsar 中的 PulsarReaderReaderBuilderCustomizer 来自定义 ReaderBuilder 中可用的任何字段。你可以提供一个类型为 PulsarReaderReaderBuilderCustomizer@Bean,然后如下所示使其对 PulsarReader 可用。

@PulsarReader(id = "reader-customizer-demo-id", topics = "reader-customizer-demo-topic",
readerCustomizer = "myCustomizer")
void read(String message) {
//...
}

@Bean
public PulsarReaderReaderBuilderCustomizer<String> myCustomizer() {
return readerBuilder -> {
readerBuilder.startMessageId(messageId); // the first message read is after this message id.
// Any other customizations on the readerBuilder
};
}
java
提示

如果你的应用程序只有一个 @PulsarReader 和一个 PulsarReaderReaderBuilderCustomizer bean 注册,那么该定制器将被自动应用。

10.3. 处理启动失败

消息监听容器在应用程序上下文刷新时启动。默认情况下,启动过程中遇到的任何故障都会被重新抛出,应用程序将无法启动。您可以通过相应容器属性上的 StartupFailurePolicy 来调整此行为。

可用的选项有:

  • Stop(默认)- 记录并重新抛出异常,实际上停止应用程序

  • Continue - 记录异常,使容器处于非运行状态,但不停止应用程序

  • Retry - 记录异常,异步重试启动容器,但不停止应用程序。

默认的重试行为是重试 3 次,每次尝试之间间隔 10 秒。然而,可以在相应的容器属性上指定一个自定义的重试模板。如果容器在重试次数用尽后仍无法重新启动,则会保持在非运行状态。

10.3.1. 配置

使用 Spring Boot

在使用 Spring Boot 时,你可以注册一个 PulsarContainerFactoryCustomizer<DefaultPulsarReaderContainerFactory<?>> bean 来设置容器启动属性。

不使用 Spring Boot

但是,如果你是手动配置组件,则在构建消息监听容器工厂时,需要相应地更新容器启动属性。