跳到主要内容

快速导览

ChatGPT-4o-mini 中英对照 Quick Tour

前提条件:您必须安装并运行 Apache Kafka。然后,您必须将 Spring for Apache Kafka (spring-kafka) JAR 及其所有依赖项放在您的类路径上。最简单的方法是在您的构建工具中声明一个依赖项。

如果您没有使用 Spring Boot,请在项目中将 spring-kafka jar 声明为依赖项。

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.3.3</version>
</dependency>
xml
important

当使用 Spring Boot 时,(如果你没有使用 start.spring.io 创建你的项目),可以省略版本,Boot 将自动引入与您的 Boot 版本兼容的正确版本:

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
xml

然而,最快的入门方式是使用 start.spring.io (或在 Spring Tool Suite 和 Intellij IDEA 中的向导)创建一个项目,并选择“Spring for Apache Kafka”作为依赖项。

兼容性

此快速导览适用于以下版本:

  • Apache Kafka Clients 3.7.x

  • Spring Framework 6.1.x

  • 最低 Java 版本:17

开始使用

最简单的入门方法是使用 start.spring.io (或 Spring Tool Suite 和 Intellij IDEA 中的向导)创建一个项目,选择“Spring for Apache Kafka”作为依赖项。有关其意见导向的基础设施 Bean 的自动配置的更多信息,请参阅 Spring Boot 文档

这是一个最小的消费者应用程序。

Spring Boot 消费者应用

@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}

@KafkaListener(id = "myId", topics = "topic1")
public void listen(String in) {
System.out.println(in);
}

}
java
spring.kafka.consumer.auto-offset-reset=earliest
properties

NewTopic bean 会在代理上创建主题;如果主题已经存在,则不需要它。

Spring Boot Producer App

@SpringBootApplication
public class Application {

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Bean
public NewTopic topic() {
return TopicBuilder.name("topic1")
.partitions(10)
.replicas(1)
.build();
}

@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
template.send("topic1", "test");
};
}

}
java

使用 Java 配置(不使用 Spring Boot)

important

Spring for Apache Kafka 旨在用于 Spring 应用程序上下文中。例如,如果您在 Spring 上下文之外自己创建监听器容器,则除非满足容器实现的所有 ...Aware 接口,否则并非所有功能都能正常工作。

这是一个不使用 Spring Boot 的应用程序示例;它同时具有 ConsumerProducer

public class Sender {

public static void main(String[] args) {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(Config.class);
context.getBean(Sender.class).send("test", 42);
}

private final KafkaTemplate<Integer, String> template;

public Sender(KafkaTemplate<Integer, String> template) {
this.template = template;
}

public void send(String toSend, int key) {
this.template.send("topic1", key, toSend);
}

}

public class Listener {

@KafkaListener(id = "listen1", topics = "topic1")
public void listen1(String in) {
System.out.println(in);
}

}

@Configuration
@EnableKafka
public class Config {

@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String>
kafkaListenerContainerFactory(ConsumerFactory<Integer, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}

@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerProps());
}

private Map<String, Object> consumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// ...
return props;
}

@Bean
public Sender sender(KafkaTemplate<Integer, String> template) {
return new Sender(template);
}

@Bean
public Listener listener() {
return new Listener();
}

@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(senderProps());
}

private Map<String, Object> senderProps() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
//...
return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}

}
java

如您所见,在不使用 Spring Boot 的情况下,您必须定义多个基础设施 bean。