跳到主要内容

空负载与“墓碑”记录的日志压缩

DeepSeek V3 中英对照 Null Payloads and Log Compaction of 'Tombstone' Records

在使用日志压缩时,你可以发送和接收带有 null 负载的消息来标识某个键的删除。你也有可能因为其他原因接收到 null 值,例如当反序列化器无法反序列化某个值时,它可能会返回 null

生成空负载

你可以通过向 ReactivePulsarTemplatesend 方法传递一个 null 消息参数值来发送 null 值,例如:

reactiveTemplate
.send(null, Schema.STRING)
.subscribe();
java
备注

在发送空值时,必须指定模式类型,因为系统无法从 null 负载中确定消息的类型。

消费空负载

对于 @ReactivePularListenernull 负载会根据消息参数的类型传递到监听器方法中,具体如下:

参数类型传入值
基本类型null
用户自定义类型null
org.apache.pulsar.client.api.Message<T>非空的 Pulsar 消息,其 getValue() 返回 null
org.springframework.messaging.Message<T>非空的 Spring 消息,其 getPayload() 返回 PulsarNull
Flux<org.apache.pulsar.client.api.Message<T>>非空的 flux,其条目为非空的 Pulsar 消息,其 getValue() 返回 null
Flux<org.springframework.messaging.Message<T>>非空的 flux,其条目为非空的 Spring 消息,其 getPayload() 返回 PulsarNull
important

当传入的值为 null 时(即使用原始类型或用户定义类型的单记录监听器),你必须使用 @Payload 参数注解,并将 required 设置为 false

important

当在监听器负载类型中使用 Spring 的 org.springframework.messaging.Message 时,其泛型类型信息必须足够宽泛以接受 Message<PulsarNull>(例如 MessageMessage<?>Message<Object>)。这是因为 Spring Message 不允许其负载为 null 值,而是使用 PulsarNull 占位符。

如果这是压缩日志的墓碑消息(tombstone message),通常还需要键(key),以便你的应用程序能够确定哪个键被“删除”了。以下示例展示了这样的配置:

@ReactivePulsarListener(
topics = "my-topic",
subscriptionName = "my-topic-sub",
schemaType = SchemaType.STRING)
Mono<Void> myListener(
@Payload(required = false) String msg,
@Header(PulsarHeaders.KEY) String key) {
...
}
java
备注

当使用流式消息监听器(Flux)时,头部的支持是有限的,因此在日志压缩场景中用处不大。