跳到主要内容
版本:7.0.2

入站通道适配器

DeepSeek V3 中英对照 Inbound Channel Adapter

以下清单展示了 AMQP 入站通道适配器的可用配置选项:

@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName"))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
  • 此适配器的唯一 ID。可选。

  • 转换后的消息应发送到的消息通道。必需。

  • 应从中消费消息的 AMQP 队列名称(逗号分隔的列表)。必需。

  • MessageListenerContainer 的确认模式。设置为 MANUAL 时,传递标签和通道将分别在消息头 amqp_deliveryTagamqp_channel 中提供。用户应用程序负责确认。NONE 表示不进行确认(autoAck)。AUTO 表示当下游流完成时,适配器的容器会进行确认。可选(默认为 AUTO)。请参阅 入站端点确认模式

  • 用于处理与此入站通道适配器相关的横切行为的额外 AOP 通知。可选。

  • 指示此组件创建的通道是否具有事务性的标志。如果为 true,则告知框架使用事务性通道,并根据操作结果(成功则提交,失败则回滚)结束所有操作(发送或接收),异常信号表示回滚。可选(默认为 false)。

  • 指定要创建的并发消费者数量。默认值为 1。我们建议增加并发消费者数量以扩展从队列接收消息的消费能力。但请注意,一旦注册了多个消费者,任何顺序保证都将丢失。通常,对于低流量队列使用一个消费者。当设置了 'consumers-per-queue' 时不允许设置此属性。可选。

  • 指向 RabbitMQ ConnectionFactory 的 Bean 引用。可选(默认为 connectionFactory)。

  • 错误消息应发送到的消息通道。可选。

  • 监听器通道(com.rabbitmq.client.Channel)是否暴露给已注册的 ChannelAwareMessageListener。可选(默认为 true)。

  • 接收 AMQP 消息时要使用的 AmqpHeaderMapper 的引用。可选。默认情况下,只有标准的 AMQP 属性(例如 contentType)会被复制到 Spring Integration 的 MessageHeaders 中。AMQP MessageProperties 中的任何用户自定义头默认情况下不会被默认的 DefaultAmqpHeaderMapper 复制到消息中。如果提供了 request-header-names,则不允许提供此属性。

  • 要从 AMQP 请求映射到 MessageHeaders 的 AMQP 头名称的逗号分隔列表。只有在未提供 'header-mapper' 引用时才能提供此属性。此列表中的值也可以是用于匹配头名称的简单模式(例如 "*" 或 "thing1*, thing2" 或 "*something")。

  • 用于接收 AMQP 消息的 AbstractMessageListenerContainer 的引用。如果提供了此属性,则不应提供与监听器容器配置相关的任何其他属性。换句话说,通过设置此引用,您必须对监听器容器配置负全部责任。唯一的例外是 MessageListener 本身。由于这实际上是此通道适配器实现的核心职责,因此引用的监听器容器必须尚未拥有自己的 MessageListener。可选。

  • 接收 AMQP 消息时要使用的 MessageConverter。可选。

  • 接收 AMQP 消息时要使用的 MessagePropertiesConverter。可选。

  • 指定底层 AbstractMessageListenerContainer 应启动和停止的阶段。启动顺序从低到高,关闭顺序则相反。默认情况下,此值为 Integer.MAX_VALUE,意味着此容器尽可能晚启动并尽可能早停止。可选。

  • 告知 AMQP 代理在单个请求中向每个消费者发送多少条消息。通常,您可以将此值设置得较高以提高吞吐量。它应大于或等于事务大小(请参阅此列表后面的 batch-size 属性)。可选(默认为 1)。

  • 接收超时时间(毫秒)。可选(默认为 1000)。

  • 指定底层 AbstractMessageListenerContainer 恢复尝试之间的间隔(毫秒)。可选(默认为 5000)。

  • 如果为 'true' 且代理上没有任何队列可用,容器在启动期间会抛出致命异常,并且如果在容器运行时删除了队列(在尝试被动声明队列三次之后),容器会停止。如果为 false,容器不会抛出异常,而是进入恢复模式,根据 recovery-interval 尝试重新启动。可选(默认为 true)。

  • 底层 AbstractMessageListenerContainer 停止后,在强制关闭 AMQP 连接之前等待工作线程的时间(毫秒)。如果关闭信号到来时有任何工作线程处于活动状态,只要它们能在此超时时间内完成处理,就允许它们完成。否则,连接将被关闭,消息将保持未确认状态(如果通道是事务性的)。可选(默认为 5000)。

  • 默认情况下,底层 AbstractMessageListenerContainer 使用 SimpleAsyncTaskExecutor 实现,该实现为每个任务启动一个新线程并异步运行它。默认情况下,并发线程数不受限制。请注意,此实现不重用线程。考虑使用线程池化的 TaskExecutor 实现作为替代方案。可选(默认为 SimpleAsyncTaskExecutor)。

  • 默认情况下,底层 AbstractMessageListenerContainer 创建 DefaultTransactionAttribute 的新实例(它采用 EJB 方法在运行时异常时回滚,但不回滚已检查异常)。可选(默认为 DefaultTransactionAttribute)。

  • 在底层 AbstractMessageListenerContainer 上设置对外部 PlatformTransactionManager 的 Bean 引用。事务管理器与 channel-transacted 属性协同工作。如果在框架发送或接收消息时已有事务在进行中且 channelTransacted 标志为 true,则消息传递事务的提交或回滚将推迟到当前事务结束时进行。如果 channelTransacted 标志为 false,则消息传递操作不应用事务语义(它是自动确认的)。更多信息,请参阅 Spring AMQP 事务。可选。

  • 告知 SimpleMessageListenerContainer 在单个请求中处理多少条消息。为获得最佳效果,它应小于或等于 prefetch-count 中设置的值。当设置了 'consumers-per-queue' 时不允许设置此属性。可选(默认为 1)。

  • 指示底层监听器容器应为 DirectMessageListenerContainer 而不是默认的 SimpleMessageListenerContainer。更多信息,请参阅 Spring AMQP 参考手册

  • 当容器的 consumerBatchEnabledtrue 时,决定适配器如何在消息有效负载中呈现消息批次。设置为 MESSAGES(默认)时,有效负载是 List<Message<?>>,其中每条消息都有从传入的 AMQP Message 映射而来的头,有效负载是转换后的 body。设置为 EXTRACT_PAYLOADS 时,有效负载是 List<?>,其中元素是从 AMQP Message 正文转换而来。EXTRACT_PAYLOADS_WITH_HEADERSEXTRACT_PAYLOADS 类似,但此外,每条消息的头从 MessageProperties 映射到相应索引处的 List<Map<String, Object>> 中;头名称为 AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS

