跳到主要内容

严格的消息顺序

QWen Plus 中英对照 Strict Message Ordering

本节描述了入站和出站消息的消息排序。

入站

如果你需要严格保证入站消息的顺序,必须将入站监听器容器的 prefetchCount 属性配置为 1。这是因为,如果一条消息失败并被重新发送,它会在现有的预取消息之后到达。自 Spring AMQP 2.0 版本以来,prefetchCount 默认值为 250,以提高性能。严格排序的要求会以降低性能为代价。

出站

考虑以下集成流:

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(","))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
java

假设我们向网关发送消息 ABC。虽然很可能消息 ABC 是按顺序发送的,但并不能保证。这是因为模板为每次发送操作“借用”了一个来自缓存的通道,并不能保证每个消息都使用相同的通道。一种解决方案是在拆分器之前启动一个事务,但在 RabbitMQ 中事务代价高昂,可能会使性能降低数百倍。

为了解决这个问题,从 5.1 版本开始,Spring Integration 提供了 BoundRabbitChannelAdvice,它是一个 HandleMessageAdvice。请参阅处理消息建议。当在拆分器之前应用时,它确保所有下游操作都在同一个通道上执行,并且可以选择等待所有已发送消息的发布确认(如果连接工厂配置了确认)。以下示例展示了如何使用 BoundRabbitChannelAdvice

@Bean
public IntegrationFlow flow(RabbitTemplate template) {
return IntegrationFlow.from(Gateway.class)
.splitWith(s -> s.delimiters(",")
.advice(new BoundRabbitChannelAdvice(template, Duration.ofSeconds(10))))
.<String, String>transform(String::toUpperCase)
.handle(Amqp.outboundAdapter(template).routingKey("rk"))
.get();
}
java

请注意,相同的 RabbitTemplate(它实现了 RabbitOperations)在建议和 outbound 适配器中使用。建议在模板的 invoke 方法内运行下游流程,以便所有操作都在同一通道上运行。如果提供了可选的超时时间,当流程完成时,建议会调用 waitForConfirmsOrDie 方法,如果在指定的时间内未收到确认信息,则会抛出异常。

important

下游流程中(QueueChannelExecutorChannel 等)绝对不能有线程脱手。