跳到主要内容

Null Payloads 和 'Tombstone' 记录的日志压缩

ChatGPT-4o-mini 中英对照 Null Payloads and Log Compaction of 'Tombstone' Records

当你使用 Log Compaction 时,你可以发送和接收 null 负载的消息,以识别一个键的删除。

您还可能因其他原因接收到 null 值,例如当 Deserializer 无法反序列化一个值时,它可能会返回 null

要通过使用 KafkaTemplate 发送 null 负载,可以将 null 传递给 send() 方法的值参数。一个例外是 send(Message<?> message) 变体。由于 spring-messagingMessage<?> 不能有 null 负载,因此可以使用一种特殊的负载类型 KafkaNull,框架会发送 null。为了方便,提供了静态的 KafkaNull.INSTANCE

当你使用消息监听容器时,接收到的 ConsumerRecordvalue()null

要配置 @KafkaListener 以处理 null 有效负载,您必须使用 @Payload 注解,并将 required 设置为 false。如果这是一个用于压缩日志的墓碑消息,通常还需要键,以便您的应用程序能够确定哪个键被 "删除"。以下示例展示了这样的配置:

@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
// value == null represents key deletion
}
java

当您使用类级别的 @KafkaListener 并且有多个 @KafkaHandler 方法时,需要一些额外的配置。具体来说,您需要一个带有 KafkaNull 负载的 @KafkaHandler 方法。以下示例展示了如何配置一个:

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

@KafkaHandler
public void listen(String cat) {
...
}

@KafkaHandler
public void listen(Integer hat) {
...
}

@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}

}
java

请注意,参数是 null,而不是 KafkaNull

提示
important

此功能需要使用 KafkaNullAwarePayloadArgumentResolver,框架将在使用默认的 MessageHandlerMethodFactory 时进行配置。当使用自定义的 MessageHandlerMethodFactory 时,请参阅 将自定义 HandlerMethodArgumentResolver 添加到 @KafkaListener