跳到主要内容

注解驱动的监听器端点

DeepSeek V3 中英对照 Annotation-driven Listener Endpoints

异步接收消息的最简单方法是使用带有注解的监听器端点基础设施。简而言之,它允许你将一个托管 bean 的方法暴露为 Rabbit 监听器端点。以下示例展示了如何使用 @RabbitListener 注解:

@Component
public class MyService {

@RabbitListener(queues = "myQueue")
public void processOrder(String data) {
...
}

}
java

前面示例的核心思想是,每当名为 myQueue 的队列上有消息可用时,就会相应地调用 processOrder 方法(在这种情况下,调用时会传入消息的有效负载)。

注解的端点基础设施在幕后为每个注解方法创建一个消息监听器容器,通过使用 RabbitListenerContainerFactory 来实现。

在前面的例子中,myQueue 必须已经存在并且绑定到某个 exchange。只要应用程序上下文中存在 RabbitAdmin,队列就可以自动声明和绑定。

备注

可以为注解属性(如 queues 等)指定属性占位符(${some.property})或 SpEL 表达式(#{someExpression})。有关为什么可能使用 SpEL 而不是属性占位符的示例,请参见监听多个队列。以下清单展示了如何声明 Rabbit 监听器的三个示例:

@Component
public class MyService {

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "myQueue", durable = "true"),
exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
key = "orderRoutingKey")
)
public void processOrder(Order order) {
...
}

@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "auto.exch"),
key = "invoiceRoutingKey")
)
public void processInvoice(Invoice invoice) {
...
}

@RabbitListener(queuesToDeclare = @Queue(name = "${my.queue}", durable = "true"))
public String handleWithSimpleDeclare(String data) {
...
}

}
java

在第一个示例中,队列 myQueue 与交换器(如果需要)一起自动声明为持久队列,并使用路由键绑定到交换器。在第二个示例中,声明并绑定了一个匿名(独占、自动删除)队列;队列名称由框架使用 Base64UrlNamingStrategy 生成。你不能使用这种技术声明由代理命名的队列;它们需要作为 bean 定义进行声明;请参阅 容器与代理命名的队列。可以提供多个 QueueBinding 条目,使监听器能够监听多个队列。在第三个示例中,如果需要,声明了一个从属性 my.queue 中获取名称的队列,并使用队列名称作为路由键与默认交换器进行默认绑定。

自 2.0 版本起,@Exchange 注解支持任何交换类型,包括自定义类型。更多信息请参阅 AMQP 概念

当你需要更高级的配置时,可以使用常规的 @Bean 定义。

注意第一个示例中交换器上的 ignoreDeclarationExceptions。这允许,例如,绑定到一个可能具有不同设置(如 internal)的现有交换器。默认情况下,现有交换器的属性必须匹配。

从 2.0 版本开始,你现在可以使用多个路由键将队列绑定到交换机,如下例所示:

...
key = { "red", "yellow" }
...
java

你也可以在 @QueueBinding 注解中为队列、交换机和绑定指定参数,如下例所示:

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "auto.headers", autoDelete = "true",
arguments = @Argument(name = "x-message-ttl", value = "10000",
type = "java.lang.Integer")),
exchange = @Exchange(value = "auto.headers", type = ExchangeTypes.HEADERS, autoDelete = "true"),
arguments = {
@Argument(name = "x-match", value = "all"),
@Argument(name = "thing1", value = "somevalue"),
@Argument(name = "thing2")
})
)
public String handleWithHeadersExchange(String foo) {
...
}
java

请注意,x-message-ttl 参数被设置为 10 秒,用于队列。由于参数类型不是 String,我们必须指定其类型——在这种情况下是 Integer。与所有此类声明一样,如果队列已经存在,参数必须与队列上的参数匹配。对于 header 交换机,我们将绑定参数设置为匹配那些具有 thing1 头设置为 somevalue 的消息,并且 thing2 头必须存在且具有任何值。x-match 参数意味着这两个条件都必须满足。

参数名称、值和类型可以是属性占位符(${…​})或 SpEL 表达式(#{…​})。name 必须解析为 String 类型。type 的表达式必须解析为 Class 或类的全限定名。value 必须解析为可以通过 DefaultConversionService 转换为指定类型的值(例如前面示例中的 x-message-ttl)。

如果一个名称解析为 null 或空字符串 String,则该 @Argument 将被忽略。

章节摘要