跳到主要内容

AMQP

DeepSeek V3 中英对照 AMQP

高级消息队列协议(AMQP)是一个与平台无关的、面向消息中间件的线级协议。Spring AMQP 项目将 Spring 核心概念应用于基于 AMQP 的消息解决方案的开发中。Spring Boot 通过 RabbitMQ 提供了多种便利来处理 AMQP,包括 spring-boot-starter-amqp 启动器。

RabbitMQ 支持

RabbitMQ 是一个基于 AMQP 协议的轻量级、可靠、可扩展且可移植的消息代理。Spring 使用 RabbitMQ 通过 AMQP 协议进行通信。

RabbitMQ 的配置通过 spring.rabbitmq.* 的外部配置属性来控制。例如,你可以在 application.properties 中声明以下部分:

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
properties

或者,你也可以使用 addresses 属性来配置相同的连接:

spring.rabbitmq.addresses=amqp://admin:secret@localhost
properties
备注

当以这种方式指定地址时,hostport 属性会被忽略。如果地址使用 amqps 协议,SSL 支持会自动启用。

有关更多支持的基于属性的配置选项,请参阅 RabbitProperties。要配置 Spring AMQP 使用的 RabbitMQ ConnectionFactory 的低级细节,请定义一个 ConnectionFactoryCustomizer Bean。

如果上下文中存在一个 ConnectionNameStrategy Bean,它将自动用于命名由自动配置的 CachingConnectionFactory 创建的连接。

要对 RabbitTemplate 进行应用程序范围内的附加自定义,请使用 RabbitTemplateCustomizer bean。

发送消息

Spring 的 AmqpTemplateAmqpAdmin 会自动配置,您可以直接将它们自动注入到您自己的 Bean 中,如下例所示:

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

private final AmqpAdmin amqpAdmin;

private final AmqpTemplate amqpTemplate;

public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}

// ...

public void someMethod() {
this.amqpAdmin.getQueueInfo("someQueue");
}

public void someOtherMethod() {
this.amqpTemplate.convertAndSend("hello");
}

}
java
备注

RabbitMessagingTemplate 可以以类似的方式注入。如果定义了 MessageConverter bean,它会自动与自动配置的 AmqpTemplate 关联。

如有必要,任何定义为 bean 的 Queue 将自动用于在 RabbitMQ 实例上声明相应的队列。

要重试操作,你可以在 AmqpTemplate 上启用重试功能(例如,在代理连接丢失的情况下):

spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
properties

默认情况下,重试机制是禁用的。你也可以通过声明一个 RabbitRetryTemplateCustomizer bean 来自定义 RetryTemplate 的实现。

如果你需要创建更多的 RabbitTemplate 实例,或者你想要覆盖默认的配置,Spring Boot 提供了一个 RabbitTemplateConfigurer Bean,你可以使用它来初始化一个 RabbitTemplate,并使用与自动配置相同的工厂设置。

向流发送消息

要向特定流发送消息,请指定流的名称,如下例所示:

spring.rabbitmq.stream.name=my-stream
properties

如果定义了 MessageConverterStreamMessageConverterProducerCustomizer Bean,它们会自动与自动配置的 RabbitStreamTemplate 关联。

如果你需要创建更多的 RabbitStreamTemplate 实例,或者你想要覆盖默认的配置,Spring Boot 提供了一个 RabbitStreamTemplateConfigurer bean,你可以使用它来初始化一个 RabbitStreamTemplate,并且它的设置将与自动配置所使用的工厂相同。

接收消息

当 Rabbit 基础设施存在时,任何 Bean 都可以通过 @RabbitListener 注解来创建一个监听端点。如果未定义 RabbitListenerContainerFactory,则会自动配置一个默认的 SimpleRabbitListenerContainerFactory,并且你可以通过 spring.rabbitmq.listener.type 属性切换到直接容器。如果定义了 MessageConverterMessageRecoverer Bean,它们将自动与默认工厂关联。

以下示例组件在 someQueue 队列上创建了一个监听器端点:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

@RabbitListener(queues = "someQueue")
public void processMessage(String content) {
// ...
}

}
java
提示

更多详情请参阅 @EnableRabbit

如果你需要创建更多的 RabbitListenerContainerFactory 实例,或者你想要覆盖默认的配置,Spring Boot 提供了 SimpleRabbitListenerContainerFactoryConfigurerDirectRabbitListenerContainerFactoryConfigurer,你可以使用它们来初始化 SimpleRabbitListenerContainerFactoryDirectRabbitListenerContainerFactory,并且这些工厂的配置与自动配置使用的工厂配置相同。

提示

选择哪种容器类型并不重要。这两个 bean 是由自动配置暴露出来的。

例如,以下配置类公开了另一个使用特定 MessageConverter 的工厂:

import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {

@Bean
public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
ConnectionFactory connectionFactory = getCustomConnectionFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new MyMessageConverter());
return factory;
}

private ConnectionFactory getCustomConnectionFactory() {
return ...
}

}
java

然后你可以在任何使用 @RabbitListener 注解的方法中使用该工厂,如下所示:

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyBean {

@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}

}
java

你可以启用重试机制来处理监听器抛出异常的情况。默认情况下,使用的是 RejectAndDontRequeueRecoverer,但你可以自定义一个 MessageRecoverer。当重试次数用尽时,消息会被拒绝,如果代理配置了死信交换器,消息会被丢弃或路由到死信交换器。默认情况下,重试机制是禁用的。你还可以通过声明一个 RabbitRetryTemplateCustomizer bean 来自定义 RetryTemplate

important

默认情况下,如果重试被禁用并且监听器抛出异常,消息将无限期地重试。你可以通过两种方式修改此行为:将 defaultRequeueRejected 属性设置为 false,这样将不会尝试重新投递;或者抛出 AmqpRejectAndDontRequeueException 来表明消息应被拒绝。后者是在启用重试且达到最大投递次数时使用的机制。