跳到主要内容

将 Spring Beans 接入生产者/消费者拦截器

ChatGPT-4o-mini 中英对照 Wiring Spring Beans into Producer/Consumer Interceptors

Apache Kafka 提供了一种机制,可以将拦截器添加到生产者和消费者。这些对象由 Kafka 管理,而不是 Spring,因此正常的 Spring 依赖注入无法用于连接依赖的 Spring Bean。然而,您可以使用拦截器的 config() 方法手动连接这些依赖项。以下 Spring Boot 应用程序通过重写 Spring Boot 的默认工厂,展示了如何将一些依赖的 Bean 添加到配置属性中。

@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(SomeBean someBean) {
Map<String, Object> consumerProperties = new HashMap<>();
// consumerProperties.put(..., ...)
// ...
consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
consumerProperties.put("some.bean", someBean);
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}

@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(SomeBean someBean) {
Map<String, Object> producerProperties = new HashMap<>();
// producerProperties.put(..., ...)
// ...
producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
producerProperties.put("some.bean", someBean);
return new DefaultKafkaProducerFactory<>(producerProperties);
}

@Bean
public SomeBean someBean() {
return new SomeBean();
}

@KafkaListener(id = "kgk897", topics = "kgh897")
public void listen(String in) {
System.out.println("Received " + in);
}

@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.send("kgh897", "test");
}

@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kgh897")
.partitions(1)
.replicas(1)
.build();
}

}
java
public class SomeBean {

public void someMethod(String what) {
System.out.println(what + " in my foo bean");
}

}
java
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {

private SomeBean bean;

@Override
public void configure(Map<String, ?> configs) {
this.bean = (SomeBean) configs.get("some.bean");
}

@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
this.bean.someMethod("producer interceptor");
return record;
}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}

@Override
public void close() {
}

}
java
public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {

private SomeBean bean;

@Override
public void configure(Map<String, ?> configs) {
this.bean = (SomeBean) configs.get("some.bean");
}

@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
this.bean.someMethod("consumer interceptor");
return records;
}

@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}

@Override
public void close() {
}

}
java

结果:

producer interceptor in my foo bean
consumer interceptor in my foo bean
Received test
none