跳到主要内容

请求/回复消息传递

DeepSeek V3 中英对照 Request/Reply Messaging

AmqpTemplate 还提供了多种 sendAndReceive 方法,这些方法接受与之前描述的单向发送操作相同的参数选项(exchangeroutingKeyMessage)。这些方法在请求-回复场景中非常有用,因为它们在发送之前处理了必要的 reply-to 属性的配置,并且可以在为特定目的内部创建的独占队列上监听回复消息。

类似的请求-回复方法也可用,其中 MessageConverter 会同时应用于请求和回复。这些方法命名为 convertSendAndReceive。更多详细信息,请参阅 AmqpTemplate 的 Javadoc

从 1.5.0 版本开始,每个 sendAndReceive 方法变体都有一个重载版本,该版本接受 CorrelationData。结合正确配置的连接工厂,这使得可以接收操作发送端的发布者确认。有关更多信息,请参阅 相关的发布者确认和返回 以及 RabbitOperations 的 Javadoc

从 2.0 版本开始,这些方法(convertSendAndReceiveAsType)提供了变体,接受一个额外的 ParameterizedTypeReference 参数,用于转换复杂的返回类型。模板必须配置一个 SmartMessageConverter。更多信息请参见 使用 RabbitTemplate 从消息转换

从 2.1 版本开始,你可以通过配置 RabbitTemplatenoLocalReplyConsumer 选项来控制回复消费者的 noLocal 标志。默认情况下,该标志为 false

回复超时

默认情况下,sendreceive 方法在五秒后超时并返回 null。你可以通过设置 replyTimeout 属性来修改此行为。从 1.5 版本开始,如果你将 mandatory 属性设置为 true(或者 mandatory-expression 对某个特定消息的求值结果为 true),如果消息无法传递到队列,则会抛出 AmqpMessageReturnedException 异常。该异常包含 returnedMessagereplyCodereplyText 属性,以及用于发送的 exchangeroutingKey

备注

此功能使用发布者返回。你可以通过在 CachingConnectionFactory 上设置 publisherReturnstrue 来启用它(参见 发布者确认和返回)。此外,你不能在 RabbitTemplate 中注册你自己的 ReturnCallback

从 2.1.2 版本开始,新增了一个 replyTimedOut 方法,允许子类在超时时收到通知,以便它们可以清理任何保留的状态。

从 2.0.11 和 2.1.3 版本开始,当你使用默认的 DirectReplyToMessageListenerContainer 时,可以通过设置模板的 replyErrorHandler 属性来添加一个错误处理器。该错误处理器会在任何投递失败时被调用,例如延迟的回复和没有相关头信息的消息。传入的异常是一个 ListenerExecutionFailedException,它有一个 failedMessage 属性。

RabbitMQ 直接回复

important

从 3.4.0 版本开始,RabbitMQ 服务器支持 direct reply-to。这消除了使用固定回复队列的主要原因(以避免为每个请求创建临时队列的需求)。从 Spring AMQP 1.4.1 版本开始,默认情况下(如果服务器支持)会使用 direct reply-to,而不是创建临时回复队列。当没有提供 replyQueue(或者它被设置为 amq.rabbitmq.reply-to 名称)时,RabbitTemplate 会自动检测是否支持 direct reply-to,并选择使用它或回退到使用临时回复队列。当使用 direct reply-to 时,不需要配置 reply-listener,也不应该配置它。

回复监听器仍然支持命名队列(除了 amq.rabbitmq.reply-to),允许控制回复并发性等。

从 1.6 版本开始,如果你希望为每个回复使用临时的、独占的、自动删除的队列,请将 useTemporaryReplyQueues 属性设置为 true。如果你设置了 replyAddress,则此属性将被忽略。

你可以通过子类化 RabbitTemplate 并重写 useDirectReplyTo() 方法来改变决定是否使用 direct reply-to 的标准。该方法仅在发送第一个请求时调用一次。

在 2.0 版本之前,RabbitTemplate 为每个请求创建一个新的消费者,并在收到回复(或超时)时取消该消费者。现在,模板改用 DirectReplyToMessageListenerContainer,使得消费者可以被重用。模板仍然负责关联回复,因此不存在延迟回复发送给不同发送者的风险。如果你想恢复到之前的行为,可以将 useDirectReplyToContainer 属性(在使用 XML 配置时为 direct-reply-to-container)设置为 false。

AsyncRabbitTemplate 没有这样的选项。当使用 direct reply-to 时,它总是使用 DirectReplyToContainer 来处理回复。

