入站通道适配器
以下清单展示了 AMQP 入站通道适配器的可用配置选项:
- Java DSL
- Java
- XML
@Bean
public IntegrationFlow amqpInbound(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(Amqp.inboundAdapter(connectionFactory, "aName"))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
@Bean
public MessageChannel amqpInputChannel() {
return new DirectChannel();
}
@Bean
public AmqpInboundChannelAdapter inbound(SimpleMessageListenerContainer listenerContainer,
@Qualifier("amqpInputChannel") MessageChannel channel) {
AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer);
adapter.setOutputChannel(channel);
return adapter;
}
@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("aName");
container.setConcurrentConsumers(2);
// ...
return container;
}
@Bean
@ServiceActivator(inputChannel = "amqpInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
<int-amqp:inbound-channel-adapter
id="inboundAmqp" // <1>
channel="inboundChannel" // <2>
queue-names="si.test.queue" // <3>
acknowledge-mode="AUTO" // <4>
advice-chain="" // <5>
channel-transacted="" // <6>
concurrent-consumers="" // <7>
connection-factory="" // <8>
error-channel="" // <9>
expose-listener-channel="" // <10>
header-mapper="" // <11>
mapped-request-headers="" // <12>
listener-container="" // <13>
message-converter="" // <14>
message-properties-converter="" // <15>
phase="" // <16>
prefetch-count="" // <17>
receive-timeout="" // <18>
recovery-interval="" // <19>
missing-queues-fatal="" // <20>
shutdown-timeout="" // <21>
task-executor="" // <22>
transaction-attribute="" // <23>
transaction-manager="" // <24>
batch-size="" // <25>
consumers-per-queue // <26>
batch-mode="MESSAGES"/> // <27>
此适配器的唯一 ID。可选。
转换后的消息应发送到的消息通道。必需。
应从中消费消息的 AMQP 队列名称(逗号分隔的列表)。必需。
MessageListenerContainer的确认模式。设置为MANUAL时,传递标签和通道将分别在消息头amqp_deliveryTag和amqp_channel中提供。用户应用程序负责确认。NONE表示不进行确认(autoAck)。AUTO表示当下游流完成时,适配器的容器会进行确认。可选(默认为 AUTO)。请参阅 入站端点确认模式。用于处理与此入站通道适配器相关的横切行为的额外 AOP 通知。可选。
指示此组件创建的通道是否具有事务性的标志。如果为 true,则告知框架使用事务性通道,并根据操作结果(成功则提交,失败则回滚)结束所有操作(发送或接收),异常信号表示回滚。可选(默认为 false)。
指定要创建的并发消费者数量。默认值为
1。我们建议增加并发消费者数量以扩展从队列接收消息的消费能力。但请注意,一旦注册了多个消费者,任何顺序保证都将丢失。通常,对于低流量队列使用一个消费者。当设置了 'consumers-per-queue' 时不允许设置此属性。可选。指向 RabbitMQ
ConnectionFactory的 Bean 引用。可选(默认为connectionFactory)。错误消息应发送到的消息通道。可选。
监听器通道(com.rabbitmq.client.Channel)是否暴露给已注册的
ChannelAwareMessageListener。可选(默认为 true)。接收 AMQP 消息时要使用的
AmqpHeaderMapper的引用。可选。默认情况下,只有标准的 AMQP 属性(例如contentType)会被复制到 Spring Integration 的MessageHeaders中。AMQPMessageProperties中的任何用户自定义头默认情况下不会被默认的DefaultAmqpHeaderMapper复制到消息中。如果提供了request-header-names,则不允许提供此属性。要从 AMQP 请求映射到
MessageHeaders的 AMQP 头名称的逗号分隔列表。只有在未提供 'header-mapper' 引用时才能提供此属性。此列表中的值也可以是用于匹配头名称的简单模式(例如 "*" 或 "thing1*, thing2" 或 "*something")。用于接收 AMQP 消息的
AbstractMessageListenerContainer的引用。如果提供了此属性,则不应提供与监听器容器配置相关的任何其他属性。换句话说,通过设置此引用,您必须对监听器容器配置负全部责任。唯一的例外是MessageListener本身。由于这实际上是此通道适配器实现的核心职责,因此引用的监听器容器必须尚未拥有自己的MessageListener。可选。接收 AMQP 消息时要使用的
MessageConverter。可选。接收 AMQP 消息时要使用的
MessagePropertiesConverter。可选。指定底层
AbstractMessageListenerContainer应启动和停止的阶段。启动顺序从低到高,关闭顺序则相反。默认情况下,此值为Integer.MAX_VALUE,意味着此容器尽可能晚启动并尽可能早停止。可选。告知 AMQP 代理在单个请求中向每个消费者发送多少条消息。通常,您可以将此值设置得较高以提高吞吐量。它应大于或等于事务大小(请参阅此列表后面的
batch-size属性)。可选(默认为1)。接收超时时间(毫秒)。可选(默认为
1000)。指定底层
AbstractMessageListenerContainer恢复尝试之间的间隔(毫秒)。可选(默认为5000)。如果为 'true' 且代理上没有任何队列可用,容器在启动期间会抛出致命异常,并且如果在容器运行时删除了队列(在尝试被动声明队列三次之后),容器会停止。如果为
false,容器不会抛出异常,而是进入恢复模式,根据recovery-interval尝试重新启动。可选(默认为true)。底层
AbstractMessageListenerContainer停止后,在强制关闭 AMQP 连接之前等待工作线程的时间(毫秒)。如果关闭信号到来时有任何工作线程处于活动状态,只要它们能在此超时时间内完成处理,就允许它们完成。否则,连接将被关闭,消息将保持未确认状态(如果通道是事务性的)。可选(默认为5000)。默认情况下,底层
AbstractMessageListenerContainer使用SimpleAsyncTaskExecutor实现,该实现为每个任务启动一个新线程并异步运行它。默认情况下,并发线程数不受限制。请注意,此实现不重用线程。考虑使用线程池化的TaskExecutor实现作为替代方案。可选(默认为SimpleAsyncTaskExecutor)。默认情况下,底层
AbstractMessageListenerContainer创建DefaultTransactionAttribute的新实例(它采用 EJB 方法在运行时异常时回滚,但不回滚已检查异常)。可选(默认为DefaultTransactionAttribute)。在底层
AbstractMessageListenerContainer上设置对外部PlatformTransactionManager的 Bean 引用。事务管理器与channel-transacted属性协同工作。如果在框架发送或接收消息时已有事务在进行中且channelTransacted标志为true,则消息传递事务的提交或回滚将推迟到当前事务结束时进行。如果channelTransacted标志为false,则消息传递操作不应用事务语义(它是自动确认的)。更多信息,请参阅 Spring AMQP 事务。可选。告知
SimpleMessageListenerContainer在单个请求中处理多少条消息。为获得最佳效果,它应小于或等于prefetch-count中设置的值。当设置了 'consumers-per-queue' 时不允许设置此属性。可选(默认为1)。指示底层监听器容器应为
DirectMessageListenerContainer而不是默认的SimpleMessageListenerContainer。更多信息,请参阅 Spring AMQP 参考手册。当容器的
consumerBatchEnabled为true时,决定适配器如何在消息有效负载中呈现消息批次。设置为MESSAGES(默认)时,有效负载是List<Message<?>>,其中每条消息都有从传入的 AMQPMessage映射而来的头,有效负载是转换后的body。设置为EXTRACT_PAYLOADS时,有效负载是List<?>,其中元素是从 AMQPMessage正文转换而来。EXTRACT_PAYLOADS_WITH_HEADERS与EXTRACT_PAYLOADS类似,但此外,每条消息的头从MessageProperties映射到相应索引处的List<Map<String, Object>>中;头名称为AmqpInboundChannelAdapter.CONSOLIDATED_HEADERS。
container
请注意,当使用 XML 配置外部容器时,不能使用 Spring AMQP 命名空间来定义容器。这是因为命名空间要求至少有一个 <listener/> 元素。在此环境中,监听器是适配器内部的。因此,你必须使用常规的 Spring <bean/> 定义来定义容器,如下例所示:
<bean id="container"
class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="queueNames" value="aName.queue" />
<property name="defaultRequeueRejected" value="false"/>
</bean>
尽管 Spring Integration 对 JMS 和 AMQP 的支持相似,但存在重要差异。JMS 入站通道适配器在底层使用 JmsDestinationPollingSource 并期望配置轮询器。而 AMQP 入站通道适配器使用 AbstractMessageListenerContainer 并且是消息驱动的。在这方面,它更类似于 JMS 消息驱动的通道适配器。
从版本5.5开始,AmqpInboundChannelAdapter 可以配置一个 org.springframework.amqp.rabbit.retry.MessageRecoverer 策略,该策略在内部调用重试操作时用于 RecoveryCallback。更多信息请参阅 setMessageRecoverer() 的 JavaDocs。
@Publisher 注解也可以与 @RabbitListener 结合使用:
@Configuration
@EnableIntegration
@EnableRabbit
@EnablePublisher
public static class ContextConfiguration {
@Bean
QueueChannel fromRabbitViaPublisher() {
return new QueueChannel();
}
@RabbitListener(queuesToDeclare = @Queue("publisherQueue"))
@Publisher("fromRabbitViaPublisher")
@Payload("#args.payload.toUpperCase()")
public void consumeForPublisher(String payload) {
}
}
默认情况下,@Publisher AOP 拦截器处理的是方法调用的返回值。然而,@RabbitListener 方法的返回值会被视为 AMQP 回复消息。因此,这种方法无法与 @Publisher 同时使用,所以对于这种组合,推荐使用带有针对方法参数的相应 SpEL 表达式的 @Payload 注解。有关 @Publisher 的更多信息,请参阅基于注解的配置部分。
当在监听器容器中使用独占或单活跃消费者时,建议将容器属性 forceStop 设置为 true。这样可以避免出现竞态条件:在容器停止后,另一个消费者可能在此实例完全停止之前就开始消费消息。
批量消息
有关批量消息的更多信息,请参阅 Spring AMQP 文档。
要使用Spring Integration生成批量消息,只需使用BatchingRabbitTemplate配置出站端点。
当接收到批量消息时,默认情况下,监听器容器会提取每个片段消息,适配器将为每个片段生成一个 Message<?>。从版本 5.2 开始,如果容器的 deBatchingEnabled 属性设置为 false,则解批处理将由适配器执行,并生成一个 Message<List<?>>,其负载为片段负载的列表(如果适用,则在转换后)。
默认的 BatchingStrategy 是 SimpleBatchingStrategy,但可以在适配器上覆盖此设置。
当重试操作需要恢复时,必须将 org.springframework.amqp.rabbit.retry.MessageBatchRecoverer 与批次一起使用。