AMQP
高级消息队列协议(AMQP)是一个与平台无关的、面向消息中间件的线级协议。Spring AMQP 项目将 Spring 核心概念应用于基于 AMQP 的消息解决方案的开发中。Spring Boot 通过 RabbitMQ 提供了多种便利来处理 AMQP,包括 spring-boot-starter-amqp
启动器。
RabbitMQ 支持
RabbitMQ 是一个基于 AMQP 协议的轻量级、可靠、可扩展且可移植的消息代理。Spring 使用 RabbitMQ 通过 AMQP 协议进行通信。
RabbitMQ 的配置通过 spring.rabbitmq.*
的外部配置属性来控制。例如,你可以在 application.properties
中声明以下部分:
- Properties
- YAML
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=secret
spring:
rabbitmq:
host: "localhost"
port: 5672
username: "admin"
password: "secret"
或者,你也可以使用 addresses
属性来配置相同的连接:
- Properties
- YAML
spring.rabbitmq.addresses=amqp://admin:secret@localhost
spring:
rabbitmq:
addresses: "amqp://admin:secret@localhost"
当以这种方式指定地址时,host
和 port
属性会被忽略。如果地址使用 amqps
协议,SSL 支持会自动启用。
有关更多支持的基于属性的配置选项,请参阅 RabbitProperties。要配置 Spring AMQP 使用的 RabbitMQ ConnectionFactory 的低级细节,请定义一个 ConnectionFactoryCustomizer Bean。
如果上下文中存在一个 ConnectionNameStrategy Bean,它将自动用于命名由自动配置的 CachingConnectionFactory 创建的连接。
要对 RabbitTemplate 进行应用程序范围内的附加自定义,请使用 RabbitTemplateCustomizer bean。
详情请参阅 理解 AMQP,RabbitMQ 使用的协议。
发送消息
Spring 的 AmqpTemplate 和 AmqpAdmin 会自动配置,您可以直接将它们自动注入到您自己的 Bean 中,如下例所示:
- Java
- Kotlin
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
private final AmqpAdmin amqpAdmin;
private final AmqpTemplate amqpTemplate;
public MyBean(AmqpAdmin amqpAdmin, AmqpTemplate amqpTemplate) {
this.amqpAdmin = amqpAdmin;
this.amqpTemplate = amqpTemplate;
}
// ...
public void someMethod() {
this.amqpAdmin.getQueueInfo("someQueue");
}
public void someOtherMethod() {
this.amqpTemplate.convertAndSend("hello");
}
}
import org.springframework.amqp.core.AmqpAdmin
import org.springframework.amqp.core.AmqpTemplate
import org.springframework.stereotype.Component
@Component
class MyBean(private val amqpAdmin: AmqpAdmin, private val amqpTemplate: AmqpTemplate) {
// ...
fun someMethod() {
amqpAdmin.getQueueInfo("someQueue")
}
fun someOtherMethod() {
amqpTemplate.convertAndSend("hello")
}
}
RabbitMessagingTemplate 可以以类似的方式注入。如果定义了 MessageConverter bean,它会自动与自动配置的 AmqpTemplate 关联。
如有必要,任何定义为 bean 的 Queue 将自动用于在 RabbitMQ 实例上声明相应的队列。
要重试操作,你可以在 AmqpTemplate 上启用重试功能(例如,在代理连接丢失的情况下):
- Properties
- YAML
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=2s
spring:
rabbitmq:
template:
retry:
enabled: true
initial-interval: "2s"
默认情况下,重试机制是禁用的。你也可以通过声明一个 RabbitRetryTemplateCustomizer bean 来自定义 RetryTemplate 的实现。
如果你需要创建更多的 RabbitTemplate 实例,或者你想要覆盖默认的配置,Spring Boot 提供了一个 RabbitTemplateConfigurer Bean,你可以使用它来初始化一个 RabbitTemplate,并使用与自动配置相同的工厂设置。
向流发送消息
要向特定流发送消息,请指定流的名称,如下例所示:
- Properties
- YAML
spring.rabbitmq.stream.name=my-stream
spring:
rabbitmq:
stream:
name: "my-stream"
如果定义了 MessageConverter、StreamMessageConverter 或 ProducerCustomizer Bean,它们会自动与自动配置的 RabbitStreamTemplate 关联。
如果你需要创建更多的 RabbitStreamTemplate 实例,或者你想要覆盖默认的配置,Spring Boot 提供了一个 RabbitStreamTemplateConfigurer bean,你可以使用它来初始化一个 RabbitStreamTemplate,并且它的设置将与自动配置所使用的工厂相同。
接收消息
当 Rabbit 基础设施存在时,任何 Bean 都可以通过 @RabbitListener 注解来创建一个监听端点。如果未定义 RabbitListenerContainerFactory,则会自动配置一个默认的 SimpleRabbitListenerContainerFactory,并且你可以通过 spring.rabbitmq.listener.type
属性切换到直接容器。如果定义了 MessageConverter 或 MessageRecoverer Bean,它们将自动与默认工厂关联。
以下示例组件在 someQueue
队列上创建了一个监听器端点:
- Java
- Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@RabbitListener(queues = "someQueue")
public void processMessage(String content) {
// ...
}
}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"])
fun processMessage(content: String?) {
// ...
}
}
更多详情请参阅 @EnableRabbit。
如果你需要创建更多的 RabbitListenerContainerFactory 实例,或者你想要覆盖默认的配置,Spring Boot 提供了 SimpleRabbitListenerContainerFactoryConfigurer 和 DirectRabbitListenerContainerFactoryConfigurer,你可以使用它们来初始化 SimpleRabbitListenerContainerFactory 和 DirectRabbitListenerContainerFactory,并且这些工厂的配置与自动配置使用的工厂配置相同。
选择哪种容器类型并不重要。这两个 bean 是由自动配置暴露出来的。
例如,以下配置类公开了另一个使用特定 MessageConverter 的工厂:
- Java
- Kotlin
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration(proxyBeanMethods = false)
public class MyRabbitConfiguration {
@Bean
public SimpleRabbitListenerContainerFactory myFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
ConnectionFactory connectionFactory = getCustomConnectionFactory();
configurer.configure(factory, connectionFactory);
factory.setMessageConverter(new MyMessageConverter());
return factory;
}
private ConnectionFactory getCustomConnectionFactory() {
return ...
}
}
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration(proxyBeanMethods = false)
class MyRabbitConfiguration {
@Bean
fun myFactory(configurer: SimpleRabbitListenerContainerFactoryConfigurer): SimpleRabbitListenerContainerFactory {
val factory = SimpleRabbitListenerContainerFactory()
val connectionFactory = getCustomConnectionFactory()
configurer.configure(factory, connectionFactory)
factory.setMessageConverter(MyMessageConverter())
return factory
}
fun getCustomConnectionFactory() : ConnectionFactory? {
return ...
}
}
然后你可以在任何使用 @RabbitListener 注解的方法中使用该工厂,如下所示:
- Java
- Kotlin
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MyBean {
@RabbitListener(queues = "someQueue", containerFactory = "myFactory")
public void processMessage(String content) {
// ...
}
}
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
@Component
class MyBean {
@RabbitListener(queues = ["someQueue"], containerFactory = "myFactory")
fun processMessage(content: String?) {
// ...
}
}
你可以启用重试机制来处理监听器抛出异常的情况。默认情况下,使用的是 RejectAndDontRequeueRecoverer,但你可以自定义一个 MessageRecoverer。当重试次数用尽时,消息会被拒绝,如果代理配置了死信交换器,消息会被丢弃或路由到死信交换器。默认情况下,重试机制是禁用的。你还可以通过声明一个 RabbitRetryTemplateCustomizer bean 来自定义 RetryTemplate。
默认情况下,如果重试被禁用并且监听器抛出异常,消息将无限期地重试。你可以通过两种方式修改此行为:将 defaultRequeueRejected
属性设置为 false
,这样将不会尝试重新投递;或者抛出 AmqpRejectAndDontRequeueException 来表明消息应被拒绝。后者是在启用重试且达到最大投递次数时使用的机制。