消息监听器
当你使用 message listener container 时,必须提供一个监听器来接收数据。目前支持八个消息监听器接口。以下列表显示了这些接口:
public interface MessageListener<K, V> { 1
void onMessage(ConsumerRecord<K, V> data);
}
public interface AcknowledgingMessageListener<K, V> { 2
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 3
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { 4
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
public interface BatchMessageListener<K, V> { 5
void onMessage(List<ConsumerRecord<K, V>> data);
}
public interface BatchAcknowledgingMessageListener<K, V> { 6
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 7
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { 8
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
使用此接口处理从 Kafka 消费者
poll()
操作接收到的单个ConsumerRecord
实例,当使用自动提交或其中一种容器管理的 提交方法 时。使用此接口处理从 Kafka 消费者
poll()
操作接收到的单个ConsumerRecord
实例,当使用其中一种手动 提交方法 时。使用此接口处理从 Kafka 消费者
poll()
操作接收到的单个ConsumerRecord
实例,当使用自动提交或其中一种容器管理的 提交方法 时。提供对Consumer
对象的访问。使用此接口处理从 Kafka 消费者
poll()
操作接收到的单个ConsumerRecord
实例,当使用其中一种手动 提交方法 时。提供对Consumer
对象的访问。使用此接口处理从 Kafka 消费者
poll()
操作接收到的所有ConsumerRecord
实例,当使用自动提交或其中一种容器管理的 提交方法 时。使用此接口时不支持AckMode.RECORD
,因为监听器接收到的是完整的批次。使用此接口处理从 Kafka 消费者
poll()
操作接收到的所有ConsumerRecord
实例,当使用其中一种手动 提交方法 时。使用此接口处理从 Kafka 消费者
poll()
操作接收到的所有ConsumerRecord
实例,当使用自动提交或其中一种容器管理的 提交方法 时。使用此接口时不支持AckMode.RECORD
,因为监听器接收到的是完整的批次。提供对Consumer
对象的访问。使用此接口处理从 Kafka 消费者
poll()
操作接收到的所有ConsumerRecord
实例,当使用其中一种手动 提交方法 时。提供对Consumer
对象的访问。
Consumer
对象不是线程安全的。您必须仅在调用监听器的线程上调用其方法。
您不应该在监听器中执行任何影响消费者位置或已提交偏移量的 Consumer<?, ?>
方法;容器需要管理这些信息。