注解驱动的监听器端点
异步接收消息的最简单方法是使用带有注解的监听器端点基础设施。简而言之,它允许你将一个托管 bean 的方法暴露为 Rabbit 监听器端点。以下示例展示了如何使用 @RabbitListener
注解:
@Component
public class MyService {
@RabbitListener(queues = "myQueue")
public void processOrder(String data) {
...
}
}
前面示例的核心思想是,每当名为 myQueue
的队列上有消息可用时,就会相应地调用 processOrder
方法(在这种情况下,调用时会传入消息的有效负载)。
注解的端点基础设施在幕后为每个注解方法创建一个消息监听器容器,通过使用 RabbitListenerContainerFactory
来实现。
在前面的例子中,myQueue
必须已经存在并且绑定到某个 exchange。只要应用程序上下文中存在 RabbitAdmin
,队列就可以自动声明和绑定。
可以为注解属性(如 queues
等)指定属性占位符(${some.property}
)或 SpEL 表达式(#{someExpression}
)。有关为什么可能使用 SpEL 而不是属性占位符的示例,请参见监听多个队列。以下清单展示了如何声明 Rabbit 监听器的三个示例:
@Component
public class MyService {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "myQueue", durable = "true"),
exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
key = "orderRoutingKey")
)
public void processOrder(Order order) {
...
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue,
exchange = @Exchange(value = "auto.exch"),
key = "invoiceRoutingKey")
)
public void processInvoice(Invoice invoice) {
...
}
@RabbitListener(queuesToDeclare = @Queue(name = "${my.queue}", durable = "true"))
public String handleWithSimpleDeclare(String data) {
...
}
}
在第一个示例中,队列 myQueue
与交换器(如果需要)一起自动声明为持久队列,并使用路由键绑定到交换器。在第二个示例中,声明并绑定了一个匿名(独占、自动删除)队列;队列名称由框架使用 Base64UrlNamingStrategy
生成。你不能使用这种技术声明由代理命名的队列;它们需要作为 bean 定义进行声明;请参阅 容器与代理命名的队列。可以提供多个 QueueBinding
条目,使监听器能够监听多个队列。在第三个示例中,如果需要,声明了一个从属性 my.queue
中获取名称的队列,并使用队列名称作为路由键与默认交换器进行默认绑定。
自 2.0 版本起,@Exchange
注解支持任何交换类型,包括自定义类型。更多信息请参阅 AMQP 概念。
当你需要更高级的配置时,可以使用常规的 @Bean
定义。
注意第一个示例中交换器上的 ignoreDeclarationExceptions
。这允许,例如,绑定到一个可能具有不同设置(如 internal
)的现有交换器。默认情况下,现有交换器的属性必须匹配。
从 2.0 版本开始,你现在可以使用多个路由键将队列绑定到交换机,如下例所示:
...
key = { "red", "yellow" }
...
你也可以在 @QueueBinding
注解中为队列、交换机和绑定指定参数,如下例所示:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "auto.headers", autoDelete = "true",
arguments = @Argument(name = "x-message-ttl", value = "10000",
type = "java.lang.Integer")),
exchange = @Exchange(value = "auto.headers", type = ExchangeTypes.HEADERS, autoDelete = "true"),
arguments = {
@Argument(name = "x-match", value = "all"),
@Argument(name = "thing1", value = "somevalue"),
@Argument(name = "thing2")
})
)
public String handleWithHeadersExchange(String foo) {
...
}
请注意,x-message-ttl
参数被设置为 10 秒,用于队列。由于参数类型不是 String
,我们必须指定其类型——在这种情况下是 Integer
。与所有此类声明一样,如果队列已经存在,参数必须与队列上的参数匹配。对于 header 交换机,我们将绑定参数设置为匹配那些具有 thing1
头设置为 somevalue
的消息,并且 thing2
头必须存在且具有任何值。x-match
参数意味着这两个条件都必须满足。
参数名称、值和类型可以是属性占位符(${…}
)或 SpEL 表达式(#{…}
)。name
必须解析为 String
类型。type
的表达式必须解析为 Class
或类的全限定名。value
必须解析为可以通过 DefaultConversionService
转换为指定类型的值(例如前面示例中的 x-message-ttl
)。
如果一个名称解析为 null
或空字符串 String
,则该 @Argument
将被忽略。
章节摘要
📄️ 元注解
有时候,你可能希望为多个监听器使用相同的配置。为了减少样板配置,你可以使用元注解来创建自己的监听器注解。以下示例展示了如何做到这一点:
📄️ 启用监听器端点注解
要启用对 @RabbitListener 注解的支持,你可以在你的 @Configuration 类中添加 @EnableRabbit。以下示例展示了如何做到这一点:
📄️ 注解方法的消息转换
在调用监听器之前,管道中有两个转换步骤。第一步使用 MessageConverter 将传入的 Spring AMQP 消息转换为 Spring-messaging 消息。当调用目标方法时,如果需要,消息的有效负载会被转换为方法参数类型。
📄️ 将自定义的 HandlerMethodArgumentResolver 添加到 @RabbitListener
从版本 2.3.7 开始,您可以添加自己的 HandlerMethodArgumentResolver 并解析自定义方法参数。您只需要实现 RabbitListenerConfigurer 并使用 RabbitListenerEndpointRegistrar 类中的 setCustomMethodArgumentResolvers() 方法即可。
📄️ 程序化端点注册
RabbitListenerEndpoint 提供了一个 Rabbit 端点的模型,并负责为该模型配置容器。除了通过 @RabbitListener 注解检测到的端点外,该基础设施还允许你以编程方式配置端点。以下示例展示了如何做到这一点:
📄️ 注解端点方法签名
到目前为止,我们一直在端点中注入一个简单的 String,但实际上它可以有一个非常灵活的方法签名。以下示例将其重写为通过自定义标头注入 Order:
📄️ @RabbitListener @Payload 验证
从 2.3.7 版本开始,现在更容易添加一个 Validator 来验证 @RabbitListener 和 @RabbitHandler 的 @Payload 参数。现在,你可以简单地将验证器添加到注册器本身。
📄️ 监听多个队列
当你使用 queues 属性时,可以指定关联的容器可以监听多个队列。你可以使用 @Header 注解来使接收消息的队列名称对 POJO 方法可用。以下示例展示了如何做到这一点:
📄️ 回复管理
MessageListenerAdapter 现有的支持已经允许你的方法具有非 void 返回类型。在这种情况下,调用的结果会被封装在一个消息中,发送到原始消息的 ReplyToAddress 头中指定的地址,或者发送到监听器上配置的默认地址。你可以通过使用消息抽象中的 @SendTo 注解来设置该默认地址。
📄️ 回复内容类型
如果你使用的是复杂的消息转换器,例如 ContentTypeDelegatingMessageConverter,你可以通过在监听器上设置 replyContentType 属性来控制回复的内容类型。这使得转换器能够为回复选择合适的委托转换器。
📄️ 多方法监听器
从 1.5.0 版本开始,你可以在类级别上指定 @RabbitListener 注解。结合新的 @RabbitHandler 注解,这允许单个监听器根据传入消息的有效负载类型调用不同的方法。通过一个示例可以最好地说明这一点:
📄️ @Repeatable @RabbitListener
从 1.6 版本开始,@RabbitListener 注解被标记为 @Repeatable。这意味着该注解可以在同一个被注解的元素(方法或类)上多次出现。在这种情况下,会为每个注解创建一个独立的监听器容器,每个容器都会调用相同的监听器 @Bean。可重复注解可以与 Java 8 或更高版本一起使用。
📄️ 代理 @RabbitListener 和泛型
如果你的服务打算被代理(例如,在 @Transactional 的情况下),当接口具有泛型参数时,你应该记住一些注意事项。考虑以下示例:
📄️ 处理异常
默认情况下,如果带注解的监听器方法抛出异常,异常会被抛给容器,消息会被重新排队并重新投递、丢弃,或者路由到死信交换器,这取决于容器和 broker 的配置。不会向发送者返回任何内容。
📄️ 容器管理
为注解创建的容器不会注册到应用程序上下文中。你可以通过在 RabbitListenerEndpointRegistry bean 上调用 getListenerContainers() 方法来获取所有容器的集合。然后你可以遍历这个集合,例如,停止或启动所有容器,或者在注册表本身上调用 Lifecycle 方法,这将调用每个容器的操作。