Null Payloads 和 'Tombstone' 记录的日志压缩
当你使用 Log Compaction 时,你可以发送和接收 null
负载的消息,以识别一个键的删除。
您还可能因其他原因接收到 null
值,例如当 Deserializer
无法反序列化一个值时,它可能会返回 null
。
要通过使用 KafkaTemplate
发送 null
负载,可以将 null 传递给 send()
方法的值参数。一个例外是 send(Message<?> message)
变体。由于 spring-messaging
的 Message<?>
不能有 null
负载,因此可以使用一种特殊的负载类型 KafkaNull
,框架会发送 null
。为了方便,提供了静态的 KafkaNull.INSTANCE
。
当你使用消息监听容器时,接收到的 ConsumerRecord
的 value()
为 null
。
要配置 @KafkaListener
以处理 null
有效负载,您必须使用 @Payload
注解,并将 required
设置为 false
。如果这是一个用于压缩日志的墓碑消息,通常还需要键,以便您的应用程序能够确定哪个键被 "删除
"。以下示例展示了这样的配置:
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_KEY) String key) {
// value == null represents key deletion
}
当您使用类级别的 @KafkaListener
并且有多个 @KafkaHandler
方法时,需要一些额外的配置。具体来说,您需要一个带有 KafkaNull
负载的 @KafkaHandler
方法。以下示例展示了如何配置一个:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
请注意,参数是 null
,而不是 KafkaNull
。
查看 手动分配所有分区。
此功能需要使用 KafkaNullAwarePayloadArgumentResolver
,框架将在使用默认的 MessageHandlerMethodFactory
时进行配置。当使用自定义的 MessageHandlerMethodFactory
时,请参阅 将自定义 HandlerMethodArgumentResolver 添加到 @KafkaListener。