消息生产
目录
1. Pulsar 模板
在 Pulsar 生产者端,Spring Boot 自动配置提供了一个 PulsarTemplate 用于发布记录。该模板实现了一个名为 PulsarOperations 的接口,并通过其契约提供了发布记录的方法。
这些发送 API 方法分为两类:send 和 sendAsync。send 方法通过使用 Pulsar 生产者的同步发送功能来阻塞调用。一旦消息在 broker 上持久化,它们会返回已发布消息的 MessageId。sendAsync 方法调用是非阻塞的异步调用。它们返回一个 CompletableFuture,你可以使用它在消息发布后异步接收消息 ID。
对于不包含主题参数的 API 变体,将使用主题解析过程来确定目标主题。
1.1. 简单 API
该模板提供了一些方法(以 'send' 为前缀)用于简单的发送请求。对于更复杂的发送请求,一个流畅的 API 允许你配置更多选项。
1.2. Fluent API
该模板提供了一个流畅构建器来处理更复杂的发送请求。
1.3. 消息定制
你可以指定一个 TypedMessageBuilderCustomizer 来配置外发消息。例如,以下代码展示了如何发送一个带键的消息:
template.newMessage(msg)
.withMessageCustomizer((mb) -> mb.key("foo-msg-key"))
.send();
1.4. 生产者自定义
你可以指定一个 ProducerBuilderCustomizer 来配置底层的 Pulsar producer builder,该 builder 最终会构建用于发送传出消息的 producer。
请谨慎使用,因为这提供了对生产者构建器的完全访问权限,调用其某些方法(如 create)可能会产生意想不到的副作用。
例如,以下代码展示了如何禁用批处理并启用分块处理:
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.enableChunking(true).enableBatching(false))
.send();
这个例子展示了在向分区主题发布记录时如何使用自定义路由。在 Producer 构建器上指定你的自定义 MessageRouter 实现,例如:
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.messageRouter(messageRouter))
.send();
注意,当使用 MessageRouter 时,spring.pulsar.producer.message-routing-mode 的唯一有效设置是 custom。
这个例子展示了如何添加一个 ProducerInterceptor,它将在消息被生产者接收并发布到 brokers 之前拦截并修改消息:
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.intercept(interceptor))
.send();
自定义程序将仅适用于用于发送操作的生产者。如果你想将自定义程序应用于所有生产者,必须按照全局生产者自定义中所述的方式将它们提供给生产者工厂。
在使用 Lambda 自定义器时,必须遵循“关于 Lambda 自定义器的注意事项”中描述的规则。
2. 指定模式信息
如果你使用 Java 的基本类型(primitive types),框架会自动为你检测 schema,你无需为发布数据指定任何 schema 类型。对于非基本类型,如果在 PulsarTemplate 上调用发送操作时没有显式指定 Schema,Spring for Apache Pulsar 框架会尝试从类型构建一个 Schema.JSON。
目前支持的复杂模式类型包括 JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES 以及带有内联编码的 KEY_VALUE。
2.1. 自定义 Schema 映射
作为在 PulsarTemplate 上调用发送操作时为复杂类型指定 schema 的替代方案,可以为类型配置 schema 解析器映射。这样就不需要指定 schema,因为框架会根据传出消息类型来查询解析器。
2.1.1. 配置属性
可以使用 spring.pulsar.defaults.type-mappings 属性来配置 Schema 映射。以下示例使用 application.yml 分别为 User 和 Address 复杂对象添加 AVRO 和 JSON 模式的映射:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
message-type 是消息类的完全限定名称。
2.1.2. Schema resolver 自定义器
首选的方法是通过上述属性来添加映射。然而,如果需要更多的控制,你可以提供一个 schema resolver 自定义器来添加映射。
以下示例使用模式解析器自定义器,分别为 User 和 Address 复杂对象添加 AVRO 和 JSON 模式的映射:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.1.3. 类型映射注解
另一种为特定消息类型指定默认模式信息的方法是通过使用 @PulsarMessage 注解来标记消息类。可以通过注解上的 schemaType 属性来指定模式信息。
以下示例配置系统在生成或消费类型为 Foo 的消息时,将 JSON 作为默认的模式(schema):
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
有了这个配置后,在发送操作时就不需要再指定 schema 了。
2.2. 使用 AUTO_SCHEMA 进行生成
如果无法提前知道 Pulsar topic 的 schema 类型,你可以使用 AUTO_PRODUCE schema 来安全地将原始 JSON 或 Avro 负载作为 byte[] 发布。
在这种情况下,生产者会验证出站字节是否与目标主题的 schema 兼容。
只需在模板发送操作上指定 Schema.AUTO_PRODUCE_BYTES() 的 schema,如下例所示:
void sendUserAsBytes(PulsarTemplate<byte[]> template, byte[] userAsBytes) {
template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES());
}
这仅在 Avro 和 JSON schema 类型中受支持。
3. Pulsar 生产者工厂
PulsarTemplate 依赖于 PulsarProducerFactory 来实际创建底层的生产者。Spring Boot 自动配置也提供了这个生产者工厂,你可以通过指定任何 spring.pulsar.producer.* 应用属性来进一步配置它。
如果直接使用生产者工厂 API 时未指定主题信息,则会使用与 PulsarTemplate 相同的主题解析流程,但有一个例外,即省略了“消息类型默认值”步骤。
3.1. 全局生产者自定义
该框架提供了 ProducerBuilderCustomizer 契约,允许你配置用于构建每个生产者的底层构建器。要自定义所有生产者,你可以将自定义器列表传递给 PulsarProducerFactory 构造函数。当使用多个自定义器时,它们将按照它们在列表中出现的顺序依次应用。
如果你使用 Spring Boot 自动配置,你可以将自定义器指定为 bean,它们将自动传递给 PulsarProducerFactory,并根据它们的 @Order 注解进行排序。
如果你想将自定义器仅应用于单个生产者,可以使用 Fluent API 并在发送时指定自定义器。
4. Pulsar 生产者缓存
每个底层的 Pulsar 生产者都会消耗资源。为了提高性能并避免持续创建生产者,生产者工厂会缓存其创建的生产者。它们以 LRU(最近最少使用)的方式被缓存,并在配置的时间段内未被使用时被淘汰。缓存键由足够的信息组成,以确保在后续的创建请求中,调用者会返回相同的生产者。
此外,您可以通过指定任何 spring.pulsar.producer.cache.* 应用程序属性来配置缓存设置。
4.1. 关于 Lambda 定制器的注意事项
任何用户提供的 producer 自定义器也会包含在缓存键中。由于缓存键依赖于 equals/hashCode 的有效实现,因此在使用 Lambda 自定义器时必须格外小心。
规则: 两个作为 Lambda 实现的自定义器(customizer)当且仅当它们使用相同的 Lambda 实例并且不需要在其闭包外定义的任何变量时,才会在 equals/hashCode 上匹配。
为了澄清上述规则,我们将看几个例子。在下面的例子中,自定义器被定义为一个内联 Lambda,这意味着每次调用 sendUser 时都会使用相同的 Lambda 实例。此外,它不需要闭包之外的变量。因此,它将匹配为缓存键。
void sendUser() {
var user = randomUser();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> b.producerName("user"))
.send();
}
在下一个案例中,定制器被定义为一个内联的 Lambda,这意味着每次调用 sendUser 都会使用相同的 Lambda 实例。然而,它需要一个在其闭包之外的变量。因此,它不会作为缓存键匹配。
void sendUser() {
var user = randomUser();
var name = randomName();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> b.producerName(name))
.send();
}
在这个最后的例子中,自定义器被定义为一个内联 Lambda,这意味着每次调用 sendUser 时都会使用同一个 Lambda 实例。虽然它确实使用了一个变量名,但这个变量名并不来源于其闭包外部,因此将作为缓存键匹配。这说明变量可以在 Lambda 闭包内部使用,甚至可以调用静态方法。
void sendUser() {
var user = randomUser();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> {
var name = SomeHelper.someStaticMethod();
b.producerName(name);
})
.send();
}
规则: 如果你的 Lambda 自定义器没有被定义一次且仅一次(相同的实例在后续调用中被使用)或者 它需要在其闭包外部定义的变量,那么你必须提供一个具有有效 equals/hashCode 实现的自定义器实现。
如果不遵循这些规则,生产者的缓存将总是失效,您的应用程序性能将受到负面影响。
5. 在生产者端拦截消息
添加一个 ProducerInterceptor 可以让你在消息被发布到 brokers 之前拦截并修改生产者接收到的消息。为此,你可以将拦截器列表传递给 PulsarTemplate 构造函数。当使用多个拦截器时,它们应用的顺序与它们在列表中出现的顺序一致。
如果你使用 Spring Boot 自动配置,你可以将拦截器指定为 Beans。它们会自动传递给 PulsarTemplate。拦截器的顺序可以通过使用 @Order 注解来实现,如下所示:
@Bean
@Order(100)
ProducerInterceptor firstInterceptor() {
...
}
@Bean
@Order(200)
ProducerInterceptor secondInterceptor() {
...
}
如果你没有使用 starter,你需要自己配置并注册上述组件。