Spring Integration 框架概述
Spring Integration 扩展了 Spring 编程模型,以支持著名的企业集成模式。它能够在基于 Spring 的应用程序中实现轻量级消息传递,并通过声明式适配器支持与外部系统的集成。这些适配器在 Spring 对远程调用、消息传递和调度的支持之上提供了更高层次的抽象。
Spring Integration 的主要目标是为构建企业集成解决方案提供一个简单的模型,同时保持关注点分离,这对于生成可维护、可测试的代码至关重要。
Spring Integration 概述
本章节将对 Spring Integration 的核心概念与组件进行高层级介绍,同时提供若干编程技巧以帮助您充分发挥 Spring Integration 的潜力。
背景
Spring Framework 的一个核心理念是控制反转(IoC)。从最广泛的意义上讲,这意味着框架代表其上下文内管理的组件处理职责。组件本身因此得到简化,因为它们从这些职责中解脱出来。例如,依赖注入解除了组件定位或创建其依赖项的责任。同样,面向切面编程通过将通用的横切关注点模块化为可重用的切面,使业务组件从这些关注点中解脱出来。在每种情况下,最终结果都是一个更易于测试、理解、维护和扩展的系统。
此外,Spring框架及其生态系统为企业级应用开发提供了一套全面的编程模型。开发者受益于该模型的一致性,特别是它基于久经考验的最佳实践,例如面向接口编程以及优先使用组合而非继承。Spring简化的抽象层和强大的支持库不仅提升了开发者的生产力,同时增强了代码的可测试性与可移植性。
Spring Integration 的动机与这些目标和原则相同。它将 Spring 编程模型扩展到消息传递领域,并基于 Spring 现有的企业集成支持,提供了更高层次的抽象。它支持消息驱动架构,其中控制反转应用于运行时关注点,例如某些业务逻辑应在何时运行以及响应应发送到哪里。它支持消息的路由和转换,因此可以集成不同的传输方式和数据格式,而不会影响可测试性。换句话说,消息传递和集成关注点由框架处理。业务组件进一步与基础设施隔离,开发人员得以从复杂的集成职责中解脱出来。
作为Spring编程模型的扩展,Spring Integration提供了多种配置选项,包括注解、支持命名空间的XML、通用"bean"元素的XML以及直接使用底层API。该API基于定义良好的策略接口和非侵入式的委托适配器。Spring Integration的设计灵感源于对Spring内部常见模式与Gregor Hohpe和Bobby Woolf在《企业集成模式》(Addison Wesley,2004)中描述的经典模式之间紧密关联的认知。阅读过该书的开发者能够立即熟悉Spring Integration的概念和术语。
目标与原则
Spring Integration 的动机源于以下目标:
-
提供实现复杂企业集成解决方案的简单模型。
-
在基于Spring的应用程序中促进异步、消息驱动的行为。
-
为现有Spring用户提供直观、渐进式的采用方式。
Spring Integration 遵循以下原则:
-
组件应松散耦合以实现模块化和可测试性。
-
框架应强制执行业务逻辑与集成逻辑之间的关注点分离。
-
扩展点本质上应是抽象的(但在明确定义的边界内),以促进重用和可移植性。
主要组件
从垂直视角来看,分层架构有助于关注点分离,而基于接口的层间契约促进了松耦合。基于Spring的应用程序通常采用这种设计方式,Spring框架及其生态体系为企业应用全栈遵循这一最佳实践提供了坚实基础。消息驱动架构引入了横向视角,但同样的目标仍然适用。正如"分层架构"是极其通用且抽象的范式,消息系统通常遵循同样抽象的"管道-过滤器"模型。"过滤器"代表能够生产或消费消息的任何组件,"管道"则在过滤器之间传输消息,使组件本身保持松耦合。需要特别注意的是,这两种高层范式并不互斥。支撑"管道"的底层消息基础设施仍应封装在层中,其契约通过接口定义。同样,"过滤器"本身应在逻辑上位于应用程序服务层之上的层级中进行管理,通过与服务层接口交互的方式实现通信——这与Web层的工作方式高度相似。
消息
在 Spring Integration 中,消息是一个通用的包装器,用于包装任何 Java 对象,并结合了框架在处理该对象时使用的元数据。它由有效负载(payload)和头部(headers)组成。有效负载可以是任何类型,而头部则保存了通常所需的信息,例如 ID、时间戳、关联 ID 和返回地址。头部还用于在连接的传输之间传递值。例如,当从接收到的文件创建消息时,文件名可能会存储在头部中,以便下游组件访问。同样,如果消息的内容最终要通过出站邮件适配器发送,则各种属性(收件人、发件人、抄送、主题等)可以由上游组件配置为消息头部值。开发人员还可以在头部中存储任意的键值对。

