AMQP (RabbitMQ) 支持
Spring Integration 提供了通道适配器,用于通过高级消息队列协议 (AMQP) 接收和发送消息。
你需要将这个依赖添加到你的项目中:
- Maven
- Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
<version>6.4.2</version>
</dependency>
compile "org.springframework.integration:spring-integration-amqp:6.4.2"
以下适配器可用:
Spring Integration 还提供了一种由 AMQP Exchange 和 Queue 支持的点对点消息通道和发布 - 订阅消息通道。
为了提供 AMQP 支持,Spring Integration 依赖于 Spring AMQP,它将核心 Spring 概念应用于基于 AMQP 的消息解决方案的开发。Spring AMQP 提供了与 Spring JMS 类似的语义。
虽然提供的 AMQP Channel Adapters 仅用于单向消息传递(发送或接收),但 Spring Integration 还提供了入站和出站 AMQP 网关以用于请求-回复操作。
提示:你应该熟悉 Spring AMQP 项目的参考文档。它提供了关于 Spring 与 AMQP 的集成,特别是与 RabbitMQ 的集成的更深入的信息。
部分总结
📄️ 入站通道适配器
以下列表显示了 AMQP 入站通道适配器的可能配置选项:
📄️ 轮询入站通道适配器
版本 5.0.1 引入了一个轮询通道适配器,让你可以按需获取单个消息——例如,使用 MessageSourcePollingTemplate 或轮询器。更多信息,请参见延迟确认轮询消息源。
📄️ 入站网关
入站网关支持入站通道适配器的所有属性(除了 'channel' 被 'request-channel' 替代之外),还有一些额外的属性。以下列表显示了可用的属性:
📄️ 入站端点确认模式
默认情况下,入站端点使用 AUTO 确认模式,这意味着当下游集成流完成时(或通过使用 QueueChannel 或 ExecutorChannel 将消息传递给另一个线程时),容器会自动确认消息。将模式设置为 NONE 会配置消费者完全不使用确认(代理在消息发送后立即自动确认消息)。将模式设置为 MANUAL 允许用户代码在处理过程中的其他某个时间点确认消息。为了支持这一点,在这种模式下,端点分别在 amqp\_channel 和 amqp\_deliveryTag 头中提供 Channel 和 deliveryTag。
📄️ 出站端点
以下 outbound 端点有许多类似的配置选项。从 5.2 版开始,增加了 confirm-timeout。通常情况下,当启用发布者确认时,代理会快速返回一个 ack(或 nack),这将被发送到适当的通道。如果在收到确认之前通道已关闭,Spring AMQP 框架将合成一个 nack。“缺失”的 ack 不应该发生,但是,如果你设置了这个属性,端点将定期检查它们,如果在没有收到确认的情况下时间到期,将合成一个 nack。
📄️ 输出通道适配器
以下示例显示了 AMQP 外发通道适配器的可用属性:
📄️ 出站网关
以下列表显示了 AMQP 外发网关的可能属性:
📄️ 异步 outbound 网关
上一节讨论的网关是同步的,也就是说,发送线程会一直挂起直到收到回复(或发生超时)。Spring Integration 4.3 版本添加了一个异步网关,它使用了来自 Spring AMQP 的 AsyncRabbitTemplate。当消息被发送时,线程在发送操作完成后立即返回,并且当消息被接收时,回复会在模板的监听器容器线程上发送。这在网关由轮询线程调用时非常有用。线程会被释放并且可用于框架中的其他任务。
📄️ 发布确认和返回的替代机制
当连接工厂配置为发布确认和返回时,上述部分讨论了配置消息通道以异步接收确认和返回。从 5.4 版开始,有一种额外的机制,通常更容易使用。
📄️ 入站消息转换
传入的消息,到达通道适配器或网关后,使用消息转换器将其转换为 spring-messaging Message\<?> 负载。默认情况下,使用 SimpleMessageConverter,它处理 Java 序列化和文本。头部默认使用 DefaultHeaderMapper.inboundMapper() 进行映射。如果发生转换错误且没有定义错误通道,则异常会被抛给容器并由监听器容器的错误处理器处理。默认错误处理器将转换错误视为致命错误,并且消息将被拒绝(如果队列配置了死信交换,则会被路由到死信交换)。如果定义了错误通道,则 ErrorMessage 负载是一个 ListenerExecutionFailedException,具有 failedMessage(无法转换的 Spring AMQP 消息)和原因属性。如果容器的确认模式是 AUTO(默认值),并且错误流在不抛出异常的情况下消费错误,则原始消息将被确认。如果错误流抛出异常,则异常类型与容器的错误处理器一起确定消息是否重新入队。如果容器配置为 AcknowledgeMode.MANUAL,则负载是一个 ManualAckListenerExecutionFailedException,具有额外的 channel 和 deliveryTag 属性。这使得错误流可以调用 basicAck 或 basicNack(或 basicReject)来控制消息的处理方式。
📄️ outboundMessageConversion
Spring AMQP 1.4 引入了 ContentTypeDelegatingMessageConverter,其中实际的转换器是根据传入的内容类型消息属性选择的。这可以被入站端点使用。
📄️ Outbound 用户 ID
Spring AMQP 1.6 引入了一种机制,允许为 outbound 消息指定默认的用户 ID。一直以来都可以设置 AmqpHeaders.USER\_ID 头信息,现在它优先于默认值。这对于消息接收者可能是有用的。对于 inbound 消息,如果消息发布者设置了该属性,则它会在 AmqpHeaders.RECEIVED\_USER\_ID 头信息中提供。请注意,RabbitMQ 会验证用户 ID 是否是连接的实际用户 ID 或连接是否允许模拟。
📄️ 延迟消息交换
Spring AMQP 支持 RabbitMQ 延迟消息交换插件。对于传入的消息,x-delay 头被映射到 AmqpHeaders.RECEIVED\_DELAY 头。设置 AMQPHeaders.DELAY 头会导致在传出消息中相应的 x-delay 头被设置。您还可以在传出端点上指定 delay 和 delayExpression 属性(在使用 XML 配置时为 delay-expression)。这些属性优先于 AmqpHeaders.DELAY 头。
📄️ 基于 AMQP 的消息通道
有两种消息通道实现可供选择。一种是点对点,另一种是发布-订阅。这两种通道都提供了大量配置属性,用于底层的 AmqpTemplate 和 SimpleMessageListenerContainer(如本章前面针对通道适配器和网关所示)。但是,这里展示的示例配置最少。可以查阅 XML 模式以查看可用的属性。
📄️ AMQP 消息头
Spring Integration AMQP 适配器自动映射所有 AMQP 属性和头信息。(这与 4.3 版本有所不同 - 之前,只有标准头信息被映射)。默认情况下,这些属性是通过使用 DefaultAmqpHeaderMapper 复制到 Spring Integration MessageHeaders 和从 MessageHeaders 复制的。
📄️ 严格的消息顺序
本节描述了入站和出站消息的消息排序。
📄️ AMQP 示例
要试验 AMQP 适配器,可以查看位于 Spring Integration 示例 git 仓库中的示例,地址为 https://github.com/SpringSource/spring-integration-samples
📄️ RabbitMQ Stream 队列支持
Version 6.0 引入了对 RabbitMQ Stream Queues 的支持。