跳到主要内容

异步消费者

DeepSeek V3 中英对照 Asynchronous Consumer

important

Spring AMQP 还通过使用 @RabbitListener 注解支持注解驱动的监听器端点,并提供了一个开放的架构以编程方式注册端点。这是迄今为止设置异步消费者最方便的方式。更多详情请参阅注解驱动的监听器端点

important

prefetch 的默认值过去是 1,这可能导致高效消费者的利用率不足。从 2.0 版本开始,默认的 prefetch 值现在为 250,这应该在大多数常见场景中保持消费者的忙碌状态,从而提高吞吐量。

然而,在某些场景下,prefetch 值应该设置得较低:

  • 对于大消息,特别是当处理速度较慢时(消息可能会在客户端进程中累积大量内存)

  • 当需要严格的消息顺序时(在这种情况下,prefetch 值应该重新设置为 1)

  • 其他特殊情况

此外,在消息量较低且存在多个消费者(包括单个监听器容器实例中的并发)的情况下,您可能希望减少 prefetch 值,以便在消费者之间更均匀地分配消息。

请参阅 消息监听器容器配置

有关 prefetch 的更多背景信息,请参阅这篇关于 RabbitMQ 中的消费者利用率 的文章和这篇关于 队列理论 的文章。

消息监听器

对于异步的 Message 接收,涉及到一个专门的组件(不是 AmqpTemplate)。该组件是一个用于 Message 消费回调的容器。我们稍后在本节中讨论容器及其属性。不过,首先我们应该看一下回调,因为这是你的应用程序代码与消息系统集成的地方。回调有几个选项,首先是实现 MessageListener 接口,如下面的代码清单所示:

public interface MessageListener {
void onMessage(Message message);
}
java

如果你的回调逻辑因任何原因依赖于 AMQP Channel 实例,你可以使用 ChannelAwareMessageListener。它看起来类似,但有一个额外的参数。以下代码展示了 ChannelAwareMessageListener 接口的定义:

public interface ChannelAwareMessageListener {
void onMessage(Message message, Channel channel) throws Exception;
}
java
important

在 2.1 版本中,此接口从包 o.s.amqp.rabbit.core 迁移到了 o.s.amqp.rabbit.listener.api

MessageListenerAdapter

如果您更希望在应用程序逻辑与消息传递 API 之间保持更严格的分离,您可以依赖框架提供的适配器实现。这通常被称为“消息驱动的 POJO”支持。

备注

版本 1.5 引入了一种更灵活的 POJO 消息传递机制,即 @RabbitListener 注解。更多信息请参见注解驱动的监听器端点

在使用适配器时,你只需要提供一个引用,指向适配器本身应该调用的实例。以下示例展示了如何做到这一点:

MessageListenerAdapter listener = new MessageListenerAdapter(somePojo);
listener.setDefaultListenerMethod("myMethod");
java

你可以继承适配器并实现 getListenerMethodName() 方法,以便根据消息动态选择不同的方法。该方法有两个参数,originalMessageextractedMessage,后者是任何转换的结果。默认情况下,配置了一个 SimpleMessageConverter。有关更多信息以及可用的其他转换器的信息,请参阅 SimpleMessageConverter

从 1.4.2 版本开始,原始消息具有 consumerQueueconsumerTag 属性,这些属性可以用来确定消息是从哪个队列接收的。

从 1.5 版本开始,你可以配置一个消费者队列或标签到方法名称的映射,以动态选择要调用的方法。如果映射中没有条目,我们将回退到默认的监听器方法。默认的监听器方法(如果未设置)是 handleMessage

从 2.0 版本开始,提供了一个便捷的 FunctionalInterface。以下列表展示了 FunctionalInterface 的定义:

@FunctionalInterface
public interface ReplyingMessageListener<T, R> {

R handleMessage(T t);

}
java

该接口通过使用 Java 8 的 lambda 表达式,方便地对适配器进行配置,如下例所示:

new MessageListenerAdapter((ReplyingMessageListener<String, String>) data -> {
...
return result;
}));
java

从 2.2 版本开始,buildListenerArguments(Object) 方法已被弃用,取而代之的是新的 buildListenerArguments(Object, Channel, Message) 方法。新方法帮助监听器获取 ChannelMessage 参数,以便执行更多操作,例如在手动确认模式下调用 channel.basicReject(long, boolean)。以下清单展示了一个最基本的示例:

public class ExtendedListenerAdapter extends MessageListenerAdapter {

@Override
protected Object[] buildListenerArguments(Object extractedMessage, Channel channel, Message message) {
return new Object[]{extractedMessage, channel, message};
}

}
java

