空负载与“墓碑”记录的日志压缩
在使用日志压缩时,你可以发送和接收带有 null
负载的消息,以标识某个键的删除。你也可能由于其他原因接收到 null
值,例如当反序列化器无法反序列化某个值时,它可能会返回 null
。
生成空负载
要使用 PulsarTemplate
发送一个 null
负载,你可以使用流畅的 API 并将 null
传递给 newMessage()
方法的 value
参数,例如:
pulsarTemplate
.newMessage(null)
.withTopic("my-topic")
.withSchema(Schema.STRING)
.withMessageCustomizer((mb) -> mb.key("key:1234"))
.send();
备注
在发送空值时,必须指定模式类型,因为系统无法从 null
负载中确定消息的类型。
消费空负载
对于 @PulsarListener
和 @PulsarReader
,null
负载会根据其消息参数的类型传递到监听器方法中,具体如下:
参数类型 | 传入值 |
---|---|
基本类型 | null |
用户自定义类型 | null |
org.apache.pulsar.client.api.Message<T> | 非空的 Pulsar 消息,其 getValue() 返回 null |
org.springframework.messaging.Message<T> | 非空的 Spring 消息,其 getPayload() 返回 PulsarNull |
List<X> | 非空列表,其条目 (X ) 是上述类型之一,并相应地行为(即基本类型条目为 null 等) |
org.apache.pulsar.client.api.Messages<T> | 非空的容器,包含非空的 Pulsar 消息,其 getValue() 返回 null |
important
当传入的值为 null
时(即使用原始类型或用户定义类型的单记录监听器),你必须使用 @Payload
参数注解,并设置 required = false
。
important
当使用 Spring 的 org.springframework.messaging.Message
作为监听器的有效负载类型时,其泛型信息必须足够宽泛以接受 Message<PulsarNull>
(例如 Message
、Message<?>
或 Message<Object>
)。这是由于 Spring 的 Message
不允许其有效负载为 null
值,而是使用 PulsarNull
作为占位符。
如果这是一个用于压缩日志的墓碑消息(tombstone message),通常你还需要键(key),以便你的应用程序可以确定哪个键被“删除
”了。以下示例展示了这样的配置:
@PulsarListener(
topics = "my-topic",
subscriptionName = "my-topic-sub",
schemaType = SchemaType.STRING)
void myListener(
@Payload(required = false) String msg,
@Header(PulsarHeaders.KEY) String key) {
...
}
备注
@PulsarReader
目前还不支持 @Header
参数,因此在日志压缩场景中用处不大。