跳到主要内容
版本:7.0.2

严格消息排序

DeepSeek V3 中英对照 Strict Message Ordering

本节介绍入站和出站消息的消息排序。

入站

如果您需要确保入站消息的严格顺序,必须将入站监听器容器的 prefetchCount 属性配置为 1。这是因为,如果某条消息处理失败并重新投递,它会在现有预取消息之后到达。自 Spring AMQP 2.0 版本起,为提升性能,prefetchCount 的默认值已设为 250。严格顺序要求会以降低性能为代价。

出站

考虑以下集成流程:

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(","))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}

假设我们向网关发送消息 ABC。虽然消息 ABC 很可能按顺序发送,但这并不能保证。这是因为模板会为每次发送操作从缓存中“借用”一个通道,而无法保证每次消息都使用同一个通道。一种解决方案是在分割器之前启动一个事务,但在 RabbitMQ 中事务开销很大,可能会使性能降低数百倍。

为了更高效地解决此问题,从5.1版本开始,Spring Integration 提供了 BoundRabbitChannelAdvice,它是一个 HandleMessageAdvice。请参阅处理消息通知。当在拆分器之前应用时,它能确保所有下游操作都在同一通道上执行,并且(可选地)可以等待所有已发送消息的发布者确认被接收(如果连接工厂配置了确认机制)。以下示例展示了如何使用 BoundRabbitChannelAdvice

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}

请注意,建议(advice)和出站适配器(outbound adapter)中使用了相同的 RabbitTemplate(它实现了 RabbitOperations)。该建议在模板的 invoke 方法内运行下游流,因此所有操作都在同一个通道上执行。如果提供了可选的超时参数,当流程完成时,建议会调用 waitForConfirmsOrDie 方法;如果在指定时间内未收到确认,该方法将抛出异常。

important

下游流程中不得出现线程切换(QueueChannelExecutorChannel 及其他类似组件)。