跳到主要内容

AmqpTemplate

DeepSeek V3 中英对照 AmqpTemplate AmqpTemplate

与 Spring 框架及相关项目提供的许多其他高级抽象一样,Spring AMQP 提供了一个扮演核心角色的“模板”。定义主要操作的接口称为 AmqpTemplate。这些操作涵盖了发送和接收消息的通用行为。换句话说,它们并不是特定于任何实现的——因此名称中包含“AMQP”。另一方面,该接口的实现与 AMQP 协议的实现紧密相关。与 JMS(本身是一个接口级别的 API)不同,AMQP 是一个线路级别的协议。该协议的实现提供了它们自己的客户端库,因此模板接口的每个实现都依赖于特定的客户端库。目前,只有一个实现:RabbitTemplate。在接下来的示例中,我们经常使用 AmqpTemplate。然而,当您查看配置示例或任何实例化模板或调用 setter 方法的代码片段时,您可以看到实现类型(例如,RabbitTemplate)。

如前所述,AmqpTemplate 接口定义了所有用于发送和接收消息的基本操作。我们将在发送消息接收消息中分别探讨消息的发送和接收。

另请参阅 异步 Rabbit 模板

添加重试功能

从 1.3 版本开始,你现在可以配置 RabbitTemplate 使用 RetryTemplate 来帮助处理与 broker 连接的问题。有关完整信息,请参阅 spring-retry 项目。以下仅是一个使用指数退避策略和默认 SimpleRetryPolicy 的示例,该策略在将异常抛给调用者之前尝试三次。

以下示例使用了 XML 命名空间:

<rabbit:template id="template" connection-factory="connectionFactory" retry-template="retryTemplate"/>

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="500" />
<property name="multiplier" value="10.0" />
<property name="maxInterval" value="10000" />
</bean>
</property>
</bean>
xml

以下示例在 Java 中使用了 @Configuration 注解:

@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
RetryTemplate retryTemplate = new RetryTemplate();
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(500);
backOffPolicy.setMultiplier(10.0);
backOffPolicy.setMaxInterval(10000);
retryTemplate.setBackOffPolicy(backOffPolicy);
template.setRetryTemplate(retryTemplate);
return template;
}
java

从 1.4 版本开始,除了 retryTemplate 属性外,RabbitTemplate 还支持 recoveryCallback 选项。它被用作 RetryTemplate.execute(RetryCallback<T, E> retryCallback, RecoveryCallback<T> recoveryCallback) 的第二个参数。

备注

RecoveryCallback 的功能有些受限,因为重试上下文仅包含 lastThrowable 字段。对于更复杂的用例,你应该使用外部的 RetryTemplate,这样你就可以通过上下文的属性向 RecoveryCallback 传递更多信息。以下示例展示了如何做到这一点:

retryTemplate.execute(
new RetryCallback<Object, Exception>() {

@Override
public Object doWithRetry(RetryContext context) throws Exception {
context.setAttribute("message", message);
return rabbitTemplate.convertAndSend(exchange, routingKey, message);
}

}, new RecoveryCallback<Object>() {

@Override
public Object recover(RetryContext context) throws Exception {
Object message = context.getAttribute("message");
Throwable t = context.getLastThrowable();
// Do something with message
return null;
}
});
}
java

在这种情况下,你不会RetryTemplate 注入到 RabbitTemplate 中。

发布是异步的 — 如何检测成功与失败

发布消息是一种异步机制,默认情况下,无法路由的消息会被 RabbitMQ 丢弃。为了确保成功发布,你可以接收到异步确认,如关联发布确认与返回中所述。考虑以下两种失败场景:

  • 发布到交换器但没有匹配的目标队列。

  • 发布到不存在的交换器。

第一种情况由发布者返回(publisher returns)所涵盖,如相关发布者确认与返回中所述。

对于第二种情况,消息会被丢弃且不会生成任何返回。底层通道会因异常而关闭。默认情况下,此异常会被记录,但你可以向 CachingConnectionFactory 注册一个 ChannelListener 来获取此类事件的通知。以下示例展示了如何添加一个 ConnectionListener

