跳到主要内容

接收消息

ChatGPT-4o-mini 中英对照 Receiving Messages

您可以通过配置 MessageListenerContainer 并提供消息监听器,或者使用 @KafkaListener 注解来接收消息。

部分总结

📄️ 手动提交偏移量

通常,当使用 AckMode.MANUAL 或 AckMode.MANUAL\_IMMEDIATE 时,确认必须按顺序进行,因为 Kafka 不为每条记录维护状态,只为每个组/分区维护一个已提交的偏移量。从版本 2.8 开始,您现在可以设置容器属性 asyncAcks,这允许对由 poll 返回的记录的确认以任意顺序进行。监听器容器将推迟无序提交,直到收到缺失的确认。在所有先前 poll 的偏移量都已提交之前,消费者将被暂停(不交付新记录)。

📄️ @KafkaListener 属性修改

从版本 2.7.2 开始,您现在可以在容器创建之前以编程方式修改注解属性。为此,请将一个或多个 KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer 添加到应用程序上下文中。AnnotationEnhancer 是一个 BiFunction\<Map\<String, Object>, AnnotatedElement, Map\<String, Object>,并且必须返回一个属性映射。属性值可以包含 SpEL 和/或属性占位符;增强器在执行任何解析之前被调用。如果存在多个增强器,并且它们实现了 Ordered,它们将按顺序被调用。

📄️ @KafkaListener 生命周期管理

为 @KafkaListener 注解创建的监听器容器不是应用程序上下文中的 bean。相反,它们是与类型为 KafkaListenerEndpointRegistry 的基础设施 bean 注册的。这个 bean 是由框架自动声明的,并管理容器的生命周期;它会自动启动任何 autoStartup 设置为 true 的容器。所有由所有容器工厂创建的容器必须处于相同的阶段。有关更多信息,请参见监听器容器自动启动。您可以通过使用注册表以编程方式管理生命周期。启动或停止注册表将启动或停止所有注册的容器。或者,您可以通过使用其 id 属性获取对单个容器的引用。您可以在注解上设置 autoStartup,这将覆盖配置到容器工厂中的默认设置。您可以从应用程序上下文中获取对 bean 的引用,例如自动装配,以管理其注册的容器。以下示例演示了如何做到这一点:

📄️ 强制消费者重新平衡

Kafka 客户端现在支持触发强制再平衡的选项。从版本 3.1.2 开始,Spring for Apache Kafka 提供了一个选项,可以通过消息监听容器在 Kafka 消费者上调用此 API。当调用此 API 时,它只是提醒 Kafka 消费者触发强制再平衡;实际的再平衡将仅在下一个 poll() 操作中发生。如果已经有再平衡正在进行中,调用强制再平衡将是一个 NO-OP。调用者必须等待当前的再平衡完成后才能调用另一个。有关 enforceRebalance 的更多详细信息,请参见 javadocs。

📄️ 从 @KafkaListeners 开始按顺序处理

一个常见的用例是在另一个监听器消费完主题中的所有记录后启动一个监听器。例如,您可能希望在处理其他主题的记录之前,将一个或多个压缩主题的内容加载到内存中。从版本 2.7.3 开始,引入了一个新组件 ContainerGroupSequencer。它使用 @KafkaListener 的 containerGroup 属性将容器分组在一起,并在当前组中的所有容器都处于空闲状态时启动下一个组中的容器。