跳到主要内容

使用 Spring 进行 Apache Kafka

ChatGPT-4o-mini 中英对照 Using Spring for Apache Kafka

本节提供了对影响使用 Spring for Apache Kafka 的各种问题的详细解释。有关快速但不太详细的介绍,请参见 Quick Tour

部分总结

📄️ 精确一次语义

您可以为 KafkaAwareTransactionManager 实例提供一个监听器容器。当配置完成后,容器在调用监听器之前会启动一个事务。监听器执行的任何 KafkaTemplate 操作都参与该事务。如果监听器成功处理了记录(或在使用 BatchMessageListener 时处理多个记录),容器会通过 producer.sendOffsetsToTransaction() 将偏移量发送到事务中,然后事务管理器提交该事务。如果监听器抛出异常,则事务会回滚,消费者会重新定位,以便在下次轮询时可以检索回滚的记录。有关更多信息以及处理重复失败记录的内容,请参见 After-rollback Processor。

📄️ 暂停和恢复监听器容器

版本 2.1.3 为监听器容器添加了 pause() 和 resume() 方法。之前,您可以在 ConsumerAwareMessageListener 中暂停消费者,并通过监听 ListenerContainerIdleEvent 来恢复它,该事件提供对 Consumer 对象的访问。虽然您可以通过使用事件监听器在空闲容器中暂停消费者,但在某些情况下,这并不是线程安全的,因为无法保证事件监听器是在消费者线程上调用的。为了安全地暂停和恢复消费者,您应该使用监听器容器上的 pause 和 resume 方法。pause() 在下一个 poll() 之前生效;resume() 在当前 poll() 返回后生效。当容器被暂停时,它会继续对消费者进行 poll(),避免在使用组管理时发生重新平衡,但不会检索任何记录。有关更多信息,请参见 Kafka 文档。

📄️ 在监听器容器上暂停和恢复分区

自版本 2.7 起,您可以通过在监听器容器中使用 pausePartition(TopicPartition topicPartition) 和 resumePartition(TopicPartition topicPartition) 方法来暂停和恢复分配给该消费者的特定分区的消费。暂停和恢复分别发生在 poll() 之前和之后,类似于 pause() 和 resume() 方法。isPartitionPauseRequested() 方法如果请求暂停该分区,则返回 true。isPartitionPaused() 方法如果该分区已有效暂停,则返回 true。