this.connectionFactory.addConnectionListener(new ConnectionListener() {

@Override
public void onCreate(Connection connection) {
}

@Override
public void onShutDown(ShutdownSignalException signal) {
...
}

});
java

你可以检查信号的 reason 属性来确定发生的问题。

为了在发送线程上检测异常,你可以在 RabbitTemplate 上设置 setChannelTransacted(true),并且异常会在 txCommit() 时被检测到。然而,事务会显著影响性能,因此在仅为这一用例启用事务之前,请仔细考虑。

关联的发布者确认与返回

AmqpTemplateRabbitTemplate 实现支持发布者确认和返回。

对于返回的消息,模板的 mandatory 属性必须设置为 true,或者 mandatory-expression 必须对特定消息评估为 true。此功能需要一个 CachingConnectionFactory,并且其 publisherReturns 属性设置为 true(参见 Publisher Confirms and Returns)。返回的消息通过注册一个 RabbitTemplate.ReturnsCallback 发送到客户端,该回调通过调用 setReturnsCallback(ReturnsCallback callback) 进行注册。回调必须实现以下方法:

void returnedMessage(ReturnedMessage returned);
java

ReturnedMessage 具有以下属性:

  • message - 返回的消息本身

  • replyCode - 表示返回原因的代码

  • replyText - 返回的文本原因 - 例如 NO_ROUTE

  • exchange - 消息发送到的 exchange

  • routingKey - 使用的 routing key

每个 RabbitTemplate 仅支持一个 ReturnsCallback。另请参阅 回复超时

对于发布者确认(也称为发布者确认),模板需要一个 CachingConnectionFactory,其 publisherConfirm 属性设置为 ConfirmType.CORRELATED。确认通过客户端注册一个 RabbitTemplate.ConfirmCallback 来发送,通过调用 setConfirmCallback(ConfirmCallback callback) 方法实现。回调必须实现以下方法:

void confirm(CorrelationData correlationData, boolean ack, String cause);
java

CorrelationData 是客户端在发送原始消息时提供的一个对象。acktrue 表示确认(ack),false 表示否定确认(nack)。对于 nack 实例,如果在生成 nack 时有可用的原因,cause 可能包含 nack 的原因。例如,当向一个不存在的交换器发送消息时,代理会关闭通道。关闭的原因会包含在 cause 中。cause 是在 1.4 版本中添加的。

一个 RabbitTemplate 仅支持一个 ConfirmCallback

备注

当 Rabbit 模板的发送操作完成时,通道会被关闭。当连接工厂缓存已满时(当缓存中有空间时,通道不会被物理关闭,返回和确认会正常进行),这会阻止确认或返回的接收。当缓存已满时,框架会将关闭操作延迟最多五秒钟,以便有时间接收确认和返回。使用确认时,通道在接收到最后一个确认后关闭。仅使用返回时,通道会保持打开状态整整五秒钟。我们通常建议将连接工厂的 channelCacheSize 设置为足够大的值,以便发布消息的通道返回到缓存中而不是被关闭。您可以通过使用 RabbitMQ 管理插件来监控通道的使用情况。如果您看到通道被频繁打开和关闭,您应该考虑增加缓存大小以减少服务器的开销。

important

在 2.1 版本之前,启用了发布者确认的通道在收到确认之前会被返回到缓存中。其他进程可能会检出该通道并执行某些操作,例如向不存在的交换器发布消息,这可能导致通道关闭。这可能会导致确认丢失。从 2.1 版本开始,当确认未完成时,不再将通道返回到缓存中。RabbitTemplate 在每次操作后会对通道执行逻辑上的 close()。通常,这意味着每个通道上只有一个未完成的确认。

备注

从 2.2 版本开始,回调会在连接工厂的 executor 线程之一上调用。这是为了避免在回调中执行 Rabbit 操作时可能出现的死锁问题。在之前的版本中,回调会直接在 amqp-client 连接的 I/O 线程上调用;如果你执行某些 RPC 操作(例如打开一个新通道),这将导致死锁,因为 I/O 线程会阻塞等待结果,而结果需要由 I/O 线程本身处理。在那些版本中,必须在回调中将工作(例如发送消息)交给另一个线程处理。由于框架现在将回调调用交给 executor 处理,因此不再需要这样做。

