跳到主要内容

入站通道适配器

QWen Plus 中英对照 Inbound Channel Adapter

以下列表显示了 AMQP 入站通道适配器的可能配置选项:

@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName"))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
java
备注

容器

请注意,在使用 XML 配置外部容器时,您不能使用 Spring AMQP 命名空间来定义容器。这是因为命名空间需要至少一个 <listener/> 元素。在这种环境下,监听器是适配器内部的。因此,您必须使用正常的 Spring <bean/> 定义来定义容器,如下例所示:

<bean id="container"
class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="queueNames" value="aName.queue" />
<property name="defaultRequeueRejected" value="false"/>
</bean>
xml
important

尽管 Spring Integration 的 JMS 和 AMQP 支持是相似的,但存在重要差异。JMS 入站通道适配器在内部使用 JmsDestinationPollingSource,并期望配置轮询器。AMQP 入站通道适配器使用 AbstractMessageListenerContainer,并且是由消息驱动的。在这方面,它更类似于 JMS 消息驱动的通道适配器。

从 5.5 版本开始,AmqpInboundChannelAdapter 可以配置为使用 org.springframework.amqp.rabbit.retry.MessageRecoverer 策略,在重试操作内部调用时,该策略会在 RecoveryCallback 中使用。有关更多信息,请参阅 setMessageRecoverer() 的 JavaDocs。

@Publisher 注解也可以与 @RabbitListener 结合使用:

@Configuration
@EnableIntegration
@EnableRabbit
@EnablePublisher
public static class ContextConfiguration {

@Bean
QueueChannel fromRabbitViaPublisher() {
return new QueueChannel();
}

@RabbitListener(queuesToDeclare = @Queue("publisherQueue"))
@Publisher("fromRabbitViaPublisher")
@Payload("#args.payload.toUpperCase()")
public void consumeForPublisher(String payload) {

}

}
java

默认情况下,@Publisher AOP 拦截器处理方法调用的返回值。但是,来自 @RabbitListener 方法的返回值被视为 AMQP 回复消息。因此,这种做法不能与 @Publisher 一起使用,所以推荐使用带有针对方法参数的相应 SpEL 表达式的 @Payload 注解来实现这种组合。有关 @Publisher 的更多信息,请参阅 注解驱动的配置 部分。

important

在使用监听器容器中的独占或单活动消费者时,建议将容器属性 forceStop 设置为 true。这可以防止在停止容器后,另一个消费者在此实例完全停止之前开始消费消息的竞争条件。

批量消息

有关批处理消息的更多信息,请参阅 Spring AMQP 文档

要使用 Spring Integration 生产批处理消息,只需将 outbound 端点配置为使用 BatchingRabbitTemplate

当接收批处理消息时,默认情况下,监听器容器会提取每个片段消息,适配器将为每个片段生成一个 Message<?>。从 5.2 版开始,如果容器的 deBatchingEnabled 属性设置为 false,则取消批处理由适配器执行,并生成一个单一的 Message<List<?>>,其有效负载是片段有效负载的列表(在适当的情况下进行转换后)。

默认的 BatchingStrategySimpleBatchingStrategy,但可以在适配器中覆盖此设置。

备注

当需要对重试操作进行恢复时,必须与批次一起使用 org.springframework.amqp.rabbit.retry.MessageBatchRecoverer