接收消息
消息接收总是比发送稍微复杂一些。接收 Message
有两种方式。较简单的方式是使用轮询方法调用,一次轮询一个 Message
。更复杂但更常见的方法是注册一个侦听器,按需异步接收 Messages
。我们将在接下来的两个小节中分别举例说明这两种方式。
章节小结
📄️ 轮询消费者
AmqpTemplate 本身可以用于轮询式的消息接收。默认情况下,如果没有可用的消息,它会立即返回 null,不会进行阻塞。从 1.5 版本开始,你可以设置一个 receiveTimeout(以毫秒为单位),接收方法会阻塞最多这么长时间,等待消息的到来。如果值小于零,则表示无限期阻塞(或者至少阻塞到与代理的连接丢失为止)。1.6 版本引入了接收方法的变体,允许在每次调用时传递超时时间。
📄️ 异步消费者
之前的 prefetch 默认值为 1,这可能导致高效消费者的利用率不足。从 2.0 版本开始,默认的 prefetch 值现在为 250,这在大多数常见场景下应该能够保持消费者的忙碌状态,从而提高吞吐量。
📄️ 批量消息
批量消息(由生产者创建)会被监听器容器自动拆分为单条消息(使用 springBatchFormat 消息头)。如果批量中的任何一条消息被拒绝,整个批次的消息都会被拒绝。有关批处理的更多信息,请参阅 批处理。
📄️ 消费者事件
每当监听器(消费者)遇到某种故障时,容器会发布应用程序事件。事件 ListenerContainerConsumerFailedEvent 具有以下属性:
📄️ 消费者标签
你可以提供一个策略来生成消费者标签。默认情况下,消费者标签是由代理(broker)生成的。以下代码展示了 ConsumerTagStrategy 接口的定义:
🗃️ 注解驱动的监听器端点
15 个项目
📄️ 使用批量处理的 @RabbitListener
在接收到一批消息时,通常由容器执行解批处理,并且每次调用监听器时只传递一条消息。从版本 2.2 开始,你可以配置监听器容器工厂和监听器,以便在一次调用中接收整个批处理的消息。只需设置工厂的 batchListener 属性,并将方法的有效负载参数设置为 List 或 Collection 类型即可:
📄️ 使用容器工厂
Listener container factories 的引入是为了支持 @RabbitListener 注解以及通过 RabbitListenerEndpointRegistry 注册容器,正如在 Programmatic Endpoint Registration 部分所讨论的那样。
📄️ 异步 @RabbitListener 返回类型
@RabbitListener(和 @RabbitHandler)方法可以指定异步返回类型 CompletableFuture\<?> 和 Mono\<?>,从而允许异步发送回复。ListenableFuture\<?> 不再受支持;它已被 Spring Framework 弃用。
📄️ 线程与异步消费者
多个不同的线程参与了异步消费者的处理。
📄️ 选择一个容器
版本 2.0 引入了 DirectMessageListenerContainer(DMLC)。之前,只有 SimpleMessageListenerContainer(SMLC)可用。SMLC 使用内部队列和每个消费者专用的线程。如果容器配置为监听多个队列,则使用相同的消费者线程来处理所有队列。并发性由 concurrentConsumers 和其他属性控制。当消息从 RabbitMQ 客户端到达时,客户端线程通过队列将消息传递给消费者线程。这种架构是必要的,因为在早期版本的 RabbitMQ 客户端中,无法实现多个并发传递。较新版本的客户端改进了线程模型,现在可以支持并发性。这使得引入 DMLC 成为可能,现在监听器直接在 RabbitMQ 客户端线程上调用。因此,它的架构实际上比 SMLC 更“简单”。然而,这种方法有一些限制,SMLC 的某些功能在 DMLC 中不可用。此外,并发性由 consumersPerQueue(以及客户端库的线程池)控制。concurrentConsumers 和相关的属性在此容器中不可用。
📄️ 检测空闲的异步消费者
虽然异步消费者效率很高,但它们存在一个问题,即如何检测它们何时处于空闲状态——如果在一段时间内没有消息到达,用户可能希望采取一些措施。
📄️ Micrometer 集成
从 2.2 版本开始,如果在类路径上检测到 Micrometer,并且应用程序上下文中存在一个 MeterRegistry(或者恰好有一个被注解为 @Primary 的 MeterRegistry,例如在使用 Spring Boot 时),监听器容器将自动为监听器创建和更新 Micrometer Timer。可以通过将容器属性 micrometerEnabled 设置为 false 来禁用这些计时器。
📄️ Micrometer 观察
从 3.0 版本开始,RabbitTemplate 和监听器容器现在支持使用 Micrometer 进行观测。