AMQP (RabbitMQ) 支持
Spring Integration 提供了通过使用高级消息队列协议(AMQP)来接收和发送消息的通道适配器。
此依赖项为项目所需:
- Maven
- Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
<version>7.0.2</version>
</dependency>
compile "org.springframework.integration:spring-integration-amqp:7.0.2"
以下适配器可用:
Spring Integration 还提供了基于 AMQP 交换机和队列的点对点消息通道和发布-订阅消息通道。
为了提供AMQP支持,Spring Integration依赖于(Spring AMQP),后者将Spring核心概念应用于基于AMQP的消息解决方案开发。Spring AMQP提供了与(Spring JMS)类似的语义。
虽然提供的 AMQP 通道适配器仅用于单向消息传递(发送或接收),但 Spring Integration 也为请求-应答操作提供了入站和出站 AMQP 网关。
提示:您应该熟悉 Spring AMQP 项目的参考文档。它提供了关于 Spring 与 AMQP(特别是 RabbitMQ)集成的更深入信息。
章节总结
📄️ 入站通道适配器
以下清单展示了 AMQP 入站通道适配器的可能配置选项:
📄️ 轮询入站通道适配器
版本 5.0.1 引入了轮询通道适配器,允许您按需获取单个消息——例如,通过 MessageSourcePollingTemplate 或轮询器。更多信息请参阅可轮询消息源的延迟确认。
📄️ 入站网关
入站网关支持入站通道适配器上的所有属性(除了 channel 被替换为 request-channel 外),以及一些额外的属性。以下列表展示了可用的属性:
📄️ 入站端点确认模式
默认情况下,入站端点使用 AUTO 确认模式,这意味着当下游集成流完成时(或通过使用 QueueChannel 或 ExecutorChannel 将消息传递给另一个线程时),容器会自动确认消息。将模式设置为 NONE 会配置消费者,使其完全不使用确认机制(代理在消息发送后立即自动确认)。将模式设置为 MANUAL 允许用户代码在处理过程中的其他时间点确认消息。为支持此功能,在此模式下,端点会分别在 amqp\_channel 和 amqp\_deliveryTag 头部中提供 Channel 和 deliveryTag。
📄️ 出站端点
以下出站端点具有许多相似的配置选项。从版本 5.2 开始,新增了 confirm-timeout 配置。通常情况下,当启用发布者确认机制时,代理会快速返回一个确认(或否定确认),该确认将被发送到相应的通道。如果在收到确认之前通道被关闭,Spring AMQP 框架将合成一个否定确认。\"缺失\"的确认通常不应发生,但如果设置了此属性,端点将定期检查这些确认,并在超过指定时间仍未收到确认时合成一个否定确认。
📄️ 出站通道适配器
以下示例展示了 AMQP 出站通道适配器的可用属性:
📄️ 出站网关
以下列表展示了 AMQP 出站网关的可能属性:
📄️ 异步出站网关
上一节讨论的网关是同步的,发送线程会一直挂起直到收到回复(或发生超时)。Spring Integration 4.3 版本新增了异步网关,它使用了 Spring AMQP 中的 AsyncRabbitTemplate。当消息发送时,线程在发送操作完成后立即返回;当消息被接收时,回复会在模板的监听器容器线程上发送。这在轮询器线程调用网关时非常有用,线程会被释放并可用于框架中的其他任务。
📄️ 发布者确认和返回的替代机制
当连接工厂配置为支持发布者确认和返回时,以上章节讨论了如何配置消息通道以异步接收确认和返回信息。从版本 5.4 开始,提供了一个通常更易于使用的额外机制。
📄️ 入站消息转换
入站消息到达通道适配器或网关时,会通过消息转换器转换为 spring-messaging 的 Message\<?> 负载。默认使用 SimpleMessageConverter,该转换器可处理 Java 序列化和文本数据。头部映射默认通过 DefaultHeaderMapper.inboundMapper() 实现。若发生转换错误且未定义错误通道,异常将抛给容器并由监听器容器的错误处理器处理。默认错误处理器将转换错误视为致命错误,消息将被拒绝(若队列配置了死信交换器,则路由至死信交换器)。若定义了错误通道,ErrorMessage 的负载将是包含 failedMessage(无法转换的 Spring AMQP 消息)和 cause 属性的 ListenerExecutionFailedException。若容器确认模式为 AUTO(默认值)且错误流在未抛出异常的情况下消费了错误,原始消息将被确认。若错误流抛出异常,异常类型与容器的错误处理器将共同决定消息是否重新入队。若容器配置为 AcknowledgeMode.MANUAL,负载将是包含额外属性 channel 和 deliveryTag 的 ManualAckListenerExecutionFailedException。这使得错误流能够通过调用 basicAck、basicNack(或 basicReject)来控制消息的处置方式。
📄️ 出站消息转换
Spring AMQP 1.4 引入了 ContentTypeDelegatingMessageConverter,它会根据传入消息的 content type 属性选择实际的转换器。入站端点可以使用此功能。
📄️ 出站用户 ID
Spring AMQP 1.6版本引入了一种机制,允许为出站消息指定默认用户ID。一直以来都可以设置AmqpHeaders.USER\_ID头部,现在该头部将优先于默认值使用。这对消息接收方可能很有用。对于入站消息,如果消息发布者设置了该属性,则会在AmqpHeaders.RECEIVED\_USER\_ID头部中提供。请注意,RabbitMQ会验证该用户ID是否为连接的实际用户ID,或者连接是否允许模拟操作。
📄️ 延迟消息交换
Spring AMQP 支持 RabbitMQ 延迟消息交换插件。对于入站消息,x-delay 头部会被映射到 AmqpHeaders.RECEIVED\_DELAY 头部。设置 AMQPHeaders.DELAY 头部会导致在出站消息中设置相应的 x-delay 头部。您也可以在出站端点上指定 delay 和 delayExpression 属性(使用 XML 配置时为 delay-expression)。这些属性的优先级高于 AmqpHeaders.DELAY 头部。
📄️ AMQP 支持的消息通道
目前有两种消息通道实现可供选择。一种是点对点通道,另一种是发布-订阅通道。这两种通道都为底层的 AmqpTemplate 和 SimpleMessageListenerContainer 提供了丰富的配置属性(如本章前面针对通道适配器和网关所示)。不过,我们在此展示的示例仅包含最基础的配置。您可以通过查看 XML 模式来了解所有可用的属性。
📄️ AMQP 消息头
Spring Integration AMQP适配器会自动映射所有AMQP属性和头部信息(这是自4.3版本以来的变化——此前仅映射标准头部)。默认情况下,这些属性会通过DefaultAmqpHeaderMapper在Spring Integration消息头部之间进行双向复制。
📄️ 严格的消息顺序
本节介绍入站和出站消息的消息排序。
📄️ AMQP 示例
要试验 AMQP 适配器,请查看 Spring Integration 示例 git 仓库中提供的示例,地址为 https://github.com/SpringSource/spring-integration-samples
📄️ RabbitMQ 流队列支持
版本 6.0 引入了对 RabbitMQ 流式队列的支持。
📄️ AMQP 1.0 支持
从 7.0 版本开始,Spring Integration 提供了用于 RabbitMQ AMQP 1.0 支持的通道适配器。这些通道适配器基于 org.springframework.amqp:spring-rabbitmq-client 库。