动态创建容器
有几种技术可以在运行时创建监听器容器。本节将探讨其中的一些技术。
MessageListener 实现
如果您直接实现自己的监听器,可以简单地使用容器工厂为该监听器创建一个原始容器:
- Java
 - Kotlin
 
public class MyListener implements MessageListener<String, String> {
    @Override
    public void onMessage(ConsumerRecord<String, String> data) {
        // ...
    }
}
private ConcurrentMessageListenerContainer<String, String> createContainer(
        ConcurrentKafkaListenerContainerFactory<String, String> factory, String topic, String group) {
    ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
    container.getContainerProperties().setMessageListener(new MyListener());
    container.getContainerProperties().setGroupId(group);
    container.setBeanName(group);
    container.start();
    return container;
}
class MyListener : MessageListener<String?, String?> {
    override fun onMessage(data: ConsumerRecord<String?, String?>) {
        // ...
    }
}
private fun createContainer(
    factory: ConcurrentKafkaListenerContainerFactory<String, String>, topic: String, group: String
): ConcurrentMessageListenerContainer<String, String> {
    val container = factory.createContainer(topic)
    container.containerProperties.messageListener = MyListener()
    container.containerProperties.groupId = group
    container.beanName = group
    container.start()
    return container
}
Prototype Beans
使用 @KafkaListener 注解的方法的容器可以通过将 bean 声明为原型动态创建:
- Java
 - Kotlin
 
public class MyPojo {
    private final String id;
    private final String topic;
    public MyPojo(String id, String topic) {
        this.id = id;
        this.topic = topic;
    }
    public String getId() {
        return this.id;
    }
    public String getTopic() {
        return this.topic;
    }
    @KafkaListener(id = "#{__listener.id}", topics = "#{__listener.topic}")
    public void listen(String in) {
        System.out.println(in);
    }
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
MyPojo pojo(String id, String topic) {
    return new MyPojo(id, topic);
}
applicationContext.getBean(MyPojo.class, "one", "topic2");
applicationContext.getBean(MyPojo.class, "two", "topic3");
class MyPojo(val id: String, val topic: String) {
    @KafkaListener(id = "#{__listener.id}", topics = ["#{__listener.topic}"])
    fun listen(`in`: String?) {
        println(`in`)
    }
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
fun pojo(id: String, topic: String): MyPojo {
    return MyPojo(id, topic)
}
applicationContext.getBean(MyPojo::class.java, "one", "topic2")
applicationContext.getBean(MyPojo::class.java, "two", "topic3")
important
监听器必须具有唯一的 ID。从版本 2.8.9 开始,KafkaListenerEndpointRegistry 增加了一个新方法 unregisterListenerContainer(String id),允许您重新使用一个 ID。注销一个容器并不会 stop() 该容器,您必须自己执行此操作。