跳到主要内容

消息消费

QWen Max 中英对照 Message Consumption

目录

1. @ReactivePulsarListener

当涉及到 Pulsar 消费者时,我们建议最终用户应用程序使用 ReactivePulsarListener 注解。要使用 ReactivePulsarListener,你需要使用 @EnableReactivePulsar 注解。当你使用 Spring Boot 支持时,它会自动启用该注解并配置所有必要的组件,例如消息监听器基础设施(负责创建底层的 Pulsar 消费者)。

让我们回顾一下在快速入门部分看到的 ReactivePulsarListener 代码片段:

@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
java
备注

监听器方法返回一个 Mono<Void> 来表示消息是否成功处理。Mono.empty() 表示成功(确认),而 Mono.error() 表示失败(否定确认)。

您还可以进一步简化此方法:

@ReactivePulsarListener
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
java

在这种最基本的形式中,当 topics 没有直接提供时,将使用 主题解析过程 来确定目标主题。同样地,当 @ReactivePulsarListener 注解上没有提供 subscriptionName 时,将使用自动生成的订阅名称。

在前面展示的 ReactivePulsarListener 方法中,我们接收的数据类型为 String,但没有指定任何模式类型。框架内部依赖 Pulsar 的模式机制将数据转换为所需类型。

框架检测到你期望的是 String 类型,然后根据该信息推断模式类型,并将该模式提供给使用者。框架对所有基本类型进行这种推断。对于所有非基本类型,默认假定模式为 JSON。如果复杂类型使用了 JSON 以外的其他格式(例如 AVRO 或 KEY_VALUE),则必须在注解中使用 schemaType 属性提供模式类型。

这个示例展示了我们如何从主题中消费复杂类型:

@ReactivePulsarListener(topics = "my-topic-2", schemaType = SchemaType.JSON)
Mono<Void> listen(Foo message) {
System.out.println(message);
return Mono.empty();
}
java

让我们再看一些我们可以使用的方法。

此示例直接消费 Pulsar 消息:

@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println(message.getValue());
return Mono.empty();
}
java

这个示例消费了用 Spring 消息信封包裹的记录:

@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.springframework.messaging.Message<String> message) {
System.out.println(message.getPayload());
return Mono.empty();
}
java

1.1. 流式处理

以上所有示例都是一次消费一条记录。然而,使用 Reactive 的一个令人信服的原因是其具有支持背压的流处理能力。

下面的示例使用 ReactivePulsarListener 来消费 POJO 流:

@ReactivePulsarListener(topics = "streaming-1", stream = true)
Flux<MessageResult<Void>> listen(Flux<org.apache.pulsar.client.api.Message<String>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getValue()))
.map(MessageResult::acknowledge);
}
java

在这里,我们接收记录作为 Pulsar 消息的 Flux。此外,为了在 ReactivePulsarListener 级别启用流消费,你需要将注解上的 stream 属性设置为 true

备注

监听器方法返回一个 Flux<MessageResult<Void>>,其中每个元素代表一个已处理的消息,并持有消息 ID、值以及是否已被确认。MessageResult 有一组静态工厂方法,可以用来创建适当的 MessageResult 实例。

根据 Flux 中消息的实际类型,框架会尝试推断要使用的模式。如果它包含复杂类型,您仍然需要在 ReactivePulsarListener 上提供 schemaType

以下监听器使用 Spring 消息传递的 Message 信封与一个复杂类型:

@ReactivePulsarListener(topics = "streaming-2", stream = true, schemaType = SchemaType.JSON)
Flux<MessageResult<Void>> listen2(Flux<org.springframework.messaging.Message<Foo>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getPayload()))
.map(MessageUtils::acknowledge);
}
java
备注

监听器方法返回一个 Flux<MessageResult<Void>>,其中每个元素表示一个已处理的消息,并持有消息 ID、值以及是否已被确认。Spring 的 MessageUtils 提供了一组静态工厂方法,可以用来从 Spring 消息创建适当的 MessageResult 实例。MessageUtils 为 Spring 消息提供了与 MessagResult 上的一组工厂方法对 Pulsar 消息相同的功能。

备注

不支持在 @ReactivePulsarListener 中使用 org.apache.pulsar.client.api.Messages<T>

1.2. 配置 - 应用属性

监听器依赖于 ReactivePulsarConsumerFactory 来创建和管理它用于消费消息的底层 Pulsar 消费者。Spring Boot 提供了这个消费者工厂,你可以通过指定 spring.pulsar.consumer.* 应用属性来进一步配置它。

1.3. 通用记录与 AUTO_CONSUME

