跳到主要内容

检测空闲的异步消费者

DeepSeek V3 中英对照 Detecting Idle Asynchronous Consumers

虽然高效,但异步消费者的一个问题是检测它们何时处于空闲状态——用户可能希望在一段时间内没有消息到达时采取一些措施。

从 1.6 版本开始,现在可以配置监听器容器在没有消息传递一段时间后发布 ListenerContainerIdleEvent。当容器处于空闲状态时,每隔 idleEventInterval 毫秒会发布一个事件。

要配置此功能,请在容器上设置 idleEventInterval。以下示例展示了如何在 XML 和 Java 中为 SimpleMessageListenerContainerSimpleRabbitListenerContainerFactory 进行设置:

<rabbit:listener-container connection-factory="connectionFactory"
...
idle-event-interval="60000"
...
>
<rabbit:listener id="container1" queue-names="foo" ref="myListener" method="handle" />
</rabbit:listener-container>
xml
@Bean
public SimpleMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
...
container.setIdleEventInterval(60000L);
...
return container;
}
java
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setIdleEventInterval(60000L);
...
return factory;
}
java

在这些情况下,容器空闲时每分钟发布一次事件。

事件消费

你可以通过实现 ApplicationListener 来捕获空闲事件 —— 既可以实现一个通用的监听器,也可以实现一个仅接收此特定事件的监听器。你也可以使用 Spring Framework 4.2 中引入的 @EventListener

以下示例将 @RabbitListener@EventListener 组合到一个类中。你需要理解的是,应用程序监听器会接收到所有容器的事件,因此如果你希望根据哪个容器处于空闲状态来采取特定操作,可能需要检查监听器的 ID。你也可以为此目的使用 @EventListenercondition 条件。

事件具有四个属性:

  • source: 监听器容器实例

  • id: 监听器 ID(或容器 bean 名称)

  • idleTime: 事件发布时容器已空闲的时间

  • queueNames: 容器监听的队列名称

以下示例展示了如何使用 @RabbitListener@EventListener 注解创建监听器:

public class Listener {

@RabbitListener(id="someId", queues="#{queue.name}")
public String listen(String foo) {
return foo.toUpperCase();
}

@EventListener(condition = "event.listenerId == 'someId'")
public void onApplicationEvent(ListenerContainerIdleEvent event) {
...
}

}
java
important

事件监听器会看到所有容器的事件。因此,在前面的示例中,我们根据监听器 ID 来缩小接收的事件范围。

警告

如果你希望使用空闲事件来停止监听器容器,你不应该在调用监听器的线程上调用 container.stop()。这样做总是会导致延迟和不必要的日志消息。相反,你应该将事件传递给另一个线程,然后由该线程来停止容器。