从版本 2.3.7 开始,模板新增了一个属性 useChannelForCorrelation。当该属性为 true 时,服务器无需将关联 ID 从请求消息头复制到回复消息中。相反,用于发送请求的通道将用于将回复与请求关联起来。

带回复队列的消息关联

在使用固定的回复队列(除了 amq.rabbitmq.reply-to 之外)时,必须提供关联数据,以便将回复与请求关联起来。请参阅 RabbitMQ 远程过程调用 (RPC)。默认情况下,标准的 correlationId 属性用于保存关联数据。但是,如果您希望使用自定义属性来保存关联数据,可以在 <rabbit-template/> 上设置 correlation-key 属性。将该属性显式设置为 correlationId 与省略该属性效果相同。客户端和服务器必须使用相同的头部来保存关联数据。

备注

Spring AMQP 版本 1.1 使用了一个名为 spring_reply_correlation 的自定义属性来处理这些数据。如果你想在当前版本中恢复这种行为(可能是为了保持与使用 1.1 版本的其他应用程序的兼容性),你必须将该属性设置为 spring_reply_correlation

默认情况下,模板会生成自己的关联 ID(忽略任何用户提供的值)。如果您希望使用自己的关联 ID,请将 RabbitTemplate 实例的 userCorrelationId 属性设置为 true

:::重要
为了确保请求的回复不会被错误地返回,关联 ID 必须是唯一的。
:::

回复监听器容器

在使用 RabbitMQ 3.4.0 之前的版本时,每个回复都会使用一个新的临时队列。然而,可以在模板上配置一个单一的回复队列,这样效率更高,并且允许你为该队列设置参数。然而,在这种情况下,你还必须提供一个 <reply-listener/> 子元素。该元素为回复队列提供了一个监听器容器,模板作为监听器。所有在 <listener-container/> 上允许的 消息监听器容器配置 属性都可以在该元素上使用,除了 connection-factorymessage-converter,这两者是从模板的配置中继承的。

important

如果你运行多个应用程序实例或使用多个 RabbitTemplate 实例,必须为每个实例使用唯一的回复队列。RabbitMQ 无法从队列中选择消息,因此,如果它们都使用相同的队列,每个实例将竞争回复,并不一定能接收到自己的消息。

以下示例定义了一个带有连接工厂的 rabbit 模板:

<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory"
reply-queue="replies"
reply-address="replyEx/routeReply">
<rabbit:reply-listener/>
</rabbit:template>
xml

虽然容器和模板共享一个连接工厂,但它们并不共享一个通道。因此,请求和回复不会在同一个事务中执行(如果是事务性的)。

备注

在 1.5.0 版本之前,reply-address 属性不可用。回复总是通过使用默认交换机和 reply-queue 名称作为路由键进行路由。这仍然是默认行为,但现在你可以指定新的 reply-address 属性。reply-address 可以包含一个格式为 <exchange>/<routingKey> 的地址,回复将被路由到指定的交换机,并使用路由键路由到绑定的队列。reply-address 优先于 reply-queue。当仅使用 reply-address 时,<reply-listener> 必须配置为一个单独的 <listener-container> 组件。reply-addressreply-queue(或 <listener-container> 上的 queues 属性)必须在逻辑上引用相同的队列。

在这种配置下,使用 SimpleListenerContainer 来接收回复,RabbitTemplate 作为 MessageListener。当使用 <rabbit:template/> 命名空间元素定义一个模板时,如前面的示例所示,解析器会定义容器并将模板作为监听器连接起来。

备注

当模板不使用固定的 replyQueue(或使用直接回复到 — 参见 RabbitMQ 直接回复到),则不需要监听容器。在使用 RabbitMQ 3.4.0 或更高版本时,直接 reply-to 是首选机制。

如果你将 RabbitTemplate 定义为一个 <bean/> 或者使用 @Configuration 类将其定义为 @Bean,或者在编程方式创建模板时,你需要自行定义并连接回复监听器容器。如果未能做到这一点,模板将永远不会接收到回复,最终会导致超时并在调用 sendAndReceive 方法时返回 null 作为回复。

从版本 1.5 开始,RabbitTemplate 会检测它是否被配置为 MessageListener 以接收回复。如果没有配置,尝试发送和接收带有回复地址的消息将失败,并抛出 IllegalStateException 异常(因为回复永远不会被接收)。

此外,如果使用的是简单的 replyAddress(队列名称),回复监听容器会验证它是否正在监听具有相同名称的队列。如果回复地址是一个交换机和路由键,则无法执行此检查,并且会写入一条调试日志消息。

important

当您自己配置回复监听器和模板时,确保模板的 replyAddress 和容器的 queues(或 queueNames)属性引用同一个队列非常重要。模板会将回复地址插入到出站消息的 replyTo 属性中。