important

只要返回回调在 60 秒或更短时间内执行,确保在确认之前收到返回消息的保证仍然有效。确认计划在返回回调退出后或 60 秒后发送,以先到者为准。

CorrelationData 对象有一个 CompletableFuture,你可以使用它来获取结果,而不是在模板上使用 ConfirmCallback。以下示例展示了如何配置一个 CorrelationData 实例:

CorrelationData cd1 = new CorrelationData();
this.templateWithConfirmsEnabled.convertAndSend("exchange", queue.getName(), "foo", cd1);
assertTrue(cd1.getFuture().get(10, TimeUnit.SECONDS).isAck());
ReturnedMessage = cd1.getReturn();
...
java

由于它是一个 CompletableFuture<Confirm>,你可以在结果准备好时使用 get() 获取结果,或者使用 whenComplete() 进行异步回调。Confirm 对象是一个简单的 bean,包含两个属性:ackreason(用于 nack 实例)。对于由 broker 生成的 nack 实例,reason 不会被填充。对于由框架生成的 nack 实例(例如,在仍有未完成的 ack 实例时关闭连接),reason 会被填充。

此外,当同时启用了确认和返回时,如果消息无法路由到任何队列,CorrelationDatareturn 属性会被填充为返回的消息。可以保证在 future 被设置为 ack 之前,返回的消息属性已经被设置。CorrelationData.getReturn() 返回一个 ReturnMessage,其包含以下属性:

  • message(返回的消息)

  • replyCode

  • replyText

  • exchange

  • routingKey

另请参阅Scoped Operations,这是一个更简单的机制,用于等待发布者确认。

作用域操作

通常情况下,使用模板时,会从缓存中检出(或创建)一个 Channel,用于操作,然后将其返回到缓存中以供重用。在多线程环境中,不能保证下一个操作会使用相同的通道。然而,有时你可能希望更精确地控制通道的使用,并确保一系列操作都在同一个通道上执行。

从 2.0 版本开始,提供了一个名为 invoke 的新方法,带有 OperationsCallback。在回调范围内对提供的 RabbitOperations 参数执行的任何操作都使用相同的专用 Channel,该 Channel 将在结束时关闭(不会返回到缓存)。如果该通道是 PublisherCallbackChannel,则在收到所有确认后将其返回到缓存(请参阅 相关的发布者确认和返回)。

@FunctionalInterface
public interface OperationsCallback<T> {

T doInRabbit(RabbitOperations operations);

}
java

一个你可能需要这样做的例子是,如果你希望在底层的 Channel 上使用 waitForConfirms() 方法。由于 Channel 通常是缓存和共享的,正如之前讨论的那样,Spring API 之前并未暴露这个方法。现在,RabbitTemplate 提供了 waitForConfirms(long timeout)waitForConfirmsOrDie(long timeout) 方法,它们将委托给在 OperationsCallback 范围内使用的专用 Channel。出于显而易见的原因,这些方法不能在该范围之外使用。

请注意,更高层次的抽象允许你将确认与请求关联起来,这在其他地方有提供(参见相关发布者确认和返回)。如果你只想等待直到代理确认交付,可以使用以下示例中展示的技术:

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
});
java

如果你希望 RabbitAdmin 操作在 OperationsCallback 的范围内通过相同的 channel 调用,那么 RabbitAdmin 必须使用与 invoke 操作相同的 RabbitTemplate 来构造。

备注

如果模板操作已经在现有事务的范围内执行,那么前面的讨论就没有意义了——例如,当在事务监听器容器线程上运行并在事务模板上执行操作时。在这种情况下,操作将在该通道上执行,并在线程返回到容器时提交。在这种情况下,不需要使用 invoke

以这种方式使用确认时,为将确认与请求关联而设置的大部分基础设施并不是真正需要的(除非还启用了返回)。从 2.2 版本开始,连接工厂支持一个名为 publisherConfirmType 的新属性。当将其设置为 ConfirmType.SIMPLE 时,可以避免使用这些基础设施,并且确认处理可以更加高效。

