过滤消息
Spring for Apache Kafka 项目还提供了一些帮助,通过 FilteringMessageListenerAdapter
类,该类可以包装你的 MessageListener
。这个类接受一个 RecordFilterStrategy
的实现,你可以在其中实现 filter
方法,以指示某条消息是重复的并应该被丢弃。它还有一个名为 ackDiscarded
的附加属性,用于指示适配器是否应该确认被丢弃的记录。默认值为 false
。
当你使用 @KafkaListener
时,设置 RecordFilterStrategy
(可选地设置 ackDiscarded
)在容器工厂上,以便监听器被包装在适当的过滤适配器中。
此外,提供了一个 FilteringBatchMessageListenerAdapter
,用于当您使用批量 message listener 时。
如果你的 @KafkaListener
接收到 ConsumerRecords<?, ?>
而不是 List<ConsumerRecord<?, ?>>
,则 FilteringBatchMessageListenerAdapter
将被忽略,因为 ConsumerRecords
是不可变的。
从版本 2.8.4 开始,您可以通过在监听器注解上使用 filter
属性来覆盖监听器容器工厂的默认 RecordFilterStrategy
。
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}
从版本 3.3 开始,支持忽略因 RecordFilterStrategy
过滤而产生的空批次。在实现 RecordFilterStrategy
时,可以通过 ignoreEmptyBatch()
进行配置。默认设置为 false
,这表示即使所有的 ConsumerRecord
都被过滤掉,KafkaListener
仍然会被调用。
如果返回 true
,则在所有 ConsumerRecord
被过滤掉时,KafkaListener
将不会被调用。然而,提交到 broker 的操作仍然会执行。
如果返回 false
,则在所有 ConsumerRecord
被过滤掉时,将调用 KafkaListener
。
这里有一些例子。
public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}
@Override
public boolean ignoreEmptyBatch() {
return true;
}
};
// NOTE: ignoreEmptyBatchRecordFilterStrategy is bean name of IgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
在这种情况下,IgnoreEmptyBatchRecordFilterStrategy
始终返回空列表,并且将 ignoreEmptyBatch()
的结果返回为 true
。因此,KafkaListener#listen(…)
将根本不会被调用。
public class NotIgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
...
@Override
public List<ConsumerRecord<String, String>> filterBatch(
List<ConsumerRecord<String, String>> consumerRecords) {
return List.of();
}
@Override
public boolean ignoreEmptyBatch() {
return false;
}
};
// NOTE: notIgnoreEmptyBatchRecordFilterStrategy is bean name of NotIgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "notIgnoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
...
}
然而,在这种情况下,IgnoreEmptyBatchRecordFilterStrategy
始终返回空列表,并且将 ignoreEmptyBatch()
的结果返回为 false
。因此,KafkaListener#listen(…)
将始终被调用。