如果无法提前知道 Pulsar 主题的模式类型,可以使用 AUTO_CONSUME 模式类型来消费通用记录。在这种情况下,主题会使用与主题关联的模式信息将消息反序列化为 GenericRecord 对象。

要消费泛型记录,请在 @ReactivePulsarListener 上设置 schemaType = SchemaType.AUTO_CONSUME,并将 Pulsar 消息类型设置为 GenericRecord 作为消息参数,如下所示。

@ReactivePulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
Mono<Void> listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
GenericRecord record = message.getValue();
record.getFields().forEach((f) ->
System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
return Mono.empty();
}
java
提示

GenericRecord API 允许访问字段及其关联的值

1.4. 消费者自定义

你可以指定一个 ReactivePulsarListenerMessageConsumerBuilderCustomizer 来配置底层的 Pulsar 消费者构建器,该构建器最终会构造出监听器用来接收消息的消费者。

注意

谨慎使用,因为这将提供对 consumer 构建器的完全访问权限,调用其某些方法(例如 create)可能会产生意外的副作用。

例如,下面的代码展示了如何将订阅的初始位置设置为该主题的最早消息。

// 代码示例
java
@ReactivePulsarListener(topics = "hello-pulsar-topic", consumerCustomizer = "myConsumerCustomizer")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myConsumerCustomizer() {
return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}
java
提示

如果您的应用程序只有一个 @ReactivePulsarListener 和一个 ReactivePulsarListenerMessageConsumerBuilderCustomizer bean 注册,那么该定制器将被自动应用。

你也可以使用定制器为消费者构建器提供直接的 Pulsar 消费者属性。如果你不想使用前面提到的 Boot 配置属性,或者有多个配置各异的 ReactivePulsarListener 方法,这样做会很方便。

以下定制器示例使用直接的 Pulsar 消费者属性:

@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> directConsumerPropsCustomizer() {
return b -> b.property("subscriptionName", "subscription-1").property("topicNames", "foo-1");
}
java
警告

使用的属性是直接的 Pulsar 消费者属性,而不是 spring.pulsar.consumer Spring Boot 配置属性

2. 指定模式信息

如前所述,对于 Java 基本类型,Spring for Apache Pulsar 框架可以推断出在 ReactivePulsarListener 上使用的正确 Schema。对于非基本类型,如果注解上没有明确指定 Schema,Spring for Apache Pulsar 框架将尝试从类型构建一个 Schema.JSON

important

目前支持的复杂 Schema 类型有 JSON、AVRO、PROTOBUF、AUTO_CONSUME、带 INLINE 编码的 KEY_VALUE。

2.1. 自定义模式映射

作为在 ReactivePulsarListener 上为复杂类型指定模式的替代方法,可以配置模式解析器来映射这些类型。这样就无需在监听器上设置模式,因为框架会根据传入的消息类型查询解析器。

2.1.1. 配置属性

Schema 映射可以通过 spring.pulsar.defaults.type-mappings 属性进行配置。以下示例使用 application.ymlUserAddress 复杂对象分别使用 AVROJSON 模式添加映射:

spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
yaml
备注

message-type 是消息类的完全限定名。

2.1.2. Schema resolver customizer

添加映射的首选方法是通过上面提到的属性。但是,如果需要更多的控制,您可以提供一个模式解析器定制器来添加映射。

以下示例使用 schema 解析器定制器为 UserAddress 复杂对象分别使用 AVROJSON 模式添加映射:

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
java

2.1.3. 类型映射注解

另一种为特定消息类型指定默认 schema 信息的方法是使用 @PulsarMessage 注解标记消息类。可以通过注解上的 schemaType 属性指定 schema 信息。

以下示例配置系统在生产和消费 Foo 类型的消息时使用 JSON 作为默认 schema:

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
java

有了这样的配置,就不需要在监听器上设置模式了,例如:

@ReactivePulsarListener(topics = "user-topic")
Mono<Void> listen(User user) {
System.out.println(user);
return Mono.empty();
}
java

3. 消息监听器容器基础设施

在大多数场景中,我们建议直接使用 ReactivePulsarListener 注解来消费 Pulsar 主题,因为该模型涵盖了广泛的应用用例。然而,了解 ReactivePulsarListener 的内部工作原理是很重要的。

消息监听器容器是使用 Spring for Apache Pulsar 进行消息消费的核心。ReactivePulsarListener 在幕后使用消息监听器容器基础设施来创建和管理底层的 Pulsar 消费者。

3.1. ReactivePulsarMessageListenerContainer

此消息监听器容器的契约通过 ReactivePulsarMessageListenerContainer 提供,其默认实现创建一个反应式的 Pulsar 消费者,并连接一个使用所创建消费者的反应式消息管道。

