连接到 Kafka
- 
KafkaAdmin- 请参见 Configuring Topics - 
ProducerFactory- 请参见 Sending Messages - 
ConsumerFactory- 请参见 Receiving Messages 
从版本 2.5 开始,这些都扩展了 KafkaResourceFactory。这允许通过向其配置中添加 Supplier<String> 来在运行时更改引导服务器:setBootstrapServersSupplier(() -> …)。这将在所有新连接时被调用,以获取服务器列表。消费者和生产者通常是长生命周期的。要关闭现有的生产者,请在 DefaultKafkaProducerFactory 上调用 reset()。要关闭现有的消费者,请在 KafkaListenerEndpointRegistry 上调用 stop()(然后 start()),以及在任何其他监听容器 bean 上调用 stop() 和 start()。
为了方便,框架还提供了一个 ABSwitchCluster,它支持两组引导服务器;其中一组在任何时候都是活动的。配置 ABSwitchCluster 并将其添加到生产者和消费者工厂,以及 KafkaAdmin,通过调用 setBootstrapServersSupplier()。当你想要切换时,调用 primary() 或 secondary(),并在生产者工厂上调用 reset() 以建立新的连接;对于消费者,调用 stop() 和 start() 所有监听容器。当使用 @KafkaListener 时,调用 stop() 和 start() KafkaListenerEndpointRegistry bean。
请参阅 Javadocs 以获取更多信息。
Factory Listeners
从版本 2.5 开始,DefaultKafkaProducerFactory 和 DefaultKafkaConsumerFactory 可以配置一个 Listener,以便在创建或关闭生产者或消费者时接收通知。
interface Listener<K, V> {
    default void producerAdded(String id, Producer<K, V> producer) {
    }
    default void producerRemoved(String id, Producer<K, V> producer) {
    }
}
interface Listener<K, V> {
    default void consumerAdded(String id, Consumer<K, V> consumer) {
    }
    default void consumerRemoved(String id, Consumer<K, V> consumer) {
    }
}
在每种情况下,id 是通过将 client-id 属性(在创建后从 metrics() 中获取)附加到工厂 beanName 属性上,并用 . 分隔而创建的。
这些监听器可以用于,例如,在创建新客户端时创建并绑定一个 Micrometer KafkaClientMetrics 实例(并在客户端关闭时关闭它)。
该框架提供了正是这样做的监听器;请参见 Micrometer Native Metrics。
默认客户端 ID 前缀
从版本 3.2 开始,对于使用 spring.application.name 属性定义应用名称的 Spring Boot 应用程序,该名称现在用作这些客户端类型的自动生成客户端 ID 的默认前缀:
- 
不使用消费者组的消费者客户端
 - 
生产者客户端
 - 
管理客户端
 
这使得在服务器端识别这些客户端变得更容易,以便进行故障排除或应用配额。
表 1. 结果为 spring.application.name=myapp 的 Spring Boot 应用程序的示例客户端 ID
| 客户端类型 | 无应用名称 | 有应用名称 | 
|---|---|---|
| 无消费者组的消费者 | consumer-null-1 | myapp-consumer-1 | 
| 有消费者组 "mygroup" 的消费者 | consumer-mygroup-1 | consumer-mygroup-1 | 
| 生产者 | producer-1 | myapp-producer-1 | 
| 管理员 | adminclient-1 | myapp-admin-1 |