注解支持
除了通过XML命名空间配置消息端点外,您还可以使用注解。首先,Spring Integration提供了类级别的@MessageEndpoint作为构造型注解,这意味着它本身已通过Spring的@Component注解进行标注,因此能够被Spring的组件扫描机制自动识别为Bean定义。
同样重要的是各种方法级别的注解。它们表明被注解的方法能够处理消息。以下示例展示了类级别和方法级别的注解:
@MessageEndpoint
public class FooService {
@ServiceActivator
public void processMessage(Message message) {
...
}
}
方法“处理”消息的确切含义取决于具体的注解。Spring Integration 中可用的注解包括:
-
@Aggregator(参见 聚合器) -
@Filter(参见 过滤器) -
@Router(参见 路由器) -
@ServiceActivator(参见 服务激活器) -
@Splitter(参见 拆分器) -
@Transformer(参见 转换器) -
@InboundChannelAdapter(参见 通道适配器) -
@BridgeFrom(参见 使用 Java 配置配置桥接) -
@BridgeTo(参见 使用 Java 配置配置桥接) -
@MessagingGateway(参见 消息网关) -
@IntegrationComponentScan(参见 配置与 @EnableIntegration)
如果你将 XML 配置与注解结合使用,则不需要 @MessageEndpoint 注解。如果你想从 <service-activator/> 元素的 ref 属性配置一个 POJO 引用,可以仅提供方法级别的注解。在这种情况下,即使 <service-activator/> 元素上没有方法级别的属性,注解也能防止歧义。
在大多数情况下,带注解的处理器方法不应将 Message 类型作为其参数。相反,方法参数类型可以与消息的有效负载类型匹配,如下例所示:
public class ThingService {
@ServiceActivator
public void bar(Thing thing) {
...
}
}
当方法参数需要从 MessageHeaders 中的值映射时,另一种选择是使用参数级别的 @Header 注解。通常,使用 Spring Integration 注解的方法可以接受 Message 本身、消息负载或标头值(通过 @Header)作为参数。实际上,方法可以接受这些参数的组合,如下例所示:
public class ThingService {
@ServiceActivator
public void otherThing(String payload, @Header("x") int valueX, @Header("y") int valueY) {
...
}
}
你也可以使用 @Headers 注解将所有消息头作为 Map 提供,如下例所示:
public class ThingService {
@ServiceActivator
public void otherThing(String payload, @Headers Map<String, Object> headerMap) {
...
}
}
注解的值也可以是一个 SpEL 表达式(例如 someHeader.toUpperCase()),这在您希望在注入前操作标头值时非常有用。它还提供了一个可选的 required 属性,用于指定该属性值是否必须在标头中可用。required 属性的默认值为 true。
对于这些注解中的多个,当消息处理方法返回非空值时,端点会尝试发送回复。这在两种配置选项(命名空间和注解)中是一致的,即使用此类端点的输出通道(如果可用),并将 REPLY_CHANNEL 消息头值作为备选方案。
端点上输出通道与回复通道消息头的结合,使得可以采用管道式处理方式,即多个组件拥有输出通道,而最终组件允许将回复消息转发到回复通道(如原始请求消息中所指定)。换句话说,最终组件依赖于原始发送方提供的信息,因此能够动态支持任意数量的客户端。这是返回地址模式的一个示例。
除了此处展示的示例外,这些注解还支持 inputChannel 和 outputChannel 属性,如下例所示:
@Service
public class ThingService {
@ServiceActivator(inputChannel="input", outputChannel="output")
public void otherThing(String payload, @Headers Map<String, Object> headerMap) {
...
}
}
处理这些注解会创建与相应XML组件相同的bean——AbstractEndpoint实例和MessageHandler实例(对于入站通道适配器则是MessageSource实例)。参见@Bean方法上的注解。bean名称根据以下模式生成:[组件名].[方法名].[首字母小写的注解类短名]。在前述示例中,AbstractEndpoint的bean名称为thingService.otherThing.serviceActivator,而MessageHandler(MessageSource)bean的名称则是在此基础上额外添加.handler(.source)后缀。此类名称可以通过在这些消息注解旁使用@EndpointId注解进行自定义。MessageHandler实例(MessageSource实例)也符合消息历史记录的追踪条件。
从版本4.0开始,所有消息注解都提供 SmartLifecycle 选项(autoStartup 和 phase),允许在应用程序上下文初始化时控制端点的生命周期。它们的默认值分别为 true 和 0。要更改端点的状态(例如 start() 或 stop()),您可以通过使用 BeanFactory(或自动装配)获取端点 bean 的引用并调用相应方法。或者,您也可以向控制总线发送命令消息。为此,您应使用前文提到的 beanName。
在解析完提到的注解后(当没有配置特定的通道 Bean 时),通道会自动创建,并且相应的消费者端点会在上下文初始化的末尾附近声明为 Bean。这些 Bean 可以在其他服务中自动装配,但必须标记为 @Lazy 注解,因为在正常的自动装配处理过程中,这些定义通常还不可用。
@Autowired
@Lazy
@Qualifier("someChannel")
MessageChannel someChannel;
...
@Bean
Thing1 dependsOnSPCA(@Qualifier("someInboundAdapter") @Lazy SourcePollingChannelAdapter someInboundAdapter) {
...
}
从 6.0 版本开始,所有消息注解现在都是 @Repeatable 的,因此可以在同一个服务方法上声明多个相同类型的注解,其含义是:根据这些注解的重复次数创建相应数量的端点:
@Transformer(inputChannel = "inputChannel1", outputChannel = "outputChannel1")
@Transformer(inputChannel = "inputChannel2", outputChannel = "outputChannel2")
public String transform(String input) {
return input.toUpperCase();
}
使用 @Poller 注解
在 Spring Integration 4.0 之前,消息传递注解要求 inputChannel 必须引用一个 SubscribableChannel。对于 PollableChannel 实例,需要使用 <int:bridge/> 元素来配置 <int:poller/>,从而使复合端点成为一个 PollingConsumer。从 4.0 版本开始,引入了 @Poller 注解,允许直接在消息传递注解上配置 poller 属性,如下例所示:
public class AnnotationService {
@Transformer(inputChannel = "input", outputChannel = "output",
poller = @Poller(maxMessagesPerPoll = "${poller.maxMessagesPerPoll}", fixedDelay = "${poller.fixedDelay}"))
public String handle(String payload) {
...
}
}
@Poller 注解仅提供简单的 PollerMetadata 选项。您可以使用属性占位符配置 @Poller 注解的属性(maxMessagesPerPoll、fixedDelay、fixedRate 和 cron)。此外,从版本 5.1 开始,还为 PollingConsumer 提供了 receiveTimeout 选项。如果需要提供更多轮询选项(例如 transaction、advice-chain、error-handler 等),您应将 PollerMetadata 配置为通用 bean,并将其 bean 名称用作 @Poller 的 value 属性。在这种情况下,不允许使用其他属性(必须在 PollerMetadata bean 上指定)。请注意,如果 inputChannel 是 PollableChannel 且未配置 @Poller,则将使用默认的 PollerMetadata(如果它存在于应用程序上下文中)。要通过 @Configuration 注解声明默认轮询器,请使用类似于以下示例的代码:
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata defaultPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(10));
return pollerMetadata;
}
以下示例展示了如何使用默认轮询器:
public class AnnotationService {
@Transformer(inputChannel = "aPollableChannel", outputChannel = "output")
public String handle(String payload) {
...
}
}
以下示例展示了如何使用命名轮询器:
@Bean
public PollerMetadata myPoller() {
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setTrigger(new PeriodicTrigger(1000));
return pollerMetadata;
}
以下示例展示了一个使用默认轮询器的端点:
public class AnnotationService {
@Transformer(inputChannel = "aPollableChannel", outputChannel = "output"
poller = @Poller("myPoller"))
public String handle(String payload) {
...
}
}
从 4.3.3 版本开始,@Poller 注解新增了 errorChannel 属性,以便更轻松地配置底层的 MessagePublishingErrorHandler。该属性的作用与 <poller> XML 组件中的 error-channel 相同。更多信息请参阅端点命名空间支持。
消息注解上的 poller() 属性与 reactive() 属性互斥。更多信息请参阅下一节。
使用 @Reactive 注解
ReactiveStreamsConsumer 自 5.0 版本起就已存在,但仅当端点的输入通道为 FluxMessageChannel(或任何 org.reactivestreams.Publisher 实现)时才被应用。从 5.3 版本开始,当目标消息处理器为 ReactiveMessageHandler 时,无论输入通道类型如何,框架也会创建其实例。自 5.5 版本起,所有消息传递注解都引入了 @Reactive 子注解(类似于上述的 @Poller)。它接受一个可选的 Function<? super Flux<Message<?>>, ? extends Publisher<Message<?>>> bean 引用,并且无论输入通道类型和消息处理器如何,都会将目标端点转换为 ReactiveStreamsConsumer 实例。该函数通过 Flux.transform() 操作符使用,以对来自输入通道的响应式流源应用一些自定义操作(如 publishOn()、doOnNext()、log()、retry() 等)。
以下示例演示了如何独立于最终订阅者和生产者,更改从输入通道到该 DirectChannel 的发布线程:
@Bean
public Function<Flux<?>, Flux<?>> publishOnCustomizer() {
return flux -> flux.publishOn(Schedulers.parallel());
}
@ServiceActivator(inputChannel = "directChannel", reactive = @Reactive("publishOnCustomizer"))
public void handleReactive(String payload) {
...
}
消息注解上的 reactive() 属性与 poller() 属性互斥。更多信息请参阅使用 @Poller 注解和响应式流支持。
使用 @InboundChannelAdapter 注解
版本 4.0 引入了 @InboundChannelAdapter 方法级注解。它基于 MethodInvokingMessageSource 为被注解的方法生成一个 SourcePollingChannelAdapter 集成组件。该注解是 <int:inbound-channel-adapter> XML 组件的对应物,并具有相同的限制:方法不能有参数,且返回类型不能是 void。它有两个属性:value(必需的 MessageChannel bean 名称)和 poller(可选的 @Poller 注解,如前文所述)。如果需要提供一些 MessageHeaders,请使用 Message<?> 返回类型,并使用 MessageBuilder 来构建 Message<?>。使用 MessageBuilder 可以配置 MessageHeaders。以下示例展示了如何使用 @InboundChannelAdapter 注解:
@InboundChannelAdapter("counterChannel")
public Integer count() {
return this.counter.incrementAndGet();
}
@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixed-rate = "5000"))
public String foo() {
return "foo";
}
版本 4.3 为 value 注解属性引入了 channel 别名,以提高源代码的可读性。此外,目标 MessageChannel bean 在 SourcePollingChannelAdapter 中通过提供的名称(由 outputChannelName 选项设置)在首次 receive() 调用时解析,而非在初始化阶段。这允许“延迟绑定”逻辑:从消费者角度来看,目标 MessageChannel bean 的创建和注册时间略晚于 @InboundChannelAdapter 的解析阶段。
第一个示例要求默认轮询器已在应用程序上下文的其他位置声明。
使用 @MessagingGateway 注解
请参阅 @MessagingGateway 注解。
使用 @IntegrationComponentScan 注解
标准的 Spring Framework @ComponentScan 注解不会扫描接口上的 @Component 原型注解。为了克服这一限制并允许配置 @MessagingGateway(参见 @MessagingGateway 注解),我们引入了 @IntegrationComponentScan 机制。该注解必须与 @Configuration 注解一同使用,并可自定义其扫描选项,例如 basePackages 和 basePackageClasses。在这种情况下,所有发现的带有 @MessagingGateway 注解的接口都会被解析并注册为 GatewayProxyFactoryBean 实例。所有其他基于类的组件则由标准的 @ComponentScan 进行解析。