跳到主要内容

启用监听器端点注解

DeepSeek V3 中英对照 Enable Listener Endpoint Annotations

要启用对 @RabbitListener 注解的支持,你可以在你的某个 @Configuration 类上添加 @EnableRabbit。以下示例展示了如何操作:

@Configuration
@EnableRabbit
public class AppConfig {

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(10);
factory.setContainerCustomizer(container -> /* customize the container */);
return factory;
}
}
java

从 2.0 版本开始,还提供了 DirectMessageListenerContainerFactory。它用于创建 DirectMessageListenerContainer 实例。

备注

有关帮助您在 SimpleRabbitListenerContainerFactoryDirectRabbitListenerContainerFactory 之间做出选择的信息,请参阅选择容器

从 2.2.2 版本开始,你可以提供一个 ContainerCustomizer 实现(如上所示)。这可以用于在容器创建和配置之后进一步配置容器;例如,你可以使用它来设置容器工厂未暴露的属性。

版本 2.4.8 提供了 CompositeContainerCustomizer,适用于你希望应用多个自定义器的情况。

默认情况下,基础设施会寻找名为 rabbitListenerContainerFactory 的 bean 作为工厂的来源,用于创建消息监听容器。在这种情况下,忽略 RabbitMQ 基础设施的设置,processOrder 方法可以在核心线程池大小为 3 个线程、最大线程池大小为 10 个线程的情况下被调用。

你可以为每个注解自定义要使用的监听器容器工厂,或者通过实现 RabbitListenerConfigurer 接口来配置一个显式的默认值。只有当至少有一个端点在没有指定特定容器工厂的情况下注册时,才需要默认值。有关完整详情和示例,请参阅 Javadoc

容器工厂提供了添加 MessagePostProcessor 实例的方法,这些实例在接收消息后(在调用监听器之前)和发送回复之前被应用。

有关回复的信息,请参见回复管理

从 2.0.6 版本开始,你可以在监听器容器工厂中添加一个 RetryTemplateRecoveryCallback。这在发送回复时使用。当重试次数耗尽时,RecoveryCallback 会被调用。你可以使用 SendRetryContextAccessor 从上下文中获取信息。以下示例展示了如何实现这一点:

factory.setRetryTemplate(retryTemplate);
factory.setReplyRecoveryCallback(ctx -> {
Message failed = SendRetryContextAccessor.getMessage(ctx);
Address replyTo = SendRetryContextAccessor.getAddress(ctx);
Throwable t = ctx.getLastThrowable();
...
return null;
});
java

如果你更喜欢 XML 配置,可以使用 <rabbit:annotation-driven> 元素。任何带有 @RabbitListener 注解的 Bean 都会被检测到。

对于 SimpleRabbitListenerContainer 实例,你可以使用类似以下的 XML 配置:

<rabbit:annotation-driven/>

<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="concurrentConsumers" value="3"/>
<property name="maxConcurrentConsumers" value="10"/>
</bean>
xml

对于 DirectMessageListenerContainer 实例,你可以使用类似于以下的 XML 配置:

<rabbit:annotation-driven/>

<bean id="rabbitListenerContainerFactory"
class="org.springframework.amqp.rabbit.config.DirectRabbitListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="consumersPerQueue" value="3"/>
</bean>
xml

从 2.0 版本开始,@RabbitListener 注解引入了一个 concurrency 属性。该属性支持 SpEL 表达式(#{…​})和属性占位符(${…​})。其含义和允许的值取决于容器类型,具体如下:

  • 对于 DirectMessageListenerContainer,该值必须是一个单一的整数值,用于设置容器上的 consumersPerQueue 属性。

  • 对于 SimpleRabbitListenerContainer,该值可以是一个单一的整数值,用于设置容器上的 concurrentConsumers 属性,或者可以是 m-n 的形式,其中 mconcurrentConsumers 属性,nmaxConcurrentConsumers 属性。

无论哪种情况,此设置都会覆盖工厂上的设置。以前,如果您的监听器需要不同的并发设置,您必须定义不同的容器工厂。

该注解还允许通过 autoStartupexecutor(自 2.2 版本起)注解属性来覆盖工厂的 autoStartuptaskExecutor 属性。为每个监听器使用不同的执行器可能有助于在日志和线程转储中识别与每个监听器相关的线程。

版本 2.2 还添加了 ackMode 属性,该属性允许你覆盖容器工厂的 acknowledgeMode 属性。

@RabbitListener(id = "manual.acks.1", queues = "manual.acks.1", ackMode = "MANUAL")
public void manual1(String in, Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {

...
channel.basicAck(tag, false);
}
java