使用批处理的 @RabbitListener
当接收到一批消息时,通常由容器执行去批处理操作,并且每次调用监听器时只传递一条消息。从版本 2.2 开始,你可以配置监听器容器工厂和监听器,以便在一次调用中接收整个批次的消息。只需将工厂的 batchListener
属性设置为 true
,并将方法的负载参数设置为 List
或 Collection
类型即可:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setBatchListener(true);
return factory;
}
@RabbitListener(queues = "batch.1")
public void listen1(List<Thing> in) {
...
}
// or
@RabbitListener(queues = "batch.2")
public void listen2(List<Message<Thing>> in) {
...
}
将 batchListener
属性设置为 true
会自动关闭工厂创建的容器中的 deBatchingEnabled
容器属性(除非 consumerBatchEnabled
为 true
—— 见下文)。实际上,去批处理的操作从容器移到了监听器适配器,适配器会创建传递给监听器的列表。
一个启用了批处理的工厂不能与多方法监听器一起使用。
从版本 2.2 开始,当逐条接收批量消息时,最后一条消息包含一个布尔头,设置为 true
。你可以通过在监听方法中添加 @Header(AmqpHeaders.LAST_IN_BATCH)
boolean last参数来获取此头信息。该头信息是从
MessageProperties.isLastInBatch() 映射而来的。此外,
AmqpHeaders.BATCH_SIZE` 在每个消息片段中都填充了批量的大小。
此外,SimpleMessageListenerContainer
新增了一个属性 consumerBatchEnabled
。当该属性为 true
时,容器将创建一批消息,最多达到 batchSize
;如果在 receiveTimeout
时间内没有新消息到达,则会传递部分批次的消息。如果接收到生产者创建的批次消息,则会将其拆分为单个消息并添加到消费者端的批次中;因此,实际传递的消息数量可能超过 batchSize
,batchSize
表示从代理接收的消息数量。当 consumerBatchEnabled
为 true
时,deBatchingEnabled
也必须为 true
;容器工厂将强制执行此要求。
@Bean
public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(rabbitConnectionFactory());
factory.setConsumerTagStrategy(consumerTagStrategy());
factory.setBatchListener(true); // configures a BatchMessageListenerAdapter
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
return factory;
}
当使用 consumerBatchEnabled
与 @RabbitListener
时:
@RabbitListener(queues = "batch.1", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch1(List<Message> amqpMessages) {
...
}
@RabbitListener(queues = "batch.2", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch2(List<org.springframework.messaging.Message<Invoice>> messages) {
...
}
@RabbitListener(queues = "batch.3", containerFactory = "consumerBatchContainerFactory")
public void consumerBatch3(List<Invoice> strings) {
...
}
-
第一个方法接收原始的、未转换的
org.springframework.amqp.core.Message
。 -
第二个方法接收
org.springframework.messaging.Message<?>
,其中包含转换后的 payload 和映射的 headers/properties。 -
第三个方法接收转换后的 payload,但无法访问 headers/properties。
你也可以添加一个 Channel
参数,通常在启用 MANUAL
确认模式时使用。这在第三个示例中并不太有用,因为你无法访问 delivery_tag
属性。
Spring Boot 提供了 consumerBatchEnabled
和 batchSize
的配置属性,但没有提供 batchListener
的配置属性。从版本 3.0 开始,在容器工厂中将 consumerBatchEnabled
设置为 true
也会将 batchListener
设置为 true
。当 consumerBatchEnabled
为 true
时,监听器必须是一个批处理监听器。
从版本 3.0 开始,监听器方法可以消费 Collection<?>
或 List<?>
。
批量模式下的监听器不支持回复,因为批量中的消息与生成的单个回复之间可能没有关联。批量监听器仍然支持异步返回类型。