跳到主要内容

重新平衡监听器

ChatGPT-4o-mini 中英对照 Rebalancing Listeners

ContainerProperties 有一个属性叫做 consumerRebalanceListener,它接受 Kafka 客户端的 ConsumerRebalanceListener 接口的实现。如果没有提供这个属性,容器将配置一个记录日志的监听器,以 INFO 级别记录再平衡事件。该框架还添加了一个子接口 ConsumerAwareRebalanceListener。以下列表显示了 ConsumerAwareRebalanceListener 接口的定义:

public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {

void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

}
java

注意,当分区被撤销时,有两个回调。第一个回调会立即被调用。第二个回调在任何待处理的偏移量被提交后调用。如果您希望在某个外部存储库中维护偏移量,这将很有用,以下示例演示了这一点:

containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {

@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}

@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}

@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
java
important

从版本 2.4 开始,新增了一个方法 onPartitionsLost()(与 ConsumerRebalanceLister 中同名的方法类似)。ConsumerRebalanceLister 的默认实现简单地调用 onPartitionsRevoked。而 ConsumerAwareRebalanceListener 的默认实现则什么也不做。当为监听器容器提供自定义监听器(任一类型)时,重要的是你的实现不要在 onPartitionsLost 中调用 onPartitionsRevoked。如果你实现了 ConsumerRebalanceListener,你应该重写默认方法。这是因为监听器容器会在调用你实现的方法后,从其 onPartitionsLost 的实现中调用自己的 onPartitionsRevoked。如果你的实现委托给默认行为,每次 Consumer 在容器的监听器上调用该方法时,onPartitionsRevoked 将被调用两次。