此外,RabbitTemplate 会在发送的消息 MessageProperties 中设置 publisherSequenceNumber 属性。如果你想检查(或记录或以其他方式使用)特定的确认信息,可以使用重载的 invoke 方法来实现,如下例所示:

public <T> T invoke(OperationsCallback<T> action, com.rabbitmq.client.ConfirmCallback acks,
com.rabbitmq.client.ConfirmCallback nacks);
java
备注

这些 ConfirmCallback 对象(用于 acknack 实例)是 Rabbit 客户端的回调,而不是模板的回调。

以下示例记录了 acknack 实例:

Collection<?> messages = getMessagesToSend();
Boolean result = this.template.invoke(t -> {
messages.forEach(m -> t.convertAndSend(ROUTE, m));
t.waitForConfirmsOrDie(10_000);
return true;
}, (tag, multiple) -> {
log.info("Ack: " + tag + ":" + multiple);
}, (tag, multiple) -> {
log.info("Nack: " + tag + ":" + multiple);
}));
java
important

作用域操作绑定到线程。有关多线程环境中严格顺序的讨论,请参阅多线程环境中的严格消息顺序

多线程环境中的严格消息顺序

作用域操作 中的讨论仅适用于在同一线程上执行的操作。

考虑以下情况:

  • thread-1 向队列发送一条消息,并将工作交给 thread-2

  • thread-2 向同一个队列发送一条消息

由于 RabbitMQ 的异步特性以及使用了缓存的通道(channel),无法保证同一个通道会被重复使用,因此消息在队列中的到达顺序也无法得到保证。(在大多数情况下,消息会按顺序到达,但乱序交付的概率并不为零)。为了解决这种用例,你可以使用一个大小为 1 的有界通道缓存(配合 channelCheckoutTimeout),以确保消息始终在同一个通道上发布,从而保证顺序。要实现这一点,如果你对连接工厂有其他用途(例如消费者),你应该为模板使用一个专用的连接工厂,或者将模板配置为使用主连接工厂中嵌入的发布者连接工厂(参见使用独立连接)。

这可以通过一个简单的 Spring Boot 应用程序来最好地说明:

@SpringBootApplication
public class Application {

private static final Logger log = LoggerFactory.getLogger(Application.class);

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}

@Bean
CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory("localhost");
CachingConnectionFactory publisherCF = (CachingConnectionFactory) ccf.getPublisherConnectionFactory();
publisherCF.setChannelCacheSize(1);
publisherCF.setChannelCheckoutTimeout(1000L);
return ccf;
}

@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}

@Bean
Queue queue() {
return new Queue("queue");
}

@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}

}

@Component
class Service {

private static final Logger LOG = LoggerFactory.getLogger(Service.class);

private final RabbitTemplate template;

private final TaskExecutor exec;

Service(RabbitTemplate template, TaskExecutor exec) {
template.setUsePublisherConnection(true);
this.template = template;
this.exec = exec;
}

void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
this.exec.execute(() -> secondaryService(toSend.toUpperCase()));
}

void secondaryService(String toSend) {
LOG.info("Publishing from secondary service");
this.template.convertAndSend("queue", toSend);
}

}
java

尽管发布操作在两个不同的线程上执行,但它们都将使用相同的通道,因为缓存被限制为单个通道。

从 2.3.7 版本开始,ThreadChannelConnectionFactory 支持使用 prepareContextSwitchswitchContext 方法将一个线程的通道转移到另一个线程。第一个方法返回一个上下文,该上下文被传递给调用第二个方法的第二个线程。一个线程可以绑定一个非事务性通道或一个事务性通道(或每种一个);除非使用两个连接工厂,否则不能单独转移它们。示例如下:

@SpringBootApplication
public class Application {

private static final Logger log = LoggerFactory.getLogger(Application.class);

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Bean
TaskExecutor exec() {
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(10);
return exec;
}

@Bean
ThreadChannelConnectionFactory tccf() {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
return new ThreadChannelConnectionFactory(rabbitConnectionFactory);
}

@RabbitListener(queues = "queue")
void listen(String in) {
log.info(in);
}

@Bean
Queue queue() {
return new Queue("queue");
}

@Bean
public ApplicationRunner runner(Service service, TaskExecutor exec) {
return args -> {
exec.execute(() -> service.mainService("test"));
};
}

}

