跳到主要内容

过滤消息

ChatGPT-4o-mini 中英对照 Filtering Messages

在某些场景中,例如重新平衡,已经处理过的消息可能会被重新投递。框架无法知道这样的消息是否已经被处理。这是一个应用层的功能。这被称为 幂等接收器 模式,Spring Integration 提供了一个 实现

Spring for Apache Kafka 项目还提供了一些帮助,通过 FilteringMessageListenerAdapter 类,该类可以包装你的 MessageListener。这个类接受一个 RecordFilterStrategy 的实现,你可以在其中实现 filter 方法,以指示某条消息是重复的并应该被丢弃。它还有一个名为 ackDiscarded 的附加属性,用于指示适配器是否应该确认被丢弃的记录。默认值为 false

当你使用 @KafkaListener 时,设置 RecordFilterStrategy(可选地设置 ackDiscarded)在容器工厂上,以便监听器被包装在适当的过滤适配器中。

此外,提供了一个 FilteringBatchMessageListenerAdapter,用于当您使用批量 message listener 时。

important

如果你的 @KafkaListener 接收到 ConsumerRecords<?, ?> 而不是 List<ConsumerRecord<?, ?>>,则 FilteringBatchMessageListenerAdapter 将被忽略,因为 ConsumerRecords 是不可变的。

从版本 2.8.4 开始,您可以通过在监听器注解上使用 filter 属性来覆盖监听器容器工厂的默认 RecordFilterStrategy

@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}
java

从版本 3.3 开始,支持忽略因 RecordFilterStrategy 过滤而产生的空批次。在实现 RecordFilterStrategy 时,可以通过 ignoreEmptyBatch() 进行配置。默认设置为 false,这表示即使所有的 ConsumerRecord 都被过滤掉,KafkaListener 仍然会被调用。

如果返回 true,则在所有 ConsumerRecord 被过滤掉时,KafkaListener 将不会被调用。然而,提交到 broker 的操作仍然会执行。

如果返回 false,则在所有 ConsumerRecord 被过滤掉时,将调用 KafkaListener

这里有一些例子。

public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}

@Override
public boolean ignoreEmptyBatch() {
return true;
}
};

// NOTE: ignoreEmptyBatchRecordFilterStrategy is bean name of IgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
java

在这种情况下,IgnoreEmptyBatchRecordFilterStrategy 始终返回空列表,并且将 ignoreEmptyBatch() 的结果返回为 true。因此,KafkaListener#listen(…​) 将根本不会被调用。

public class NotIgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}

@Override
public boolean ignoreEmptyBatch() {
return false;
}
};

// NOTE: notIgnoreEmptyBatchRecordFilterStrategy is bean name of NotIgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "notIgnoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
java

然而,在这种情况下,IgnoreEmptyBatchRecordFilterStrategy 始终返回空列表,并且将 ignoreEmptyBatch() 的结果返回为 false。因此,KafkaListener#listen(…​) 将始终被调用。