@KafkaListener 注解
@KafkaListener Annotation
@KafkaListener 注解用于将一个 bean 方法指定为监听器容器的监听器。该 bean 被包装在一个 MessagingMessageListenerAdapter 中,并配置了各种功能,例如转换器,用于在必要时将数据转换为与方法参数匹配的格式。
您可以通过使用 #{…} 或属性占位符 (${…}) 来使用 SpEL 配置注解上的大多数属性。有关更多信息,请参见 Javadoc。
Record Listeners
@KafkaListener 注解提供了一种简单的 POJO 监听器机制。以下示例展示了如何使用它:
public class Listener {
    @KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
    public void listen(String data) {
        ...
    }
}
这个机制要求在你的一个 @Configuration 类上使用 @EnableKafka 注解,并且需要一个监听器容器工厂,用于配置底层的 ConcurrentMessageListenerContainer。默认情况下,期望有一个名为 kafkaListenerContainerFactory 的 bean。以下示例展示了如何使用 ConcurrentMessageListenerContainer:
@Configuration
@EnableKafka
public class KafkaConfig {
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        ...
        return props;
    }
}
请注意,要设置容器属性,必须在工厂上使用 getContainerProperties() 方法。它被用作注入到容器中的实际属性的模板。
从版本 2.1.1 开始,您现在可以为通过注解创建的消费者设置 client.id 属性。clientIdPrefix 以 -n 结尾,其中 n 是一个整数,表示在使用并发时的容器编号。
从版本 2.2 开始,您现在可以通过在注解本身上使用属性来覆盖容器工厂的 concurrency 和 autoStartup 属性。这些属性可以是简单值、属性占位符或 SpEL 表达式。以下示例展示了如何做到这一点:
@KafkaListener(id = "myListener", topics = "myTopic",
        autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
    ...
}
显式分区分配
您还可以配置具有显式主题和分区的 POJO 监听器(可选地,还可以配置它们的初始偏移量)。以下示例演示了如何做到这一点:
@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}
您可以在 partitions 或 partitionOffsets 属性中指定每个分区,但不能同时指定两者。
与大多数注解属性一样,您可以使用 SpEL 表达式;有关如何生成大量分区的示例,请参见 手动分配所有分区。
从版本 2.5.5 开始,您可以对所有分配的分区应用初始偏移量:
@KafkaListener(id = "thing3", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" },
             partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}
* 通配符表示 partitions 属性中的所有分区。在每个 @TopicPartition 中只能有一个带有通配符的 @PartitionOffset。
此外,当监听器实现 ConsumerSeekAware 时,即使在使用手动分配的情况下,onPartitionsAssigned 现在也会被调用。这允许在那个时刻进行任何任意的偏移量查找操作。
从版本 2.6.4 开始,您可以指定一个以逗号分隔的分区列表或分区范围:
@KafkaListener(id = "pp", autoStartup = "false",
        topicPartitions = @TopicPartition(topic = "topic1",
                partitions = "0-5, 7, 10-15"))
