使用 Spring 进行 Apache Kafka
本节提供了对影响使用 Spring for Apache Kafka 的各种问题的详细解释。有关快速但不太详细的介绍,请参见 Quick Tour。
部分总结
📄️ 连接到 Kafka
KafkaAdmin - 参见 配置主题
📄️ 配置主题
如果您在应用程序上下文中定义了一个 KafkaAdmin bean,它可以自动将主题添加到代理。为此,您可以为每个主题向应用程序上下文添加一个 NewTopic @Bean。版本 2.3 引入了一个新的类 TopicBuilder,以使创建此类 bean 更加方便。以下示例演示了如何做到这一点:
📄️ 发送消息
本节介绍如何发送消息。
🗃️ 接受消息
19 个项目
📄️ 监听器容器属性
ackCount
📄️ 动态创建容器
有几种技术可以在运行时创建监听器容器。本节将探讨其中的一些技术。
📄️ 应用程序事件
以下 Spring 应用程序事件由监听器容器及其消费者发布:
📄️ 主题/分区初始偏移量
有几种方法可以设置分区的初始偏移量。
📄️ 寻求特定偏移量
为了进行查找,您的监听器必须实现 ConsumerSeekAware 接口,该接口具有以下方法:
📄️ 容器工厂
如在 @KafkaListener 注解中讨论的,ConcurrentKafkaListenerContainerFactory 用于为注解方法创建容器。
📄️ 线程安全
当使用并发消息监听器容器时,单个监听器实例会在所有消费者线程上被调用。因此,监听器需要是线程安全的,最好使用无状态监听器。如果无法使您的监听器线程安全,或者添加同步会显著降低添加并发的好处,您可以使用以下几种技术之一:
📄️ 监控
从版本 2.3 开始,如果在类路径中检测到 Micrometer,并且应用程序上下文中存在单个 MeterRegistry,监听器容器将自动创建和更新监听器的 Micrometer Timers。可以通过将 ContainerProperty 的 micrometerEnabled 设置为 false 来禁用这些计时器。
📄️ 事务
本节描述了 Spring for Apache Kafka 如何支持事务。
📄️ 精确一次语义
您可以为 KafkaAwareTransactionManager 实例提供一个监听器容器。当配置完成后,容器在调用监听器之前会启动一个事务。监听器执行的任何 KafkaTemplate 操作都参与该事务。如果监听器成功处理了记录(或在使用 BatchMessageListener 时处理多个记录),容器会通过 producer.sendOffsetsToTransaction() 将偏移量发送到事务中,然后事务管理器提交该事务。如果监听器抛出异常,则事务会回滚,消费者会重新定位,以便在下次轮询时可以检索回滚的记录。有关更多信息以及处理重复失败记录的内容,请参见 After-rollback Processor。
📄️ 将 Spring Beans 接入生产者/消费者拦截器
Apache Kafka 提供了一种机制,可以将拦截器添加到生产者和消费者。这些对象由 Kafka 管理,而不是 Spring,因此正常的 Spring 依赖注入无法用于连接依赖的 Spring Bean。然而,您可以使用拦截器的 config() 方法手动连接这些依赖项。以下 Spring Boot 应用程序展示了如何通过重写 Spring Boot 的默认工厂,将一些依赖的 Bean 添加到配置属性中。
📄️ 在 Spring 中管理生产者拦截器
从版本 3.0.0 开始,对于生产者拦截器,您可以让 Spring 直接将其作为一个 bean 管理,而不是将拦截器的类名提供给 Apache Kafka 生产者配置。如果您选择这种方法,则需要在 KafkaTemplate 上设置此生产者拦截器。以下是一个使用上面相同的 MyProducerInterceptor 的示例,但更改为不使用内部配置属性。
📄️ 暂停和恢复监听器容器
版本 2.1.3 为监听器容器添加了 pause() 和 resume() 方法。之前,您可以在 ConsumerAwareMessageListener 中暂停消费者,并通过监听 ListenerContainerIdleEvent 来恢复它,该事件提供对 Consumer 对象的访问。虽然您可以通过使用事件监听器在空闲容器中暂停消费者,但在某些情况下,这并不是线程安全的,因为无法保证事件监听器是在消费者线程上调用的。为了安全地暂停和恢复消费者,您应该使用监听器容器上的 pause 和 resume 方法。pause() 在下一个 poll() 之前生效;resume() 在当前 poll() 返回后生效。当容器被暂停时,它会继续对消费者进行 poll(),避免在使用组管理时发生重新平衡,但不会检索任何记录。有关更多信息,请参见 Kafka 文档。
📄️ 在监听器容器上暂停和恢复分区
自版本 2.7 起,您可以通过在监听器容器中使用 pausePartition(TopicPartition topicPartition) 和 resumePartition(TopicPartition topicPartition) 方法来暂停和恢复分配给该消费者的特定分区的消费。暂停和恢复分别发生在 poll() 之前和之后,类似于 pause() 和 resume() 方法。isPartitionPauseRequested() 方法如果请求暂停该分区,则返回 true。isPartitionPaused() 方法如果该分区已有效暂停,则返回 true。
📄️ 序列化、反序列化和消息转换
Apache Kafka 提供了一个高层次的 API,用于序列化和反序列化记录值以及它们的键。它通过 org.apache.kafka.common.serialization.Serializer 和 org.apache.kafka.common.serialization.Deserializer 抽象提供了一些内置实现。同时,我们可以通过使用 Producer 或 Consumer 配置属性来指定序列化器和反序列化器类。以下示例演示了如何做到这一点:
📄️ 消息头
0.11.0.0 客户端引入了对消息中头部的支持。从 2.0 版本开始,Spring for Apache Kafka 现在支持将这些头部映射到 spring-messaging 的 MessageHeaders 中。
📄️ 空载荷与“墓碑”记录的日志压缩
当您使用日志压缩时,您可以发送和接收负载为 null 的消息,以标识键的删除。
📄️ 处理异常
本节描述了在使用 Spring for Apache Kafka 时如何处理可能出现的各种异常。
📄️ JAAS 和 Kerberos
从版本 2.0 开始,添加了一个 KafkaJaasLoginModuleInitializer 类,以帮助进行 Kerberos 配置。您可以将此 bean 及其所需的配置添加到您的应用程序上下文中。以下示例配置了这样的 bean: