配置 Broker
AMQP 规范描述了如何使用该协议在代理上配置队列、交换器和绑定。这些操作(可移植自 0.8 规范及更高版本)存在于 org.springframework.amqp.core
包中的 AmqpAdmin
接口中。该类的 RabbitMQ 实现是位于 org.springframework.amqp.rabbit.core
包中的 RabbitAdmin
。
AmqpAdmin
接口基于使用 Spring AMQP 领域抽象,如下列表所示:
public interface AmqpAdmin {
// Exchange Operations
void declareExchange(Exchange exchange);
void deleteExchange(String exchangeName);
// Queue Operations
Queue declareQueue();
String declareQueue(Queue queue);
void deleteQueue(String queueName);
void deleteQueue(String queueName, boolean unused, boolean empty);
void purgeQueue(String queueName, boolean noWait);
// Binding Operations
void declareBinding(Binding binding);
void removeBinding(Binding binding);
Properties getQueueProperties(String queueName);
}
另请参见 Scoped Operations。
getQueueProperties()
方法返回关于队列的一些有限信息(消息数量和消费者数量)。返回的属性键值在 RabbitAdmin
中作为常量提供(QUEUE_NAME
、QUEUE_MESSAGE_COUNT
和 QUEUE_CONSUMER_COUNT
)。RabbitMQ REST API 在 QueueInfo
对象中提供了更多信息。
无参的 declareQueue()
方法会在代理上定义一个队列,其名称是自动生成的。这个自动生成的队列的额外属性是 exclusive=true
、autoDelete=true
和 durable=false
。
declareQueue(Queue queue)
方法接收一个 Queue
对象,并返回声明的队列的名称。如果提供的 Queue
的 name
属性是一个空字符串(""
),代理(broker)将使用生成的名称来声明队列。该名称会返回给调用者,并且也会添加到 Queue
的 actualName
属性中。你可以通过直接调用 RabbitAdmin
来以编程方式使用此功能。当在应用上下文中以声明方式定义队列时,如果使用管理员的自动声明功能,你可以将 name
属性设置为 ""
(空字符串)。然后代理会创建名称。从 2.1 版本开始,监听器容器可以使用这种类型的队列。有关更多信息,请参阅 容器和代理命名的队列。
这与 AnonymousQueue
形成对比,在 AnonymousQueue
中,框架会生成一个唯一的(UUID
)名称,并将 durable
设置为 false
,将 exclusive
和 autoDelete
设置为 true
。一个 name
属性为空(或缺失)的 <rabbit:queue/>
总是会创建一个 AnonymousQueue
。
请参见 AnonymousQueue 以了解为什么 AnonymousQueue
优于代理生成的队列名称以及如何控制名称的格式。从 2.1 版本开始,匿名队列在声明时默认将参数 Queue.X_QUEUE_LEADER_LOCATOR
设置为 client-local
。这确保了队列在应用程序连接的节点上声明。声明式队列必须具有固定的名称,因为它们可能在上下文的其他地方被引用——例如在以下示例所示的监听器中:
<rabbit:listener-container>
<rabbit:listener ref="listener" queue-names="#{someQueue.name}" />
</rabbit:listener-container>
请参阅自动声明交换器、队列和绑定。
该接口的 RabbitMQ 实现是 RabbitAdmin
,当使用 Spring XML 进行配置时,示例如下:
<rabbit:connection-factory id="connectionFactory"/>
<rabbit:admin id="amqpAdmin" connection-factory="connectionFactory"/>
当 CachingConnectionFactory
的缓存模式为 CHANNEL
(默认值)时,RabbitAdmin
实现会自动延迟声明在同一个 ApplicationContext
中声明的队列、交换机和绑定。这些组件在打开到代理的 Connection
时会立即声明。有一些命名空间特性使这非常方便 —— 例如,在 Stocks 示例应用程序中,我们有以下内容:
<rabbit:queue id="tradeQueue"/>
<rabbit:queue id="marketDataQueue"/>
<fanout-exchange name="broadcast.responses"
xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="tradeQueue"/>
</bindings>
</fanout-exchange>
<topic-exchange name="app.stock.marketdata"
xmlns="http://www.springframework.org/schema/rabbit">
<bindings>
<binding queue="marketDataQueue" pattern="${stocks.quote.pattern}"/>
</bindings>
</topic-exchange>
在前面的例子中,我们使用了匿名队列(实际上,在内部,这些队列只是由框架生成的名称,而不是由 broker 生成的)并通过 ID 引用它们。我们也可以使用显式名称声明队列,这些名称也作为它们在上下文中的 bean 定义的标识符。以下示例配置了一个具有显式名称的队列:
<rabbit:queue name="stocks.trade.queue"/>
你可以同时提供 id
和 name
属性。这允许你通过一个独立于队列名称的 ID 来引用队列(例如,在绑定中)。它还允许使用标准的 Spring 特性(例如属性占位符和队列名称的 SpEL 表达式)。当你使用名称作为 bean 标识符时,这些特性是不可用的。
队列可以通过额外的参数进行配置 —— 例如,x-message-ttl
。当你使用命名空间支持时,这些参数以参数名称/参数值对的 Map
形式提供,通过使用 <rabbit:queue-arguments>
元素来定义。以下示例展示了如何实现这一点:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="myDLX"/>
<entry key="x-dead-letter-routing-key" value="dlqRK"/>
</rabbit:queue-arguments>
</rabbit:queue>
默认情况下,参数被视为字符串。对于其他类型的参数,必须提供类型。以下示例展示了如何指定类型:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments value-type="java.lang.Long">
<entry key="x-message-ttl" value="100"/>
</rabbit:queue-arguments>
</rabbit:queue>
当提供混合类型的参数时,必须为每个条目元素提供类型。以下示例展示了如何做到这一点:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl">
<value type="java.lang.Long">100</value>
</entry>
<entry key="x-dead-letter-exchange" value="myDLX"/>
<entry key="x-dead-letter-routing-key" value="dlqRK"/>
</rabbit:queue-arguments>
</rabbit:queue>
从 Spring Framework 3.2 开始,可以更简洁地声明如下:
<rabbit:queue name="withArguments">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value="100" value-type="java.lang.Long"/>
<entry key="x-ha-policy" value="all"/>
</rabbit:queue-arguments>
</rabbit:queue>
当你使用 Java 配置时,Queue.X_QUEUE_LEADER_LOCATOR
参数通过 Queue
类上的 setLeaderLocator()
方法作为一等属性被支持。从 2.1 版本开始,匿名队列默认将此属性设置为 client-local
。这确保了队列在应用程序连接的节点上声明。
RabbitMQ 代理不允许声明具有不匹配参数的队列。例如,如果一个 queue
已经存在且没有 time to live
参数,而你尝试声明它时使用了(例如)key="x-message-ttl" value="100"
,则会抛出异常。
默认情况下,当发生任何异常时,RabbitAdmin
会立即停止处理所有声明。这可能会导致下游问题,例如监听器容器无法初始化,因为另一个队列(在错误队列之后定义的)未声明。
可以通过在 RabbitAdmin
实例上设置 ignore-declaration-exceptions
属性为 true
来修改此行为。此选项指示 RabbitAdmin
记录异常并继续声明其他元素。使用 Java 配置 RabbitAdmin
时,此属性称为 ignoreDeclarationExceptions
。这是一个全局设置,适用于所有元素。队列、交换机和绑定也有类似的属性,仅适用于这些元素。
在 1.6 版本之前,此属性仅在通道发生 IOException
时生效,例如当前属性与期望属性不匹配时。现在,此属性对任何异常都生效,包括 TimeoutException
和其他异常。
此外,任何声明异常都会导致发布一个 DeclarationExceptionEvent
,这是一个 ApplicationEvent
,可以被上下文中的任何 ApplicationListener
消费。该事件包含对管理员、正在声明的元素以及 Throwable
的引用。
Headers Exchange
从 1.3 版本开始,你可以配置 HeadersExchange
以匹配多个头部信息。你还可以指定是匹配任意一个头部还是所有头部必须匹配。以下示例展示了如何实现这一点:
<rabbit:headers-exchange name="headers-test">
<rabbit:bindings>
<rabbit:binding queue="bucket">
<rabbit:binding-arguments>
<entry key="foo" value="bar"/>
<entry key="baz" value="qux"/>
<entry key="x-match" value="all"/>
</rabbit:binding-arguments>
</rabbit:binding>
</rabbit:bindings>
</rabbit:headers-exchange>
从 1.6 版本开始,你可以使用 internal
标志(默认为 false
)来配置 Exchanges
,这样的 Exchange
会通过 RabbitAdmin
在 Broker 上正确配置(如果应用程序上下文中存在 RabbitAdmin
)。如果交换器的 internal
标志为 true
,RabbitMQ 将不允许客户端使用该交换器。这对于死信交换器或交换器到交换器的绑定非常有用,在这种情况下,你不希望发布者直接使用该交换器。
要了解如何使用 Java 配置 AMQP 基础设施,请查看 Stock 示例应用程序,其中有一个 @Configuration
类 AbstractStockRabbitConfiguration
,它又有 RabbitClientConfiguration
和 RabbitServerConfiguration
子类。以下清单展示了 AbstractStockRabbitConfiguration
的代码:
@Configuration
public abstract class AbstractStockAppRabbitConfiguration {
@Bean
public CachingConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory =
new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setMessageConverter(jsonMessageConverter());
configureRabbitTemplate(template);
return template;
}
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public TopicExchange marketDataExchange() {
return new TopicExchange("app.stock.marketdata");
}
// additional code omitted for brevity
}
在 Stock 应用程序中,服务器是通过以下 @Configuration
类进行配置的:
@Configuration
public class RabbitServerConfiguration extends AbstractStockAppRabbitConfiguration {
@Bean
public Queue stockRequestQueue() {
return new Queue("app.stock.request");
}
}
这是整个 @Configuration
类继承链的终点。最终的结果是,在应用程序启动时,TopicExchange
和 Queue
被声明到 broker 中。在服务器配置中,并没有将 TopicExchange
绑定到队列,因为这是在客户端应用程序中完成的。然而,股票请求队列会自动绑定到 AMQP 默认的 exchange。这种行为是由规范定义的。
客户端 @Configuration
类稍微有趣一些。它的声明如下:
@Configuration
public class RabbitClientConfiguration extends AbstractStockAppRabbitConfiguration {
@Value("${stocks.quote.pattern}")
private String marketDataRoutingKey;
@Bean
public Queue marketDataQueue() {
return amqpAdmin().declareQueue();
}
/**
* Binds to the market data exchange.
* Interested in any stock quotes
* that match its routing key.
*/
@Bean
public Binding marketDataBinding() {
return BindingBuilder.bind(
marketDataQueue()).to(marketDataExchange()).with(marketDataRoutingKey);
}
// additional code omitted for brevity
}
客户端通过 AmqpAdmin
上的 declareQueue()
方法声明另一个队列。它将这个队列绑定到市场数据交换器(market data exchange),并使用从属性文件中外部化的路由模式(routing pattern)进行绑定。
队列和交换机的构建器 API
版本 1.6 引入了一个方便的流式 API,用于在使用 Java 配置时配置 Queue
和 Exchange
对象。以下示例展示了如何使用它:
@Bean
public Queue queue() {
return QueueBuilder.nonDurable("foo")
.autoDelete()
.exclusive()
.withArgument("foo", "bar")
.build();
}
@Bean
public Exchange exchange() {
return ExchangeBuilder.directExchange("foo")
.autoDelete()
.internal()
.withArgument("foo", "bar")
.build();
}
有关更多信息,请参阅 org.springframework.amqp.core.QueueBuilder 和 org.springframework.amqp.core.ExchangeBuilder 的 Javadoc。
从 2.0 版本开始,ExchangeBuilder
现在默认创建持久化的 exchanges,以与各个 AbstractExchange
类上的简单构造函数保持一致。要使用 builder 创建一个非持久化的 exchange,请在调用 .build()
之前使用 .durable(false)
。不再提供不带参数的 durable()
方法。
版本 2.2 引入了流畅的 API 来添加“众所周知的”交换器和队列参数…
@Bean
public Queue allArgs1() {
return QueueBuilder.nonDurable("all.args.1")
.ttl(1000)
.expires(200_000)
.maxLength(42)
.maxLengthBytes(10_000)
.overflow(Overflow.rejectPublish)
.deadLetterExchange("dlx")
.deadLetterRoutingKey("dlrk")
.maxPriority(4)
.lazy()
.leaderLocator(LeaderLocator.minLeaders)
.singleActiveConsumer()
.build();
}
@Bean
public DirectExchange ex() {
return ExchangeBuilder.directExchange("ex.with.alternate")
.durable(true)
.alternate("alternate")
.build();
}
声明交换器、队列和绑定的集合
你可以将 Declarable
对象的集合(Queue
、Exchange
和 Binding
)包装在 Declarables
对象中。RabbitAdmin
会在应用上下文中检测这些 bean(以及离散的 Declarable
bean),并在建立连接时(初始连接或连接失败后)在代理上声明这些包含的对象。以下示例展示了如何做到这一点:
@Configuration
public static class Config {
@Bean
public CachingConnectionFactory cf() {
return new CachingConnectionFactory("localhost");
}
@Bean
public RabbitAdmin admin(ConnectionFactory cf) {
return new RabbitAdmin(cf);
}
@Bean
public DirectExchange e1() {
return new DirectExchange("e1", false, true);
}
@Bean
public Queue q1() {
return new Queue("q1", false, false, true);
}
@Bean
public Binding b1() {
return BindingBuilder.bind(q1()).to(e1()).with("k1");
}
@Bean
public Declarables es() {
return new Declarables(
new DirectExchange("e2", false, true),
new DirectExchange("e3", false, true));
}
@Bean
public Declarables qs() {
return new Declarables(
new Queue("q2", false, false, true),
new Queue("q3", false, false, true));
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Declarables prototypes() {
return new Declarables(new Queue(this.prototypeQueueName, false, false, true));
}
@Bean
public Declarables bs() {
return new Declarables(
new Binding("q2", DestinationType.QUEUE, "e2", "k2", null),
new Binding("q3", DestinationType.QUEUE, "e3", "k3", null));
}
@Bean
public Declarables ds() {
return new Declarables(
new DirectExchange("e4", false, true),
new Queue("q4", false, false, true),
new Binding("q4", DestinationType.QUEUE, "e4", "k4", null));
}
}
在 2.1 版本之前,您可以通过定义 Collection<Declarable>
类型的 bean 来声明多个 Declarable
实例。在某些情况下,这可能会导致不希望的副作用,因为管理员必须遍历所有 Collection<?>
类型的 bean。
版本 2.2 在 Declarables
中新增了 getDeclarablesByType
方法;这可以作为一个便利工具使用,例如在声明监听器容器 bean 时。
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
Declarables mixedDeclarables, MessageListener listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueues(mixedDeclarables.getDeclarablesByType(Queue.class).toArray(new Queue[0]));
container.setMessageListener(listener);
return container;
}
条件声明
默认情况下,所有的队列、交换器和绑定都由应用程序上下文中的所有 RabbitAdmin
实例声明(假设它们的 auto-startup="true"
)。
从 2.1.9 版本开始,RabbitAdmin
新增了一个属性 explicitDeclarationsOnly
(默认值为 false
);当该属性设置为 true
时,admin 只会声明那些明确配置为由该 admin 声明的 bean。
从 1.2 版本开始,你可以有条件地声明这些元素。这在应用程序连接到多个代理并且需要指定特定元素应与哪些代理一起声明时特别有用。
表示这些元素的类实现了 Declarable
接口,该接口有两个方法:shouldDeclare()
和 getDeclaringAdmins()
。RabbitAdmin
使用这些方法来确定特定实例是否应该实际处理其 Connection
上的声明。
属性可以作为命名空间中的属性使用,如下例所示:
<rabbit:admin id="admin1" connection-factory="CF1" />
<rabbit:admin id="admin2" connection-factory="CF2" />
<rabbit:admin id="admin3" connection-factory="CF3" explicit-declarations-only="true" />
<rabbit:queue id="declaredByAdmin1AndAdmin2Implicitly" />
<rabbit:queue id="declaredByAdmin1AndAdmin2" declared-by="admin1, admin2" />
<rabbit:queue id="declaredByAdmin1Only" declared-by="admin1" />
<rabbit:queue id="notDeclaredByAllExceptAdmin3" auto-declare="false" />
<rabbit:direct-exchange name="direct" declared-by="admin1, admin2">
<rabbit:bindings>
<rabbit:binding key="foo" queue="bar"/>
</rabbit:bindings>
</rabbit:direct-exchange>
默认情况下,auto-declare
属性为 true
,如果未提供 declared-by
(或为空),则所有 RabbitAdmin
实例都会声明该对象(前提是 admin 的 auto-startup
属性为 true
,这是默认值,并且 admin 的 explicit-declarations-only
属性为 false)。
同样地,你可以使用基于 Java 的 @Configuration
来实现相同的效果。在下面的示例中,组件由 admin1
声明,而不是由 admin2
声明:
@Bean
public RabbitAdmin admin1() {
return new RabbitAdmin(cf1());
}
@Bean
public RabbitAdmin admin2() {
return new RabbitAdmin(cf2());
}
@Bean
public Queue queue() {
Queue queue = new Queue("foo");
queue.setAdminsThatShouldDeclare(admin1());
return queue;
}
@Bean
public Exchange exchange() {
DirectExchange exchange = new DirectExchange("bar");
exchange.setAdminsThatShouldDeclare(admin1());
return exchange;
}
@Bean
public Binding binding() {
Binding binding = new Binding("foo", DestinationType.QUEUE, exchange().getName(), "foo", null);
binding.setAdminsThatShouldDeclare(admin1());
return binding;
}
关于 id
和 name
属性的说明
<rabbit:queue/>
和 <rabbit:exchange/>
元素上的 name
属性反映了代理中实体的名称。对于队列,如果省略了 name
,则会创建一个匿名队列(参见 匿名队列)。
在 2.0 之前的版本中,name
也被注册为 bean 名称的别名(类似于 <bean/>
元素上的 name
)。
这导致了两个问题:
-
它阻止了声明一个具有相同名称的队列和交换器。
-
如果别名包含 SpEL 表达式(
#{…}
),则不会解析该别名。
从版本 2.0 开始,如果你声明这些元素时同时使用了 id
和 name
属性,name
将不再被声明为 bean 名称的别名。如果你希望声明一个队列和一个交换器使用相同的 name
,你必须提供一个 id
。
如果元素仅具有 name
属性,则不会有任何变化。该 bean 仍然可以通过 name
进行引用——例如,在绑定声明中。然而,如果名称包含 SpEL,则仍然无法引用它——你必须提供一个 id
用于引用目的。
AnonymousQueue
一般来说,当你需要一个具有唯一名称、独占且自动删除的队列时,我们建议你使用 AnonymousQueue
,而不是由代理定义的队列名称(使用 ""
作为 Queue
名称会导致代理生成队列名称)。
这是因为:
-
队列实际上是在与代理(broker)建立连接时声明的。这是在 bean 创建并连接在一起之后很久才发生的。使用队列的 bean 需要知道其名称。事实上,当应用程序启动时,代理可能甚至没有运行。
-
如果由于某种原因与代理的连接丢失,管理员会使用相同的名称重新声明
AnonymousQueue
。如果我们使用代理声明的队列,队列名称将会改变。
你可以控制 AnonymousQueue
实例使用的队列名称格式。
默认情况下,队列名称以 spring.gen-
为前缀,后跟 UUID
的 base64 表示形式——例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g
。
你可以在构造函数参数中提供一个 AnonymousQueue.NamingStrategy
实现。以下示例展示了如何实现这一点:
@Bean
public Queue anon1() {
return new AnonymousQueue();
}
@Bean
public Queue anon2() {
return new AnonymousQueue(new AnonymousQueue.Base64UrlNamingStrategy("something-"));
}
@Bean
public Queue anon3() {
return new AnonymousQueue(AnonymousQueue.UUIDNamingStrategy.DEFAULT);
}
第一个 bean 生成的队列名称以 spring.gen-
为前缀,后跟 UUID
的 base64 表示形式 —— 例如:spring.gen-MRBv9sqISkuCiPfOYfpo4g
。第二个 bean 生成的队列名称以 something-
为前缀,后跟 UUID
的 base64 表示形式。第三个 bean 生成的名称仅使用 UUID(不进行 base64 转换)—— 例如,f20c818a-006b-4416-bf91-643590fedb0e
。
base64 编码使用了 RFC 4648 中的“URL 和文件名安全字母表”。尾部的填充字符(=
)会被移除。
您可以提供自己的命名策略,从而可以在队列名称中包含其他信息(例如应用程序名称或客户端主机)。
在使用 XML 配置时,您可以指定命名策略。<rabbit:queue>
元素上有一个 naming-strategy
属性,用于引用实现了 AnonymousQueue.NamingStrategy
的 bean。以下示例展示了如何以各种方式指定命名策略:
<rabbit:queue id="uuidAnon" />
<rabbit:queue id="springAnon" naming-strategy="uuidNamer" />
<rabbit:queue id="customAnon" naming-strategy="customNamer" />
<bean id="uuidNamer" class="org.springframework.amqp.core.AnonymousQueue.UUIDNamingStrategy" />
<bean id="customNamer" class="org.springframework.amqp.core.AnonymousQueue.Base64UrlNamingStrategy">
<constructor-arg value="custom.gen-" />
</bean>
第一个示例创建的名称类似于 spring.gen-MRBv9sqISkuCiPfOYfpo4g
。第二个示例创建的名称包含一个 UUID 的字符串表示。第三个示例创建的名称类似于 custom.gen-MRBv9sqISkuCiPfOYfpo4g
。
你也可以提供自己的命名策略 bean。
从版本 2.1 开始,匿名队列在声明时默认将参数 Queue.X_QUEUE_LEADER_LOCATOR
设置为 client-local
。这确保了队列会在应用程序连接的节点上声明。你可以在构造实例后通过调用 queue.setLeaderLocator(null)
恢复到之前的行为。
恢复自动删除声明
通常情况下,RabbitAdmin
只会恢复在应用上下文中声明为 bean 的队列/交换器/绑定;如果任何这样的声明是自动删除的,当连接丢失时,它们将被代理移除。当连接重新建立时,admin
会重新声明这些实体。通常,通过调用 admin.declareQueue(…)
、admin.declareExchange(…)
和 admin.declareBinding(…)
创建的实体不会被恢复。
从 2.4 版本开始,admin 新增了一个属性 redeclareManualDeclarations
;当该属性为 true
时,admin 除了会恢复应用上下文中的 beans,还会恢复这些实体。
如果调用了 deleteQueue(…)
、deleteExchange(…)
或 removeBinding(…)
,则不会执行单个声明的恢复。当队列和交换器被删除时,相关的绑定将从可恢复实体中移除。
最后,调用 resetAllManualDeclarations()
将阻止恢复任何先前声明的实体。