寻找特定偏移量
为了进行查找,您的监听器必须实现 ConsumerSeekAware
,该接口具有以下方法:
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onPartitionsRevoked(Collection<TopicPartition> partitions);
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
registerSeekCallback
在容器启动时以及每当分区被分配时被调用。你应该在初始化后某个任意时间进行查找时使用这个回调。你应该保存对回调的引用。如果你在多个容器中(或在 ConcurrentMessageListenerContainer
中)使用相同的监听器,你应该将回调存储在 ThreadLocal
或其他以监听器 Thread
为键的结构中。
在使用组管理时,onPartitionsAssigned
在分区被分配时被调用。您可以使用此方法,例如,通过调用回调来设置分区的初始偏移量。您还可以使用此方法将该线程的回调与分配的分区关联(请参见下面的示例)。您必须使用回调参数,而不是传递给 registerSeekCallback
的参数。从版本 2.5.5 开始,即使在使用 手动分区分配 时,此方法也会被调用。
onPartitionsRevoked
在容器停止或 Kafka 撤销分配时被调用。你应该丢弃这个线程的回调,并移除与被撤销分区的任何关联。
回调具有以下方法:
void seek(String topic, int partition, long offset);
void seek(String topic, int partition, Function<Long, Long> offsetComputeFunction);
void seekToBeginning(String topic, int partition);
void seekToBeginning(Collection<TopicPartitions> partitions);
void seekToEnd(String topic, int partition);
void seekToEnd(Collection<TopicPartitions> partitions);
void seekRelative(String topic, int partition, long offset, boolean toCurrent);
void seekToTimestamp(String topic, int partition, long timestamp);
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);
String getGroupId();
seek
方法的两种不同变体提供了一种寻址到任意偏移量的方法。接受 Function
作为参数以计算偏移量的方法是在框架的 3.2 版本中添加的。该函数提供对当前偏移量的访问(消费者返回的当前位置信息,即下一个要获取的偏移量)。用户可以根据消费者中的当前偏移量在函数定义中决定要寻址到哪个偏移量。
seekRelative
在版本 2.3 中被添加,用于执行相对查找。
-
offset
为负且toCurrent
为false
- 相对于分区的末尾进行查找。 -
offset
为正且toCurrent
为false
- 相对于分区的开头进行查找。 -
offset
为负且toCurrent
为true
- 相对于当前位置进行查找(倒退)。 -
offset
为正且toCurrent
为true
- 相对于当前位置进行查找(快进)。
seekToTimestamp
方法也在版本 2.3 中添加。
在 onIdleContainer
或 onPartitionsAssigned
方法中寻求多个分区的相同时间戳时,第二种方法更为优先,因为在一次调用消费者的 offsetsForTimes
方法中查找时间戳的偏移量更为高效。当从其他位置调用时,容器将收集所有时间戳查找请求,并对 offsetsForTimes
进行一次调用。
您还可以在检测到空闲容器时,从 onIdleContainer()
执行查找操作。有关如何启用空闲容器检测,请参见 Detecting Idle and Non-Responsive Consumers。
seekToBeginning
方法接受一个集合,这在处理压缩主题时非常有用,例如,当您希望在每次启动应用程序时都寻求开始位置时:
public class MyListener implements ConsumerSeekAware {
...
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToBeginning(assignments.keySet());
}
}
要在运行时任意查找,请使用 registerSeekCallback
中适当线程的回调引用。
这是一个简单的 Spring Boot 应用程序,演示了如何使用回调;它向主题发送 10 条记录;在控制台中按 <Enter>
会导致所有分区回到开头。
@SpringBootApplication
public class SeekExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SeekExampleApplication.class, args);
}
@Bean
public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send(
new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
while (true) {
System.in.read();
listener.seekToStart();
}
};
}
@Bean
public NewTopic topic() {
return new NewTopic("seekExample", 3, (short) 1);
}
}
@Component
class Listener implements ConsumerSeekAware {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();
private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.callbackForThread.set(callback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> this.callbacks.remove(tp));
this.callbackForThread.remove();
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
@KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
public void listen(ConsumerRecord<String, String> in) {
logger.info(in.toString());
}
public void seekToStart() {
this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
为了简化操作,版本 2.3 添加了 AbstractConsumerSeekAware
类,该类跟踪用于特定主题/分区的回调。以下示例展示了如何在每个分区中,每次容器空闲时,定位到最后处理的记录。它还具有允许任意外部调用将分区回退一条记录的方法。
public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
@KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
public void listen(String in) {
...
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind all partitions one record.
*/
public void rewindAllOneRecord() {
getTopicsAndCallbacks()
.forEach((tp, callbacks) ->
callbacks.forEach(callback -> callback.seekRelative(tp.topic(), tp.partition(), -1, true))
);
}
/**
* Rewind one partition one record.
*/
public void rewindOnePartitionOneRecord(String topic, int partition) {
getSeekCallbacksFor(new TopicPartition(topic, partition))
.forEach(callback -> callback.seekRelative(topic, partition, -1, true));
}
}
版本 2.6 向抽象类添加了便利方法:
-
seekToBeginning()
- 将所有分配的分区定位到开始。 -
seekToEnd()
- 将所有分配的分区定位到结束。 -
seekToTimestamp(long timestamp)
- 将所有分配的分区定位到该时间戳所表示的偏移量。
示例:
public class MyListener extends AbstractConsumerSeekAware {
@KafkaListener(...)
void listen(...) {
...
}
}
public class SomeOtherBean {
MyListener listener;
...
void someMethod() {
this.listener.seekToTimestamp(System.currentTimeMillis() - 60_000);
}
}
从版本 3.3 开始,在 ConsumerSeekAware.ConsumerSeekCallback
接口中引入了一个新方法 getGroupId()
。当你需要识别与特定 seek 回调相关联的消费者组时,这个方法特别有用。
当使用一个扩展了 AbstractConsumerSeekAware
的类时,在一个监听器中执行的寻址操作可能会影响同一类中的所有监听器。这可能并不是总是期望的行为。为了解决这个问题,您可以使用回调提供的 getGroupId()
方法。这使您能够选择性地执行寻址操作,仅针对感兴趣的消费者组。