跳到主要内容

AMQP 抽象

DeepSeek V3 中英对照 AMQP Abstractions

Spring AMQP 由两个模块组成(每个模块在发行版中由一个 JAR 表示):spring-amqpspring-rabbitspring-amqp 模块包含 org.springframework.amqp.core 包。在该包中,你可以找到表示核心 AMQP “模型”的类。我们的意图是提供不依赖于任何特定 AMQP 代理实现或客户端库的通用抽象。最终用户代码可以更具可移植性,因为它可以仅针对抽象层进行开发。这些抽象随后由特定于代理的模块实现,例如 spring-rabbit。目前只有 RabbitMQ 的实现。然而,除了 RabbitMQ 之外,这些抽象已经在 .NET 中使用 Apache Qpid 进行了验证。由于 AMQP 在协议级别上运行,原则上,你可以将 RabbitMQ 客户端与任何支持相同协议版本的代理一起使用,但目前我们不会测试任何其他代理。

本概述假定您已经熟悉 AMQP 规范的基础知识。如果不熟悉,请查看其他资源中列出的资料。

Message

0-9-1 AMQP 规范并未定义 Message 类或接口。相反,在执行诸如 basicPublish() 等操作时,内容作为字节数组参数传递,而附加属性则作为单独的参数传递。Spring AMQP 定义了一个 Message 类,作为更通用的 AMQP 领域模型表示的一部分。Message 类的目的是将消息体和属性封装在一个实例中,从而使 API 更加简洁。以下示例展示了 Message 类的定义:

public class Message {

private final MessageProperties messageProperties;

private final byte[] body;

public Message(byte[] body, MessageProperties messageProperties) {
this.body = body;
this.messageProperties = messageProperties;
}

public byte[] getBody() {
return this.body;
}

public MessageProperties getMessageProperties() {
return this.messageProperties;
}
}
java

MessageProperties 接口定义了多个常见属性,例如 messageIdtimestampcontentType 等。你还可以通过调用 setHeader(String key, Object value) 方法,使用用户自定义的 headers 来扩展这些属性。

important

从版本 1.5.71.6.111.7.42.0.0 开始,如果消息体是一个序列化的 Serializable Java 对象,在执行 toString() 操作(例如在日志消息中)时,默认情况下将不再反序列化该对象。这是为了防止不安全的反序列化。默认情况下,只有 java.utiljava.lang 类会被反序列化。要恢复到之前的行为,可以通过调用 Message.addAllowedListPatterns(…​) 来添加允许的类/包模式。支持简单的 * 通配符,例如 com.something.*, *.MyClass。无法反序列化的消息体在日志消息中将以 byte[<size>] 的形式表示。

交换

Exchange 接口代表一个 AMQP 交换器(Exchange),消息生产者(Message Producer)将消息发送到交换器。在代理(broker)的虚拟主机(virtual host)中,每个交换器都有一个唯一的名称以及其他一些属性。以下示例展示了 Exchange 接口:

public interface Exchange {
String getName();
String getType();
boolean isDurable();
boolean isAutoDelete();
Map<String, Object> getArguments();
}
java

在这个接口中:

  • getName() 返回交换器的名称。
  • getType() 返回交换器的类型,例如 directtopicfanoutheaders
  • isDurable() 指示交换器是否是持久的,即是否在代理重启后仍然存在。
  • isAutoDelete() 指示交换器是否在没有绑定的队列时自动删除。
  • getArguments() 返回交换器的其他参数,这些参数可以用于自定义交换器的行为。
public interface Exchange {

String getName();

String getExchangeType();

boolean isDurable();

boolean isAutoDelete();

Map<String, Object> getArguments();

}
java

