跳到主要内容

AMQP (RabbitMQ) 支持

QWen Plus 中英对照 AMQP Support AMQP (RabbitMQ) Support

Spring Integration 提供了通道适配器,用于通过高级消息队列协议 (AMQP) 接收和发送消息。

你需要将这个依赖添加到你的项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
<version>6.4.2</version>
</dependency>
xml

以下适配器可用:

Spring Integration 还提供了一种由 AMQP Exchange 和 Queue 支持的点对点消息通道和发布 - 订阅消息通道。

为了提供 AMQP 支持,Spring Integration 依赖于 Spring AMQP,它将核心 Spring 概念应用于基于 AMQP 的消息解决方案的开发。Spring AMQP 提供了与 Spring JMS 类似的语义。

虽然提供的 AMQP Channel Adapters 仅用于单向消息传递(发送或接收),但 Spring Integration 还提供了入站和出站 AMQP 网关以用于请求-回复操作。

提示:你应该熟悉 Spring AMQP 项目的参考文档。它提供了关于 Spring 与 AMQP 的集成,特别是与 RabbitMQ 的集成的更深入的信息。

部分总结

📄️ 入站端点确认模式

默认情况下,入站端点使用 AUTO 确认模式,这意味着当下游集成流完成时(或通过使用 QueueChannel 或 ExecutorChannel 将消息传递给另一个线程时),容器会自动确认消息。将模式设置为 NONE 会配置消费者完全不使用确认(代理在消息发送后立即自动确认消息)。将模式设置为 MANUAL 允许用户代码在处理过程中的其他某个时间点确认消息。为了支持这一点,在这种模式下,端点分别在 amqp\_channel 和 amqp\_deliveryTag 头中提供 Channel 和 deliveryTag。

📄️ 出站端点

以下 outbound 端点有许多类似的配置选项。从 5.2 版开始,增加了 confirm-timeout。通常情况下,当启用发布者确认时,代理会快速返回一个 ack(或 nack),这将被发送到适当的通道。如果在收到确认之前通道已关闭,Spring AMQP 框架将合成一个 nack。“缺失”的 ack 不应该发生,但是,如果你设置了这个属性,端点将定期检查它们,如果在没有收到确认的情况下时间到期,将合成一个 nack。

📄️ 异步 outbound 网关

上一节讨论的网关是同步的,也就是说,发送线程会一直挂起直到收到回复(或发生超时)。Spring Integration 4.3 版本添加了一个异步网关,它使用了来自 Spring AMQP 的 AsyncRabbitTemplate。当消息被发送时,线程在发送操作完成后立即返回,并且当消息被接收时,回复会在模板的监听器容器线程上发送。这在网关由轮询线程调用时非常有用。线程会被释放并且可用于框架中的其他任务。

📄️ 入站消息转换

传入的消息,到达通道适配器或网关后,使用消息转换器将其转换为 spring-messaging Message\<?> 负载。默认情况下,使用 SimpleMessageConverter,它处理 Java 序列化和文本。头部默认使用 DefaultHeaderMapper.inboundMapper() 进行映射。如果发生转换错误且没有定义错误通道,则异常会被抛给容器并由监听器容器的错误处理器处理。默认错误处理器将转换错误视为致命错误,并且消息将被拒绝(如果队列配置了死信交换,则会被路由到死信交换)。如果定义了错误通道,则 ErrorMessage 负载是一个 ListenerExecutionFailedException,具有 failedMessage(无法转换的 Spring AMQP 消息)和原因属性。如果容器的确认模式是 AUTO(默认值),并且错误流在不抛出异常的情况下消费错误,则原始消息将被确认。如果错误流抛出异常,则异常类型与容器的错误处理器一起确定消息是否重新入队。如果容器配置为 AcknowledgeMode.MANUAL,则负载是一个 ManualAckListenerExecutionFailedException,具有额外的 channel 和 deliveryTag 属性。这使得错误流可以调用 basicAck 或 basicNack(或 basicReject)来控制消息的处理方式。