跳到主要内容
版本:7.0.2

AMQP (RabbitMQ) 支持

DeepSeek V3 中英对照 AMQP Support AMQP (RabbitMQ) Support

Spring Integration 提供了通过使用高级消息队列协议(AMQP)来接收和发送消息的通道适配器。

此依赖项为项目所需:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
<version>7.0.2</version>
</dependency>

以下适配器可用:

Spring Integration 还提供了基于 AMQP 交换机和队列的点对点消息通道和发布-订阅消息通道。

为了提供AMQP支持,Spring Integration依赖于(Spring AMQP),后者将Spring核心概念应用于基于AMQP的消息解决方案开发。Spring AMQP提供了与(Spring JMS)类似的语义。

虽然提供的 AMQP 通道适配器仅用于单向消息传递(发送或接收),但 Spring Integration 也为请求-应答操作提供了入站和出站 AMQP 网关。

提示:您应该熟悉 Spring AMQP 项目的参考文档。它提供了关于 Spring 与 AMQP(特别是 RabbitMQ)集成的更深入信息。

章节总结

📄️ 入站端点确认模式

默认情况下,入站端点使用 AUTO 确认模式,这意味着当下游集成流完成时(或通过使用 QueueChannel 或 ExecutorChannel 将消息传递给另一个线程时),容器会自动确认消息。将模式设置为 NONE 会配置消费者,使其完全不使用确认机制(代理在消息发送后立即自动确认)。将模式设置为 MANUAL 允许用户代码在处理过程中的其他时间点确认消息。为支持此功能,在此模式下,端点会分别在 amqp\_channel 和 amqp\_deliveryTag 头部中提供 Channel 和 deliveryTag。

📄️ 出站端点

以下出站端点具有许多相似的配置选项。从版本 5.2 开始,新增了 confirm-timeout 配置。通常情况下,当启用发布者确认机制时,代理会快速返回一个确认(或否定确认),该确认将被发送到相应的通道。如果在收到确认之前通道被关闭,Spring AMQP 框架将合成一个否定确认。\"缺失\"的确认通常不应发生,但如果设置了此属性,端点将定期检查这些确认,并在超过指定时间仍未收到确认时合成一个否定确认。

📄️ 入站消息转换

入站消息到达通道适配器或网关时,会通过消息转换器转换为 spring-messaging 的 Message\<?> 负载。默认使用 SimpleMessageConverter,该转换器可处理 Java 序列化和文本数据。头部映射默认通过 DefaultHeaderMapper.inboundMapper() 实现。若发生转换错误且未定义错误通道,异常将抛给容器并由监听器容器的错误处理器处理。默认错误处理器将转换错误视为致命错误,消息将被拒绝(若队列配置了死信交换器,则路由至死信交换器)。若定义了错误通道,ErrorMessage 的负载将是包含 failedMessage(无法转换的 Spring AMQP 消息)和 cause 属性的 ListenerExecutionFailedException。若容器确认模式为 AUTO(默认值)且错误流在未抛出异常的情况下消费了错误,原始消息将被确认。若错误流抛出异常,异常类型与容器的错误处理器将共同决定消息是否重新入队。若容器配置为 AcknowledgeMode.MANUAL,负载将是包含额外属性 channel 和 deliveryTag 的 ManualAckListenerExecutionFailedException。这使得错误流能够通过调用 basicAck、basicNack(或 basicReject)来控制消息的处置方式。