消息消费
目录
1. @ReactivePulsarListener
当涉及到 Pulsar 消费者时,我们建议最终用户应用程序使用 ReactivePulsarListener
注解。要使用 ReactivePulsarListener
,你需要使用 @EnableReactivePulsar
注解。当你使用 Spring Boot 支持时,它会自动启用该注解并配置所有必要的组件,例如消息监听器基础设施(负责创建底层的 Pulsar 消费者)。
让我们回顾一下在快速入门部分看到的 ReactivePulsarListener
代码片段:
@ReactivePulsarListener(subscriptionName = "hello-pulsar-sub", topics = "hello-pulsar-topic")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
监听器方法返回一个 Mono<Void>
来表示消息是否成功处理。Mono.empty()
表示成功(确认),而 Mono.error()
表示失败(否定确认)。
您还可以进一步简化此方法:
@ReactivePulsarListener
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
在这种最基本的形式中,当 topics
没有直接提供时,将使用 主题解析过程 来确定目标主题。同样地,当 @ReactivePulsarListener
注解上没有提供 subscriptionName
时,将使用自动生成的订阅名称。
在前面展示的 ReactivePulsarListener
方法中,我们接收的数据类型为 String
,但没有指定任何模式类型。框架内部依赖 Pulsar 的模式机制将数据转换为所需类型。
框架检测到你期望的是 String
类型,然后根据该信息推断模式类型,并将该模式提供给使用者。框架对所有基本类型进行这种推断。对于所有非基本类型,默认假定模式为 JSON。如果复杂类型使用了 JSON 以外的其他格式(例如 AVRO 或 KEY_VALUE),则必须在注解中使用 schemaType
属性提供模式类型。
这个示例展示了我们如何从主题中消费复杂类型:
@ReactivePulsarListener(topics = "my-topic-2", schemaType = SchemaType.JSON)
Mono<Void> listen(Foo message) {
System.out.println(message);
return Mono.empty();
}
让我们再看一些我们可以使用的方法。
此示例直接消费 Pulsar 消息:
@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.apache.pulsar.client.api.Message<String> message) {
System.out.println(message.getValue());
return Mono.empty();
}
这个示例消费了用 Spring 消息信封包裹的记录:
@ReactivePulsarListener(topics = "my-topic")
Mono<Void> listen(org.springframework.messaging.Message<String> message) {
System.out.println(message.getPayload());
return Mono.empty();
}
1.1. 流式处理
以上所有示例都是一次消费一条记录。然而,使用 Reactive 的一个令人信服的原因是其具有支持背压的流处理能力。
下面的示例使用 ReactivePulsarListener
来消费 POJO 流:
@ReactivePulsarListener(topics = "streaming-1", stream = true)
Flux<MessageResult<Void>> listen(Flux<org.apache.pulsar.client.api.Message<String>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getValue()))
.map(MessageResult::acknowledge);
}
在这里,我们接收记录作为 Pulsar 消息的 Flux
。此外,为了在 ReactivePulsarListener
级别启用流消费,你需要将注解上的 stream
属性设置为 true
。
监听器方法返回一个 Flux<MessageResult<Void>>
,其中每个元素代表一个已处理的消息,并持有消息 ID、值以及是否已被确认。MessageResult
有一组静态工厂方法,可以用来创建适当的 MessageResult
实例。
根据 Flux
中消息的实际类型,框架会尝试推断要使用的模式。如果它包含复杂类型,您仍然需要在 ReactivePulsarListener
上提供 schemaType
。
以下监听器使用 Spring 消息传递的 Message
信封与一个复杂类型:
@ReactivePulsarListener(topics = "streaming-2", stream = true, schemaType = SchemaType.JSON)
Flux<MessageResult<Void>> listen2(Flux<org.springframework.messaging.Message<Foo>> messages) {
return messages
.doOnNext((msg) -> System.out.println("Received: " + msg.getPayload()))
.map(MessageUtils::acknowledge);
}
监听器方法返回一个 Flux<MessageResult<Void>>
,其中每个元素表示一个已处理的消息,并持有消息 ID、值以及是否已被确认。Spring 的 MessageUtils
提供了一组静态工厂方法,可以用来从 Spring 消息创建适当的 MessageResult
实例。MessageUtils
为 Spring 消息提供了与 MessagResult
上的一组工厂方法对 Pulsar 消息相同的功能。
不支持在 @ReactivePulsarListener
中使用 org.apache.pulsar.client.api.Messages<T>
。
1.2. 配置 - 应用属性
监听器依赖于 ReactivePulsarConsumerFactory
来创建和管理它用于消费消息的底层 Pulsar 消费者。Spring Boot 提供了这个消费者工厂,你可以通过指定 spring.pulsar.consumer.* 应用属性来进一步配置它。
1.3. 通用记录与 AUTO_CONSUME
如果无法提前知道 Pulsar 主题的模式类型,可以使用 AUTO_CONSUME
模式类型来消费通用记录。在这种情况下,主题会使用与主题关联的模式信息将消息反序列化为 GenericRecord
对象。
要消费泛型记录,请在 @ReactivePulsarListener
上设置 schemaType = SchemaType.AUTO_CONSUME
,并将 Pulsar 消息类型设置为 GenericRecord
作为消息参数,如下所示。
@ReactivePulsarListener(topics = "my-generic-topic", schemaType = SchemaType.AUTO_CONSUME)
Mono<Void> listen(org.apache.pulsar.client.api.Message<GenericRecord> message) {
GenericRecord record = message.getValue();
record.getFields().forEach((f) ->
System.out.printf("%s = %s%n", f.getName(), record.getField(f)));
return Mono.empty();
}
GenericRecord
API 允许访问字段及其关联的值
1.4. 消费者自定义
你可以指定一个 ReactivePulsarListenerMessageConsumerBuilderCustomizer
来配置底层的 Pulsar 消费者构建器,该构建器最终会构造出监听器用来接收消息的消费者。
谨慎使用,因为这将提供对 consumer 构建器的完全访问权限,调用其某些方法(例如 create
)可能会产生意外的副作用。
例如,下面的代码展示了如何将订阅的初始位置设置为该主题的最早消息。
// 代码示例
@ReactivePulsarListener(topics = "hello-pulsar-topic", consumerCustomizer = "myConsumerCustomizer")
Mono<Void> listen(String message) {
System.out.println(message);
return Mono.empty();
}
@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> myConsumerCustomizer() {
return b -> b.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
}
如果您的应用程序只有一个 @ReactivePulsarListener
和一个 ReactivePulsarListenerMessageConsumerBuilderCustomizer
bean 注册,那么该定制器将被自动应用。
你也可以使用定制器为消费者构建器提供直接的 Pulsar 消费者属性。如果你不想使用前面提到的 Boot 配置属性,或者有多个配置各异的 ReactivePulsarListener
方法,这样做会很方便。
以下定制器示例使用直接的 Pulsar 消费者属性:
@Bean
ReactivePulsarListenerMessageConsumerBuilderCustomizer<String> directConsumerPropsCustomizer() {
return b -> b.property("subscriptionName", "subscription-1").property("topicNames", "foo-1");
}
使用的属性是直接的 Pulsar 消费者属性,而不是 spring.pulsar.consumer
Spring Boot 配置属性
2. 指定模式信息
如前所述,对于 Java 基本类型,Spring for Apache Pulsar 框架可以推断出在 ReactivePulsarListener
上使用的正确 Schema。对于非基本类型,如果注解上没有明确指定 Schema,Spring for Apache Pulsar 框架将尝试从类型构建一个 Schema.JSON
。
目前支持的复杂 Schema 类型有 JSON、AVRO、PROTOBUF、AUTO_CONSUME、带 INLINE 编码的 KEY_VALUE。
2.1. 自定义模式映射
作为在 ReactivePulsarListener
上为复杂类型指定模式的替代方法,可以配置模式解析器来映射这些类型。这样就无需在监听器上设置模式,因为框架会根据传入的消息类型查询解析器。
2.1.1. 配置属性
Schema 映射可以通过 spring.pulsar.defaults.type-mappings
属性进行配置。以下示例使用 application.yml
为 User
和 Address
复杂对象分别使用 AVRO
和 JSON
模式添加映射:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
message-type
是消息类的完全限定名。
2.1.2. Schema resolver customizer
添加映射的首选方法是通过上面提到的属性。但是,如果需要更多的控制,您可以提供一个模式解析器定制器来添加映射。
以下示例使用 schema 解析器定制器为 User
和 Address
复杂对象分别使用 AVRO
和 JSON
模式添加映射:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.1.3. 类型映射注解
另一种为特定消息类型指定默认 schema 信息的方法是使用 @PulsarMessage
注解标记消息类。可以通过注解上的 schemaType
属性指定 schema 信息。
以下示例配置系统在生产和消费 Foo
类型的消息时使用 JSON 作为默认 schema:
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
有了这样的配置,就不需要在监听器上设置模式了,例如:
@ReactivePulsarListener(topics = "user-topic")
Mono<Void> listen(User user) {
System.out.println(user);
return Mono.empty();
}
3. 消息监听器容器基础设施
在大多数场景中,我们建议直接使用 ReactivePulsarListener
注解来消费 Pulsar 主题,因为该模型涵盖了广泛的应用用例。然而,了解 ReactivePulsarListener
的内部工作原理是很重要的。
消息监听器容器是使用 Spring for Apache Pulsar 进行消息消费的核心。ReactivePulsarListener
在幕后使用消息监听器容器基础设施来创建和管理底层的 Pulsar 消费者。
3.1. ReactivePulsarMessageListenerContainer
此消息监听器容器的契约通过 ReactivePulsarMessageListenerContainer
提供,其默认实现创建一个反应式的 Pulsar 消费者,并连接一个使用所创建消费者的反应式消息管道。
3.2. ReactiveMessagePipeline
管道是底层 Apache Pulsar Reactive 客户端的一个特性,它负责以反应式方式接收数据,然后将其传递给提供的消息处理器。反应式消息监听容器的实现要简单得多,因为管道处理了大部分工作。
3.3. ReactivePulsarMessageHandler
"监听器"方面由 ReactivePulsarMessageHandler
提供,其中有两种提供的实现:
-
ReactivePulsarOneByOneMessageHandler
- 一次处理一条消息 -
ReactivePulsarStreamingHandler
- 通过Flux
处理多条消息
如果在直接使用监听器容器时未指定主题信息,则会使用与 ReactivePulsarListener
相同的主题解析过程,唯一的例外是“消息类型默认”步骤被省略。
3.4. 处理启动失败
消息监听器容器在应用程序上下文刷新时启动。默认情况下,启动过程中遇到的任何故障都会被重新抛出,应用程序将无法启动。您可以通过相应容器属性上的 StartupFailurePolicy
来调整此行为。
可用的选项有:
-
Stop
(默认)- 记录并重新抛出异常,实际上停止应用程序 -
Continue
- 记录异常,使容器处于非运行状态,但不停止应用程序 -
Retry
- 记录异常,异步重试启动容器,但不停止应用程序。
默认的重试行为是重试 3 次,每次尝试之间间隔 10 秒。然而,可以在相应的容器属性上指定自定义重试模板。如果容器在重试次数用尽后仍无法重新启动,则会保持在非运行状态。
3.4.1. 配置
使用 Spring Boot
在使用 Spring Boot 时,您可以注册一个 PulsarContainerFactoryCustomizer<DefaultReactivePulsarListenerContainerFactory<?>>
bean 来设置容器启动属性。
不使用 Spring Boot
但是,如果你是手动配置组件,则在构建消息监听容器工厂时,需要相应地更新容器启动属性。
4. 并发
在流模式(stream = true
)下消费记录时,并发性通过客户端实现中的底层 Reactive 支持自然地实现。
然而,在逐个处理消息时,可以通过指定并发来增加处理吞吐量。只需在 @ReactivePulsarListener
上设置 concurrency
属性即可。此外,当 concurrency > 1
时,你可以通过在注解上设置 useKeyOrderedProcessing = "true"
来确保按 key 排序的消息被发送到同一个处理器。
同样,ReactiveMessagePipeline
负责主要的工作,我们只需要设置它的属性即可。
5. Pulsar Headers
Pulsar 消息元数据可以作为 Spring 消息头来使用。可用的消息头列表可以在 PulsarHeaders.java 中找到。
5.1. 在 OneByOne 监听器中访问
以下示例展示了如何在使用逐条消息监听器时访问 Pulsar Headers:
@ReactivePulsarListener(topics = "some-topic")
Mono<Void> listen(String data,
@Header(PulsarHeaders.MESSAGE_ID) MessageId messageId,
@Header("foo") String foo) {
System.out.println("Received " + data + " w/ id=" + messageId + " w/ foo=" + foo);
return Mono.empty();
}
在前面的示例中,我们访问了 messageId
消息元数据的值以及一个名为 foo
的自定义消息属性。每个头部字段都使用了 Spring 的 @Header
注解。
你也可以使用 Pulsar 的 Message
作为信封来携带负载。这样做时,用户可以直接调用 Pulsar 消息上的相应方法来检索元数据。但是,为了方便起见,你也可以通过使用 Header
注解来检索它。请注意,你也可以使用 Spring 消息传递的 Message
信封来携带负载,然后通过使用 @Header
来检索 Pulsar 头信息。
5.2. 在流式监听器中访问
在使用流式消息监听器时,对头部的支持是有限的。只有当 Flux
包含 Spring org.springframework.messaging.Message
元素时,才会填充头部。此外,Spring 的 @Header
注解不能用于检索数据。您必须直接调用 Spring 消息上的相应方法来检索数据。
6. 消息确认
框架会自动处理消息确认。但是,监听器方法必须发送一个信号,指示消息是否已成功处理。然后,容器实现将使用该信号来执行 ack 或 nack 操作。这与它的命令式对应方式略有不同,在命令式方式中,除非方法抛出异常,否则信号被默认为是正向的。
6.1. OneByOne Listener
单个消息(又称 OneByOne)消息监听器方法返回一个 Mono<Void>
,以指示消息是否成功处理。Mono.empty()
表示成功(确认),而 Mono.error()
表示失败(否定确认)。
6.2. 流式监听器
流式监听器方法返回一个 Flux<MessageResult<Void>>
,其中每个 MessageResult
元素代表一个已处理的消息,并持有消息 id、值以及它是否被确认。MessageResult
有一组 acknowledge
和 negativeAcknowledge
静态工厂方法,可以用来创建适当的 MessageResult
实例。
7. 消息重发和错误处理
Apache Pulsar 提供了多种原生的消息重传和错误处理策略。我们将了解一下这些策略,并学习如何通过 Spring for Apache Pulsar 使用它们。
7.1. Acknowledgment Timeout
默认情况下,Pulsar 消费者在消费者崩溃之前不会重新发送消息,但你可以通过在 Pulsar 消费者上设置一个 ack 超时来改变这种行为。如果 ack 超时属性的值大于零,并且 Pulsar 消费者在该超时时间内未确认消息,则会重新发送该消息。
你可以通过诸如消费者自定义器这样的方式直接将此属性指定为 Pulsar 消费者属性。
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("ackTimeoutMillis", "60000");
}
7.2. Negative Acknowledgment Redelivery Delay
当否定确认时,Pulsar 消费者让你指定应用程序希望如何重新传递消息。默认是在一分钟内重新传递消息,但你可以通过消费者定制器来更改它,例如:
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> consumerCustomizer() {
return b -> b.property("negativeAckRedeliveryDelay", "10ms");
}
7.3. Dead Letter Topic
Apache Pulsar 允许应用程序在使用 Shared
订阅类型的情况下使用死信主题。对于 Exclusive
和 Failover
订阅类型,此功能不可用。基本思路是,如果一条消息被重试了一定次数(可能是由于 ack 超时或 nack 重新传递),一旦重试次数用尽,该消息可以被发送到一个称为死信队列(DLQ)的特殊主题。让我们通过检查一些代码片段来看看此功能的实际应用细节:
@Configuration(proxyBeanMethods = false)
class DeadLetterPolicyConfig {
@ReactivePulsarListener(
topics = "topic-with-dlp",
subscriptionType = SubscriptionType.Shared,
deadLetterPolicy = "myDeadLetterPolicy",
consumerCustomizer = "ackTimeoutCustomizer" )
void listen(String msg) {
throw new RuntimeException("fail " + msg);
}
@ReactivePulsarListener(topics = "my-dlq-topic")
void listenDlq(String msg) {
System.out.println("From DLQ: " + msg);
}
@Bean
DeadLetterPolicy myDeadLetterPolicy() {
return DeadLetterPolicy.builder().maxRedeliverCount(10).deadLetterTopic("my-dlq-topic").build();
}
@Bean
ReactiveMessageConsumerBuilderCustomizer<String> ackTimeoutCustomizer() {
return b -> b.property("ackTimeoutMillis", "1000");
}
}
首先,我们有一个特殊的 DeadLetterPolicy
的 bean,它被命名为 deadLetterPolicy
(你可以根据需要命名)。这个 bean 指定了许多事项,例如最大投递次数(本例中为 10)和死信主题的名称 — 在本例中为 my-dlq-topic
。如果你不指定一个 DLQ 主题名称,在 Pulsar 中默认名称为 <topicname>-<subscriptionname>-DLQ
。接下来,我们通过设置 deadLetterPolicy
属性将这个 bean 名称提供给 ReactivePulsarListener
。请注意,ReactivePulsarListener
的订阅类型为 Shared
,因为 DLQ 功能仅适用于共享订阅。这段代码主要是为了演示目的,所以我们提供了 1000 的 ackTimeoutMillis
值。其想法是,代码抛出异常,如果 Pulsar 在 1 秒内没有收到确认消息,则进行重试。如果该循环持续十次(因为这是我们在 DeadLetterPolicy
中的最大重投递计数),Pulsar 消费者会将消息发布到 DLQ 主题。我们还有一个 ReactivePulsarListener
监听 DLQ 主题,以接收发布到 DLQ 主题的数据。
8. Pulsar Reader 支持
框架通过 ReactivePulsarReaderFactory
以反应式的方式提供了对 Pulsar Reader 的支持。
Spring Boot 提供了这个读取器工厂,可以使用任何 spring.pulsar.reader.* 应用属性进行配置。