消息生产
目录
1. Pulsar 模板
在 Pulsar 生产者端,Spring Boot 自动配置提供了一个 PulsarTemplate
用于发布记录。该模板实现了一个名为 PulsarOperations
的接口,并通过其契约提供了发布记录的方法。
这些发送 API 方法分为两类:send
和 sendAsync
。send
方法通过使用 Pulsar 生产者的同步发送功能来阻塞调用。一旦消息在 broker 上持久化,它们会返回已发布消息的 MessageId
。sendAsync
方法调用是非阻塞的异步调用。它们返回一个 CompletableFuture
,你可以使用它在消息发布后异步接收消息 ID。
对于不包含主题参数的 API 变体,将使用主题解析过程来确定目标主题。
1.1. 简单 API
该模板提供了一些方法(以 'send' 为前缀)用于简单的发送请求。对于更复杂的发送请求,一个流畅的 API 允许你配置更多选项。
1.2. Fluent API
该模板提供了一个流畅构建器来处理更复杂的发送请求。
1.3. 消息定制
你可以指定一个 TypedMessageBuilderCustomizer
来配置外发消息。例如,以下代码展示了如何发送一个带键的消息:
template.newMessage(msg)
.withMessageCustomizer((mb) -> mb.key("foo-msg-key"))
.send();
1.4. 生产者自定义
你可以指定一个 ProducerBuilderCustomizer
来配置底层的 Pulsar producer builder,该 builder 最终会构建用于发送传出消息的 producer。
请谨慎使用,因为这提供了对生产者构建器的完全访问权限,调用其某些方法(如 create
)可能会产生意想不到的副作用。
例如,以下代码展示了如何禁用批处理并启用分块处理:
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.enableChunking(true).enableBatching(false))
.send();
这个例子展示了在向分区主题发布记录时如何使用自定义路由。在 Producer
构建器上指定你的自定义 MessageRouter
实现,例如:
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.messageRouter(messageRouter))
.send();
注意,当使用 MessageRouter
时,spring.pulsar.producer.message-routing-mode
的唯一有效设置是 custom
。
这个例子展示了如何添加一个 ProducerInterceptor
,它将在消息被生产者接收并发布到 brokers 之前拦截并修改消息:
template.newMessage(msg)
.withProducerCustomizer((pb) -> pb.intercept(interceptor))
.send();
自定义程序将仅适用于用于发送操作的生产者。如果你想将自定义程序应用于所有生产者,必须按照全局生产者自定义中所述的方式将它们提供给生产者工厂。
在使用 Lambda 自定义器时,必须遵循“关于 Lambda 自定义器的注意事项”中描述的规则。
2. 指定模式信息
如果你使用 Java 的基本类型(primitive types),框架会自动为你检测 schema,你无需为发布数据指定任何 schema 类型。对于非基本类型,如果在 PulsarTemplate
上调用发送操作时没有显式指定 Schema,Spring for Apache Pulsar 框架会尝试从类型构建一个 Schema.JSON
。
目前支持的复杂模式类型包括 JSON、AVRO、PROTOBUF、AUTO_PRODUCE_BYTES 以及带有内联编码的 KEY_VALUE。
2.1. 自定义 Schema 映射
作为在 PulsarTemplate
上调用发送操作时为复杂类型指定 schema 的替代方案,可以为类型配置 schema 解析器映射。这样就不需要指定 schema,因为框架会根据传出消息类型来查询解析器。
2.1.1. 配置属性
可以使用 spring.pulsar.defaults.type-mappings
属性来配置 Schema 映射。以下示例使用 application.yml
分别为 User
和 Address
复杂对象添加 AVRO
和 JSON
模式的映射:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.acme.User
schema-info:
schema-type: AVRO
- message-type: com.acme.Address
schema-info:
schema-type: JSON
message-type
是消息类的完全限定名称。
2.1.2. Schema resolver 自定义器
首选的方法是通过上述属性来添加映射。然而,如果需要更多的控制,你可以提供一个 schema resolver 自定义器来添加映射。
以下示例使用模式解析器自定义器,分别为 User
和 Address
复杂对象添加 AVRO
和 JSON
模式的映射:
@Bean
public SchemaResolverCustomizer<DefaultSchemaResolver> schemaResolverCustomizer() {
return (schemaResolver) -> {
schemaResolver.addCustomSchemaMapping(User.class, Schema.AVRO(User.class));
schemaResolver.addCustomSchemaMapping(Address.class, Schema.JSON(Address.class));
}
}
2.1.3. 类型映射注解
另一种为特定消息类型指定默认模式信息的方法是通过使用 @PulsarMessage
注解来标记消息类。可以通过注解上的 schemaType
属性来指定模式信息。
以下示例配置系统在生成或消费类型为 Foo
的消息时,将 JSON 作为默认的模式(schema):
@PulsarMessage(schemaType = SchemaType.JSON)
record Foo(String value) {
}
有了这个配置后,在发送操作时就不需要再指定 schema 了。
2.2. 使用 AUTO_SCHEMA 进行生成
如果无法提前知道 Pulsar topic 的 schema 类型,你可以使用 AUTO_PRODUCE schema 来安全地将原始 JSON 或 Avro 负载作为 byte[]
发布。
在这种情况下,生产者会验证出站字节是否与目标主题的 schema 兼容。
只需在模板发送操作上指定 Schema.AUTO_PRODUCE_BYTES()
的 schema,如下例所示:
void sendUserAsBytes(PulsarTemplate<byte[]> template, byte[] userAsBytes) {
template.send("user-topic", userAsBytes, Schema.AUTO_PRODUCE_BYTES());
}
这仅在 Avro 和 JSON schema 类型中受支持。
3. Pulsar 生产者工厂
PulsarTemplate
依赖于 PulsarProducerFactory
来实际创建底层的生产者。Spring Boot 自动配置也提供了这个生产者工厂,你可以通过指定任何 spring.pulsar.producer.* 应用属性来进一步配置它。
如果直接使用生产者工厂 API 时未指定主题信息,则会使用与 PulsarTemplate
相同的主题解析流程,但有一个例外,即省略了“消息类型默认值”步骤。
3.1. 全局生产者自定义
该框架提供了 ProducerBuilderCustomizer
契约,允许你配置用于构建每个生产者的底层构建器。要自定义所有生产者,你可以将自定义器列表传递给 PulsarProducerFactory
构造函数。当使用多个自定义器时,它们将按照它们在列表中出现的顺序依次应用。
如果你使用 Spring Boot 自动配置,你可以将自定义器指定为 bean,它们将自动传递给 PulsarProducerFactory
,并根据它们的 @Order
注解进行排序。
如果你想将自定义器仅应用于单个生产者,可以使用 Fluent API 并在发送时指定自定义器。
4. Pulsar 生产者缓存
每个底层的 Pulsar 生产者都会消耗资源。为了提高性能并避免持续创建生产者,生产者工厂会缓存其创建的生产者。它们以 LRU(最近最少使用)的方式被缓存,并在配置的时间段内未被使用时被淘汰。缓存键由足够的信息组成,以确保在后续的创建请求中,调用者会返回相同的生产者。
此外,您可以通过指定任何 spring.pulsar.producer.cache.* 应用程序属性来配置缓存设置。
4.1. 关于 Lambda 定制器的注意事项
任何用户提供的 producer 自定义器也会包含在缓存键中。由于缓存键依赖于 equals/hashCode
的有效实现,因此在使用 Lambda 自定义器时必须格外小心。
规则: 两个作为 Lambda 实现的自定义器(customizer)当且仅当它们使用相同的 Lambda 实例并且不需要在其闭包外定义的任何变量时,才会在 equals/hashCode
上匹配。
为了澄清上述规则,我们将看几个例子。在下面的例子中,自定义器被定义为一个内联 Lambda,这意味着每次调用 sendUser
时都会使用相同的 Lambda 实例。此外,它不需要闭包之外的变量。因此,它将匹配为缓存键。
void sendUser() {
var user = randomUser();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> b.producerName("user"))
.send();
}
在下一个案例中,定制器被定义为一个内联的 Lambda,这意味着每次调用 sendUser
都会使用相同的 Lambda 实例。然而,它需要一个在其闭包之外的变量。因此,它不会作为缓存键匹配。
void sendUser() {
var user = randomUser();
var name = randomName();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> b.producerName(name))
.send();
}
在这个最后的例子中,自定义器被定义为一个内联 Lambda,这意味着每次调用 sendUser
时都会使用同一个 Lambda 实例。虽然它确实使用了一个变量名,但这个变量名并不来源于其闭包外部,因此将作为缓存键匹配。这说明变量可以在 Lambda 闭包内部使用,甚至可以调用静态方法。
void sendUser() {
var user = randomUser();
template.newMessage(user)
.withTopic("user-topic")
.withProducerCustomizer((b) -> {
var name = SomeHelper.someStaticMethod();
b.producerName(name);
})
.send();
}
规则: 如果你的 Lambda 自定义器没有被定义一次且仅一次(相同的实例在后续调用中被使用)或者 它需要在其闭包外部定义的变量,那么你必须提供一个具有有效 equals/hashCode
实现的自定义器实现。
如果不遵循这些规则,生产者的缓存将总是失效,您的应用程序性能将受到负面影响。
5. 在生产者端拦截消息
添加一个 ProducerInterceptor
可以让你在消息被发布到 brokers 之前拦截并修改生产者接收到的消息。为此,你可以将拦截器列表传递给 PulsarTemplate
构造函数。当使用多个拦截器时,它们应用的顺序与它们在列表中出现的顺序一致。
如果你使用 Spring Boot 自动配置,你可以将拦截器指定为 Beans。它们会自动传递给 PulsarTemplate
。拦截器的顺序可以通过使用 @Order
注解来实现,如下所示:
@Bean
@Order(100)
ProducerInterceptor firstInterceptor() {
...
}
@Bean
@Order(200)
ProducerInterceptor secondInterceptor() {
...
}
如果你没有使用 starter,你需要自己配置并注册上述组件。