3.2. ReactiveMessagePipeline

管道是底层 Apache Pulsar Reactive 客户端的一个特性,它负责以反应式方式接收数据,然后将其传递给提供的消息处理器。反应式消息监听容器的实现要简单得多,因为管道处理了大部分工作。

3.3. ReactivePulsarMessageHandler

"监听器"方面由 ReactivePulsarMessageHandler 提供,其中有两种提供的实现:

  • ReactivePulsarOneByOneMessageHandler - 一次处理一条消息

  • ReactivePulsarStreamingHandler - 通过 Flux 处理多条消息

备注

如果在直接使用监听器容器时未指定主题信息,则会使用与 ReactivePulsarListener 相同的主题解析过程,唯一的例外是“消息类型默认”步骤被省略

3.4. 处理启动失败

消息监听器容器在应用程序上下文刷新时启动。默认情况下,启动过程中遇到的任何故障都会被重新抛出,应用程序将无法启动。您可以通过相应容器属性上的 StartupFailurePolicy 来调整此行为。

可用的选项有:

  • Stop(默认)- 记录并重新抛出异常,实际上停止应用程序

  • Continue - 记录异常,使容器处于非运行状态,但不停止应用程序

  • Retry - 记录异常,异步重试启动容器,但不停止应用程序。

默认的重试行为是重试 3 次,每次尝试之间间隔 10 秒。然而,可以在相应的容器属性上指定自定义重试模板。如果容器在重试次数用尽后仍无法重新启动,则会保持在非运行状态。

3.4.1. 配置

使用 Spring Boot

在使用 Spring Boot 时,您可以注册一个 PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>> bean 来设置容器启动属性。

不使用 Spring Boot

但是,如果你是手动配置组件,则在构建消息监听容器工厂时,需要相应地更新容器启动属性。

4. 并发

在流模式(stream = true)下消费记录时,并发性通过客户端实现中的底层 Reactive 支持自然地实现。

然而,在逐个处理消息时,可以通过指定并发来增加处理吞吐量。只需在 @ReactivePulsarListener 上设置 concurrency 属性即可。此外,当 concurrency > 1 时,你可以通过在注解上设置 useKeyOrderedProcessing = "true" 来确保按 key 排序的消息被发送到同一个处理器。

同样,ReactiveMessagePipeline 负责主要的工作,我们只需要设置它的属性即可。

Reactive 与 Imperative

反应式容器中的并发与命令式容器中的并发不同。后者创建多个线程(每个线程都有一个 Pulsar 消费者),而前者则在反应式并行调度器上将消息分派给多个处理程序实例。

反应式并发模型的一个优点是它可以与 Exclusive 订阅一起使用,而命令式并发模型则不能。

5. Pulsar Headers

Pulsar 消息元数据可以作为 Spring 消息头来使用。可用的消息头列表可以在 PulsarHeaders.java 中找到。

5.1. 在 OneByOne 监听器中访问

以下示例展示了如何在使用逐条消息监听器时访问 Pulsar Headers:

@ReactivePulsarListener(topics = "some-topic")
Mono<Void> listen(String data,
@Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header("foo") String foo) {
System.out.println("Received " + data + " w/ id=" + messageId + " w/ foo=" + foo);
return Mono.empty();
}
java

在前面的示例中,我们访问了 messageId 消息元数据的值以及一个名为 foo 的自定义消息属性。每个头部字段都使用了 Spring 的 @Header 注解。

你也可以使用 Pulsar 的 Message 作为信封来携带负载。这样做时,用户可以直接调用 Pulsar 消息上的相应方法来检索元数据。但是,为了方便起见,你也可以通过使用 Header 注解来检索它。请注意,你也可以使用 Spring 消息传递的 Message 信封来携带负载,然后通过使用 @Header 来检索 Pulsar 头信息。

5.2. 在流式监听器中访问

在使用流式消息监听器时,对头部的支持是有限的。只有当 Flux 包含 Spring org.springframework.messaging.Message 元素时,才会填充头部。此外,Spring 的 @Header 注解不能用于检索数据。您必须直接调用 Spring 消息上的相应方法来检索数据。

6. 消息确认

框架会自动处理消息确认。但是,监听器方法必须发送一个信号,指示消息是否已成功处理。然后,容器实现将使用该信号来执行 ack 或 nack 操作。这与它的命令式对应方式略有不同,在命令式方式中,除非方法抛出异常,否则信号被默认为是正向的。

6.1. OneByOne Listener

