跳到主要内容

空负载与“墓碑”记录的日志压缩

DeepSeek V3 中英对照 Null Payloads and Log Compaction of 'Tombstone' Records

在使用日志压缩时,你可以发送和接收带有 null 负载的消息,以标识某个键的删除。你也可能由于其他原因接收到 null 值,例如当反序列化器无法反序列化某个值时,它可能会返回 null

生成空负载

要使用 PulsarTemplate 发送一个 null 负载,你可以使用流畅的 API 并将 null 传递给 newMessage() 方法的 value 参数,例如:

pulsarTemplate
.newMessage(null)
.withTopic("my-topic")
.withSchema(Schema.STRING)
.withMessageCustomizer((mb) -> mb.key("key:1234"))
.send();
java
备注

在发送空值时,必须指定模式类型,因为系统无法从 null 负载中确定消息的类型。

消费空负载

对于 @PulsarListener@PulsarReadernull 负载会根据其消息参数的类型传递到监听器方法中,具体如下:

参数类型传入值
基本类型null
用户自定义类型null
org.apache.pulsar.client.api.Message<T>非空的 Pulsar 消息,其 getValue() 返回 null
org.springframework.messaging.Message<T>非空的 Spring 消息,其 getPayload() 返回 PulsarNull
List<X>非空列表,其条目 (X) 是上述类型之一,并相应地行为(即基本类型条目为 null 等)
org.apache.pulsar.client.api.Messages<T>非空的容器,包含非空的 Pulsar 消息,其 getValue() 返回 null
important

当传入的值为 null 时(即使用原始类型或用户定义类型的单记录监听器),你必须使用 @Payload 参数注解,并设置 required = false

important

当使用 Spring 的 org.springframework.messaging.Message 作为监听器的有效负载类型时,其泛型信息必须足够宽泛以接受 Message<PulsarNull>(例如 MessageMessage<?>Message<Object>)。这是由于 Spring 的 Message 不允许其有效负载为 null 值,而是使用 PulsarNull 作为占位符。

如果这是一个用于压缩日志的墓碑消息(tombstone message),通常你还需要键(key),以便你的应用程序可以确定哪个键被“删除”了。以下示例展示了这样的配置:

@PulsarListener(
topics = "my-topic",
subscriptionName = "my-topic-sub",
schemaType = SchemaType.STRING)
void myListener(
@Payload(required = false) String msg,
@Header(PulsarHeaders.KEY) String key) {
...
}
java
备注

@PulsarReader 目前还不支持 @Header 参数,因此在日志压缩场景中用处不大。