跳到主要内容

Producer Interceptor 在 Spring 中的管理

ChatGPT-4o-mini 中英对照 Producer Interceptor Managed in Spring

从版本 3.0.0 开始,对于生产者拦截器,您可以让 Spring 直接将其作为一个 bean 管理,而不是将拦截器的类名提供给 Apache Kafka 生产者配置。如果您选择这种方法,则需要在 KafkaTemplate 上设置此生产者拦截器。以下是一个示例,使用上面的 MyProducerInterceptor,但不使用内部配置属性。

public class MyProducerInterceptor implements ProducerInterceptor<String, String> {

private final SomeBean bean;

public MyProducerInterceptor(SomeBean bean) {
this.bean = bean;
}

@Override
public void configure(Map<String, ?> configs) {
}

@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
this.bean.someMethod("producer interceptor");
return record;
}

@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}

@Override
public void close() {
}

}
java
@Bean
public MyProducerInterceptor myProducerInterceptor(SomeBean someBean) {
return new MyProducerInterceptor(someBean);
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> pf, MyProducerInterceptor myProducerInterceptor) {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(pf);
kafkaTemplate.setProducerInterceptor(myProducerInterceptor);
}
none

在记录发送之前,生产者拦截器的 onSend 方法被调用。一旦服务器在发布数据时发送确认,onAcknowledgement 方法就会被调用。onAcknowledgement 在生产者调用任何用户回调之前被调用。

如果您有多个通过 Spring 管理的此类生产者拦截器需要应用于 KafkaTemplate,则需要改用 CompositeProducerInterceptorCompositeProducerInterceptor 允许按顺序添加各个生产者拦截器。底层 ProducerInterceptor 实现的方法将按照它们被添加到 CompositeProducerInterceptor 的顺序被调用。