程序化构建
该功能旨在与 @KafkaListener 一起使用;然而,一些用户请求有关如何以编程方式配置非阻塞重试的信息。以下 Spring Boot 应用程序提供了如何做到这一点的示例。
@SpringBootApplication
public class Application extends RetryTopicConfigurationSupport {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
    @Bean
    RetryTopicConfiguration retryConfig(KafkaTemplate<String, String> template) {
        return RetryTopicConfigurationBuilder.newInstance()
                .maxAttempts(4)
                .autoCreateTopicsWith(2, (short) 1)
                .create(template);
    }
    @Bean
    TaskScheduler scheduler() {
        return new ThreadPoolTaskScheduler();
    }
    @Bean
    @Order(0)
    SmartInitializingSingleton dynamicRetry(RetryTopicConfigurer configurer, RetryTopicConfiguration config,
            KafkaListenerAnnotationBeanPostProcessor<?, ?> bpp, KafkaListenerContainerFactory<?> factory,
            Listener listener, KafkaListenerEndpointRegistry registry) {
        return () -> {
            KafkaListenerEndpointRegistrar registrar = bpp.getEndpointRegistrar();
            MethodKafkaListenerEndpoint<String, String> mainEndpoint = new MethodKafkaListenerEndpoint<>();
            EndpointProcessor endpointProcessor = endpoint -> {
                // customize as needed (e.g. apply attributes to retry endpoints).
                if (!endpoint.equals(mainEndpoint)) {
                    endpoint.setConcurrency(1);
                }
                // these are required
                endpoint.setMessageHandlerMethodFactory(bpp.getMessageHandlerMethodFactory());
                endpoint.setTopics("topic");
                endpoint.setId("id");
                endpoint.setGroupId("group");
            };
            mainEndpoint.setBean(listener);
            try {
                mainEndpoint.setMethod(Listener.class.getDeclaredMethod("onMessage", ConsumerRecord.class));
            }
            catch (NoSuchMethodException | SecurityException ex) {
                throw new IllegalStateException(ex);
            }
            mainEndpoint.setConcurrency(2);
            mainEndpoint.setTopics("topic");
            mainEndpoint.setId("id");
            mainEndpoint.setGroupId("group");
            configurer.processMainAndRetryListeners(endpointProcessor, mainEndpoint, config, registrar, factory,
                    "kafkaListenerContainerFactory");
        };
    }
    @Bean
    ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("topic", "test");
        };
    }
}
@Component
class Listener implements MessageListener<String, String> {
    @Override
    public void onMessage(ConsumerRecord<String, String> record) {
        System.out.println(KafkaUtils.format(record));
        throw new RuntimeException("test");
    }
}
important
主题的自动创建仅在配置在应用程序上下文刷新之前被处理时发生,如上面的示例所示。要在运行时配置容器,主题需要使用其他技术创建。