跳到主要内容

消息生产

DeepSeek V3 中英对照 Message Production

目录

1. ReactivePulsarTemplate

在 Pulsar 生产者端,Spring Boot 自动配置提供了一个 ReactivePulsarTemplate 用于发布记录。该模板实现了一个名为 ReactivePulsarOperations 的接口,并通过其契约提供了发布记录的方法。

该模板提供了接受单个消息并返回 Mono<MessageId> 的发送方法。它还提供了接受多个消息(以 ReactiveStreams Publisher 类型的形式)并返回 Flux<MessageId> 的发送方法。

备注

对于不包含主题参数的 API 变体,将使用主题解析过程来确定目标主题。

1.1. Fluent API

该模板提供了一个流畅的构建器 来处理更复杂的发送请求。

1.2. 消息自定义

你可以指定一个 MessageSpecBuilderCustomizer 来配置发送的消息。例如,以下代码展示了如何发送一个带键的消息:

template.newMessage(msg)
.withMessageCustomizer((mc) -> mc.key("foo-msg-key"))
.send();
java

1.3. 发送者自定义

你可以指定一个 ReactiveMessageSenderBuilderCustomizer 来配置底层的 Pulsar 发送者构建器,该构建器最终用于构建用于发送出站消息的发送者。

注意

请谨慎使用,因为这将赋予对发送者构建器的完全访问权限,调用其某些方法(如 create)可能会产生意外的副作用。

例如,以下代码展示了如何禁用批处理并启用分块:

template.newMessage(msg)
.withSenderCustomizer((sc) -> sc.enableChunking(true).enableBatching(false))
.send();
java

另一个示例展示了在将记录发布到分区主题时如何使用自定义路由。在发送者构建器上指定您的自定义 MessageRouter 实现,例如:

template.newMessage(msg)
.withSenderCustomizer((sc) -> sc.messageRouter(messageRouter))
.send();
java
提示

请注意,在使用 MessageRouter 时,spring.pulsar.producer.message-routing-mode 的唯一有效设置是 custom

2. 指定模式信息

如果你使用 Java 的基本类型(primitive types),框架会自动检测 schema,因此你无需为发布数据指定任何 schema 类型。对于非基本类型,如果在 ReactivePulsarTemplate 上调用发送操作时没有显式指定 Schema,Spring for Apache Pulsar 框架将尝试从类型构建一个 Schema.JSON

important

当前支持的复杂模式类型包括 JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES 和 KEY_VALUE(带 INLINE 编码)。

2.1. 自定义 Schema 映射

作为在 ReactivePulsarTemplate 上调用发送操作时为复杂类型指定 schema 的替代方案,可以配置 schema 解析器来映射这些类型。这样就无需再指定 schema,因为框架会根据发出的消息类型来查询解析器。

2.1.1. 配置属性

可以通过 spring.pulsar.defaults.type-mappings 属性来配置 schema 映射。以下示例使用 application.yml 文件为 UserAddress 复杂对象分别添加了使用 AVROJSON schema 的映射:

spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
yaml
备注

message-type 是消息类的完全限定名。

2.1.2. Schema 解析器定制器

首选的方法是通过上述属性添加映射。然而,如果需要更多的控制,你可以提供一个 schema resolver 自定义器来添加映射。

以下示例使用模式解析器自定义工具,分别使用 AVROJSON 模式为 UserAddress 复杂对象添加映射:

@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
java

2.1.3. 类型映射注解

另一种为特定消息类型指定默认模式信息的方法,是在消息类上使用 @PulsarMessage 注解。可以通过注解中的 schemaType 属性来指定模式信息。

以下示例配置系统在使用 Foo 类型的消息时,将 JSON 作为默认的 schema 进行生成或消费:

@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
java

通过此配置,无需在发送操作时指定 schema。

2.2. 使用 AUTO_SCHEMA 进行生产

如果无法提前知道 Pulsar topic 的模式类型,你可以使用 AUTO_PRODUCE 模式,安全地将原始的 JSON 或 Avro 负载作为 byte[] 发布。

在这种情况下,生产者会验证出站字节是否与目标主题的 schema 兼容。

只需在模板发送操作中指定 Schema.AUTO_PRODUCE_BYTES() 的 schema,如下例所示:

void sendUserAsBytes(ReactivePulsarTemplate<byte[]> template, byte[] userAsBytes) {
template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES()).subscribe();
}
java
备注

此功能仅支持 Avro 和 JSON 架构类型。

3. ReactivePulsarSenderFactory

ReactivePulsarTemplate 依赖于 ReactivePulsarSenderFactory 来实际创建底层的 sender。

Spring Boot 提供了这个发送者工厂,可以使用任何 spring.pulsar.producer.* 应用程序属性进行配置。

备注

如果直接使用发送者工厂 API 时未指定主题信息,则使用与 ReactivePulsarTemplate 相同的主题解析过程,但有一个例外,即省略了“消息类型默认”步骤。

3.1. Producer 缓存

每个底层的 Pulsar 生产者都会消耗资源。为了提高性能并避免持续创建生产者,底层 Apache Pulsar Reactive 客户端中的 ReactiveMessageSenderCache 会缓存它创建的生产者。这些生产者以 LRU(最近最少使用)的方式进行缓存,并在配置的时间段内未使用时被驱逐。

您可以通过指定任何 spring.pulsar.producer.cache.* 应用程序属性来配置缓存设置。