跳到主要内容

动态创建容器

ChatGPT-4o-mini 中英对照 Dynamically Creating Containers

有几种技术可以在运行时创建监听器容器。本节将探讨其中的一些技术。

MessageListener 实现

如果您直接实现自己的监听器,可以简单地使用容器工厂为该监听器创建一个原始容器:

public class MyListener implements MessageListener<String, String> {

@Override
public void onMessage(ConsumerRecord<String, String> data) {
// ...
}

}

private ConcurrentMessageListenerContainer<String, String> createContainer(
ConcurrentKafkaListenerContainerFactory<String, String> factory, String topic, String group) {

ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
container.getContainerProperties().setMessageListener(new MyListener());
container.getContainerProperties().setGroupId(group);
container.setBeanName(group);
container.start();
return container;
}
java

Prototype Beans

使用 @KafkaListener 注解的方法的容器可以通过将 bean 声明为原型动态创建:

public class MyPojo {

private final String id;

private final String topic;

public MyPojo(String id, String topic) {
this.id = id;
this.topic = topic;
}

public String getId() {
return this.id;
}

public String getTopic() {
return this.topic;
}

@KafkaListener(id = "#{__listener.id}", topics = "#{__listener.topic}")
public void listen(String in) {
System.out.println(in);
}

}

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
MyPojo pojo(String id, String topic) {
return new MyPojo(id, topic);
}

applicationContext.getBean(MyPojo.class, "one", "topic2");
applicationContext.getBean(MyPojo.class, "two", "topic3");
java
important

监听器必须具有唯一的 ID。从版本 2.8.9 开始,KafkaListenerEndpointRegistry 增加了一个新方法 unregisterListenerContainer(String id),允许您重新使用一个 ID。注销一个容器并不会 stop() 该容器,您必须自己执行此操作。