跳到主要内容

暂停和恢复监听器容器

ChatGPT-4o-mini 中英对照 Pausing and Resuming Listener Containers

版本 2.1.3 为监听器容器添加了 pause()resume() 方法。之前,您可以在 ConsumerAwareMessageListener 中暂停消费者,并通过监听 ListenerContainerIdleEvent 来恢复它,这提供了对 Consumer 对象的访问。虽然您可以通过使用事件监听器在空闲容器中暂停消费者,但在某些情况下,这并不是线程安全的,因为无法保证事件监听器是在消费者线程上被调用。为了安全地暂停和恢复消费者,您应该使用监听器容器上的 pauseresume 方法。pause() 在下一个 poll() 之前生效;resume() 在当前 poll() 返回后生效。当容器被暂停时,它会继续 poll() 消费者,避免在使用组管理时发生重新平衡,但不会检索任何记录。有关更多信息,请参阅 Kafka 文档。

从版本 2.1.5 开始,您可以调用 isPauseRequested() 来查看是否已调用 pause()。然而,消费者可能尚未实际暂停。isConsumerPaused() 如果所有 Consumer 实例都已实际暂停,则返回 true。

此外(自 2.1.5 版本起),ConsumerPausedEventConsumerResumedEvent 实例以容器作为 source 属性发布,并且涉及的 TopicPartition 实例在 partitions 属性中。

从版本 2.9 开始,新的容器属性 pauseImmediate 被引入,当其设置为 true 时,暂停将在当前记录处理完毕后生效。默认情况下,暂停在处理完上一次轮询的所有记录后生效。请参见 pauseImmediate

以下简单的 Spring Boot 应用程序演示了如何使用容器注册表获取对 @KafkaListener 方法容器的引用,并暂停或恢复其消费者,以及接收相应的事件:

@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {

public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}

@Override
public void onApplicationEvent(KafkaEvent event) {
System.out.println(event);
}

@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
KafkaTemplate<String, String> template) {
return args -> {
template.send("pause.resume.topic", "thing1");
Thread.sleep(10_000);
System.out.println("pausing");
registry.getListenerContainer("pause.resume").pause();
Thread.sleep(10_000);
template.send("pause.resume.topic", "thing2");
Thread.sleep(10_000);
System.out.println("resuming");
registry.getListenerContainer("pause.resume").resume();
Thread.sleep(10_000);
};
}

@KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
public void listen(String in) {
System.out.println(in);
}

@Bean
public NewTopic topic() {
return TopicBuilder.name("pause.resume.topic")
.partitions(2)
.replicas(1)
.build();
}

}
java

以下列表显示了前一个示例的结果:

partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2
none