如你所见,Exchange 还有一个 'type',由 ExchangeTypes 中定义的常量表示。基本类型包括:directtopicfanoutheaders。在核心包中,你可以找到每种类型的 Exchange 接口的实现。这些 Exchange 类型在处理与队列的绑定时行为有所不同。例如,Direct 类型的 Exchange 允许队列通过固定的路由键(通常是队列的名称)进行绑定。Topic 类型的 Exchange 支持带有路由模式的绑定,这些模式可以包括 '*' 和 '#' 通配符,分别表示“恰好一个”和“零个或多个”。Fanout 类型的 Exchange 会将消息发布到所有绑定到它的队列中,而不考虑任何路由键。有关这些及其他 Exchange 类型的更多信息,请参阅 AMQP Exchanges

从 3.2 版本开始,为了方便在应用程序配置阶段使用,引入了 ConsistentHashExchange 类型。它提供了像 x-consistent-hash 这样的选项来作为交换类型。允许配置 hash-headerhash-property 交换定义参数。在代理上必须启用相应的 RabbitMQ rabbitmq_consistent_hash_exchange 插件。关于 Consistent Hash Exchange 的目的、逻辑和行为的更多信息,请参阅 RabbitMQ 官方文档

备注

AMQP 规范还要求任何 broker 都必须提供一个没有名称的“默认”直连交换机。所有声明的队列都会绑定到这个默认的 Exchange,并使用它们的名称作为路由键。你可以在 AmqpTemplate 中了解更多关于 Spring AMQP 中默认 Exchange 的用法。

队列

Queue 类表示消息消费者从中接收消息的组件。与各种 Exchange 类一样,我们的实现旨在成为这种核心 AMQP 类型的抽象表示。以下列表展示了 Queue 类:

public class Queue  {

private final String name;

private volatile boolean durable;

private volatile boolean exclusive;

private volatile boolean autoDelete;

private volatile Map<String, Object> arguments;

/**
* The queue is durable, non-exclusive and non auto-delete.
*
* @param name the name of the queue.
*/
public Queue(String name) {
this(name, true, false, false);
}

// Getters and Setters omitted for brevity

}
java

请注意,构造函数接收队列名称作为参数。根据具体实现,管理模板可能提供生成唯一命名队列的方法。这类队列在作为“回复到”地址或其他临时场景中非常有用。因此,自动生成队列的 exclusiveautoDelete 属性通常都会被设置为 true

备注

有关使用命名空间支持声明队列(包括队列参数)的信息,请参阅 配置 Broker 中的队列部分。

绑定

鉴于生产者发送消息到交换器(exchange)而消费者从队列(queue)接收消息,将队列与交换器绑定的绑定(bindings)对于通过消息传递连接这些生产者和消费者至关重要。在 Spring AMQP 中,我们定义了一个 Binding 类来表示这些连接。本节将回顾将队列绑定到交换器的基本选项。

你可以将一个队列绑定到一个 DirectExchange,并指定一个固定的路由键,如下例所示:

new Binding(someQueue, someDirectExchange, "foo.bar");
java

你可以使用路由模式将队列绑定到 TopicExchange,如下例所示:

new Binding(someQueue, someTopicExchange, "foo.*");
java

你可以将一个队列绑定到一个 FanoutExchange 上,而不需要指定路由键,如下例所示:

new Binding(someQueue, someFanoutExchange);
java

我们还提供了一个 BindingBuilder 来简化“流畅 API”风格的编写,如下例所示:

Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");
java
备注

为了清晰起见,前面的示例展示了 BindingBuilder 类,但使用静态导入 'bind()' 方法时,这种风格效果很好。

Binding 类的实例本身仅保存有关连接的数据。换句话说,它不是一个“活跃”的组件。然而,正如你稍后在 配置 Broker 中将会看到的,AmqpAdmin 类可以使用 Binding 实例来实际触发 Broker 上的绑定操作。同样,正如你可以在同一部分中看到的,你可以在 @Configuration 类中使用 Spring 的 @Bean 注解来定义 Binding 实例。还有一个方便的基类,可以进一步简化生成 AMQP 相关的 bean 定义的方法,并识别队列、交换器和绑定,以便在应用程序启动时在 AMQP Broker 上声明它们。

AmqpTemplate 也定义在核心包中。作为实际 AMQP 消息传递中涉及的主要组件之一,它在其专属章节中有详细讨论(参见 AmqpTemplate)。