连接与资源管理
尽管我们在前一节中描述的 AMQP 模型是通用的,适用于所有实现,但在我们进入资源管理时,细节则特定于 broker 实现。因此,在本节中,我们将重点放在仅存在于 “spring-rabbit” 模块中的代码上,因为目前 RabbitMQ 是唯一受支持的实现。
管理连接到 RabbitMQ 代理的核心组件是 ConnectionFactory
接口。ConnectionFactory
实现类的职责是提供一个 org.springframework.amqp.rabbit.connection.Connection
的实例,它是 com.rabbitmq.client.Connection
的封装。
选择连接工厂
有三种连接工厂可供选择
-
PooledChannelConnectionFactory
(池化通道连接工厂) -
ThreadChannelConnectionFactory
(线程通道连接工厂) -
CachingConnectionFactory
(缓存连接工厂)
前两个是在 2.3 版本中添加的。
在大多数使用场景中,应该使用 CachingConnectionFactory
。如果你希望确保严格的消息顺序,而不需要使用 Scoped Operations,则可以使用 ThreadChannelConnectionFactory
。PooledChannelConnectionFactory
类似于 CachingConnectionFactory
,它使用单个连接和一个通道池。它的实现更简单,但不支持相关的发布者确认。
所有三种工厂都支持简单的发布者确认。
从 2.3.2 版本开始,在配置 RabbitTemplate
使用独立连接时,你可以将发布连接工厂配置为不同的类型。默认情况下,发布工厂与主工厂类型相同,并且主工厂上设置的任何属性也会传播到发布工厂。
从 3.1 版本开始,AbstractConnectionFactory
包含了 connectionCreatingBackOff
属性,该属性支持在连接模块中实现退避策略。目前,在 createChannel()
的行为中支持处理达到 channelMax
限制时发生的异常,实现了基于尝试次数和间隔的退避策略。
PooledChannelConnectionFactory
该工厂管理一个单一连接和两个基于 Apache Pool2 的通道池。其中一个池用于事务性通道,另一个用于非事务性通道。这些池是采用默认配置的 GenericObjectPool
;提供了一个回调函数来配置这些池;有关更多信息,请参阅 Apache 文档。
要使用此工厂,必须在类路径中包含 Apache commons-pool2
jar。
@Bean
PooledChannelConnectionFactory pcf() throws Exception {
ConnectionFactory rabbitConnectionFactory = new ConnectionFactory();
rabbitConnectionFactory.setHost("localhost");
PooledChannelConnectionFactory pcf = new PooledChannelConnectionFactory(rabbitConnectionFactory);
pcf.setPoolConfigurer((pool, tx) -> {
if (tx) {
// configure the transactional pool
}
else {
// configure the non-transactional pool
}
});
return pcf;
}
ThreadChannelConnectionFactory
该工厂管理一个单一连接和两个 ThreadLocal
,一个用于事务性通道,另一个用于非事务性通道。该工厂确保在同一线程上的所有操作使用相同的通道(只要它保持打开状态)。这有助于实现严格的消息顺序,而无需使用作用域操作。为了避免内存泄漏,如果你的应用程序使用了许多短生命周期的线程,你必须调用工厂的 closeThreadChannel()
来释放通道资源。从版本 2.3.7 开始,一个线程可以将其通道转移给另一个线程。更多信息请参见多线程环境中的严格消息顺序。
CachingConnectionFactory
提供的第三个实现是 CachingConnectionFactory
,默认情况下,它建立了一个可以被应用程序共享的单一连接代理。由于使用 AMQP 进行消息传递的“工作单元”实际上是一个“通道”(在某些方面,这类似于 JMS 中连接和会话之间的关系),因此可以共享连接。连接实例提供了一个 createChannel
方法。CachingConnectionFactory
实现支持这些通道的缓存,并根据它们是否是事务性的来维护单独的通道缓存。在创建 CachingConnectionFactory
实例时,可以通过构造函数提供 'hostname'。你还应该提供 'username' 和 'password' 属性。要配置通道缓存的大小(默认值为 25),可以调用 setChannelCacheSize()
方法。
从 1.3 版本开始,你可以配置 CachingConnectionFactory
来缓存连接以及仅缓存通道。在这种情况下,每次调用 createConnection()
都会创建一个新的连接(或从缓存中获取一个空闲的连接)。关闭连接会将其返回到缓存中(如果缓存大小尚未达到上限)。在此类连接上创建的通道也会被缓存。在某些环境中,使用独立的连接可能会很有用,例如与负载均衡器结合使用时,从 HA 集群中消费消息,连接到不同的集群成员等。要缓存连接,请将 cacheMode
设置为 CacheMode.CONNECTION
。
这并不限制连接的数量。相反,它指定了允许有多少空闲的打开连接。
从版本 1.5.5 开始,提供了一个名为 connectionLimit
的新属性。当设置此属性时,它将限制允许的总连接数。当设置后,如果达到限制,将使用 channelCheckoutTimeLimit
等待连接变为空闲状态。如果超时,将抛出 AmqpTimeoutException
。
当缓存模式为 CONNECTION
时,不支持队列等的自动声明(参见自动声明交换机、队列和绑定)。
此外,截至本文撰写时,amqp-client
库默认会为每个连接创建一个固定大小的线程池(默认大小:Runtime.getRuntime().availableProcessors() * 2
个线程)。当使用大量连接时,您应考虑在 CachingConnectionFactory
上设置自定义的 executor
。这样,所有连接可以共享同一个执行器,并且其线程也可以共享。执行器的线程池应该是无界的,或者根据预期使用情况进行适当设置(通常,至少每个连接一个线程)。如果在每个连接上创建多个通道,池的大小会影响并发性,因此可变(或简单的缓存)线程池执行器将是最合适的选择。
重要的是要理解,缓存大小(默认情况下)并不是一个限制,而仅仅是可以缓存的通道数量。例如,缓存大小为 10 时,实际上可以有任意数量的通道在使用。如果使用了超过 10 个通道,并且它们都被返回到缓存中,那么其中 10 个会进入缓存,其余的则会被物理关闭。
从 1.6 版本开始,默认的 channel 缓存大小已从 1 增加到 25。在高流量、多线程环境中,较小的缓存意味着 channel 会以高速率被创建和关闭。增加默认缓存大小可以避免这种开销。你应该通过 RabbitMQ Admin UI 监控正在使用的 channel,如果发现有许多 channel 被创建和关闭,考虑进一步增加缓存大小。缓存仅在需要时增长(以适应应用程序的并发需求),因此这一变化不会影响现有的低流量应用程序。
从 1.4.2 版本开始,CachingConnectionFactory
引入了一个名为 channelCheckoutTimeout
的属性。当该属性的值大于零时,channelCacheSize
将限制在单个连接上可以创建的通道数量。如果达到了这个限制,调用线程将会阻塞,直到有可用的通道或达到超时时间。如果超时,将会抛出 AmqpTimeoutException
异常。
框架内使用的通道(例如,RabbitTemplate
)会被可靠地返回到缓存中。如果你在框架外创建通道(例如,通过直接访问连接并调用 createChannel()
),你必须确保它们被可靠地返回(通过关闭),可能是在 finally
块中,以避免通道耗尽。
以下示例展示了如何创建一个新的 connection
:
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("somehost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = connectionFactory.createConnection();
使用 XML 时,配置可能如下例所示:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
</bean>
还有一个仅在框架的单元测试代码中可用的 SingleConnectionFactory
实现。它比 CachingConnectionFactory
更简单,因为它不缓存通道,但由于其性能和弹性不足,不适合在简单测试之外的实际使用。如果你出于某种原因需要实现自己的 ConnectionFactory
,AbstractConnectionFactory
基类可能是一个不错的起点。
可以通过使用 rabbit 命名空间快速方便地创建一个 ConnectionFactory
,如下所示:
<rabbit:connection-factory id="connectionFactory"/>
在大多数情况下,这种方法更可取,因为框架可以为你选择最佳默认值。创建的实例是一个 CachingConnectionFactory
。请记住,通道的默认缓存大小为 25。如果你希望缓存更多的通道,可以通过设置 channelCacheSize
属性来设置一个更大的值。在 XML 中,它将如下所示:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<constructor-arg value="somehost"/>
<property name="username" value="guest"/>
<property name="password" value="guest"/>
<property name="channelCacheSize" value="50"/>
</bean>
此外,在命名空间中,你可以添加 channel-cache-size
属性,如下所示:
<rabbit:connection-factory
id="connectionFactory" channel-cache-size="50"/>
默认的缓存模式是 CHANNEL
,但你可以配置为缓存连接。在以下示例中,我们使用 connection-cache-size
:
<rabbit:connection-factory
id="connectionFactory" cache-mode="CONNECTION" connection-cache-size="25"/>
你可以通过使用命名空间来提供 host
和 port
属性,如下所示:
<rabbit:connection-factory
id="connectionFactory" host="somehost" port="5672"/>
或者,如果在集群环境中运行,你可以使用 addresses
属性,如下所示:
<rabbit:connection-factory
id="connectionFactory" addresses="host1:5672,host2:5672" address-shuffle-mode="RANDOM"/>
有关 address-shuffle-mode
的信息,请参见连接到集群。
以下是一个自定义线程工厂的示例,该工厂将线程名称前缀设置为 rabbitmq-
:
<rabbit:connection-factory id="multiHost" virtual-host="/bar" addresses="host1:1234,host2,host3:4567"
thread-factory="tf"
channel-cache-size="10" username="user" password="password" />
<bean id="tf" class="org.springframework.scheduling.concurrent.CustomizableThreadFactory">
<constructor-arg value="rabbitmq-" />
</bean>
地址解析器
从 2.1.15 版本开始,你现在可以使用 AddressResolver
来解析连接地址。这将覆盖 addresses
和 host/port
属性的任何设置。
命名连接
从 1.7 版本开始,提供了一个 ConnectionNameStrategy
,用于注入到 AbstractionConnectionFactory
中。生成的名称用于特定应用程序的目标 RabbitMQ 连接的标识。如果 RabbitMQ 服务器支持,连接名称将显示在管理 UI 中。此值不必唯一,也不能用作连接标识符——例如,在 HTTP API 请求中。此值应该是人类可读的,并且是 ClientProperties
中 connection_name
键的一部分。你可以使用一个简单的 Lambda,如下所示:
connectionFactory.setConnectionNameStrategy(connectionFactory -> "MY_CONNECTION");
ConnectionFactory
参数可用于通过某些逻辑来区分目标连接名称。默认情况下,使用 AbstractConnectionFactory
的 beanName
、表示对象的十六进制字符串以及内部计数器来生成 connection_name
。<rabbit:connection-factory>
命名空间组件还提供了 connection-name-strategy
属性。
SimplePropertyValueConnectionNameStrategy
的一个实现将连接名称设置为应用程序属性。您可以将其声明为 @Bean
并将其注入到连接工厂中,如下例所示:
@Bean
public SimplePropertyValueConnectionNameStrategy cns() {
return new SimplePropertyValueConnectionNameStrategy("spring.application.name");
}
@Bean
public ConnectionFactory rabbitConnectionFactory(ConnectionNameStrategy cns) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
...
connectionFactory.setConnectionNameStrategy(cns);
return connectionFactory;
}
该属性必须存在于应用程序上下文的 Environment
中。
当使用 Spring Boot 及其自动配置的连接工厂时,你只需要声明 ConnectionNameStrategy
@Bean
。Spring Boot 会自动检测该 bean 并将其注入到工厂中。
阻塞连接与资源限制
连接可能会因为与 Memory Alarm 对应的 broker 的交互而被阻塞。从 2.0 版本开始,org.springframework.amqp.rabbit.connection.Connection
可以配置 com.rabbitmq.client.BlockedListener
实例,以便在连接被阻塞和解锁时收到通知。此外,AbstractConnectionFactory
通过其内部的 BlockedListener
实现分别发出 ConnectionBlockedEvent
和 ConnectionUnblockedEvent
。这些事件允许你提供应用程序逻辑,以适当地应对 broker 上的问题,并(例如)采取一些纠正措施。
当应用程序配置了单个 CachingConnectionFactory
时(这是 Spring Boot 自动配置的默认行为),当连接被 Broker 阻塞时,应用程序将停止工作。而当连接被 Broker 阻塞时,其任何客户端都会停止工作。如果我们在同一个应用程序中同时有生产者和消费者,当生产者阻塞连接(因为 Broker 上没有更多资源)而消费者无法释放它们(因为连接被阻塞)时,可能会导致死锁。为了缓解这个问题,我们建议使用另一个具有相同选项的独立 CachingConnectionFactory
实例——一个用于生产者,一个用于消费者。对于在消费者线程上执行的事务性生产者来说,无法使用独立的 CachingConnectionFactory
,因为它们应该复用与消费者事务相关联的 Channel
。
从版本 2.0.2 开始,RabbitTemplate
提供了一个配置选项,可以自动使用第二个连接工厂,除非正在使用事务。更多信息请参见使用独立连接。发布者连接的 ConnectionNameStrategy
与主策略相同,只是在调用方法的结果后附加了 .publisher
。
从 1.7.7 版本开始,提供了一个 AmqpResourceNotAvailableException
,当 SimpleConnection.createChannel()
无法创建 Channel
时(例如,因为达到了 channelMax
限制并且缓存中没有可用的通道),会抛出此异常。你可以在 RetryPolicy
中使用此异常,以便在一定的退避时间后恢复操作。
配置底层客户端连接工厂
CachingConnectionFactory
使用了 Rabbit 客户端的 ConnectionFactory
实例。在设置 CachingConnectionFactory
的等效属性时,许多配置属性会被传递(例如 host
、port
、userName
、password
、requestedHeartBeat
和 connectionTimeout
)。要设置其他属性(例如 clientProperties
),你可以定义一个 Rabbit 工厂的实例,并通过使用 CachingConnectionFactory
的适当构造函数来提供对该实例的引用。当使用命名空间时(如前所述),你需要在 connection-factory
属性中提供对已配置工厂的引用。为了方便起见,提供了一个工厂 bean 来帮助在 Spring 应用上下文中配置连接工厂,如下一节所述。
<rabbit:connection-factory
id="connectionFactory" connection-factory="rabbitConnectionFactory"/>
4.0.x 客户端默认启用了自动恢复功能。虽然 Spring AMQP 兼容此功能,但它有自己的恢复机制,通常不需要客户端的恢复功能。我们建议禁用 amqp-client
的自动恢复功能,以避免在代理可用但连接尚未恢复时出现 AutoRecoverConnectionNotCurrentlyOpenException
异常。例如,当在 RabbitTemplate
中配置了 RetryTemplate
时,即使切换到集群中的另一个代理,你也可能会注意到此异常。由于自动恢复的连接是基于定时器恢复的,使用 Spring AMQP 的恢复机制可能会更快地恢复连接。从 1.7.1 版本开始,Spring AMQP 默认禁用 amqp-client
的自动恢复功能,除非你显式创建自己的 RabbitMQ 连接工厂并将其提供给 CachingConnectionFactory
。由 RabbitConnectionFactoryBean
创建的 RabbitMQ ConnectionFactory
实例也默认禁用了此选项。
RabbitConnectionFactoryBean
和配置 SSL
从 1.4 版本开始,提供了一个方便的 RabbitConnectionFactoryBean
,通过依赖注入来方便地配置底层客户端连接工厂的 SSL 属性。其他 setter 方法委托给底层工厂。以前,您必须以编程方式配置 SSL 选项。以下示例展示了如何配置 RabbitConnectionFactoryBean
:
@Bean
RabbitConnectionFactoryBean rabbitConnectionFactory() {
RabbitConnectionFactoryBean factoryBean = new RabbitConnectionFactoryBean();
factoryBean.setUseSSL(true);
factoryBean.setSslPropertiesLocation(new ClassPathResource("secrets/rabbitSSL.properties"));
return factoryBean;
}
@Bean
CachingConnectionFactory connectionFactory(ConnectionFactory rabbitConnectionFactory) {
CachingConnectionFactory ccf = new CachingConnectionFactory(rabbitConnectionFactory);
ccf.setHost("...");
// ...
return ccf;
}
spring.rabbitmq.ssl.enabled:true
spring.rabbitmq.ssl.keyStore=...
spring.rabbitmq.ssl.keyStoreType=jks
spring.rabbitmq.ssl.keyStorePassword=...
spring.rabbitmq.ssl.trustStore=...
spring.rabbitmq.ssl.trustStoreType=jks
spring.rabbitmq.ssl.trustStorePassword=...
spring.rabbitmq.host=...
...
<rabbit:connection-factory id="rabbitConnectionFactory"
connection-factory="clientConnectionFactory"
host="${host}"
port="${port}"
virtual-host="${vhost}"
username="${username}" password="${password}" />
<bean id="clientConnectionFactory"
class="org.springframework.amqp.rabbit.connection.RabbitConnectionFactoryBean">
<property name="useSSL" value="true" />
<property name="sslPropertiesLocation" value="classpath:secrets/rabbitSSL.properties"/>
</bean>
请参阅 RabbitMQ 文档 获取有关配置 SSL 的信息。要连接 SSL 而不进行证书验证,请省略 keyStore
和 trustStore
配置。下一个示例展示了如何提供密钥和信任库配置。
sslPropertiesLocation
属性是一个 Spring Resource
,指向一个包含以下键的属性文件:
keyStore=file:/secret/keycert.p12
trustStore=file:/secret/trustStore
keyStore.passPhrase=secret
trustStore.passPhrase=secret
keyStore
和 truststore
是 Spring Resources
,指向存储位置。通常,操作系统会保护这个属性文件,应用程序只有读取权限。
从 Spring AMQP 版本 1.5 开始,您可以直接在工厂 bean 上设置这些属性。如果同时提供了离散属性和 sslPropertiesLocation
,后者的属性将覆盖离散值。
从 2.0 版本开始,默认情况下会验证服务器证书,因为这样更安全。如果出于某些原因你希望跳过此验证,请将工厂 bean 的 skipServerCertificateValidation
属性设置为 true
。从 2.1 版本开始,RabbitConnectionFactoryBean
现在默认会调用 enableHostnameVerification()
。要恢复之前的行为,请将 enableHostnameVerification
属性设置为 false
。
从 2.2.5 版本开始,工厂 bean 将默认始终使用 TLS v1.2;在此之前,它在某些情况下使用 v1.1,而在其他情况下使用 v1.2(取决于其他属性)。如果出于某些原因需要使用 v1.1,请设置 sslAlgorithm
属性:setSslAlgorithm("TLSv1.1")
。
连接到集群
要连接到集群,请在 CachingConnectionFactory
上配置 addresses
属性:
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
return ccf;
}
从 3.0 版本开始,底层连接工厂在建立新连接时,将尝试通过随机选择一个地址来连接到主机。要恢复到之前从第一个地址到最后一个地址依次尝试连接的行为,请将 addressShuffleMode
属性设置为 AddressShuffleMode.NONE
。
从 2.3 版本开始,新增了 INORDER
洗牌模式,这意味着在创建连接后,第一个地址会被移动到末尾。如果您希望在所有节点上消费所有分片的数据,您可能希望将此模式与 RabbitMQ 分片插件 一起使用,并配合 CacheMode.CONNECTION
和适当的并发设置。
@Bean
public CachingConnectionFactory ccf() {
CachingConnectionFactory ccf = new CachingConnectionFactory();
ccf.setAddresses("host1:5672,host2:5672,host3:5672");
ccf.setAddressShuffleMode(AddressShuffleMode.INORDER);
return ccf;
}
路由连接工厂
从 1.3 版本开始,引入了 AbstractRoutingConnectionFactory
。该工厂提供了一种机制来配置多个 ConnectionFactory
的映射,并在运行时通过某个 lookupKey
确定目标 ConnectionFactory
。通常情况下,实现会检查线程绑定的上下文。为了方便起见,Spring AMQP 提供了 SimpleRoutingConnectionFactory
,它从 SimpleResourceHolder
中获取当前线程绑定的 lookupKey
。以下示例展示了如何在 XML 和 Java 中配置 SimpleRoutingConnectionFactory
:
<bean id="connectionFactory"
class="org.springframework.amqp.rabbit.connection.SimpleRoutingConnectionFactory">
<property name="targetConnectionFactories">
<map>
<entry key="#{connectionFactory1.virtualHost}" ref="connectionFactory1"/>
<entry key="#{connectionFactory2.virtualHost}" ref="connectionFactory2"/>
</map>
</property>
</bean>
<rabbit:template id="template" connection-factory="connectionFactory" />
public class MyService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void service(String vHost, String payload) {
SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), vHost);
rabbitTemplate.convertAndSend(payload);
SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
}
}
使用后解绑资源非常重要。有关更多信息,请参阅 AbstractRoutingConnectionFactory
的 JavaDoc。
从 1.4 版本开始,RabbitTemplate
支持 SpEL 的 sendConnectionFactorySelectorExpression
和 receiveConnectionFactorySelectorExpression
属性,这些属性在每次 AMQP 协议交互操作(send
、sendAndReceive
、receive
或 receiveAndReply
)时进行评估,解析为提供的 AbstractRoutingConnectionFactory
的 lookupKey
值。你可以在表达式中使用 bean 引用,例如 @vHostResolver.getVHost(#root)
。对于 send
操作,要发送的消息是根评估对象。对于 receive
操作,queueName
是根评估对象。
路由算法如下:如果选择器表达式为 null
或求值为 null
,或者提供的 ConnectionFactory
不是 AbstractRoutingConnectionFactory
的实例,则一切照常工作,依赖于提供的 ConnectionFactory
实现。如果求值结果不为 null
,但没有为该 lookupKey
找到目标 ConnectionFactory
,并且 AbstractRoutingConnectionFactory
配置了 lenientFallback = true
,也会发生相同的情况。在 AbstractRoutingConnectionFactory
的情况下,它会基于 determineCurrentLookupKey()
回退到其 routing
实现。然而,如果 lenientFallback = false
,则会抛出 IllegalStateException
。
命名空间支持还在 <rabbit:template>
组件上提供了 send-connection-factory-selector-expression
和 receive-connection-factory-selector-expression
属性。
从版本 1.4 开始,你可以在监听器容器中配置一个路由连接工厂。在这种情况下,队列名称列表将用作查找键。例如,如果你使用 setQueueNames("thing1", "thing2")
配置容器,查找键将是 [thing1,thing2]
(注意键中没有空格)。
从 1.6.9 版本开始,您可以通过在监听器容器上使用 setLookupKeyQualifier
来为查找键添加限定符。这样做的目的是,例如,可以监听具有相同名称但位于不同虚拟主机中的队列(在这种情况下,您将为每个虚拟主机配置一个连接工厂)。
例如,如果查找键限定符为 thing1
,并且容器监听队列 thing2
,那么你可以使用 thing1[thing2]
作为查找键来注册目标连接工厂。
目标(以及默认的,如果提供的话)连接工厂必须在发布者确认和返回设置上保持一致。请参阅发布者确认和返回。
从版本 2.4.4 开始,可以禁用此验证。如果你有需要在 confirms
和 returns
之间值不相等的情况,可以使用 AbstractRoutingConnectionFactory#setConsistentConfirmsReturns
来关闭验证。请注意,添加到 AbstractRoutingConnectionFactory
的第一个连接工厂将决定 confirms
和 returns
的通用值。
在某些情况下,您可能希望检查某些消息的确认/返回结果,而忽略其他消息。例如:
@Bean
public RabbitTemplate rabbitTemplate() {
final com.rabbitmq.client.ConnectionFactory cf = new com.rabbitmq.client.ConnectionFactory();
cf.setHost("localhost");
cf.setPort(5672);
CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(cf);
cachingConnectionFactory.setPublisherConfirmType(CachingConnectionFactory.ConfirmType.CORRELATED);
PooledChannelConnectionFactory pooledChannelConnectionFactory = new PooledChannelConnectionFactory(cf);
final Map<Object, ConnectionFactory> connectionFactoryMap = new HashMap<>(2);
connectionFactoryMap.put("true", cachingConnectionFactory);
connectionFactoryMap.put("false", pooledChannelConnectionFactory);
final AbstractRoutingConnectionFactory routingConnectionFactory = new SimpleRoutingConnectionFactory();
routingConnectionFactory.setConsistentConfirmsReturns(false);
routingConnectionFactory.setDefaultTargetConnectionFactory(pooledChannelConnectionFactory);
routingConnectionFactory.setTargetConnectionFactories(connectionFactoryMap);
final RabbitTemplate rabbitTemplate = new RabbitTemplate(routingConnectionFactory);
final Expression sendExpression = new SpelExpressionParser().parseExpression(
"messageProperties.headers['x-use-publisher-confirms'] ?: false");
rabbitTemplate.setSendConnectionFactorySelectorExpression(sendExpression);
}
这样,带有 x-use-publisher-confirms: true
头信息的消息将通过缓存连接发送,你可以确保消息的传递。有关确保消息传递的更多信息,请参阅 Publisher Confirms and Returns。
队列亲和性与 LocalizedQueueConnectionFactory
在集群中使用 HA 队列时,为了获得最佳性能,您可能需要连接到主队列所在的物理代理。可以将 CachingConnectionFactory
配置为使用多个代理地址。这是为了实现故障转移,客户端会根据配置的 AddressShuffleMode
顺序尝试连接。LocalizedQueueConnectionFactory
使用管理插件提供的 REST API 来确定哪个节点是队列的主节点。然后,它创建(或从缓存中检索)一个仅连接到该节点的 CachingConnectionFactory
。如果连接失败,则会确定新的主节点,消费者将连接到该节点。LocalizedQueueConnectionFactory
配置了一个默认的连接工厂,以防无法确定队列的物理位置,在这种情况下,它会像往常一样连接到集群。
LocalizedQueueConnectionFactory
是一个 RoutingConnectionFactory
,而 SimpleMessageListenerContainer
使用队列名称作为查找键,如上面 Routing Connection Factory 部分所讨论的那样。
出于这个原因(使用队列名称进行查找),LocalizedQueueConnectionFactory
只能在容器配置为监听单个队列时使用。
必须在每个节点上启用 RabbitMQ 管理插件。
此连接工厂适用于长连接,例如 SimpleMessageListenerContainer
使用的连接。它不适用于短连接,例如与 RabbitTemplate
一起使用,因为在建立连接之前调用 REST API 会产生开销。此外,对于发布操作,队列是未知的,消息无论如何都会发布到所有集群成员,因此查找节点的逻辑几乎没有价值。
以下示例配置展示了如何配置工厂:
@Autowired
private ConfigurationProperties props;
@Bean
public CachingConnectionFactory defaultConnectionFactory() {
CachingConnectionFactory cf = new CachingConnectionFactory();
cf.setAddresses(this.props.getAddresses());
cf.setUsername(this.props.getUsername());
cf.setPassword(this.props.getPassword());
cf.setVirtualHost(this.props.getVirtualHost());
return cf;
}
@Bean
public LocalizedQueueConnectionFactory queueAffinityCF(
@Qualifier("defaultConnectionFactory") ConnectionFactory defaultCF) {
return new LocalizedQueueConnectionFactory(defaultCF,
StringUtils.commaDelimitedListToStringArray(this.props.getAddresses()),
StringUtils.commaDelimitedListToStringArray(this.props.getAdminUris()),
StringUtils.commaDelimitedListToStringArray(this.props.getNodes()),
this.props.getVirtualHost(), this.props.getUsername(), this.props.getPassword(),
false, null);
}
请注意,前三个参数分别是 addresses
、adminUris
和 nodes
的数组。这些参数是位置对应的,也就是说,当容器尝试连接到队列时,它会使用 admin API 来确定哪个节点是该队列的主节点,并连接到与该节点在同一数组位置中的地址。
从 3.0 版本开始,RabbitMQ 不再使用 http-client
来访问 Rest API。默认情况下,如果 spring-webflux
在类路径上,则使用 Spring Webflux 的 WebClient
;否则使用 RestTemplate
。
要将 WebFlux
添加到类路径中:
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
</dependency>
compile 'org.springframework.amqp:spring-rabbit'
你也可以通过实现 LocalizedQueueConnectionFactory.NodeLocator
并重写其 createClient
、restCall
以及可选的 close
方法来使用其他 REST 技术。
lqcf.setNodeLocator(new NodeLocator<MyClient>() {
@Override
public MyClient createClient(String userName, String password) {
...
}
@Override
public HashMap<String, Object> restCall(MyClient client, URI uri) {
...
});
});
该框架提供了 WebFluxNodeLocator
和 RestTemplateNodeLocator
,默认选项如上所述。
发布者确认与返回
通过将 CachingConnectionFactory
的属性 publisherConfirmType
设置为 ConfirmType.CORRELATED
并将 publisherReturns
属性设置为 true
,可以支持已确认(带相关性)和返回的消息。
当这些选项被设置时,由工厂创建的 Channel
实例会被包装在一个 PublisherCallbackChannel
中,该通道用于促进回调。当获得这样的通道时,客户端可以向 Channel
注册一个 PublisherCallbackChannel.Listener
。PublisherCallbackChannel
实现包含将确认或返回路由到适当监听器的逻辑。这些功能将在以下部分进一步解释。
另请参阅 相关发布者确认和返回 以及 作用域操作 中的 simplePublisherConfirms
。
有关更多背景信息,请参阅 RabbitMQ 团队的博客文章 Introducing Publisher Confirms。
连接与通道监听器
连接工厂支持注册 ConnectionListener
和 ChannelListener
实现。这使您可以接收与连接和通道相关的事件的通知。(RabbitAdmin
使用 ConnectionListener
在连接建立时执行声明 - 有关更多信息,请参阅 自动声明交换器、队列和绑定)。以下清单显示了 ConnectionListener
接口的定义:
@FunctionalInterface
public interface ConnectionListener {
void onCreate(Connection connection);
default void onClose(Connection connection) {
}
default void onShutDown(ShutdownSignalException signal) {
}
}
从 2.0 版本开始,org.springframework.amqp.rabbit.connection.Connection
对象可以配置 com.rabbitmq.client.BlockedListener
实例,以便在连接被阻塞和解锁时收到通知。以下示例展示了 ChannelListener
接口的定义:
@FunctionalInterface
public interface ChannelListener {
void onCreate(Channel channel, boolean transactional);
default void onShutDown(ShutdownSignalException signal) {
}
}
请参阅发布是异步的——如何检测成功和失败了解可能需要注册 ChannelListener
的场景。
记录通道关闭事件
版本 1.5 引入了一种机制,使用户能够控制日志记录级别。
AbstractConnectionFactory
使用默认策略来记录通道关闭日志,如下所示:
-
正常通道关闭(200 OK)不会被记录。
-
如果通道由于被动队列声明失败而关闭,它会在 DEBUG 级别记录。
-
如果通道由于独占消费者条件导致
basic.consume
被拒绝而关闭,它会在 DEBUG 级别记录(自 3.1 版本起,之前为 INFO 级别)。 -
所有其他情况都会在 ERROR 级别记录。
要修改此行为,您可以将自定义的 ConditionalExceptionLogger
注入到 CachingConnectionFactory
的 closeExceptionLogger
属性中。
此外,AbstractConnectionFactory.DefaultChannelCloseLogger
现在被公开,允许对其进行子类化。
另请参阅 消费者事件。
运行时缓存属性
从 1.6 版本开始,CachingConnectionFactory
现在通过 getCacheProperties()
方法提供缓存统计信息。这些统计信息可用于调整缓存,以在生产环境中优化其性能。例如,高水位标记可用于确定是否应增加缓存大小。如果它等于缓存大小,您可能需要考虑进一步增加。下表描述了 CacheMode.CHANNEL
的属性:
表 1. CacheMode.CHANNEL 的缓存属性
属性 | 含义 |
---|---|
connectionName | 由 ConnectionNameStrategy 生成的连接名称。 |
channelCacheSize | 当前配置的允许空闲的最大通道数。 |
localPort | 连接的本地端口(如果可用)。这可以用于与 RabbitMQ 管理界面上的连接和通道进行关联。 |
idleChannelsTx | 当前空闲(缓存)的事务性通道数。 |
idleChannelsNotTx | 当前空闲(缓存)的非事务性通道数。 |
idleChannelsTxHighWater | 曾经同时空闲(缓存)的事务性通道的最大数量。 |
idleChannelsNotTxHighWater | 曾经同时空闲(缓存)的非事务性通道的最大数量。 |
下表描述了 CacheMode.CONNECTION
的属性:
表 2. CacheMode.CONNECTION 的缓存属性
属性 | 含义 |
---|---|
connectionName:<localPort> | 由 ConnectionNameStrategy 生成的连接名称。 |
openConnections | 表示与 broker 连接的连接对象数量。 |
channelCacheSize | 当前配置的允许空闲的最大通道数。 |
connectionCacheSize | 当前配置的允许空闲的最大连接数。 |
idleConnections | 当前空闲的连接数。 |
idleConnectionsHighWater | 曾经并发空闲的最大连接数。 |
idleChannelsTx:<localPort> | 当前空闲的(缓存的)事务通道数。您可以使用属性名称中的 localPort 部分与 RabbitMQ 管理界面上的连接和通道进行关联。 |
idleChannelsNotTx:<localPort> | 当前空闲的(缓存的)非事务通道数。您可以使用属性名称中的 localPort 部分与 RabbitMQ 管理界面上的连接和通道进行关联。 |
idleChannelsTxHighWater:<localPort> | 曾经并发空闲的(缓存的)事务通道的最大数量。您可以使用属性名称中的 localPort 部分与 RabbitMQ 管理界面上的连接和通道进行关联。 |
idleChannelsNotTxHighWater:<localPort> | 曾经并发空闲的(缓存的)非事务通道的最大数量。您可以使用属性名称中的 localPort 部分与 RabbitMQ 管理界面上的连接和通道进行关联。 |
cacheMode
属性(CHANNEL
或 CONNECTION
)也包含在内。
图 1. JVisualVM 示例
RabbitMQ 自动连接/拓扑恢复
自 Spring AMQP 的第一个版本以来,该框架就在代理(broker)故障时提供了自己的连接和通道恢复机制。此外,正如在配置代理中所讨论的那样,当连接重新建立时,RabbitAdmin
会重新声明任何基础设施 bean(队列等)。因此,它不依赖于 amqp-client
库现在提供的自动恢复功能。amqp-client
默认启用了自动恢复功能。这两种恢复机制之间存在一些不兼容性,因此,默认情况下,Spring 将底层 RabbitMQ connectionFactory
的 automaticRecoveryEnabled
属性设置为 false
。即使该属性为 true
,Spring 也会通过立即关闭任何恢复的连接来有效地禁用它。
默认情况下,只有在失败后重新连接时定义为 bean 的元素(队列、交换机、绑定)才会被重新声明。有关如何更改此行为,请参阅恢复自动删除声明。