强制消费者再平衡
Kafka 客户端现在支持触发 强制再平衡 的选项。从版本 3.1.2
开始,Spring for Apache Kafka 提供了一个选项,可以通过消息监听容器在 Kafka 消费者上调用此 API。当调用此 API 时,它只是提醒 Kafka 消费者触发一个强制再平衡;实际的再平衡将仅在下一个 poll()
操作中发生。如果已经有一个再平衡正在进行,调用强制再平衡将是一个 NO-OP。调用者必须等待当前再平衡完成后才能调用另一个。有关更多详细信息,请参见 enforceRebalance
的 javadocs。
以下代码片段展示了使用消息监听容器强制重新平衡的本质。
@KafkaListener(id = "my.id", topics = "my-topic")
void listen(ConsumerRecord<String, String> in) {
System.out.println("From KafkaListener: " + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, Object> template, KafkaListenerEndpointRegistry registry) {
return args -> {
final MessageListenerContainer listenerContainer = registry.getListenerContainer("my.id");
System.out.println("Enforcing a rebalance");
Thread.sleep(5_000);
listenerContainer.enforceRebalance();
Thread.sleep(5_000);
};
}
如上面的代码所示,应用程序使用 KafkaListenerEndpointRegistry
来访问消息监听容器,然后调用 enforceRebalance
API。当在监听容器上调用 enforceRebalance
时,它将调用委托给底层的 Kafka 消费者。Kafka 消费者将在下一个 poll()
操作中触发重新平衡。