接收消息
您可以通过配置 MessageListenerContainer
并提供消息监听器,或者使用 @KafkaListener
注解来接收消息。
部分总结
📄️ 消息监听器
当您使用消息监听容器时,必须提供一个监听器来接收数据。目前支持八个消息监听器接口。以下列表显示了这些接口:
📄️ 消息监听器容器
提供了两个 MessageListenerContainer 实现:
📄️ 手动提交偏移量
通常,当使用 AckMode.MANUAL 或 AckMode.MANUAL\_IMMEDIATE 时,确认必须按顺序进行,因为 Kafka 不为每条记录维护状态,只为每个组/分区维护一个已提交的偏移量。从版本 2.8 开始,您现在可以设置容器属性 asyncAcks,这允许对由 poll 返回的记录的确认以任意顺序进行。监听器容器将推迟无序提交,直到收到缺失的确认。在所有先前 poll 的偏移量都已提交之前,消费者将被暂停(不交付新记录)。
📄️ 异步 @KafkaListener 返回类型
从版本 3.2 开始,@KafkaListener (和 @KafkaHandler)方法可以指定异步返回类型,从而允许异步发送回复。返回类型包括 CompletableFuture 和 Kotlin 挂起函数。
📄️ @KafkaListener 注解
@KafkaListener 注解用于指定一个 bean 方法作为监听容器的监听器。该 bean 被包装在一个 MessagingMessageListenerAdapter 中,并配置了各种特性,例如转换器,以便在必要时将数据转换为与方法参数匹配的格式。
📄️ 获取 Consumer group.id
在多个容器中运行相同的监听器代码时,能够确定记录来自哪个容器(通过其 group.id 消费者属性识别)可能是有用的。
📄️ 容器线程命名
TaskExecutor 用于调用消费者和监听器。您可以通过设置容器的 ContainerProperties 的 consumerExecutor 属性来提供自定义执行器。当使用池化执行器时,请确保有足够的线程可用于处理所有容器中的并发性。当使用 ConcurrentMessageListenerContainer 时,执行器中的一个线程将用于每个消费者(并发性)。
📄️ @KafkaListener 作为元注解
从版本 2.2 开始,您现在可以将 @KafkaListener 用作元注解。以下示例演示了如何做到这一点:
📄️ @KafkaListener 在一个类上
当你在类级别使用 @KafkaListener 时,必须在方法级别指定 @KafkaHandler。当消息被传递时,转换后的消息有效载荷类型用于确定调用哪个方法。以下示例展示了如何做到这一点:
📄️ @KafkaListener 属性修改
从版本 2.7.2 开始,您现在可以在容器创建之前以编程方式修改注解属性。为此,请将一个或多个 KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer 添加到应用程序上下文中。AnnotationEnhancer 是一个 BiFunction\<Map\<String, Object>, AnnotatedElement, Map\<String, Object>,并且必须返回一个属性映射。属性值可以包含 SpEL 和/或属性占位符;增强器在执行任何解析之前被调用。如果存在多个增强器,并且它们实现了 Ordered,它们将按顺序被调用。
📄️ @KafkaListener 生命周期管理
为 @KafkaListener 注解创建的监听器容器不是应用程序上下文中的 bean。相反,它们是与类型为 KafkaListenerEndpointRegistry 的基础设施 bean 注册的。这个 bean 是由框架自动声明的,并管理容器的生命周期;它会自动启动任何 autoStartup 设置为 true 的容器。所有由所有容器工厂创建的容器必须处于相同的阶段。有关更多信息,请参见监听器容器自动启动。您可以通过使用注册表以编程方式管理生命周期。启动或停止注册表将启动或停止所有注册的容器。或者,您可以通过使用其 id 属性获取对单个容器的引用。您可以在注解上设置 autoStartup,这将覆盖配置到容器工厂中的默认设置。您可以从应用程序上下文中获取对 bean 的引用,例如自动装配,以管理其注册的容器。以下示例演示了如何做到这一点:
📄️ @KafkaListener @Payload 验证
从版本 2.2 开始,现在更容易为 @KafkaListener @Payload 参数添加验证器。之前,您必须配置一个自定义的 DefaultMessageHandlerMethodFactory 并将其添加到注册器中。现在,您可以将验证器直接添加到注册器本身。以下代码展示了如何做到这一点:
📄️ 重新平衡监听器
ContainerProperties 有一个名为 consumerRebalanceListener 的属性,它接受 Kafka 客户端的 ConsumerRebalanceListener 接口的实现。如果没有提供此属性,容器会配置一个记录日志的监听器,以 INFO 级别记录再平衡事件。该框架还添加了一个子接口 ConsumerAwareRebalanceListener。以下列表显示了 ConsumerAwareRebalanceListener 接口的定义:
📄️ 强制消费者重新平衡
Kafka 客户端现在支持触发强制再平衡的选项。从版本 3.1.2 开始,Spring for Apache Kafka 提供了一个选项,可以通过消息监听容器在 Kafka 消费者上调用此 API。当调用此 API 时,它只是提醒 Kafka 消费者触发强制再平衡;实际的再平衡将仅在下一个 poll() 操作中发生。如果已经有再平衡正在进行中,调用强制再平衡将是一个 NO-OP。调用者必须等待当前的再平衡完成后才能调用另一个。有关 enforceRebalance 的更多详细信息,请参见 javadocs。
📄️ 使用 @SendTo 转发监听器结果
从版本 2.0 开始,如果你还在 @KafkaListener 上添加了 @SendTo 注解,并且方法调用返回了结果,则该结果将被转发到 @SendTo 指定的主题。
📄️ 过滤消息
在某些场景中,例如重新平衡,已经处理过的消息可能会被重新投递。框架无法知道这样的消息是否已经被处理。这是一个应用层的功能。这被称为幂等接收器模式,Spring Integration 提供了它的实现。
📄️ 重试交付
请参见 Handling Exceptions 中的 DefaultErrorHandler。
📄️ 从 @KafkaListeners 开始按顺序处理
一个常见的用例是在另一个监听器消费完主题中的所有记录后启动一个监听器。例如,您可能希望在处理其他主题的记录之前,将一个或多个压缩主题的内容加载到内存中。从版本 2.7.3 开始,引入了一个新组件 ContainerGroupSequencer。它使用 @KafkaListener 的 containerGroup 属性将容器分组在一起,并在当前组中的所有容器都处于空闲状态时启动下一个组中的容器。
📄️ 使用 KafkaTemplate 接收
本节介绍如何使用 KafkaTemplate 接收消息。