图 1. 消息
消息通道
消息通道代表了管道-过滤器架构中的“管道”。生产者向通道发送消息,消费者从通道接收消息。因此,消息通道解耦了消息组件,同时也为消息的拦截和监控提供了便利点。

图 2. 消息通道
消息通道可以遵循点对点或发布-订阅语义。对于点对点通道,每个发送到通道的消息最多只能被一个消费者接收。而发布-订阅通道则尝试将每个消息广播给通道上的所有订阅者。Spring Integration 同时支持这两种模型。
"点对点"和"发布-订阅"定义了每条消息最终能被多少消费者接收的两种模式,但还有另一个重要考量因素:通道是否应该缓冲消息?在 Spring Integration 中,可轮询通道能够将消息缓冲到队列中。缓冲的优势在于可以限制入站消息的流量,从而防止消费者过载。然而,正如其名称所示,这也带来了一些复杂性,因为只有在配置了轮询器的情况下,消费者才能从此类通道接收消息。另一方面,连接到可订阅通道的消费者则完全是消息驱动的。消息通道实现详细讨论了 Spring Integration 中可用的各种通道实现。
消息端点
Spring Integration 的主要目标之一是通过控制反转来简化企业集成解决方案的开发。这意味着您无需直接实现消费者和生产者,甚至无需构建消息或在消息通道上调用发送或接收操作。相反,您应该能够专注于基于普通对象实现的特定领域模型。然后,通过提供声明式配置,您可以将领域特定代码“连接”到 Spring Integration 提供的消息传递基础设施。负责这些连接的组件是消息端点。这并不意味着您必须将现有应用程序代码直接连接起来。任何现实世界中的企业集成解决方案都需要一定数量的代码来处理集成关注点,例如路由和转换。重要的是实现集成逻辑与业务逻辑之间的关注点分离。换句话说,就像 Web 应用程序的模型-视图-控制器 (MVC) 范式一样,目标应该是提供一个精简但专用的层,将入站请求转换为服务层调用,然后将服务层返回值转换为出站回复。下一节将概述处理这些职责的消息端点类型,并且在后续章节中,您将看到 Spring Integration 的声明式配置选项如何以非侵入式的方式使用这些端点。
消息端点
消息端点(Message Endpoint)代表了管道-过滤器架构中的“过滤器”。如前所述,端点的核心作用是以非侵入的方式将应用程序代码连接到消息框架。换句话说,理想情况下应用程序代码不应感知消息对象或消息通道。这类似于 MVC 范式中的控制器角色。正如控制器处理 HTTP 请求,消息端点处理消息;正如控制器映射到 URL 模式,消息端点映射到消息通道。两者的目标相同:将应用程序代码与基础设施隔离。这些概念及后续所有模式在《企业集成模式》一书中有详细讨论。在此,我们仅对 Spring Integration 支持的主要端点类型及其相关角色进行高层描述。后续章节将详细阐述并提供示例代码及配置示例。
消息转换器
消息转换器负责转换消息的内容或结构,并返回修改后的消息。最常见的转换器类型可能是将消息的有效载荷从一种格式转换为另一种格式(例如从XML转换为java.lang.String)。同样,转换器也可以添加、删除或修改消息的头部值。
消息过滤器
消息过滤器决定一条消息是否应被传递至输出通道。这仅需一个布尔测试方法,该方法可检查特定的负载内容类型、属性值、标头是否存在或其他条件。若消息被接受,则发送至输出通道;若未被接受,则被丢弃(或在更严格的实现中,可抛出 Exception)。消息过滤器常与发布-订阅通道结合使用,其中多个消费者可能接收同一消息,并利用过滤器的标准来缩小待处理消息的范围。
请注意,不要将管道-过滤器架构模式中通用的“过滤器”概念与这种特定的端点类型混淆,后者选择性地缩小两个通道之间流动的消息范围。管道-过滤器概念中的“过滤器”更接近于Spring Integration的消息端点:任何可以连接到消息通道以发送或接收消息的组件。
消息路由器
消息路由器负责决定消息接下来应发送到哪个或哪些通道(如果有的话)。通常,该决策基于消息的内容或消息头中可用的元数据。消息路由器常被用作服务激活器或其他能够发送回复消息的端点上静态配置输出通道的动态替代方案。同样,消息路由器为多个订阅者使用的反应式消息过滤器提供了主动式替代方案,如前所述。

