跳到主要内容
版本:7.0.2

基于 AMQP 的消息通道

DeepSeek V3 中英对照 AMQP-backed Message Channels

目前有两种消息通道实现可供选择。一种是点对点通道,另一种是发布-订阅通道。这两种通道都为底层的AmqpTemplateSimpleMessageListenerContainer提供了丰富的配置属性(如本章前面针对通道适配器和网关所示)。不过,我们在此展示的示例仅包含最基础的配置。如需查看所有可用属性,请查阅XML模式定义。

点对点通道可能如下例所示:

<int-amqp:channel id="p2pChannel"/>

在前述示例的内部实现中,会声明一个名为 si.p2pChannelQueue,并且该通道会向这个 Queue 发送消息(技术上来说,是通过向一个无名的直连交换机发送消息,并使用与该 Queue 名称匹配的路由键)。此通道还会在该 Queue 上注册一个消费者。如果你希望通道是“可轮询的”而非消息驱动的,请将 message-driven 标志设置为 false,如下例所示:

<int-amqp:channel id="p2pPollableChannel"  message-driven="false"/>

一个发布-订阅通道可能如下所示:

<int-amqp:publish-subscribe-channel id="pubSubChannel"/>

在底层实现中,前述示例会声明一个名为 si.fanout.pubSubChannel 的扇出交换机,而该通道会向此扇出交换机发送消息。同时,该通道还会声明一个由服务器命名的、排他的、自动删除的、非持久化的 Queue,并将其绑定到该扇出交换机,同时在该 Queue 上注册一个消费者以接收消息。发布-订阅通道不支持“可轮询”选项,它必须是消息驱动的。

从版本4.1开始,AMQP支持的消息通道(结合 channel-transacted)支持 template-channel-transacted,以分离 AbstractMessageListenerContainerRabbitTemplatetransactional 配置。请注意,在此之前,channel-transacted 默认是 true。现在,对于 AbstractMessageListenerContainer,其默认值已改为 false

在 4.3 版本之前,AMQP 支持的通道仅支持带有 Serializable 负载和消息头的消息。整个消息会被转换(序列化)并发送到 RabbitMQ。现在,你可以将 extract-payload 属性(或在使用 Java 配置时使用 setExtractPayload() 方法)设置为 true。当此标志为 true 时,消息负载会被转换,消息头也会以类似于使用通道适配器的方式进行映射。这种配置使得 AMQP 支持的通道能够与非可序列化的负载一起使用(可能使用其他消息转换器,例如 Jackson2JsonMessageConverter)。有关默认映射消息头的更多信息,请参阅 AMQP 消息头。你可以通过提供使用 outbound-header-mapperinbound-header-mapper 属性的自定义映射器来修改映射。现在,你还可以指定 default-delivery-mode,用于在没有 amqp_deliveryMode 消息头时设置投递模式。默认情况下,Spring AMQP 的 MessageProperties 使用 PERSISTENT 投递模式。

important

与其他持久化支持的通道一样,AMQP 支持的通道旨在提供消息持久化以避免消息丢失。它们并非用于将工作分发到其他对等应用程序。为此目的,请改用通道适配器。

important

从版本 5.0 开始,可轮询通道现在会按照指定的 receiveTimeout(默认为 1 秒)阻塞轮询线程。在此之前,与其他 PollableChannel 实现不同,如果没有可用消息,无论接收超时设置如何,线程都会立即返回调度器。阻塞比使用 basicGet() 检索消息(无超时)开销稍大,因为需要为每条消息创建一个消费者。要恢复之前的行为,请将轮询器的 receiveTimeout 设置为 0。

使用 Java 配置进行配置

以下示例展示了如何使用 Java 配置来配置通道:

@Bean
public AmqpChannelFactoryBean pollable(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean();
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("foo");
factoryBean.setPubSub(false);
return factoryBean;
}

@Bean
public AmqpChannelFactoryBean messageDriven(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("bar");
factoryBean.setPubSub(false);
return factoryBean;
}

@Bean
public AmqpChannelFactoryBean pubSub(ConnectionFactory connectionFactory) {
AmqpChannelFactoryBean factoryBean = new AmqpChannelFactoryBean(true);
factoryBean.setConnectionFactory(connectionFactory);
factoryBean.setQueueName("baz");
factoryBean.setPubSub(false);
return factoryBean;
}

使用 Java DSL 进行配置

以下示例展示了如何使用 Java DSL 配置通道:

@Bean
public IntegrationFlow pollableInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.pollableChannel(connectionFactory)
.queueName("foo"))
...
.get();
}

@Bean
public IntegrationFlow messageDrivenInFow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.channel(connectionFactory)
.queueName("bar"))
...
.get();
}

@Bean
public IntegrationFlow pubSubInFlow(ConnectionFactory connectionFactory) {
return IntegrationFlow.from(...)
...
.channel(Amqp.publishSubscribeChannel(connectionFactory)
.queueName("baz"))
...
.get();
}