现在,如果你需要接收“channel”和“message”,你可以像配置 MessageListenerAdapter 一样配置 ExtendedListenerAdapter。监听器的参数应设置为 buildListenerArguments(Object, Channel, Message) 返回的值,如下面的监听器示例所示:

public void handleMessage(Object object, Channel channel, Message message) throws IOException {
...
}
java

容器

现在你已经了解了 Message 监听回调的各种选项,我们可以将注意力转向容器。基本上,容器处理“主动”职责,以便监听回调可以保持被动。容器是“生命周期”组件的一个示例。它提供了启动和停止的方法。在配置容器时,你实际上是在 AMQP Queue 和 MessageListener 实例之间架起了一座桥梁。你必须提供对 ConnectionFactory 的引用以及监听器应该消费消息的队列名称或 Queue 实例。

在 2.0 版本之前,只有一个监听器容器,即 SimpleMessageListenerContainer。现在有了第二个容器,即 DirectMessageListenerContainer。这两个容器之间的区别以及你在选择使用哪个容器时可能应用的标准在 选择容器 中有详细描述。

以下列表展示了最基本的示例,通过使用 SimpleMessageListenerContainer 来实现:

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory);
container.setQueueNames("some.queue");
container.setMessageListener(new MessageListenerAdapter(somePojo));
java

作为一个“活动”组件,最常见的方式是使用 bean 定义来创建监听器容器,以便它可以在后台运行。以下示例展示了如何使用 XML 来实现这一点:

<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
xml

以下列表展示了使用 XML 的另一种方式:

<rabbit:listener-container connection-factory="rabbitConnectionFactory" type="direct">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle"/>
</rabbit:listener-container>
xml

前面的两个示例都创建了一个 DirectMessageListenerContainer(注意 type 属性,它默认为 simple)。

或者,您可能更喜欢使用 Java 配置,它看起来与前面的代码片段类似:

@Configuration
public class ExampleAmqpConfiguration {

@Bean
public SimpleMessageListenerContainer messageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(rabbitConnectionFactory());
container.setQueueName("some.queue");
container.setMessageListener(exampleListener());
return container;
}

@Bean
public CachingConnectionFactory rabbitConnectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}

@Bean
public MessageListener exampleListener() {
return new MessageListener() {
public void onMessage(Message message) {
System.out.println("received: " + message);
}
};
}
}
java

消费者优先级

从 RabbitMQ 3.2 版本开始,代理服务器现在支持消费者优先级(参见 使用 RabbitMQ 的消费者优先级)。这是通过在消费者上设置 x-priority 参数来启用的。SimpleMessageListenerContainer 现在支持设置消费者参数,如下例所示:

container.setConsumerArguments(Collections.
<String, Object> singletonMap("x-priority", Integer.valueOf(10)));
java

为了方便起见,命名空间在 listener 元素上提供了 priority 属性,如下例所示:

<rabbit:listener-container connection-factory="rabbitConnectionFactory">
<rabbit:listener queues="some.queue" ref="somePojo" method="handle" priority="10" />
</rabbit:listener-container>
xml

从 1.3 版本开始,你可以在运行时修改容器监听的队列。请参阅 监听器容器队列

auto-delete 队列

当容器配置为监听 auto-delete 队列时,队列具有 x-expires 选项,或者在 Broker 上配置了 Time-To-Live 策略时,当容器停止(即最后一个消费者被取消)时,Broker 会移除该队列。在 1.3 版本之前,由于队列丢失,容器无法重新启动。RabbitAdmin 仅在连接关闭或打开时自动重新声明队列等操作,而在容器停止和启动时不会发生这种情况。

从 1.3 版本开始,容器使用 RabbitAdmin 在启动期间重新声明任何缺失的队列。

你也可以将条件声明(参见条件声明)与一个 auto-startup="false" 的管理员一起使用,以推迟队列声明直到容器启动。以下示例展示了如何实现这一点:

<rabbit:queue id="otherAnon" declared-by="containerAdmin" />

<rabbit:direct-exchange name="otherExchange" auto-delete="true" declared-by="containerAdmin">
<rabbit:bindings>
<rabbit:binding queue="otherAnon" key="otherAnon" />
</rabbit:bindings>
</rabbit:direct-exchange>

<rabbit:listener-container id="container2" auto-startup="false">
<rabbit:listener id="listener2" ref="foo" queues="otherAnon" admin="containerAdmin" />
</rabbit:listener-container>

<rabbit:admin id="containerAdmin" connection-factory="rabbitConnectionFactory"
auto-startup="false" />
xml

在这种情况下,队列和交换器由 containerAdmin 声明,该容器具有 auto-startup="false",因此在上下文初始化期间不会声明这些元素。同时,出于同样的原因,容器也不会启动。当容器稍后启动时,它会使用其对 containerAdmin 的引用来声明这些元素。