空负载与“墓碑”记录的日志压缩
在使用日志压缩时,你可以发送和接收带有 null 负载的消息,以标识某个键的删除。你也可能由于其他原因接收到 null 值,例如当反序列化器无法反序列化某个值时,它可能会返回 null。
生成空负载
要使用 PulsarTemplate 发送一个 null 负载,你可以使用流畅的 API 并将 null 传递给 newMessage() 方法的 value 参数,例如:
pulsarTemplate
.newMessage(null)
.withTopic("my-topic")
.withSchema(Schema.STRING)
.withMessageCustomizer((mb) -> mb.key("key:1234"))
.send();
备注
在发送空值时,必须指定模式类型,因为系统无法从 null 负载中确定消息的类型。
消费空负载
对于 @PulsarListener 和 @PulsarReader,null 负载会根据其消息参数的类型传递到监听器方法中,具体如下:
| 参数类型 | 传入值 |
|---|---|
| 基本类型 | null |
| 用户自定义类型 | null |
org.apache.pulsar.client.api.Message<T> | 非空的 Pulsar 消息,其 getValue() 返回 null |
org.springframework.messaging.Message<T> | 非空的 Spring 消息,其 getPayload() 返回 PulsarNull |
List<X> | 非空列表,其条目 (X) 是上述类型之一,并相应地行为(即基本类型条目为 null 等) |
org.apache.pulsar.client.api.Messages<T> | 非空的容器,包含非空的 Pulsar 消息,其 getValue() 返回 null |
important
当传入的值为 null 时(即使用原始类型或用户定义类型的单记录监听器),你必须使用 @Payload 参数注解,并设置 required = false。
important
当使用 Spring 的 org.springframework.messaging.Message 作为监听器的有效负载类型时,其泛型信息必须足够宽泛以接受 Message<PulsarNull>(例如 Message、Message<?> 或 Message<Object>)。这是由于 Spring 的 Message 不允许其有效负载为 null 值,而是使用 PulsarNull 作为占位符。
如果这是一个用于压缩日志的墓碑消息(tombstone message),通常你还需要键(key),以便你的应用程序可以确定哪个键被“删除”了。以下示例展示了这样的配置:
@PulsarListener(
topics = "my-topic",
subscriptionName = "my-topic-sub",
schemaType = SchemaType.STRING)
void myListener(
@Payload(required = false) String msg,
@Header(PulsarHeaders.KEY) String key) {
...
}
备注
@PulsarReader 目前还不支持 @Header 参数,因此在日志压缩场景中用处不大。