注解驱动的监听器端点
接收异步消息的最简单方法是使用带注释的监听器端点(annotated listener endpoint)基础设施。简而言之,它允许你将一个管理Bean(managed bean)的方法暴露为JMS监听器端点。以下示例展示了如何使用这种方法:
@Component
public class MyService {
@JmsListener(destination = "myDestination")
public void processOrder(String data) { ... }
}
前面示例的思路是,每当有消息出现在jakarta.jms.Destination的myDestination上时,就会相应地调用processOrder方法(在这种情况下,该方法会处理JMS消息的内容,类似于MessageListenerAdapter所提供的功能)。
带注解的端点基础设施会为每个带注解的方法在后台创建一个消息监听器容器,这是通过使用JmsListenerContainerFactory来实现的。这样的容器并不会注册到应用程序上下文中,但可以通过使用JmsListenerEndpointRegistrybean轻松地找到它以进行管理操作。
@JmsListener 是 Java 8 中的一个可重复使用的注解,因此你可以通过在该方法上添加额外的 @JmsListener 声明,将多个 JMS 目的地与该方法关联起来。
启用监听器端点注释
要支持@JmsListener注解,你可以在其中一个@Configuration类上添加@EnableJms,如下例所示:
- Java
- Kotlin
- Xml
@Configuration
@EnableJms
public class JmsConfiguration {
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory,
DestinationResolver destinationResolver) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setDestinationResolver(destinationResolver);
factory.setSessionTransacted(true);
factory.setConcurrency("3-10");
return factory;
}
}
@Configuration
@EnableJms
class JmsConfiguration {
@Bean
fun jmsListenerContainerFactory(connectionFactory: ConnectionFactory, destinationResolver: DestinationResolver) =
DefaultJmsListenerContainerFactory().apply {
setConnectionFactory(connectionFactory)
setDestinationResolver(destinationResolver)
setSessionTransacted(true)
setConcurrency("3-10")
}
}
<jms:annotation-driven/>
<bean id="jmsListenerContainerFactory" class="org.springframework.jms.config.DefaultJmsListenerContainerFactory">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destinationResolver" ref="destinationResolver"/>
<property name="sessionTransacted" value="true"/>
<property name="concurrency" value="3-10"/>
</bean>
默认情况下,基础设施会寻找一个名为 jmsListenerContainerFactory 的 Bean 作为创建消息监听器容器时使用的工厂来源。在这种情况下(忽略 JMS 基础设施的配置),您可以调用 processOrder 方法,设置核心线程池大小为三个线程,最大线程池大小为十个线程。
你可以自定义用于每个注解的监听器容器工厂,或者通过实现 JmsListenerConfigurer 接口来配置一个显式的默认值。只有当至少有一个端点在没有特定容器工厂的情况下注册时,才需要设置默认值。有关详细信息和示例,请参阅实现 JmsListenerConfigurer 的类的 javadoc。
程序化端点注册
JmsListenerEndpoint 提供了一个 JMS 端点的模型,并负责配置该模型的容器。除了通过 JmsListener 注解检测到的端点外,该基础设施还允许你以编程方式来配置其他端点。以下示例展示了如何进行这样的配置:
@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId("myJmsEndpoint");
endpoint.setDestination("anotherQueue");
endpoint.setMessageListener(message -> {
// processing
});
registrar.registerEndpoint(endpoint);
}
}
在前面的例子中,我们使用了SimpleJmsListenerEndpoint,它提供了实际的MessageListener来执行调用。然而,你也可以构建自己的端点变体来描述自定义的调用机制。
请注意,您完全可以跳过使用@JmsListener,而是通过JmsListenerConfigurer以编程方式注册您的端点。
带注释的端点方法签名
到目前为止,我们一直在我们的端点中注入一个简单的String,但实际上它可以具有非常灵活的方法签名。在下面的例子中,我们重写代码以注入带有自定义头的Order:
@Component
public class MyService {
@JmsListener(destination = "myDestination")
public void processOrder(Order order, @Header("order_type") String orderType) {
...
}
}
您可以在JMS监听器端点中注入的主要元素如下:
-
原生的
jakarta.jms.Message或其任何子类(前提是它们与传入的消息类型匹配)。 -
jakarta.jms.Session,用于可选地访问原生 JMS API(例如,发送自定义回复)。 -
org.springframework.messaging.Message,它代表传入的 JMS 消息。注意,该消息同时包含自定义头部和标准头部(由JmsHeaders定义)。 -
带有
@Header注解的方法参数,用于提取特定的头部值,包括标准 JMS 头部。 -
带有
@Headers注解的参数,该参数还必须能赋值给java.util.Map,以便访问所有头部信息。 -
未被任何注解标记的元素(既不是
Message也不是Session类型),被视为有效负载(payload)。你可以通过在参数上添加@Payload注解来明确指定其作为有效负载。此外,你还可以通过添加额外的@Valid注解来启用验证功能。
能够注入Spring的Message抽象层特别有用,因为它可以让我们利用存储在特定传输方式消息中的所有信息,而无需依赖于该传输方式特定的API。以下示例展示了如何实现这一点:
@JmsListener(destination = "myDestination")
public void processOrder(Message<Order> order) { ... }
方法参数的处理由DefaultMessageHandlerMethodFactory提供,你还可以进一步自定义它以支持额外的方法参数。在那里,你也可以自定义转换和验证的支持。
例如,如果我们在处理之前想要确保我们的Order是有效的,我们可以在数据负载上添加@Valid注释,并配置必要的验证器,如下例所示:
@Configuration
@EnableJms
public class AppConfig implements JmsListenerConfigurer {
@Override
public void configureJmsListeners(JmsListenerEndpointRegistrar registrar) {
registrar.setMessageHandlerMethodFactory(myJmsHandlerMethodFactory());
}
@Bean
public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
factory.setValidator(myValidator());
return factory;
}
}
响应管理
MessageListenerAdapter 中现有的支持已经允许方法具有非 void 的返回类型。在这种情况下,方法调用的结果会被封装在 jakarta.jms.Message 对象中,并发送到原始消息的 JMSReplyTo 头部指定的目标地址,或者发送到监听器上配置的默认目标地址。现在,你可以使用消息抽象层中的 @SendTo 注解来设置这个默认目标地址。
假设我们的processOrder方法现在应该返回一个OrderStatus,我们可以编写它来自动发送响应,如下例所示:
@JmsListener(destination = "myDestination")
@SendTo("status")
public OrderStatus processOrder(Order order) {
// order processing
return status;
}
如果你有多个带有@JmsListener注解的方法,你也可以在类级别放置@SendTo注解,以便共享一个默认的回复目的地。
如果你需要以与传输方式无关的方式设置额外的头部信息,你可以返回一个Message对象,方法可以类似于以下示例:
@JmsListener(destination = "myDestination")
@SendTo("status")
public Message<OrderStatus> processOrder(Order order) {
// order processing
return MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
}
如果你需要在运行时计算响应的目标地址,你可以将你的响应封装在一个JmsResponse实例中,该实例也会提供在运行时使用的目标地址。我们可以如下重构之前的示例:
@JmsListener(destination = "myDestination")
public JmsResponse<Message<OrderStatus>> processOrder(Order order) {
// order processing
Message<OrderStatus> response = MessageBuilder
.withPayload(status)
.setHeader("code", 1234)
.build();
return JmsResponse.forQueue(response, "status");
}
最后,如果你需要为响应指定一些QoS值,例如优先级或生存时间(time to live),你可以相应地配置JmsListenerContainerFactory,如下例所示:
@Configuration
@EnableJms
public class AppConfig {
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
QosSettings replyQosSettings = new QosSettings();
replyQosSettings.setPriority(2);
replyQosSettings.setTimeToLive(10000);
factory.setReplyQosSettings(replyQosSettings);
return factory;
}
}