空负载与“墓碑”记录的日志压缩
在使用日志压缩时,你可以发送和接收带有 null
负载的消息来标识某个键的删除。你也有可能因为其他原因接收到 null
值,例如当反序列化器无法反序列化某个值时,它可能会返回 null
。
生成空负载
你可以通过向 ReactivePulsarTemplate
的 send
方法传递一个 null
消息参数值来发送 null
值,例如:
reactiveTemplate
.send(null, Schema.STRING)
.subscribe();
备注
在发送空值时,必须指定模式类型,因为系统无法从 null
负载中确定消息的类型。
消费空负载
对于 @ReactivePularListener
,null
负载会根据消息参数的类型传递到监听器方法中,具体如下:
参数类型 | 传入值 |
---|---|
基本类型 | null |
用户自定义类型 | null |
org.apache.pulsar.client.api.Message<T> | 非空的 Pulsar 消息,其 getValue() 返回 null |
org.springframework.messaging.Message<T> | 非空的 Spring 消息,其 getPayload() 返回 PulsarNull |
Flux<org.apache.pulsar.client.api.Message<T>> | 非空的 flux,其条目为非空的 Pulsar 消息,其 getValue() 返回 null |
Flux<org.springframework.messaging.Message<T>> | 非空的 flux,其条目为非空的 Spring 消息,其 getPayload() 返回 PulsarNull |
important
当传入的值为 null
时(即使用原始类型或用户定义类型的单记录监听器),你必须使用 @Payload
参数注解,并将 required
设置为 false
。
important
当在监听器负载类型中使用 Spring 的 org.springframework.messaging.Message
时,其泛型类型信息必须足够宽泛以接受 Message<PulsarNull>
(例如 Message
、Message<?>
或 Message<Object>
)。这是因为 Spring Message 不允许其负载为 null 值,而是使用 PulsarNull
占位符。
如果这是压缩日志的墓碑消息(tombstone message),通常还需要键(key),以便你的应用程序能够确定哪个键被“删除
”了。以下示例展示了这样的配置:
@ReactivePulsarListener(
topics = "my-topic",
subscriptionName = "my-topic-sub",
schemaType = SchemaType.STRING)
Mono<Void> myListener(
@Payload(required = false) String msg,
@Header(PulsarHeaders.KEY) String key) {
...
}
备注
当使用流式消息监听器(Flux
)时,头部的支持是有限的,因此在日志压缩场景中用处不大。