public void process(String in) {
    ...
}
范围是包含的;上述示例将分配分区 0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15。
在指定初始偏移量时,可以使用相同的技术:
@KafkaListener(id = "thing3", topicPartitions =
        { @TopicPartition(topic = "topic1",
             partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}
初始偏移量将应用于所有 6 个分区。
自 3.2 以来,@PartitionOffset 支持 SeekPosition.END、SeekPosition.BEGINNING、SeekPosition.TIMESTAMP,seekPosition 匹配 SeekPosition 枚举名称:
@KafkaListener(id = "seekPositionTime", topicPartitions = {
        @TopicPartition(topic = TOPIC_SEEK_POSITION, partitionOffsets = {
                @PartitionOffset(partition = "0", initialOffset = "723916800000", seekPosition = "TIMESTAMP"),
                @PartitionOffset(partition = "1", initialOffset = "0", seekPosition = "BEGINNING"),
                @PartitionOffset(partition = "2", initialOffset = "0", seekPosition = "END")
        })
})
public void listen(ConsumerRecord<?, ?> record) {
    ...
}
如果 seekPosition 设置为 END 或 BEGINNING,将忽略 initialOffset 和 relativeToCurrent。如果 seekPosition 设置为 TIMESTAMP,则 initialOffset 表示时间戳。
手动确认
当使用手动 AckMode 时,您还可以向监听器提供 Acknowledgment。要激活手动 AckMode,您需要在 ContainerProperties 中将 ack-mode 设置为适当的手动模式。以下示例还展示了如何使用不同的容器工厂。此自定义容器工厂必须通过调用 getContainerProperties() 并在其上调用 setAckMode 来将 AckMode 设置为手动类型。否则,Acknowledgment 对象将为 null。
@KafkaListener(id = "cat", topics = "myTopic",
          containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
    ...
    ack.acknowledge();
}
Consumer Record Metadata
最后,关于记录的元数据可以从消息头中获取。您可以使用以下头部名称来检索消息的头部:
- 
KafkaHeaders.OFFSET - 
KafkaHeaders.RECEIVED_KEY - 
KafkaHeaders.RECEIVED_TOPIC - 
KafkaHeaders.RECEIVED_PARTITION - 
KafkaHeaders.RECEIVED_TIMESTAMP - 
KafkaHeaders.TIMESTAMP_TYPE 
从版本 2.5 开始,如果传入记录的键为 null,则 RECEIVED_KEY 不再存在;之前该头部会填充为 null 值。此更改是为了使框架与 spring-messaging 的约定保持一致,其中 null 值的头部是不存在的。
以下示例展示了如何使用 headers:
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {
    ...
}
参数注解(@Payload,@Header)必须在监听器方法的具体实现上指定;如果它们在接口上定义,则不会被检测到。
从版本 2.5 开始,您可以在 ConsumerRecordMetadata 参数中接收记录元数据,而不是使用离散的头部。
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
    ...
}
这包含了来自 ConsumerRecord 的所有数据,除了键和值。
批处理监听器
从版本 1.1 开始,您可以配置 @KafkaListener 方法以接收从消费者轮询中接收到的整个消费者记录批次。
非阻塞重试 不支持批处理监听器。
要配置监听器容器工厂以创建批量监听器,可以设置 batchListener 属性。以下示例演示了如何做到这一点:
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
    ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
            new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setBatchListener(true);  // <<<<<<<<<<<<<<<<<<<<<<<<<
   return factory;
}
从版本 2.8 开始,您可以使用 @KafkaListener 注解上的 batch 属性覆盖工厂的 batchListener 属性。这与对 容器错误处理程序 的更改一起,允许使用相同的工厂来处理记录和批量监听器。
从版本 2.9.6 开始,容器工厂为 recordMessageConverter 和 batchMessageConverter 属性提供了单独的 setter 方法。之前只有一个属性 messageConverter,适用于记录和批处理监听器。
以下示例展示了如何接收一系列有效负载:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
    ...
}
主题、分区、偏移等信息可在与负载并行的头部中找到。以下示例展示了如何使用这些头部:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
        @Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
        @Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
        @Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
        @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
    ...
}
或者,您可以接收一个 List 的 Message<?> 对象,每个消息中包含每个偏移量和其他详细信息,但它必须是方法中定义的唯一参数(除了在使用手动提交时可选的 Acknowledgment 和/或 Consumer<?, ?> 参数)。以下示例展示了如何做到这一点:
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen1(List<Message<?>> list) {
    ...
}
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen2(List<Message<?>> list, Acknowledgment ack) {
    ...
}
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen3(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
    ...
}
在这种情况下,负载不会进行任何转换。
如果 BatchMessagingMessageConverter 配置了 RecordMessageConverter,您还可以向 Message 参数添加一个泛型类型,并且有效载荷将被转换。有关更多信息,请参见 Payload Conversion with Batch Listeners。
您还可以接收一个 ConsumerRecord<?, ?> 对象的列表,但它必须是方法中唯一的参数(除了在使用手动提交和 Consumer<?, ?> 参数时可选的 Acknowledgment)。以下示例展示了如何做到这一点:
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
    ...
}
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
    ...
}
从版本 2.2 开始,监听器可以接收 poll() 方法返回的完整 ConsumerRecords<?, ?> 对象,这让监听器可以访问其他方法,例如 partitions()(返回列表中的 TopicPartition 实例)和 records(TopicPartition)(获取特定的记录)。同样,这必须是该方法的唯一参数(除了在使用手动提交或 Consumer<?, ?> 参数时的可选 Acknowledgment)。以下示例演示了如何做到这一点:
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
    ...
}
如果容器工厂配置了 RecordFilterStrategy,则对于 ConsumerRecords<?, ?> 监听器将被忽略,并会发出 WARN 日志消息。只有在使用 <List<?>> 形式的监听器时,才能对记录进行批量过滤。默认情况下,记录是逐个过滤的;从版本 2.8 开始,您可以覆盖 filterBatch 以在一次调用中过滤整个批次。
注解属性
从版本 2.0 开始,id 属性(如果存在)将用作 Kafka 消费者的 group.id 属性,覆盖消费者工厂中配置的属性(如果存在)。您也可以显式设置 groupId,或者将 idIsGroup 设置为 false,以恢复使用消费者工厂 group.id 的先前行为。
您可以在大多数注解属性中使用属性占位符或 SpEL 表达式,如下例所示:
@KafkaListener(topics = "${some.property}")
@KafkaListener(topics = "#{someBean.someProperty}",
    groupId = "#{someBean.someProperty}.group")
从版本 2.1.2 开始,SpEL 表达式支持一个特殊的标记:__listener。它是一个伪 bean 名称,表示当前存在此注解的 bean 实例。
考虑以下示例:
@Bean
public Listener listener1() {
    return new Listener("topic1");
}
@Bean
public Listener listener2() {
    return new Listener("topic2");
}
根据前面的示例中的 beans,我们可以使用以下内容:
public class Listener {
    private final String topic;
    public Listener(String topic) {
        this.topic = topic;
    }
    @KafkaListener(topics = "#{__listener.topic}",
        groupId = "#{__listener.topic}.group")
    public void listen(...) {
        ...
    }
    public String getTopic() {
        return this.topic;
    }
}
如果在不太可能的情况下你有一个实际的 bean 叫做 __listener,你可以通过使用 beanRef 属性来更改表达式令牌。以下示例演示了如何做到这一点:
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}", groupId = "#{__x.topic}.group")
从版本 2.2.4 开始,您可以直接在注解上指定 Kafka 消费者属性,这些属性将覆盖消费者工厂中配置的同名属性。您 不能 以这种方式指定 group.id 和 client.id 属性;它们将被忽略;请使用 groupId 和 clientIdPrefix 注解属性来设置这些。
属性被指定为单独的字符串,采用正常的 Java Properties 文件格式:foo:bar、foo=bar 或 foo bar,如下例所示:
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
    "max.poll.interval.ms:60000",
    ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
以下是 Using RoutingKafkaTemplate 示例中相应监听器的示例。
@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
    System.out.println("1: " + in);
}
@KafkaListener(id = "two", topics = "two",
        properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
    System.out.println("2: " + new String(in));
}