跳到主要内容

基于 AMQP 的消息通道

QWen Plus 中英对照 AMQP-backed Message Channels

有两种消息通道实现可供选择。一种是点对点,另一种是发布-订阅。这两种通道都为底层的 AmqpTemplateSimpleMessageListenerContainer 提供了广泛的配置属性(如本章前面所述的通道适配器和网关)。然而,这里展示的示例具有最小的配置。请查阅 XML 模式以查看可用的属性。

点对点通道可能看起来像以下示例:

<int-amqp:channel id="p2pChannel"/>
xml

在内部,前面的例子会导致声明一个名为 si.p2pChannelQueue,并且这个通道会发送消息到该 Queue(技术上是通过发送到无名称的 direct exchange,并使用与这个 Queue 名称匹配的路由键)。此通道还会在此 Queue 上注册一个消费者。如果你想让通道变为“可轮询”而不是消息驱动的,请提供 message-driven 标志并将其值设置为 false,如下例所示:

<int-amqp:channel id="p2pPollableChannel"  message-driven="false"/>
xml

一个发布-订阅通道可能如下所示:

<int-amqp:publish-subscribe-channel id="pubSubChannel"/>
xml

在内部,前面的例子会导致声明一个名为 si.fanout.pubSubChannel 的扇出交换机,该通道会向此扇出交换机发送消息。此通道还会声明一个服务器命名的独占、自动删除、非持久化 Queue,并将该队列绑定到扇出交换机上,同时在此 Queue 上注册一个消费者以接收消息。对于发布 - 订阅通道,没有“可轮询”的选项。它必须是消息驱动的。

从 4.1 版本开始,AMQP 支持的消息通道(与 channel-transacted 一起使用)支持 template-channel-transacted,以分离 AbstractMessageListenerContainerRabbitTemplatetransactional 配置。请注意,以前 channel-transacted 默认为 true。现在,默认情况下,对于 AbstractMessageListenerContainer,它为 false

在 4.3 版本之前,AMQP 支持的通道只支持 Serializable 类型的有效载荷和消息头。整个消息被转换(序列化)并发送到 RabbitMQ。现在,您可以将 extract-payload 属性(或在使用 Java 配置时使用 setExtractPayload() 方法)设置为 true。当此标志为 true 时,消息有效载荷会被转换,并且消息头会被映射,类似于使用通道适配器时的方式。这种安排允许 AMQP 支持的通道与不可序列化有效载荷一起使用(可能使用其他消息转换器,例如 Jackson2JsonMessageConverter)。有关默认映射头的更多信息,请参阅 AMQP 消息头。您可以通过提供自定义映射器来修改映射,这些映射器使用 outbound-header-mapperinbound-header-mapper 属性。现在您还可以指定一个 default-delivery-mode,它用于在没有 amqp_deliveryMode 头时设置投递模式。默认情况下,Spring AMQP MessageProperties 使用 PERSISTENT 投递模式。

important

与其他基于持久性的通道一样,基于 AMQP 的通道旨在提供消息持久性以避免消息丢失。它们不是为了将工作分发给其他对等应用程序。为此目的,请改用通道适配器。

important

从 5.0 版本开始,轮询通道现在会将轮询线程阻塞指定的 receiveTimeout(默认是 1 秒)。之前,与其他 PollableChannel 实现不同的是,如果没有可用的消息,线程会立即返回调度程序,而不管接收超时设置。阻塞比使用 basicGet() 来检索消息(不带超时)要稍微昂贵一些,因为每次接收消息都需要创建一个消费者。要恢复之前的行为,将轮询器的 receiveTimeout 设置为 0。

使用 Java 配置

以下示例展示了如何使用 Java 配置来配置通道:

@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("foo");
factoryBean.setPubSub(false);
return factoryBean;
}

@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("bar");
factoryBean.setPubSub(false);
return factoryBean;
}

@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("baz");
factoryBean.setPubSub(false);
return factoryBean;
}
java

使用 Java DSL 进行配置

以下示例展示了如何使用 Java DSL 配置通道:

@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.pollableChannel(connectionFactory)
.queueName("foo"))
...
.get();
}

@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.channel(connectionFactory)
.queueName("bar"))
...
.get();
}

@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.publishSubscribeChannel(connectionFactory)
.queueName("baz"))
...
.get();
}
java