跳到主要内容

@KafkaListener 在一个类上

ChatGPT-4o-mini 中英对照 @KafkaListener on a Class @KafkaListener on a Class

当你在类级别使用 @KafkaListener 时,必须在方法级别指定 @KafkaHandler。当消息被传递时,转换后的消息负载类型用于确定调用哪个方法。以下示例演示了如何做到这一点:

@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {

@KafkaHandler
public void listen(String foo) {
...
}

@KafkaHandler
public void listen(Integer bar) {
...
}

@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
...
}

}
java

从版本 2.1.3 开始,您可以将 @KafkaHandler 方法指定为默认方法,如果没有其他方法匹配,则会调用该方法。最多只能指定一个这样的默认方法。在使用 @KafkaHandler 方法时,负载必须已经转换为域对象(以便进行匹配)。请使用自定义反序列化器、JsonDeserializer 或将其 TypePrecedence 设置为 TYPE_IDJsonMessageConverter。有关更多信息,请参见 Serialization, Deserialization, and Message Conversion

important

由于 Spring 在解析方法参数时的一些限制,默认的 @KafkaHandler 不能接收离散的头信息;它必须使用 ConsumerRecordMetadata,如 Consumer Record Metadata 中所讨论的。

例如:

@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
}
java

如果对象是一个 String,这将不起作用;topic 参数也会引用 object

如果您需要关于记录的元数据在默认方法中,请使用以下内容:

@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
String topic = meta.topic();
...
}
java