请求/回复消息传递
AmqpTemplate
还提供了多种 sendAndReceive
方法,这些方法接受与之前描述的单向发送操作相同的参数选项(exchange
、routingKey
和 Message
)。这些方法在请求-回复场景中非常有用,因为它们在发送之前处理了必要的 reply-to
属性的配置,并且可以在为特定目的内部创建的独占队列上监听回复消息。
类似的请求-回复方法也可用,其中 MessageConverter
会同时应用于请求和回复。这些方法命名为 convertSendAndReceive
。更多详细信息,请参阅 AmqpTemplate 的 Javadoc。
从 1.5.0 版本开始,每个 sendAndReceive
方法变体都有一个重载版本,该版本接受 CorrelationData
。结合正确配置的连接工厂,这使得可以接收操作发送端的发布者确认。有关更多信息,请参阅 相关的发布者确认和返回 以及 RabbitOperations 的 Javadoc。
从 2.0 版本开始,这些方法(convertSendAndReceiveAsType
)提供了变体,接受一个额外的 ParameterizedTypeReference
参数,用于转换复杂的返回类型。模板必须配置一个 SmartMessageConverter
。更多信息请参见 使用 RabbitTemplate 从消息转换。
从 2.1 版本开始,你可以通过配置 RabbitTemplate
的 noLocalReplyConsumer
选项来控制回复消费者的 noLocal
标志。默认情况下,该标志为 false
。
回复超时
默认情况下,send
和 receive
方法在五秒后超时并返回 null
。你可以通过设置 replyTimeout
属性来修改此行为。从 1.5 版本开始,如果你将 mandatory
属性设置为 true
(或者 mandatory-expression
对某个特定消息的求值结果为 true
),如果消息无法传递到队列,则会抛出 AmqpMessageReturnedException
异常。该异常包含 returnedMessage
、replyCode
和 replyText
属性,以及用于发送的 exchange
和 routingKey
。
此功能使用发布者返回。你可以通过在 CachingConnectionFactory
上设置 publisherReturns
为 true
来启用它(参见 发布者确认和返回)。此外,你不能在 RabbitTemplate
中注册你自己的 ReturnCallback
。
从 2.1.2 版本开始,新增了一个 replyTimedOut
方法,允许子类在超时时收到通知,以便它们可以清理任何保留的状态。
从 2.0.11 和 2.1.3 版本开始,当你使用默认的 DirectReplyToMessageListenerContainer
时,可以通过设置模板的 replyErrorHandler
属性来添加一个错误处理器。该错误处理器会在任何投递失败时被调用,例如延迟的回复和没有相关头信息的消息。传入的异常是一个 ListenerExecutionFailedException
,它有一个 failedMessage
属性。
RabbitMQ 直接回复
从 3.4.0 版本开始,RabbitMQ 服务器支持 direct reply-to。这消除了使用固定回复队列的主要原因(以避免为每个请求创建临时队列的需求)。从 Spring AMQP 1.4.1 版本开始,默认情况下(如果服务器支持)会使用 direct reply-to,而不是创建临时回复队列。当没有提供 replyQueue
(或者它被设置为 amq.rabbitmq.reply-to
名称)时,RabbitTemplate
会自动检测是否支持 direct reply-to,并选择使用它或回退到使用临时回复队列。当使用 direct reply-to 时,不需要配置 reply-listener
,也不应该配置它。
回复监听器仍然支持命名队列(除了 amq.rabbitmq.reply-to
),允许控制回复并发性等。
从 1.6 版本开始,如果你希望为每个回复使用临时的、独占的、自动删除的队列,请将 useTemporaryReplyQueues
属性设置为 true
。如果你设置了 replyAddress
,则此属性将被忽略。
你可以通过子类化 RabbitTemplate
并重写 useDirectReplyTo()
方法来改变决定是否使用 direct reply-to 的标准。该方法仅在发送第一个请求时调用一次。
在 2.0 版本之前,RabbitTemplate
为每个请求创建一个新的消费者,并在收到回复(或超时)时取消该消费者。现在,模板改用 DirectReplyToMessageListenerContainer
,使得消费者可以被重用。模板仍然负责关联回复,因此不存在延迟回复发送给不同发送者的风险。如果你想恢复到之前的行为,可以将 useDirectReplyToContainer
属性(在使用 XML 配置时为 direct-reply-to-container
)设置为 false。
AsyncRabbitTemplate
没有这样的选项。当使用 direct reply-to 时,它总是使用 DirectReplyToContainer
来处理回复。
从版本 2.3.7 开始,模板新增了一个属性 useChannelForCorrelation
。当该属性为 true
时,服务器无需将关联 ID 从请求消息头复制到回复消息中。相反,用于发送请求的通道将用于将回复与请求关联起来。
带回复队列的消息关联
在使用固定的回复队列(除了 amq.rabbitmq.reply-to
之外)时,必须提供关联数据,以便将回复与请求关联起来。请参阅 RabbitMQ 远程过程调用 (RPC)。默认情况下,标准的 correlationId
属性用于保存关联数据。但是,如果您希望使用自定义属性来保存关联数据,可以在 <rabbit-template/> 上设置 correlation-key
属性。将该属性显式设置为 correlationId
与省略该属性效果相同。客户端和服务器必须使用相同的头部来保存关联数据。
Spring AMQP 版本 1.1 使用了一个名为 spring_reply_correlation
的自定义属性来处理这些数据。如果你想在当前版本中恢复这种行为(可能是为了保持与使用 1.1 版本的其他应用程序的兼容性),你必须将该属性设置为 spring_reply_correlation
。
默认情况下,模板会生成自己的关联 ID(忽略任何用户提供的值)。如果您希望使用自己的关联 ID,请将 RabbitTemplate
实例的 userCorrelationId
属性设置为 true
。
:::重要
为了确保请求的回复不会被错误地返回,关联 ID 必须是唯一的。
:::
回复监听器容器
在使用 RabbitMQ 3.4.0 之前的版本时,每个回复都会使用一个新的临时队列。然而,可以在模板上配置一个单一的回复队列,这样效率更高,并且允许你为该队列设置参数。然而,在这种情况下,你还必须提供一个 <reply-listener/>
子元素。该元素为回复队列提供了一个监听器容器,模板作为监听器。所有在 <listener-container/>
上允许的 消息监听器容器配置 属性都可以在该元素上使用,除了 connection-factory
和 message-converter
,这两者是从模板的配置中继承的。
如果你运行多个应用程序实例或使用多个 RabbitTemplate
实例,必须为每个实例使用唯一的回复队列。RabbitMQ 无法从队列中选择消息,因此,如果它们都使用相同的队列,每个实例将竞争回复,并不一定能接收到自己的消息。
以下示例定义了一个带有连接工厂的 rabbit 模板:
<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory"
reply-queue="replies"
reply-address="replyEx/routeReply">
<rabbit:reply-listener/>
</rabbit:template>
虽然容器和模板共享一个连接工厂,但它们并不共享一个通道。因此,请求和回复不会在同一个事务中执行(如果是事务性的)。
在 1.5.0 版本之前,reply-address
属性不可用。回复总是通过使用默认交换机和 reply-queue
名称作为路由键进行路由。这仍然是默认行为,但现在你可以指定新的 reply-address
属性。reply-address
可以包含一个格式为 <exchange>/<routingKey>
的地址,回复将被路由到指定的交换机,并使用路由键路由到绑定的队列。reply-address
优先于 reply-queue
。当仅使用 reply-address
时,<reply-listener>
必须配置为一个单独的 <listener-container>
组件。reply-address
和 reply-queue
(或 <listener-container>
上的 queues
属性)必须在逻辑上引用相同的队列。
在这种配置下,使用 SimpleListenerContainer
来接收回复,RabbitTemplate
作为 MessageListener
。当使用 <rabbit:template/>
命名空间元素定义一个模板时,如前面的示例所示,解析器会定义容器并将模板作为监听器连接起来。
当模板不使用固定的 replyQueue
(或使用直接回复到 — 参见 RabbitMQ 直接回复到),则不需要监听容器。在使用 RabbitMQ 3.4.0 或更高版本时,直接 reply-to
是首选机制。
如果你将 RabbitTemplate
定义为一个 <bean/>
或者使用 @Configuration
类将其定义为 @Bean
,或者在编程方式创建模板时,你需要自行定义并连接回复监听器容器。如果未能做到这一点,模板将永远不会接收到回复,最终会导致超时并在调用 sendAndReceive
方法时返回 null 作为回复。
从版本 1.5 开始,RabbitTemplate
会检测它是否被配置为 MessageListener
以接收回复。如果没有配置,尝试发送和接收带有回复地址的消息将失败,并抛出 IllegalStateException
异常(因为回复永远不会被接收)。
此外,如果使用的是简单的 replyAddress
(队列名称),回复监听容器会验证它是否正在监听具有相同名称的队列。如果回复地址是一个交换机和路由键,则无法执行此检查,并且会写入一条调试日志消息。
当您自己配置回复监听器和模板时,确保模板的 replyAddress
和容器的 queues
(或 queueNames
)属性引用同一个队列非常重要。模板会将回复地址插入到出站消息的 replyTo
属性中。
下面的清单展示了如何手动连接 bean 的示例:
<bean id="amqpTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
<constructor-arg ref="connectionFactory" />
<property name="exchange" value="foo.exchange" />
<property name="routingKey" value="foo" />
<property name="replyQueue" ref="replyQ" />
<property name="replyTimeout" value="600000" />
<property name="useDirectReplyToContainer" value="false" />
</bean>
<bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg ref="connectionFactory" />
<property name="queues" ref="replyQ" />
<property name="messageListener" ref="amqpTemplate" />
</bean>
<rabbit:queue id="replyQ" name="my.reply.queue" />
@Bean
public RabbitTemplate amqpTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(msgConv());
rabbitTemplate.setReplyAddress(replyQueue().getName());
rabbitTemplate.setReplyTimeout(60000);
rabbitTemplate.setUseDirectReplyToContainer(false);
return rabbitTemplate;
}
@Bean
public SimpleMessageListenerContainer replyListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueues(replyQueue());
container.setMessageListener(amqpTemplate());
return container;
}
@Bean
public Queue replyQueue() {
return new Queue("my.reply.queue");
}
一个完整的示例展示了如何使用固定的回复队列配置 RabbitTemplate
,以及一个处理请求并返回回复的“远程”监听器容器,该示例可以在这个测试用例中找到。
当回复超时(replyTimeout
)时,sendAndReceive()
方法会返回 null。
在 1.3.6 版本之前,超时消息的延迟回复只会被记录下来。现在,如果收到延迟回复,它将被拒绝(模板会抛出 AmqpRejectAndDontRequeueException
)。如果回复队列配置为将拒绝的消息发送到死信交换器(dead letter exchange),则可以检索该回复以供后续分析。为此,将一个队列绑定到配置的死信交换器,并使用与回复队列名称相同的路由键(routing key)。
有关配置死信(Dead Letter)的更多信息,请参阅 RabbitMQ 死信文档。你也可以查看 FixedReplyQueueDeadLetterTests
测试用例作为示例。
异步 Rabbit 模板
版本 1.6 引入了 AsyncRabbitTemplate
。它具有与 AmqpTemplate 上类似的 sendAndReceive
(以及 convertSendAndReceive
)方法。然而,它们不会阻塞,而是返回一个 CompletableFuture
。
sendAndReceive
方法返回一个 RabbitMessageFuture
。convertSendAndReceive
方法返回一个 RabbitConverterFuture
。
你可以稍后通过调用 get()
方法同步获取结果,也可以注册一个回调函数,该回调函数会在结果可用时被异步调用。以下代码展示了这两种方法:
@Autowired
private AsyncRabbitTemplate template;
...
public void doSomeWorkAndGetResultLater() {
...
CompletableFuture<String> future = this.template.convertSendAndReceive("foo");
// do some more work
String reply = null;
try {
reply = future.get(10, TimeUnit.SECONDS);
}
catch (ExecutionException e) {
...
}
...
}
public void doSomeWorkAndGetResultAsync() {
...
RabbitConverterFuture<String> future = this.template.convertSendAndReceive("foo");
future.whenComplete((result, ex) -> {
if (ex == null) {
// success
}
else {
// failure
}
});
...
}
如果设置了 mandatory
并且消息无法被投递,future 会抛出一个 ExecutionException
,其原因是 AmqpMessageReturnedException
,该异常封装了返回的消息以及有关返回的信息。
如果设置了 enableConfirms
,future
将具有一个名为 confirm
的属性,它本身是一个 CompletableFuture<Boolean>
,其中 true
表示发布成功。如果 confirm
的 future
是 false
,RabbitFuture
将具有另一个名为 nackCause
的属性,该属性包含失败的原因(如果有的话)。
如果在收到回复之后才收到发布者确认,则该确认将被丢弃,因为回复意味着发布成功。
你可以在模板上设置 receiveTimeout
属性来超时回复(默认值为 30000
- 30 秒)。如果发生超时,future 将以 AmqpReplyTimeoutException
完成。
该模板实现了 SmartLifecycle
。当存在未完成的回复时停止模板,会导致未完成的 Future
实例被取消。
从 2.0 版本开始,异步模板现在支持 direct reply-to 而不是配置的回复队列。要启用此功能,请使用以下构造函数之一:
public AsyncRabbitTemplate(ConnectionFactory connectionFactory, String exchange, String routingKey)
public AsyncRabbitTemplate(RabbitTemplate template)
请参阅 RabbitMQ 直接回复 以了解如何与同步的 RabbitTemplate
一起使用直接回复功能。
2.0 版本引入了这些方法的变体(convertSendAndReceiveAsType
),这些方法接受一个额外的 ParameterizedTypeReference
参数,以转换复杂的返回类型。你必须为底层的 RabbitTemplate
配置一个 SmartMessageConverter
。有关更多信息,请参阅 使用 RabbitTemplate 从消息转换。
使用 AMQP 进行 Spring 远程调用
Spring remoting 不再受支持,因为该功能已从 Spring Framework 中移除。
使用 RabbitTemplate
的 sendAndReceive
操作(客户端)并替换为 @RabbitListener
。