@Component
class Service {

private static final Logger LOG = LoggerFactory.getLogger(Service.class);

private final RabbitTemplate template;

private final TaskExecutor exec;

private final ThreadChannelConnectionFactory connFactory;

Service(RabbitTemplate template, TaskExecutor exec,
ThreadChannelConnectionFactory tccf) {

this.template = template;
this.exec = exec;
this.connFactory = tccf;
}

void mainService(String toSend) {
LOG.info("Publishing from main service");
this.template.convertAndSend("queue", toSend);
Object context = this.connFactory.prepareSwitchContext();
this.exec.execute(() -> secondaryService(toSend.toUpperCase(), context));
}

void secondaryService(String toSend, Object threadContext) {
LOG.info("Publishing from secondary service");
this.connFactory.switchContext(threadContext);
this.template.convertAndSend("queue", toSend);
this.connFactory.closeThreadChannel();
}

}
java
important

一旦调用了 prepareSwitchContext,如果当前线程执行任何更多的操作,它们将在新的通道上执行。当不再需要线程绑定的通道时,关闭它非常重要。

消息集成

从 1.4 版本开始,RabbitMessagingTemplate(基于 RabbitTemplate 构建)提供了与 Spring Framework 消息抽象(即 org.springframework.messaging.Message)的集成。这使你可以使用 spring-messagingMessage<?> 抽象来发送和接收消息。这种抽象也被其他 Spring 项目使用,例如 Spring Integration 和 Spring 的 STOMP 支持。涉及两个消息转换器:一个用于在 spring-messagingMessage<?> 和 Spring AMQP 的 Message 抽象之间进行转换,另一个用于在 Spring AMQP 的 Message 抽象和底层 RabbitMQ 客户端库所需的格式之间进行转换。默认情况下,消息的有效负载由提供的 RabbitTemplate 实例的消息转换器进行转换。或者,你可以注入一个自定义的 MessagingMessageConverter,并使用其他有效负载转换器,如下例所示:

MessagingMessageConverter amqpMessageConverter = new MessagingMessageConverter();
amqpMessageConverter.setPayloadConverter(myPayloadConverter);
rabbitMessagingTemplate.setAmqpMessageConverter(amqpMessageConverter);
java

已验证用户 ID

从 1.6 版本开始,模板现在支持 user-id-expression(在使用 Java 配置时为 userIdExpression)。如果发送了一条消息,用户 ID 属性将在评估此表达式后设置(如果尚未设置)。评估的根对象是要发送的消息。

以下示例展示了如何使用 user-id-expression 属性:

<rabbit:template ... user-id-expression="'guest'" />

<rabbit:template ... user-id-expression="@myConnectionFactory.username" />
xml

第一个例子是一个字面量表达式。第二个例子从应用程序上下文中的一个连接工厂 bean 中获取 username 属性。

使用单独的连接

从 2.0.2 版本开始,你可以将 usePublisherConnection 属性设置为 true,以便在可能的情况下使用与监听器容器不同的连接。这是为了避免当生产者因任何原因被阻塞时,消费者也被阻塞。连接工厂为此目的维护了第二个内部连接工厂;默认情况下,它与主工厂的类型相同,但如果你希望为发布使用不同的工厂类型,可以显式设置。如果 Rabbit 模板在由监听器容器启动的事务中运行,无论此设置如何,都将使用容器的通道。

important

通常情况下,你不应该将 RabbitAdmin 与一个将此属性设置为 true 的模板一起使用。请使用接受连接工厂的 RabbitAdmin 构造函数。如果你使用接受模板的其他构造函数,请确保模板的此属性为 false。这是因为,通常管理员用于为监听器容器声明队列。使用将此属性设置为 true 的模板意味着独占队列(如 AnonymousQueue)将在与监听器容器使用的连接不同的连接上声明。在这种情况下,容器将无法使用这些队列。