跳到主要内容

配置主题

ChatGPT-4o-mini 中英对照 Configuring Topics

如果您在应用程序上下文中定义一个 KafkaAdmin bean,它可以自动将主题添加到代理中。为此,您可以为每个主题向应用程序上下文添加一个 NewTopic @Bean。版本 2.3 引入了一个新类 TopicBuilder,以便更方便地创建此类 bean。以下示例演示了如何做到这一点:

@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic1() {
return TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build();
}

@Bean
public NewTopic topic2() {
return TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}

@Bean
public NewTopic topic3() {
return TopicBuilder.name("thing3")
.assignReplicas(0, List.of(0, 1))
.assignReplicas(1, List.of(1, 2))
.assignReplicas(2, List.of(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
java

从版本 2.6 开始,您可以省略 partitions() 和/或 replicas(),并将应用代理的默认值到这些属性。代理版本必须至少为 2.4.0 才能支持此功能 - 请参见 KIP-464

@Bean
public NewTopic topic4() {
return TopicBuilder.name("defaultBoth")
.build();
}

@Bean
public NewTopic topic5() {
return TopicBuilder.name("defaultPart")
.replicas(1)
.build();
}

@Bean
public NewTopic topic6() {
return TopicBuilder.name("defaultRepl")
.partitions(3)
.build();
}
java

从版本 2.7 开始,您可以在单个 KafkaAdmin.NewTopics bean 定义中声明多个 NewTopic

@Bean
public KafkaAdmin.NewTopics topics456() {
return new NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build());
}
java
important

在使用 Spring Boot 时,KafkaAdmin bean 会自动注册,因此您只需要 NewTopic(和/或 NewTopics@Bean

默认情况下,如果代理不可用,将记录一条消息,但上下文会继续加载。您可以通过编程方式调用管理员的 initialize() 方法以稍后重试。如果您希望将此条件视为致命错误,请将管理员的 fatalIfBrokerNotAvailable 属性设置为 true。上下文将无法初始化。

备注

如果代理支持它(1.0.0 或更高版本),管理员会增加分区数量,如果发现现有主题的分区数量少于 NewTopic.numPartitions

从版本 2.7 开始,KafkaAdmin 提供了在运行时创建和检查主题的方法。

  • createOrModifyTopics

  • describeTopics

对于更高级的功能,您可以直接使用 AdminClient。以下示例演示了如何做到这一点:

@Autowired
private KafkaAdmin admin;

...

AdminClient client = AdminClient.create(admin.getConfigurationProperties());
...
client.close();
java

从版本 2.9.10 和 3.0.9 开始,您可以提供一个 Predicate<NewTopic>,该谓词可以用于确定是否应考虑创建或修改特定的 NewTopic bean。这在您有多个指向不同集群的 KafkaAdmin 实例,并希望选择每个管理员应创建或修改的主题时非常有用。

admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreateThisOne"));
java