跳到主要内容

主题命名

ChatGPT-4o-mini 中英对照 Topic Naming

重试主题和 DLT 通过在主要主题后添加提供的或默认值,并附加该主题的延迟或索引来命名。

示例:

"my-topic" → "my-topic-retry-0", "my-topic-retry-1", …​, "my-topic-dlt"

"my-other-topic" → "my-topic-myRetrySuffix-1000", "my-topic-myRetrySuffix-2000", …​, "my-topic-myDltSuffix"

备注

默认行为是为每次尝试创建单独的重试主题,并附加一个索引值:retry-0, retry-1, …​, retry-n。因此,默认情况下,重试主题的数量是配置的 maxAttempts 减去 1。

您可以 配置后缀,选择是否附加 尝试索引或延迟,在使用固定退避时使用 单个重试主题,以及在使用指数退避时对具有 maxInterval 的尝试使用 单个重试主题

重试主题和 DLT 后缀

您可以指定将用于重试和 DLT 主题的后缀。

@RetryableTopic(retryTopicSuffix = "-my-retry-suffix", dltTopicSuffix = "-my-dlt-suffix")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
java
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.retryTopicSuffix("-my-retry-suffix")
.dltTopicSuffix("-my-dlt-suffix")
.create(template);
}
java
备注

默认的后缀是 "-retry" 和 "-dlt",分别用于重试主题和 dlt。

Appending the Topic’s Index or Delay

您可以在后缀后附加主题的索引或延迟值。

@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
java
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.suffixTopicsWithIndexValues()
.create(template);
}
java
备注

默认行为是以延迟值作为后缀,除了具有多个主题的固定延迟配置,在这种情况下,主题会以主题的索引作为后缀。

单一主题的固定延迟重试

如果您使用的是固定延迟策略,例如 FixedBackOffPolicyNoBackOffPolicy,您可以使用单个主题来实现非阻塞重试。该主题将以提供的或默认的后缀作为后缀,并且不会附加索引或延迟值。

备注

之前的 FixedDelayStrategy 现在已被弃用,可以用 SameIntervalTopicReuseStrategy 替代。

@RetryableTopic(backoff = @Backoff(2_000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
java
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3_000)
.maxAttempts(5)
.useSingleTopicForFixedDelays()
.create(template);
}
java
备注

默认行为是为每次尝试创建单独的重试主题,后面附加它们的索引值:retry-0, retry-1, …​

Single Topic for maxInterval Exponential Delay

如果您正在使用指数退避策略(ExponentialBackOffPolicy),您可以使用单个重试主题来实现非阻塞重试,这些重试的延迟是配置的 maxInterval

这个“最终”重试主题将会附加提供的或默认的后缀,并且将附加索引或 maxInterval 值。

备注

通过选择使用单一主题进行重试,并设置 maxInterval 延迟,配置一个长时间重试的指数重试策略可能变得更可行,因为在这种方法中,您不需要大量的主题。

从 3.2 开始,默认行为是对相同的时间间隔重用重试主题,当使用指数退避时,重试主题会以延迟值作为后缀,最后一个重试主题会对相同的时间间隔进行重用(对应于 maxInterval 延迟)。

例如,当配置指数退避时,使用 initialInterval=1_000multiplier=2maxInterval=16_000,为了保持尝试一个小时,需要将 maxAttempts 配置为 229,默认所需的重试主题将是:

  • -retry-1000

  • -retry-2000

  • -retry-4000

  • -retry-8000

  • -retry-16000

当使用与配置的 maxAttempts 减 1 相等的重试主题数量的策略时,最后一个重试主题(对应于 maxInterval 延迟)将被附加一个额外的索引:

  • -retry-1000

  • -retry-2000

  • -retry-4000

  • -retry-8000

  • -retry-16000-0

  • -retry-16000-1

  • -retry-16000-2

  • …​

  • -retry-16000-224

如果需要多个主题,可以使用以下配置。

@RetryableTopic(attempts = 230,
backoff = @Backoff(delay = 1_000, multiplier = 2, maxDelay = 16_000),
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.MULTIPLE_TOPICS)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
java
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1_000, 2, 16_000)
.maxAttempts(230)
.useSingleTopicForSameIntervals()
.create(template);
}
java

自定义命名策略

更复杂的命名策略可以通过注册一个实现了 RetryTopicNamesProviderFactory 的 bean 来实现。默认实现是 SuffixingRetryTopicNamesProviderFactory,可以通过以下方式注册不同的实现:

@Override
protected RetryTopicComponentFactory createComponentFactory() {
return new RetryTopicComponentFactory() {
@Override
public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
return new CustomRetryTopicNamesProviderFactory();
}
};
}
java

作为一个例子,以下实现除了标准后缀外,还为 retry/dlt 主题名称添加了前缀:

public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {

@Override
public RetryTopicNamesProvider createRetryTopicNamesProvider(
DestinationTopic.Properties properties) {

if (properties.isMainEndpoint()) {
return new SuffixingRetryTopicNamesProvider(properties);
}
else {
return new SuffixingRetryTopicNamesProvider(properties) {

@Override
public String getTopicName(String topic) {
return "my-prefix-" + super.getTopicName(topic);
}

};
}
}

}
java