跳到主要内容

轮询消费者

DeepSeek V3 中英对照 Polling Consumer

AmqpTemplate 本身可以用于轮询 Message 接收。默认情况下,如果没有可用的消息,会立即返回 null,不会阻塞。从 1.5 版本开始,你可以设置一个 receiveTimeout,单位为毫秒,接收方法会阻塞直到该时长,等待消息的到来。小于零的值意味着无限期阻塞(或至少直到与代理的连接丢失)。1.6 版本引入了 receive 方法的变体,允许在每次调用时传递超时参数。

警告

由于接收操作为每条消息创建一个新的 QueueingConsumer,因此这种技术并不真正适合高吞吐量的环境。对于这些用例,请考虑使用异步消费者或将 receiveTimeout 设置为零。

从 2.4.8 版本开始,当使用非零超时时间时,你可以指定传递给 basicConsume 方法的参数,这些参数用于将消费者与通道关联起来。例如:template.addConsumerArg("x-priority", 10)

有四种简单的 receive 方法可用。与发送端的 Exchange 类似,有一种方法要求已在模板本身上设置了默认的队列属性,还有一种方法在运行时接受一个队列参数。1.6 版本引入了接受 timeoutMillis 的变体,以在每个请求的基础上覆盖 receiveTimeout。以下列表展示了这四种方法的定义:

Message receive() throws AmqpException;

Message receive(String queueName) throws AmqpException;

Message receive(long timeoutMillis) throws AmqpException;

Message receive(String queueName, long timeoutMillis) throws AmqpException;
java

与发送消息的情况类似,AmqpTemplate 提供了一些便捷方法用于接收 POJO 而不是 Message 实例,并且实现类提供了一种方式来定制用于创建返回 ObjectMessageConverter。以下清单展示了这些方法:

Object receiveAndConvert() throws AmqpException;

Object receiveAndConvert(String queueName) throws AmqpException;

Object receiveAndConvert(long timeoutMillis) throws AmqpException;

Object receiveAndConvert(String queueName, long timeoutMillis) throws AmqpException;
java

从 2.0 版本开始,这些方法有了变体,它们接受一个额外的 ParameterizedTypeReference 参数来转换复杂类型。模板必须配置一个 SmartMessageConverter。有关更多信息,请参阅使用 RabbitTemplate 从消息转换

sendAndReceive 方法类似,从 1.3 版本开始,AmqpTemplate 提供了几个方便的 receiveAndReply 方法,用于同步接收、处理和回复消息。以下清单展示了这些方法的定义:

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback)
throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
String replyExchange, String replyRoutingKey) throws AmqpException;

<R, S> boolean receiveAndReply(ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;

<R, S> boolean receiveAndReply(String queueName, ReceiveAndReplyCallback<R, S> callback,
ReplyToAddressCallback<S> replyToAddressCallback) throws AmqpException;
java

AmqpTemplate 的实现负责处理 receivereply 阶段。在大多数情况下,你只需要提供一个 ReceiveAndReplyCallback 的实现来为接收到的消息执行一些业务逻辑,并在需要时构建一个回复对象或消息。注意,ReceiveAndReplyCallback 可以返回 null。在这种情况下,不会发送任何回复,receiveAndReply 的工作方式类似于 receive 方法。这使得同一个队列可以用于混合消息,其中一些消息可能不需要回复。

仅在提供的回调不是 ReceiveAndReplyMessageCallback 实例时,才会应用自动消息(请求和回复)转换,因为 ReceiveAndReplyMessageCallback 提供了一个原始的消息交换契约。

ReplyToAddressCallback 在需要自定义逻辑以根据接收到的消息和 ReceiveAndReplyCallback 的回复在运行时确定 replyTo 地址的情况下非常有用。默认情况下,请求消息中的 replyTo 信息用于路由回复。

以下清单展示了一个基于 POJO 的接收和回复示例:

boolean received =
this.template.receiveAndReply(ROUTE, new ReceiveAndReplyCallback<Order, Invoice>() {

public Invoice handle(Order order) {
return processOrder(order);
}
});
if (received) {
log.info("We received an order!");
}
java