备注

container

请注意,当使用 XML 配置外部容器时,不能使用 Spring AMQP 命名空间来定义容器。这是因为命名空间要求至少有一个 <listener/> 元素。在此环境中,监听器是适配器内部的。因此,你必须使用常规的 Spring <bean/> 定义来定义容器,如下例所示:

<bean id="container"
class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="queueNames" value="aName.queue" />
<property name="defaultRequeueRejected" value="false"/>
</bean>
important

尽管 Spring Integration 对 JMS 和 AMQP 的支持相似,但存在重要差异。JMS 入站通道适配器在底层使用 JmsDestinationPollingSource 并期望配置轮询器。而 AMQP 入站通道适配器使用 AbstractMessageListenerContainer 并且是消息驱动的。在这方面,它更类似于 JMS 消息驱动的通道适配器。

从版本5.5开始,AmqpInboundChannelAdapter 可以配置一个 org.springframework.amqp.rabbit.retry.MessageRecoverer 策略,该策略在内部调用重试操作时用于 RecoveryCallback。更多信息请参阅 setMessageRecoverer() 的 JavaDocs。

@Publisher 注解也可以与 @RabbitListener 结合使用:

@Configuration
@EnableIntegration
@EnableRabbit
@EnablePublisher
public static class ContextConfiguration {

@Bean
QueueChannel fromRabbitViaPublisher() {
return new QueueChannel();
}

@RabbitListener(queuesToDeclare = @Queue("publisherQueue"))
@Publisher("fromRabbitViaPublisher")
@Payload("#args.payload.toUpperCase()")
public void consumeForPublisher(String payload) {

}

}

默认情况下,@Publisher AOP 拦截器处理的是方法调用的返回值。然而,@RabbitListener 方法的返回值会被视为 AMQP 回复消息。因此,这种方法无法与 @Publisher 同时使用,所以对于这种组合,推荐使用带有针对方法参数的相应 SpEL 表达式的 @Payload 注解。有关 @Publisher 的更多信息,请参阅基于注解的配置部分。

important

当在监听器容器中使用独占或单活跃消费者时,建议将容器属性 forceStop 设置为 true。这样可以避免出现竞态条件:在容器停止后,另一个消费者可能在此实例完全停止之前就开始消费消息。

批量消息

有关批量消息的更多信息,请参阅 Spring AMQP 文档

要使用Spring Integration生成批量消息,只需使用BatchingRabbitTemplate配置出站端点。

当接收到批量消息时,默认情况下,监听器容器会提取每个片段消息,适配器将为每个片段生成一个 Message<?>。从版本 5.2 开始,如果容器的 deBatchingEnabled 属性设置为 false,则解批处理将由适配器执行,并生成一个 Message<List<?>>,其负载为片段负载的列表(如果适用,则在转换后)。

默认的 BatchingStrategySimpleBatchingStrategy,但可以在适配器上覆盖此设置。

备注

当重试操作需要恢复时,必须将 org.springframework.amqp.rabbit.retry.MessageBatchRecoverer 与批次一起使用。