单个消息(又称 OneByOne)消息监听器方法返回一个 Mono<Void>,以指示消息是否成功处理。Mono.empty() 表示成功(确认),而 Mono.error() 表示失败(否定确认)。

6.2. 流式监听器

流式监听器方法返回一个 Flux<MessageResult<Void>>,其中每个 MessageResult 元素代表一个已处理的消息,并持有消息 id、值以及它是否被确认。MessageResult 有一组 acknowledgenegativeAcknowledge 静态工厂方法,可以用来创建适当的 MessageResult 实例。

7. 消息重发和错误处理

Apache Pulsar 提供了多种原生的消息重传和错误处理策略。我们将了解一下这些策略,并学习如何通过 Spring for Apache Pulsar 使用它们。

7.1. Acknowledgment Timeout

默认情况下,Pulsar 消费者在消费者崩溃之前不会重新发送消息,但你可以通过在 Pulsar 消费者上设置一个 ack 超时来改变这种行为。如果 ack 超时属性的值大于零,并且 Pulsar 消费者在该超时时间内未确认消息,则会重新发送该消息。

你可以通过诸如消费者自定义器这样的方式直接将此属性指定为 Pulsar 消费者属性。

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("ackTimeoutMillis", "60000");
}
java

7.2. Negative Acknowledgment Redelivery Delay

当否定确认时,Pulsar 消费者让你指定应用程序希望如何重新传递消息。默认是在一分钟内重新传递消息,但你可以通过消费者定制器来更改它,例如:

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("negativeAckRedeliveryDelay", "10ms");
}
java

7.3. Dead Letter Topic

Apache Pulsar 允许应用程序在使用 Shared 订阅类型的情况下使用死信主题。对于 ExclusiveFailover 订阅类型,此功能不可用。基本思路是,如果一条消息被重试了一定次数(可能是由于 ack 超时或 nack 重新传递),一旦重试次数用尽,该消息可以被发送到一个称为死信队列(DLQ)的特殊主题。让我们通过检查一些代码片段来看看此功能的实际应用细节:

@Configuration(proxyBeanMethods = false)
class DeadLetterPolicyConfig {

@ReactivePulsarListener(
topics = "topic-with-dlp",
subscriptionType = SubscriptionType.Shared,
deadLetterPolicy = "myDeadLetterPolicy",
consumerCustomizer = "ackTimeoutCustomizer" )
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}

@ReactivePulsarListener(topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}

@Bean
DeadLetterPolicy myDeadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}

@Bean
ReactiveMessageConsumerBuilderCustomizer<String> ackTimeoutCustomizer() {
return b -> b.property("ackTimeoutMillis", "1000");
}
}
java

首先,我们有一个特殊的 DeadLetterPolicy 的 bean,它被命名为 deadLetterPolicy(你可以根据需要命名)。这个 bean 指定了许多事项,例如最大投递次数(本例中为 10)和死信主题的名称 — 在本例中为 my-dlq-topic。如果你不指定一个 DLQ 主题名称,在 Pulsar 中默认名称为 <topicname>-<subscriptionname>-DLQ。接下来,我们通过设置 deadLetterPolicy 属性将这个 bean 名称提供给 ReactivePulsarListener。请注意,ReactivePulsarListener 的订阅类型为 Shared,因为 DLQ 功能仅适用于共享订阅。这段代码主要是为了演示目的,所以我们提供了 1000 的 ackTimeoutMillis 值。其想法是,代码抛出异常,如果 Pulsar 在 1 秒内没有收到确认消息,则进行重试。如果该循环持续十次(因为这是我们在 DeadLetterPolicy 中的最大重投递计数),Pulsar 消费者会将消息发布到 DLQ 主题。我们还有一个 ReactivePulsarListener 监听 DLQ 主题,以接收发布到 DLQ 主题的数据。

使用分区主题时关于 DLQ 主题的特别说明

如果主主题是分区的,在幕后,Pulsar 会将每个分区视为单独的主题。Pulsar 在主主题名称后面追加 partition-<n>,其中 <n> 代表分区编号。问题是,如果你不指定一个 DLQ 主题(与我们上面所做的相反),Pulsar 会发布到一个默认的主题名称中,该名称包含此 partition-<n> 信息 — 例如:topic-with-dlp-partition-0-deadLetterPolicySubscription-DLQ。解决这个问题的简单方法是始终提供一个 DLQ 主题名称。

8. Pulsar Reader 支持

框架通过 ReactivePulsarReaderFactory 以反应式的方式提供了对 Pulsar Reader 的支持。

Spring Boot 提供了这个读取器工厂,可以使用任何 spring.pulsar.reader.* 应用属性进行配置。