跳到主要内容

回复管理

DeepSeek V3 中英对照 Reply Management

MessageListenerAdapter 中现有的支持已经允许你的方法具有非 void 返回类型。在这种情况下,调用的结果会被封装在一个消息中,发送到原始消息的 ReplyToAddress 头中指定的地址,或者发送到监听器上配置的默认地址。你可以通过使用消息抽象中的 @SendTo 注解来设置该默认地址。

假设我们的 processOrder 方法现在应该返回一个 OrderStatus,我们可以如下编写它来自动发送回复:

@RabbitListener(destination = "myQueue")
@SendTo("status")
public OrderStatus processOrder(Order order) {
// order processing
return status;
}
java

如果你需要以与传输无关的方式设置额外的头部信息,你可以返回一个 Message,如下所示:

@RabbitListener(destination = "myQueue")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
// order processing
return MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
}
java

或者,你可以使用 MessagePostProcessor 并将其添加到 beforeSendReplyMessagePostProcessors 容器工厂属性中,以添加更多的头信息。从版本 2.2.3 开始,被调用的 bean/方法会在回复消息中提供,这可以在消息后处理器中使用,将信息传递回调用者:

factory.setBeforeSendReplyPostProcessors(msg -> {
msg.getMessageProperties().setHeader("calledBean",
msg.getMessageProperties().getTargetBean().getClass().getSimpleName());
msg.getMessageProperties().setHeader("calledMethod",
msg.getMessageProperties().getTargetMethod().getName());
return m;
});
java

从 2.2.5 版本开始,你可以配置一个 ReplyPostProcessor 来在发送回复消息之前对其进行修改;它会在 correlationId 标头被设置为与请求匹配之后调用。

@RabbitListener(queues = "test.header", group = "testGroup", replyPostProcessor = "echoCustomHeader")
public String capitalizeWithHeader(String in) {
return in.toUpperCase();
}

@Bean
public ReplyPostProcessor echoCustomHeader() {
return (req, resp) -> {
resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
return resp;
};
}
java

从 3.0 版本开始,您可以在容器工厂上配置后处理器,而不是在注解上配置。

factory.setReplyPostProcessorProvider(id -> (req, resp) -> {
resp.getMessageProperties().setHeader("myHeader", req.getMessageProperties().getHeader("myHeader"));
return resp;
});
java

id 参数是监听器的 id。

注解上的设置将覆盖工厂设置。

@SendTo 的值被假定为遵循 exchange/routingKey 模式的回复 exchangeroutingKey 对,其中这两个部分中的一个可以被省略。有效值如下:

  • thing1/thing2: replyTo 交换器和 routingKey
    thing1/: replyTo 交换器和默认(空)的 routingKey
    thing2/thing2: replyToroutingKey 和默认(空)的交换器。
    / 或空: replyTo 默认交换器和默认的 routingKey

此外,你也可以使用不带 value 属性的 @SendTo。这种情况等同于一个空的 sendTo 模式。@SendTo 仅当入站消息没有 replyToAddress 属性时才会被使用。

从 1.5 版本开始,@SendTo 的值可以是一个 bean 初始化的 SpEL 表达式,如下例所示:

@RabbitListener(queues = "test.sendTo.spel")
@SendTo("#{spelReplyTo}")
public String capitalizeWithSendToSpel(String foo) {
return foo.toUpperCase();
}
...
@Bean
public String spelReplyTo() {
return "test.sendTo.reply.spel";
}
java

表达式必须计算为 String 类型,可以是一个简单的队列名称(发送到默认的交换机),或者采用 exchange/routingKey 的形式,如在前述示例之前所讨论的那样。

备注

#{…​} 表达式在初始化期间仅计算一次。

为了实现动态的回复路由,消息发送方应在消息中包含一个 reply_to 消息属性,或者使用备选的运行时 SpEL 表达式(在下一个示例之后进行描述)。

从 1.6 版本开始,@SendTo 可以是一个 SpEL 表达式,该表达式在运行时根据请求和回复进行评估,如下例所示:

@RabbitListener(queues = "test.sendTo.spel")
@SendTo("!{'some.reply.queue.with.' + result.queueName}")
public Bar capitalizeWithSendToSpel(Foo foo) {
return processTheFooAndReturnABar(foo);
}
java

SpEL 表达式的运行时特性通过 !{…​} 分隔符表示。表达式的求值上下文 #root 对象具有三个属性:

  • request: o.s.amqp.core.Message 请求对象。

  • source: 转换后的 o.s.messaging.Message<?>

  • result: 方法结果。

上下文包含一个映射属性访问器、一个标准类型转换器和一个 bean 解析器,这使得可以引用上下文中的其他 bean(例如,@someBeanName.determineReplyQ(request, result))。

总结一下,#{…​} 在初始化时被评估一次,其中 #root 对象是应用程序上下文。Bean 通过它们的名称被引用。!{…​} 在每条消息的运行时被评估,根对象具有前面列出的属性。Bean 通过它们的名称被引用,并在名称前加上 @ 前缀。

从 2.1 版本开始,还支持简单的属性占位符(例如 ${some.reply.to})。在早期版本中,可以使用以下方法作为替代方案,如下例所示:

@RabbitListener(queues = "foo")
@SendTo("#{environment['my.send.to']}")
public String listen(Message in) {
...
return ...
}
java