图 3. 消息路由器
分割器
拆分器是另一种类型的消息端点,其职责是从其输入通道接收消息,将该消息拆分为多个消息,并将每个消息发送到其输出通道。这通常用于将“复合”负载对象拆分为一组包含细分负载的消息。
聚合器
基本上,聚合器是拆分器的镜像,它是一种消息端点,接收多条消息并将它们合并为一条消息。实际上,在包含拆分器的管道中,聚合器通常是下游消费者。从技术上讲,聚合器比拆分器更复杂,因为它需要维护状态(待聚合的消息)、决定何时完整的消息组可用,并在必要时进行超时处理。此外,在超时的情况下,聚合器需要知道是发送部分结果、丢弃它们,还是将它们发送到单独的通道。Spring Integration 提供了 CorrelationStrategy、ReleaseStrategy 以及可配置的超时设置,用于决定超时时是否发送部分结果,以及一个丢弃通道。
服务激活器
服务激活器(Service Activator)是一种通用端点,用于将服务实例连接到消息系统。必须配置输入消息通道,并且如果待调用的服务方法能够返回值,也可以提供输出消息通道。
输出通道是可选的,因为每条消息也可以提供自己的 'Return Address' 头部。同样的规则适用于所有消费者端点。
服务激活器通过调用某个服务对象上的操作来处理请求消息,它会提取请求消息的有效载荷并在必要时进行转换(当方法不期望接收消息类型参数时)。每当服务对象的方法返回一个值时,该返回值同样会在必要时转换为回复消息(如果它本身不是消息类型)。该回复消息将被发送到输出通道。如果未配置输出通道,回复消息将发送至消息中指定的"返回地址"通道(如果存在该地址)。
请求-应答服务激活器端点将目标对象的方法连接到输入和输出消息通道。

图 4. 服务激活器
如前所述,在消息通道中,通道可以是可轮询的或可订阅的。在上图中,这通过“时钟”符号、实线箭头(轮询)和虚线箭头(订阅)来表示。
通道适配器
通道适配器是一种端点,用于将消息通道连接到其他系统或传输方式。通道适配器可以是入站或出站的。通常,通道适配器会在消息与从其他系统接收或发送到其他系统的对象或资源(如文件、HTTP请求、JMS消息等)之间进行某种映射。根据传输方式的不同,通道适配器还可能填充或提取消息头值。Spring Integration 提供了多种通道适配器,这些内容将在后续章节中介绍。

图 5. 入站通道适配器端点将源系统连接到 MessageChannel。
消息源可以是轮询式的(例如,POP3)或消息驱动的(例如,IMAP Idle)。在上图中,这通过“时钟”符号、实线箭头(轮询)和虚线箭头(消息驱动)来表示。