下面的清单展示了如何手动连接 bean 的示例:

<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory" />
<property name="exchange" value="foo.exchange" />
<property name="routingKey" value="foo" />
<property name="replyQueue" ref="replyQ" />
<property name="replyTimeout" value="600000" />
<property name="useDirectReplyToContainer" value="false" />
</bean>

<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg ref="connectionFactory" />
<property name="queues" ref="replyQ" />
<property name="messageListener" ref="amqpTemplate" />
</bean>

<rabbit:queue id="replyQ" name="my.reply.queue" />
xml
@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyAddress(replyQueue().getName());
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}

@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
}

@Bean
public Queue replyQueue() {
return new Queue("my.reply.queue");
}
java

一个完整的示例展示了如何使用固定的回复队列配置 RabbitTemplate,以及一个处理请求并返回回复的“远程”监听器容器,该示例可以在这个测试用例中找到。

important

当回复超时(replyTimeout)时,sendAndReceive() 方法会返回 null。

在 1.3.6 版本之前,超时消息的延迟回复只会被记录下来。现在,如果收到延迟回复,它将被拒绝(模板会抛出 AmqpRejectAndDontRequeueException)。如果回复队列配置为将拒绝的消息发送到死信交换器(dead letter exchange),则可以检索该回复以供后续分析。为此,将一个队列绑定到配置的死信交换器,并使用与回复队列名称相同的路由键(routing key)。

有关配置死信(Dead Letter)的更多信息,请参阅 RabbitMQ 死信文档。你也可以查看 FixedReplyQueueDeadLetterTests 测试用例作为示例。

异步 Rabbit 模板

版本 1.6 引入了 AsyncRabbitTemplate。它具有与 AmqpTemplate 上类似的 sendAndReceive(以及 convertSendAndReceive)方法。然而,它们不会阻塞,而是返回一个 CompletableFuture

sendAndReceive 方法返回一个 RabbitMessageFutureconvertSendAndReceive 方法返回一个 RabbitConverterFuture

你可以稍后通过调用 get() 方法同步获取结果,也可以注册一个回调函数,该回调函数会在结果可用时被异步调用。以下代码展示了这两种方法:

@Autowired
private AsyncRabbitTemplate template;

...

public void doSomeWorkAndGetResultLater() {

...

CompletableFuture<String> future = this.template.convertSendAndReceive("foo");

// do some more work

String reply = null;
try {
reply = future.get(10, TimeUnit.SECONDS);
}
catch (ExecutionException e) {
...
}

...

}

public void doSomeWorkAndGetResultAsync() {

...

RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
future.whenComplete((result, ex) -> {
if (ex == null) {
// success
}
else {
// failure
}
});

...

}
java

如果设置了 mandatory 并且消息无法被投递,future 会抛出一个 ExecutionException,其原因是 AmqpMessageReturnedException,该异常封装了返回的消息以及有关返回的信息。

如果设置了 enableConfirmsfuture 将具有一个名为 confirm 的属性,它本身是一个 CompletableFuture<Boolean>,其中 true 表示发布成功。如果 confirmfuturefalseRabbitFuture 将具有另一个名为 nackCause 的属性,该属性包含失败的原因(如果有的话)。

important

如果在收到回复之后才收到发布者确认,则该确认将被丢弃,因为回复意味着发布成功。

你可以在模板上设置 receiveTimeout 属性来超时回复(默认值为 30000 - 30 秒)。如果发生超时,future 将以 AmqpReplyTimeoutException 完成。

该模板实现了 SmartLifecycle。当存在未完成的回复时停止模板,会导致未完成的 Future 实例被取消。

从 2.0 版本开始,异步模板现在支持 direct reply-to 而不是配置的回复队列。要启用此功能,请使用以下构造函数之一:

public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)

public AsyncRabbitTemplate(RabbitTemplate template)
java

请参阅 RabbitMQ 直接回复 以了解如何与同步的 RabbitTemplate 一起使用直接回复功能。

2.0 版本引入了这些方法的变体(convertSendAndReceiveAsType),这些方法接受一个额外的 ParameterizedTypeReference 参数,以转换复杂的返回类型。你必须为底层的 RabbitTemplate 配置一个 SmartMessageConverter。有关更多信息,请参阅 使用 RabbitTemplate 从消息转换

使用 AMQP 进行 Spring 远程调用

Spring remoting 不再受支持,因为该功能已从 Spring Framework 中移除。

使用 RabbitTemplatesendAndReceive 操作(客户端)并替换为 @RabbitListener