使用 Spring AMQP
本章探讨了使用 Spring AMQP 开发应用程序时作为核心组件的接口和类。
章节摘要
📄️ AMQP 抽象
Spring AMQP 由两个模块组成(每个模块在发行版中由一个 JAR 表示):spring-amqp 和 spring-rabbit。spring-amqp 模块包含 org.springframework.amqp.core 包。在该包中,你可以找到表示核心 AMQP “模型”的类。我们的目标是提供不依赖于任何特定 AMQP 代理实现或客户端库的通用抽象。最终用户代码可以在不同厂商的实现之间更具可移植性,因为它可以仅针对抽象层进行开发。这些抽象随后由特定代理的模块实现,例如 spring-rabbit。目前只有 RabbitMQ 的实现。然而,这些抽象已经在 .NET 中使用 Apache Qpid 和 RabbitMQ 进行了验证。由于 AMQP 在协议级别上运行,原则上,你可以将 RabbitMQ 客户端与支持相同协议版本的任何代理一起使用,但目前我们不会测试任何其他代理。
📄️ 连接与资源管理
尽管我们在前一节描述的 AMQP 模型是通用的,适用于所有实现,但当我们深入到资源管理时,细节则特定于代理(broker)的实现。因此,在本节中,我们将重点介绍仅存在于我们的“spring-rabbit”模块中的代码,因为目前 RabbitMQ 是唯一受支持的实现。
📄️ 添加自定义客户端连接属性
CachingConnectionFactory 现在允许你访问底层的连接工厂,以便设置自定义客户端属性等。以下示例展示了如何实现这一点:
📄️ AmqpTemplate
与 Spring 框架及相关项目提供的许多其他高级抽象一样,Spring AMQP 提供了一个扮演核心角色的“模板”。定义主要操作的接口称为 AmqpTemplate。这些操作涵盖了发送和接收消息的通用行为。换句话说,它们并不特定于任何实现——因此名称中带有“AMQP”。另一方面,该接口的实现与 AMQP 协议的实现紧密相关。与 JMS(本身是一个接口级 API)不同,AMQP 是一个线路级协议。该协议的实现提供了自己的客户端库,因此模板接口的每个实现都依赖于特定的客户端库。目前,只有一个实现:RabbitTemplate。在接下来的示例中,我们经常使用 AmqpTemplate。然而,当您查看配置示例或任何实例化模板或调用 setter 方法的代码片段时,您可以看到实现类型(例如 RabbitTemplate)。
📄️ 发送消息
在发送消息时,您可以使用以下任意一种方法:
🗃️ 接收消息
14 个项目
📄️ 容器与代理命名队列
虽然更推荐使用 AnonymousQueue 实例作为自动删除队列,但从 2.1 版本开始,你可以在监听器容器中使用代理命名的队列。以下示例展示了如何实现这一点:
📄️ 消息转换器
AmqpTemplate 还定义了几种用于发送和接收消息的方法,这些方法委托给 MessageConverter。MessageConverter 为每个方向提供了一个方法:一个用于转换为 Message,另一个用于从 Message 转换。请注意,在转换为 Message 时,除了对象之外,您还可以提供属性。object 参数通常对应于 Message 的主体。以下代码展示了 MessageConverter 接口的定义:
📄️ 修改消息 - 压缩及其他
存在多个扩展点。它们允许你在消息发送到 RabbitMQ 之前或接收之后立即对消息进行一些处理。
📄️ 请求/回复消息传递
AmqpTemplate 还提供了多种 sendAndReceive 方法,这些方法接受与之前描述的单向发送操作(exchange、routingKey 和 Message)相同的参数选项。这些方法在请求-回复场景中非常有用,因为它们在发送之前处理必要的 reply-to 属性的配置,并且可以在内部为此目的创建的独占队列上监听回复消息。
📄️ 配置 Broker
AMQP 规范描述了如何使用该协议在代理上配置队列、交换器和绑定。这些操作(从 0.8 规范及更高版本中是可移植的)存在于 org.springframework.amqp.core 包中的 AmqpAdmin 接口中。该类的 RabbitMQ 实现是位于 org.springframework.amqp.rabbit.core 包中的 RabbitAdmin。
📄️ Broker 事件监听器
当启用 Event Exchange 插件时,如果你在应用程序上下文中添加了一个类型为 BrokerEventListener 的 bean,它会将选定的 broker 事件作为 BrokerEvent 实例发布,这些事件可以通过普通的 Spring ApplicationListener 或 @EventListener 方法进行消费。事件由 broker 发布到一个名为 amq.rabbitmq.event 的主题交换器(topic exchange),每个事件类型都有不同的路由键。监听器使用事件键,这些键用于将一个匿名队列(AnonymousQueue)绑定到交换器,以便监听器仅接收选定的事件。由于这是一个主题交换器,因此可以使用通配符(以及显式请求特定事件),如下例所示:
📄️ 延迟消息交换
版本 1.6 引入了对 Delayed Message Exchange 插件的支持。
📄️ RabbitMQ REST API
当管理插件启用时,RabbitMQ 服务器会暴露一个 REST API 用于监控和配置代理。现在为该 API 提供了一个 Java 绑定。com.rabbitmq.http.client.Client 是一个标准的、即时的、因此也是阻塞的 API。它基于 Spring Web 模块及其 RestTemplate 实现。另一方面,com.rabbitmq.http.client.ReactorNettyClient 是一个基于 Reactor Netty 项目的响应式、非阻塞的实现。
📄️ 异常处理
在使用 RabbitMQ Java 客户端时,许多操作可能会抛出受检异常(checked exceptions)。例如,在很多情况下可能会抛出 IOException 实例。RabbitTemplate、SimpleMessageListenerContainer 以及其他 Spring AMQP 组件会捕获这些异常,并将其转换为 AmqpException 层次结构中的某个异常。这些异常定义在 org.springframework.amqp 包中,而 AmqpException 是该层次结构的基类。
📄️ 事务
Spring Rabbit 框架支持在同步和异步使用场景中进行自动事务管理,提供了多种不同的语义,这些语义可以通过声明式的方式进行选择,这对于熟悉 Spring 事务的现有用户来说是非常熟悉的。这使得许多(如果不是大多数)常见的消息传递模式易于实现。
📄️ 消息监听器容器配置
在配置 SimpleMessageListenerContainer (SMLC) 和 DirectMessageListenerContainer (DMLC) 时,有许多选项与事务和服务质量相关,其中一些选项会相互影响。适用于 SMLC、DMLC 或 StreamListenerContainer (StLC)(参见 使用 RabbitMQ 流插件)的属性在相应的列中用勾号标记。有关帮助您决定哪种容器适合您的应用程序的信息,请参见 选择容器。
📄️ 监听器并发
默认情况下,监听器容器会启动一个消费者,该消费者从队列中接收消息。
📄️ 独占消费者
从 1.3 版本开始,你可以为监听器容器配置一个独占的消费者。这会阻止其他容器从队列中消费消息,直到当前消费者被取消。此类容器的并发度必须为 1。
📄️ 监听器容器队列
版本 1.3 引入了许多改进,用于处理监听器容器中的多个队列。
📄️ 弹性:从错误和 Broker 故障中恢复
Spring AMQP 提供的一些关键(也是最受欢迎的)高级功能与在协议错误或代理失败时的恢复和自动重新连接有关。在本指南中,我们已经看到了所有相关的组件,但在这里将它们全部整合在一起,并分别介绍这些功能和恢复场景将会有所帮助。
📄️ 多 Broker(或集群)支持
版本 2.3 在单个应用程序与多个代理或代理集群之间的通信中增加了更多的便利性。在消费者端的主要好处是,基础设施可以自动将自动声明的队列与适当的代理关联起来。
📄️ 调试
Spring AMQP 提供了广泛的日志记录功能,特别是在 DEBUG 级别。