图 6. 出站通道适配器端点将 MessageChannel 连接到目标系统。
正如之前在消息通道中讨论的,通道可以是可轮询的或可订阅的。在上图中,这通过“时钟”符号、实线箭头(轮询)和虚线箭头(订阅)来表示。
端点 Bean 名称
消费端点(任何带有 inputChannel 的组件)由两个 Bean 组成:消费者和消息处理器。消费者持有对消息处理器的引用,并在消息到达时调用它。
考虑以下 XML 示例:
<int:service-activator id = "someService" ... />
根据前面的示例,bean 名称如下:
-
消费者:
someService(id) -
处理器:
someService.handler
在使用企业集成模式(EIP)注解时,名称取决于多个因素。考虑以下带注解的 POJO 示例:
@Component
public class SomeComponent {
@ServiceActivator(inputChannel = ...)
public String someMethod(...) {
...
}
}
根据前面的示例,bean 名称如下:
-
Consumer:
someComponent.someMethod.serviceActivator -
Handler:
someComponent.someMethod.serviceActivator.handler
从 5.0.4 版本开始,你可以使用 @EndpointId 注解来修改这些名称,如下例所示:
@Component
public class SomeComponent {
@EndpointId("someService")
@ServiceActivator(inputChannel = ...)
public String someMethod(...) {
...
}
}
根据前面的示例,bean 名称如下:
-
消费者:
someService -
处理器:
someService.handler
@EndpointId 创建的名称与 XML 配置中 id 属性创建的名称相同。请看以下带注解的 Bean 示例:
@Configuration
public class SomeConfiguration {
@Bean
@ServiceActivator(inputChannel = ...)
public MessageHandler someHandler() {
...
}
}
根据前面的示例,Bean 的名称如下:
-
Consumer:
someConfiguration.someHandler.serviceActivator -
Handler:
someHandler(@Bean名称)
从 5.0.4 版本开始,你可以使用 @EndpointId 注解来修改这些名称,如下例所示:
@Configuration
public class SomeConfiguration {
@Bean("someService.handler") 1
@EndpointId("someService") 2
@ServiceActivator(inputChannel = ...)
public MessageHandler someHandler() {
...
}
}
处理器:
someService.handler(Bean 名称)消费者:
someService(端点 ID)
@EndpointId 注解会创建与 XML 配置中 id 属性相同的名称,前提是您遵循在 @Bean 名称后追加 .handler 的命名约定。
存在一种特殊情况会创建第三个 Bean:出于架构考虑,如果 MessageHandler 的 @Bean 未定义 AbstractReplyProducingMessageHandler,框架会将提供的 Bean 包装在 ReplyProducingMessageHandlerWrapper 中。该包装器支持请求处理器通知处理,并发出正常的“未生成回复”调试日志消息。其 Bean 名称是处理器 Bean 名称加上 .wrapper(当存在 @EndpointId 时——否则使用常规生成的处理器名称)。
类似地,可轮询消息源会创建两个 Bean:一个 SourcePollingChannelAdapter (SPCA) 和一个 MessageSource。
考虑以下 XML 配置:
<int:inbound-channel-adapter id = "someAdapter" ... />
根据前面的 XML 配置,bean 的名称如下:
-
SPCA:
someAdapter(该id) -
处理器:
someAdapter.source
考虑以下用于定义 @EndpointId 的 POJO 的 Java 配置:
@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public String pojoSource() {
...
}
根据前面的 Java 配置示例,Bean 的名称如下:
-
SPCA:
someAdapter -
处理器:
someAdapter.source
考虑以下用于定义 @EndpointID 的 Java Bean 配置:
@Bean("someAdapter.source")
@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public MessageSource<?> source() {
return () -> {
...
};
}
根据前面的示例,bean 名称如下:
-
SPCA:
someAdapter -
Handler:
someAdapter.source(只要遵循在@Bean名称后追加.source的命名约定)
配置与 @EnableIntegration
在本文档中,您可以看到关于在 Spring Integration 流中声明元素的 XML 命名空间支持的引用。该支持由一系列命名空间解析器提供,这些解析器会生成适当的 bean 定义以实现特定组件。例如,许多端点由一个 MessageHandler bean 和一个 ConsumerEndpointFactoryBean 组成,处理程序和输入通道名称会被注入到该工厂 bean 中。
当首次遇到 Spring Integration 命名空间元素时,框架会自动声明多个用于支持运行时环境的 Bean(任务调度器、隐式通道创建器等)。
版本 4.0 引入了 @EnableIntegration 注解,用于注册 Spring Integration 基础设施 bean(参见 Javadoc)。当仅使用 Java 配置时(例如,配合 Spring Boot 或 Spring Integration 消息注解支持以及 Spring Integration Java DSL,且没有 XML 集成配置),此注解是必需的。
当您拥有一个没有Spring Integration组件的父上下文以及两个或多个使用Spring Integration的子上下文时,@EnableIntegration 注解同样非常有用。它允许这些公共组件仅在父上下文中声明一次。
@EnableIntegration 注解会在应用上下文中注册许多基础设施组件。具体来说,它:
-
注册一些内置的 Bean,例如
errorChannel及其LoggingHandler、用于轮询器的taskScheduler、jsonPathSpEL 函数等。 -
添加多个
BeanFactoryPostProcessor实例,以增强用于全局和默认集成环境的BeanFactory。 -
添加多个
BeanPostProcessor实例,以出于集成目的增强、转换或包装特定的 Bean。 -
添加注解处理器以解析消息传递注解,并在应用上下文中为它们注册组件。
@IntegrationComponentScan 注解同样支持类路径扫描。该注解的作用类似于标准的 Spring Framework @ComponentScan 注解,但仅限于扫描 Spring Integration 特有的组件和注解,这些是标准的 Spring Framework 组件扫描机制无法覆盖的。有关示例,请参阅 @MessagingGateway 注解。
@EnablePublisher 注解会注册一个 PublisherAnnotationBeanPostProcessor bean,并为那些未提供 channel 属性的 @Publisher 注解配置 default-publisher-channel。如果发现多个 @EnablePublisher 注解,它们必须为默认通道设置相同的值。更多信息请参阅使用 @Publisher 注解进行注解驱动配置。
@GlobalChannelInterceptor 注解已被引入,用于标记 ChannelInterceptor bean 以实现全局通道拦截。该注解是 <int:channel-interceptor> XML 元素的对应物(参见全局通道拦截器配置)。@GlobalChannelInterceptor 注解可以放置在类级别(配合 @Component 原型注解使用)或 @Configuration 类中的 @Bean 方法上。无论哪种情况,该 bean 都必须实现 ChannelInterceptor 接口。
从版本5.1开始,全局通道拦截器将应用于动态注册的通道——例如通过使用 beanFactory.initializeBean() 或在使用Java DSL时通过 IntegrationFlowContext 初始化的bean。在此之前,当bean在应用上下文刷新后创建时,拦截器不会被应用。
@IntegrationConverter 注解将 Converter、GenericConverter 或 ConverterFactory bean 标记为 integrationConversionService 的候选转换器。此注解是 <int:converter> XML 元素的对应物(参见负载类型转换)。您可以将 @IntegrationConverter 注解放在类级别(配合 @Component 原型注解使用)或 @Configuration 类中的 @Bean 方法上。
有关消息注解的更多信息,请参阅注解支持。
编程注意事项
Spring Integration 中的大多数类(除非另有说明)必须在应用程序上下文中声明为 bean 且为单例。这意味着这些类的实例是线程安全的,其生命周期以及与其他组件的连接由 Spring 依赖注入容器管理。工具类和构建器类(如 JacksonMessagingUtils、MessageBuilder、ExpressionEvalMap、IntegrationReactiveUtils 等)可以直接在 Java 代码中使用。然而,Java DSL 工厂和 IntegrationComponentSpec 的实现结果仍需作为 bean 注册到应用程序上下文中。存在于许多模块中的 Session 抽象不是线程安全的,通常通过 Factory 模式实现创建,并从线程安全的 Template 模式中使用。例如,可参考 SftpRemoteFileTemplate 及其与 DefaultSftpSessionFactory 的关系。
在可能的情况下,你应该尽量使用普通的Java对象(POJOs)(用于目标逻辑中的消息处理),并且仅在绝对必要时才在代码中暴露框架。更多信息请参见 POJO 方法调用。
如果你确实将框架暴露给类,那么有一些注意事项需要考虑,尤其是在应用程序启动期间:
-
如果你的组件实现了
ApplicationContextAware接口,通常不应在setApplicationContext()方法中使用ApplicationContext。相反,应存储一个引用,并将此类使用推迟到上下文生命周期的后期。 -
如果你的组件是
InitializingBean或使用了@PostConstruct方法,请不要从这些初始化方法中发送任何消息。调用这些方法时,应用程序上下文尚未初始化,发送此类消息很可能会失败。如果你需要在启动期间发送消息,请实现ApplicationListener并等待ContextRefreshedEvent。或者,实现SmartLifecycle,将你的 bean 置于较晚的阶段,并从start()方法中发送消息。
使用打包(例如 Shaded)Jar 时的注意事项
Spring Integration 通过使用 Spring Framework 的 SpringFactories 机制加载多个 IntegrationConfigurationInitializer 类来引导某些功能。这包括 -core 包以及其他一些包,例如 -http 和 -jmx。此过程的信息存储在每个 jar 包的 META-INF/spring.factories 文件中。
一些开发者倾向于使用知名工具(例如 Apache Maven Shade Plugin)将其应用程序及其所有依赖项重新打包为单个 jar 文件。
默认情况下,shade 插件在生成 shaded jar 时不会合并 spring.factories 文件。
除了 spring.factories 之外,其他 META-INF 文件(spring.handlers 和 spring.schemas)用于 XML 配置。这些文件也需要进行合并。
Spring Boot 的可执行 jar 机制 采用了不同的方法,它将 jar 文件嵌套在一起,从而在类路径上保留了每个 spring.factories 文件。因此,对于 Spring Boot 应用程序,如果您使用其默认的可执行 jar 格式,则无需进行其他操作。
即使你不使用 Spring Boot,你仍然可以利用 Boot 提供的工具来增强 shade 插件,为上述文件添加转换器。以下示例展示了如何配置该插件:
示例 1. pom.xml
...
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<dependencies>
<dependency> // <1>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers> // <2>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
...
具体来说,
添加
spring-boot-maven-plugin作为依赖项。配置 transformers。
你可以为 ${spring.boot.version} 添加一个属性,或者使用一个明确的版本。
编程技巧与窍门
本节介绍了一些充分利用 Spring Integration 的方法。
XML 模式
在使用 XML 配置时,为避免出现错误的模式验证错误,应使用“支持 Spring”的 IDE,例如 Spring Tool Suite (STS)、安装了 Spring IDE 插件的 Eclipse 或 IntelliJ IDEA。这些 IDE 知道如何从类路径中解析正确的 XML 模式(通过使用 jar 包中的 META-INF/spring.schemas 文件)。在使用 STS 或安装了插件的 Eclipse 时,必须在项目上启用 Spring Project Nature。
由于兼容性考虑,托管在互联网上的某些旧模块(存在于1.0版本中的模块)的架构仍为1.0版本。如果您的IDE使用这些架构,很可能会看到错误的报错信息。
每个在线模式都有类似的警告:
此模式适用于 Spring Integration Core 的 1.0 版本。我们无法将其更新到当前模式,因为这会破坏任何使用 1.0.3 或更低版本的应用。对于后续版本,"无版本"模式将从类路径解析并从 jar 包中获取。请参考 GitHub:
受影响的模块是
-
core(spring-integration.xsd) -
file -
http -
jms -
mail -
security -
stream -
ws -
xml
查找 Java 和 DSL 配置的类名
通过XML配置和Spring集成命名空间支持,XML解析器隐藏了目标bean的声明和连接方式。对于Java配置,理解面向最终用户应用的框架API至关重要。
EIP 实现的一等公民是 Message、Channel 和 Endpoint(参见本章前面的主要组件)。它们的实现(合约)如下:
前两个足够简单,易于理解如何实现、配置和使用。最后一个值得更多关注。
AbstractEndpoint 在 Spring 框架中被广泛用于不同的组件实现。其主要实现包括:
-
EventDrivenConsumer,当我们订阅SubscribableChannel以监听消息时使用。 -
PollingConsumer,当我们从PollableChannel轮询消息时使用。
在使用消息注解或Java DSL时,您无需关心这些组件,因为框架会自动通过适当的注解和BeanPostProcessor实现来生成它们。当手动构建组件时,您应使用ConsumerEndpointFactoryBean来帮助确定要创建的目标AbstractEndpoint消费者实现,这基于提供的inputChannel属性。
另一方面,ConsumerEndpointFactoryBean 委托给框架中的另一个核心组件——org.springframework.messaging.MessageHandler。该接口实现的目标是处理端点从通道消费的消息。Spring Integration 中的所有 EIP 组件都是 MessageHandler 实现(例如 AggregatingMessageHandler、MessageTransformingHandler、AbstractMessageSplitter 等)。目标协议出站适配器(FileWritingMessageHandler、HttpRequestExecutingMessageHandler、AbstractMqttMessageHandler 等)同样也是 MessageHandler 实现。当您使用 Java 配置开发 Spring Integration 应用程序时,应查阅 Spring Integration 模块以找到适合用于 @ServiceActivator 配置的 MessageHandler 实现。例如,要发送 XMPP 消息(参见 XMPP 支持),您应配置类似以下内容:
@Bean
@ServiceActivator(inputChannel = "input")
public MessageHandler sendChatMessageHandler(XMPPConnection xmppConnection) {
ChatMessageSendingMessageHandler handler = new ChatMessageSendingMessageHandler(xmppConnection);
DefaultXmppHeaderMapper xmppHeaderMapper = new DefaultXmppHeaderMapper();
xmppHeaderMapper.setRequestHeaderNames("*");
handler.setHeaderMapper(xmppHeaderMapper);
return handler;
}
MessageHandler 实现代表了消息流的出站和处理部分。
入站消息流侧拥有其自身的组件,这些组件分为轮询与监听两种行为模式。监听(消息驱动)类组件较为简单,通常只需一个目标类实现即可准备就绪以产生消息。监听组件可以是单向的 MessageProducerSupport 实现(例如 AbstractMqttMessageDrivenChannelAdapter 和 ImapIdleChannelAdapter),也可以是请求-应答模式的 MessagingGatewaySupport 实现(例如 AmqpInboundGateway 和 AbstractWebServiceInboundGateway)。
轮询入站端点适用于那些不提供监听器API或不适用于此类行为的协议,包括任何基于文件的协议(如FTP)、任何数据库(RDBMS或NoSQL)以及其他协议。
这些入站端点由两个组件构成:轮询器配置(用于定期启动轮询任务)和消息源类(用于从目标协议读取数据并为下游集成流生成消息)。轮询器配置的第一个类是 SourcePollingChannelAdapter。它是另一个 AbstractEndpoint 实现,但专门用于轮询以启动集成流。通常,在使用消息注解或 Java DSL 时,您无需关心此类。框架会根据 @InboundChannelAdapter 配置或 Java DSL 构建器规范为其生成一个 bean。
消息源组件对于目标应用程序开发更为重要,它们都实现了 MessageSource 接口(例如 MongoDbMessageSource 和 AbstractTwitterMessageSource)。考虑到这一点,我们用于通过 JDBC 从 RDBMS 表中读取数据的配置可能如下所示:
@Bean
@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixedDelay="5000"))
public MessageSource<?> storedProc(DataSource dataSource) {
return new JdbcPollingChannelAdapter(dataSource, "SELECT * FROM foo where status = 0");
}
您可以在特定的 Spring Integration 模块中找到目标协议所需的所有入站和出站类(大多数情况下位于相应的包中)。例如,spring-integration-websocket 适配器位于:
-
o.s.i.websocket.inbound.WebSocketInboundChannelAdapter: 实现MessageProducerSupport,用于监听套接字上的帧,并向通道生成消息。 -
o.s.i.websocket.outbound.WebSocketOutboundMessageHandler: 单向的AbstractMessageHandler实现,用于将传入的消息转换为适当的帧并通过 WebSocket 发送。
如果你熟悉Spring Integration的XML配置,从版本4.3开始,我们在XSD元素定义中提供了关于使用哪些目标类来声明适配器或网关的bean的信息,如下例所示:
<xsd:element name="outbound-async-gateway">
<xsd:annotation>
<xsd:documentation>
Configures a Consumer Endpoint for the 'o.s.i.amqp.outbound.AsyncAmqpOutboundGateway'
that will publish an AMQP Message to the provided Exchange and expect a reply Message.
The sending thread returns immediately; the reply is sent asynchronously; uses 'AsyncRabbitTemplate.sendAndReceive()'.
</xsd:documentation>
</xsd:annotation>
POJO 方法调用
如编程注意事项中所述,我们建议采用 POJO 编程风格,如下例所示:
@ServiceActivator
public String myService(String payload) { ... }
在这种情况下,框架会提取一个String类型的有效载荷,调用您的方法,并将结果包装成消息发送给流程中的下一个组件(原始头部信息会被复制到新消息中)。实际上,如果使用XML配置,您甚至不需要@ServiceActivator注解,如下列配对示例所示:
<int:service-activator ... ref="myPojo" method="myService" />
public String myService(String payload) { ... }
只要类中的公共方法没有歧义,你就可以省略 method 属性。
你也可以在 POJO 方法中获取头部信息,如下例所示:
@ServiceActivator
public String myService(@Payload String payload, @Header("foo") String fooHeader) { ... }
你也可以解构消息中的属性,如下例所示:
@ServiceActivator
public String myService(@Payload("payload.foo") String foo, @Header("bar.baz") String barbaz) { ... }
由于有多种 POJO 方法调用方式可用,5.0 之前的版本使用 SpEL(Spring 表达式语言)来调用 POJO 方法。与这些方法中通常执行的实际工作相比,SpEL(即使是解释执行)通常“足够快”。然而,从 5.0 版本开始,默认情况下尽可能使用 org.springframework.messaging.handler.invocation.InvocableHandlerMethod。这种技术通常比解释执行的 SpEL 执行速度更快,并且与其他 Spring 消息传递项目保持一致。InvocableHandlerMethod 类似于 Spring MVC 中用于调用控制器方法的技术。某些方法在使用 SpEL 时仍然始终会被调用。例如,如前所述,包含解引用属性的注解参数。这是因为 SpEL 具有导航属性路径的能力。
可能存在一些我们尚未考虑到的其他边缘情况,这些情况同样无法与 InvocableHandlerMethod 实例配合工作。因此,在这些情况下,我们会自动回退到使用 SpEL。
如果你愿意,也可以设置你的 POJO 方法,使其始终使用 SpEL,通过 UseSpelInvoker 注解,如下例所示:
@UseSpelInvoker(compilerMode = "IMMEDIATE")
public void bar(String bar) { ... }
如果省略 compilerMode 属性,则由 spring.expression.compiler.mode 系统属性决定编译器模式。有关已编译 SpEL 的更多信息,请参阅 SpEL 编译。