Spring Integration框架概述
Spring Integration 提供了对 Spring 编程模型的扩展,以支持知名的 企业集成模式。它在基于 Spring 的应用程序中启用了轻量级消息传递,并通过声明式适配器支持与外部系统的集成。这些适配器在 Spring 对远程处理、消息传递和调度的支持之上提供了更高层次的抽象。
Spring Integration 的主要目标是提供一个简单的模型来构建企业集成解决方案,同时保持关注点的分离,这是生成可维护、可测试代码所必需的。
Spring Integration 概述
本章对 Spring Integration 的核心概念和组件进行了高层次的介绍。它包括一些编程提示,帮助你充分利用 Spring Integration。
背景
Spring 框架的一个关键主题是控制反转(IoC)。从广义上讲,这意味着框架在其上下文中代表组件处理职责。组件本身得到了简化,因为它们被免除了一些职责。例如,依赖注入使组件免除了定位或创建其依赖项的责任。同样,面向方面编程通过将通用的横切关注点模块化为可重用的方面,从而减轻了业务组件的负担。在每种情况下,最终结果都是一个更容易测试、理解、维护和扩展的系统。
此外,Spring 框架和组合为构建企业应用程序提供了全面的编程模型。开发人员受益于该模型的一致性,尤其是它基于成熟的最佳实践,例如按接口编程和优先选择组合而非继承。Spring 的简化抽象和强大的支持库在提高开发人员生产力的同时,也提高了可测试性和可移植性水平。
Spring Integration 是由这些相同的目标和原则驱动的。它将 Spring 编程模型扩展到消息领域,并基于 Spring 现有的企业集成支持提供了一个更高的抽象级别。它支持消息驱动架构,在这种架构中,控制反转适用于运行时关注点,例如特定业务逻辑应在何时运行以及响应应发送到何处。它支持消息的路由和转换,以便不同的传输方式和不同的数据格式可以被集成而不影响可测试性。换句话说,消息和集成的关注点由框架处理。业务组件进一步与基础设施隔离,开发人员也被免除复杂的集成责任。
作为 Spring 编程模型的扩展,Spring Integration 提供了多种配置选项,包括注解、带有命名空间支持的 XML、带有通用 “bean” 元素的 XML 以及直接使用底层 API。该 API 基于定义良好的策略接口和非侵入式委托适配器。Spring Integration 的设计灵感来源于对 Spring 内常见模式与 Gregor Hohpe 和 Bobby Woolf 在 Enterprise Integration Patterns (Addison Wesley,2004)中描述的知名模式之间强烈关联的认可。读过这本书的开发人员应该能够立即熟悉 Spring Integration 的概念和术语。
目标和原则
Spring Integration 的设计目标如下:
-
为实施复杂的企业集成解决方案提供一个简单的模型。
-
在基于 Spring 的应用程序中促进异步、消息驱动的行为。
-
促进现有 Spring 用户的直观、增量采用。
Spring Integration 是由以下原则指导的:
-
组件应松散耦合以实现模块化和可测试性。
-
框架应强制分离业务逻辑和集成逻辑。
-
扩展点应该具有抽象性质(但在明确定义的边界内)以促进重用和可移植性。
主要组件
从垂直的角度来看,分层架构有助于分离关注点,并且基于接口的层间契约促进了松耦合。Spring 为基础的应用程序通常就是这样设计的,Spring 框架和组合提供了一个强大的基础,以遵循这一最佳实践,应用于企业应用程序的整个堆栈。消息驱动的架构增加了水平视角,但这些相同的目标仍然相关。就像“分层架构”是一个非常通用和抽象的范式一样,消息系统通常遵循类似的抽象“管道和过滤器”模型。“过滤器”表示任何能够生成或消费消息的组件,“管道”在过滤器之间传输消息,使得组件本身保持松耦合。需要注意的是,这两个高层次的范式并不是互相排斥的。支持“管道”的底层消息基础设施仍然应该封装在一个定义为接口的层中。同样地,“过滤器”本身应该在逻辑上位于应用程序服务层之上的一个层中进行管理,通过接口与这些服务交互,这与 Web 层的方式大致相同。
消息
在 Spring Integration 中,消息是任何 Java 对象的通用包装,结合了框架在处理该对象时使用的元数据。它由有效负载和头组成。有效负载可以是任何类型,而头包含常用信息,例如 ID、时间戳、相关 ID 和返回地址。头还用于在连接的传输之间传递值。例如,在从接收到的文件创建消息时,文件名可能会存储在头中,以便下游组件访问。同样地,如果消息的内容最终将通过出站邮件适配器发送,则各种属性(如收件人、发件人、抄送、主题等)可能由上游组件配置为消息头值。开发人员还可以在头中存储任意键值对。
图 1. 消息
消息通道
消息通道代表了管道和过滤器架构中的“管道”。生产者将消息发送到通道,消费者从通道接收消息。因此,消息通道解耦了消息组件,并且还为拦截和监控消息提供了一个方便的点。
图 2. 消息通道
消息通道可以遵循点对点或发布-订阅语义。对于点对点通道,每个发送到通道的消息最多只能由一个消费者接收。另一方面,发布-订阅通道会尝试将每个消息广播给通道上的所有订阅者。Spring Integration 支持这两种模型。
而 “点对点” 和 "发布-订阅" 定义了有多少消费者最终接收每条消息的两种选项,还有另一个重要的考虑因素:通道是否应该缓冲消息?在 Spring Integration 中,可轮询通道能够在队列中缓冲消息。缓冲的优势在于它可以通过限制传入的消息来防止消费者过载。然而,顾名思义,这也增加了一些复杂性,因为只有在配置了轮询器的情况下,消费者才能从这样的通道接收消息。另一方面,连接到可订阅通道的消费者是简单的由消息驱动的。消息通道实现 详细讨论了 Spring Integration 中可用的各种通道实现。
消息端点
Spring Integration 的主要目标之一是通过控制反转简化企业集成解决方案的开发。这意味着你不需要直接实现消费者和生产者,甚至不需要构建消息并在消息通道上调用发送或接收操作。相反,你应该能够专注于基于普通对象的具体领域模型。然后,通过提供声明性配置,你可以将特定领域的代码“连接”到 Spring Integration 提供的消息传递基础设施。负责这些连接的组件是消息端点。这并不意味着你应该直接连接现有的应用程序代码。任何现实世界的企业集成解决方案都需要一些关注路由和转换等集成问题的代码。重要的是要在集成逻辑和业务逻辑之间实现关注点分离。换句话说,就像 Web 应用程序的模型-视图-控制器(MVC)范例一样,目标应该是提供一个薄而专门的层,将传入请求转换为服务层调用,然后再将服务层返回值转换为传出回复。下一节概述了处理这些职责的消息端点类型,在接下来的章节中,你可以看到 Spring Integration 的声明性配置选项如何以非侵入的方式使用每一个端点。
消息端点
消息端点表示管道和过滤器架构中的“过滤器”。如前所述,端点的主要作用是以非侵入性的方式将应用程序代码连接到消息传递框架。换句话说,应用程序代码理想情况下不应该意识到消息对象或消息通道。这类似于MVC范式中的控制器角色。就像控制器处理HTTP请求一样,消息端点处理消息。就像控制器映射到URL模式一样,消息端点映射到消息通道。在这两种情况下,目标是相同的:隔离应用程序代码与基础设施。这些概念以及后面的所有模式都在《企业集成模式》一书中进行了详细讨论。在这里,我们仅提供由Spring Integration支持的主要端点类型的高层次描述以及与这些类型相关的作用。随后的章节将详细阐述并提供示例代码和配置示例。
消息转换器
消息转换器负责转换消息的内容或结构,并返回修改后的消息。可能最常见的转换器类型是将消息的有效载荷从一种格式转换为另一种格式(例如,从 XML 到 java.lang.String
)。同样,转换器可以添加、删除或修改消息的标题值。
消息过滤器
消息过滤器确定消息是否应该传递给输出通道。这只需要一个布尔测试方法,该方法可以检查特定的有效载荷内容类型、属性值、是否存在头部或其他条件。如果消息被接受,则发送到输出通道。如果不被接受,则会被丢弃(或者,在更严格的情况下,可以抛出一个 Exception
)。消息过滤器通常与发布 - 订阅通道一起使用,在这种情况下,多个消费者可能会收到相同的消息,并使用过滤器的标准来缩小要处理的消息集。
请注意,不要将管道和过滤器架构模式中通用的 “filter” 与这种特定的端点类型混淆,该端点类型有选择地缩小两个通道之间流动的消息。管道和过滤器概念中的 “filter” 更接近于 Spring Integration 的消息端点:任何可以连接到消息通道以发送或接收消息的组件。
消息路由器
消息路由器负责决定消息接下来应该发送到哪个通道或哪些通道(如果有的话)。通常,该决定基于消息的内容或消息标题中可用的元数据。消息路由器经常被用作服务激活器或其他能够发送回复消息的端点上静态配置输出通道的动态替代方案。同样,消息路由器为多个订阅者使用的反应式消息过滤器提供了主动的替代方案,如前所述。
图 3. 消息路由器
分割器
拆分器是另一种类型的消息端点,其职责是从输入通道接收消息,将该消息拆分为多个消息,并将每个消息发送到输出通道。这通常用于将“复合”有效负载对象分成包含细分有效负载的消息组。
聚合器
基本上是拆分器的镜像,聚合器是一种消息端点,它接收多条消息并将它们组合成一条消息。事实上,聚合器通常是包含拆分器的管道中的下游消费者。从技术上讲,聚合器比拆分器更复杂,因为它需要维护状态(要聚合的消息),决定何时完整的消息组可用,并在必要时超时。此外,在发生超时的情况下,聚合器需要知道是发送部分结果、丢弃它们还是将它们发送到单独的通道。Spring Integration 提供了 CorrelationStrategy
、ReleaseStrategy
以及可配置的超时设置、是否在超时后发送部分结果和一个废弃通道。
服务激活器
服务激活器(Service Activator)是用于将服务实例连接到消息系统的通用端点。必须配置输入消息通道,如果要调用的服务方法可以返回值,则还可以提供输出消息通道。
输出通道是可选的,因为每条消息也可以提供自己的 'Return Address' 标头。此规则同样适用于所有消费者端点。
服务激活器调用某个服务对象的操作来处理请求消息,提取请求消息的有效负载并转换(如果方法不期望接收消息类型参数)。每当服务对象的方法返回值时,该返回值同样会在必要时被转换为回复消息(如果它还不是消息类型)。该回复消息将发送到输出通道。如果没有配置输出通道,则回复将发送到消息的“返回地址”中指定的通道(如果可用的话)。
请求-回复服务激活器端点将目标对象的方法连接到输入和输出消息通道。
图 4. 服务激活器
如前所述,在 消息通道 中,通道可以是可轮询的或可订阅的。在前面的图中,这通过“时钟”符号和实线箭头(轮询)以及虚线箭头(订阅)来表示。
通道适配器
通道适配器是连接消息通道与其他系统或传输的端点。通道适配器可以是入站或出站。通常,通道适配器会在消息与从其他系统接收或发送的任何对象或资源(文件、HTTP 请求、JMS 消息等)之间进行一些映射。根据传输方式的不同,通道适配器还可能填充或提取消息头值。Spring Integration 提供了许多通道适配器,将在后续章节中进行描述。
图 5. 入站通道适配器端点将源系统连接到 MessageChannel
。
消息源可以是可轮询的(例如,POP3)或消息驱动的(例如,IMAP Idle)。在前面的图中,这通过“时钟”符号和实线箭头(轮询)以及虚线箭头(消息驱动)来表示。
图 6. 一个 outbound 通道适配器端点将一个 MessageChannel
连接到目标系统。
如前所述,在 消息通道 中,通道可以是可轮询的或可订阅的。在前面的图中,这通过“时钟”符号和实线箭头(轮询)以及虚线箭头(订阅)来表示。
端点 Bean 名称
消费端点(任何带有 inputChannel
的东西)由两个 bean 组成,即消费者和消息处理程序。消费者有一个对消息处理程序的引用,并在消息到达时调用它。
考虑以下 XML 示例:
<int:service-activator id = "someService" ... />
鉴于前面的例子,bean 的名称如下:
-
消费者:
someService
(id
) -
处理程序:
someService.handler
当使用企业集成模式 (EIP) 注解时,名称取决于几个因素。考虑以下带有注解的 POJO 示例:
@Component
public class SomeComponent {
@ServiceActivator(inputChannel = ...)
public String someMethod(...) {
...
}
}
根据前面的例子,bean 的名称如下:
-
消费者:
someComponent.someMethod.serviceActivator
-
处理程序:
someComponent.someMethod.serviceActivator.handler
从版本 5.0.4 开始,您可以使用 @EndpointId
注解来修改这些名称,如下例所示:
@Component
public class SomeComponent {
@EndpointId("someService")
@ServiceActivator(inputChannel = ...)
public String someMethod(...) {
...
}
}
鉴于前面的例子,bean 的名称如下:
-
消费者:
someService
-
处理程序:
someService.handler
@EndpointId
使用与 XML 配置中的 id
属性创建的名称相同。考虑以下带注解的 bean 示例:
@Configuration
public class SomeConfiguration {
@Bean
@ServiceActivator(inputChannel = ...)
public MessageHandler someHandler() {
...
}
}
鉴于前面的例子,bean 的名称如下:
-
消费者:
someConfiguration.someHandler.serviceActivator
-
处理器:
someHandler
(@Bean
的名称)
从版本 5.0.4 开始,你可以通过使用 @EndpointId
注解来修改这些名称,如下例所示:
@Configuration
public class SomeConfiguration {
@Bean("someService.handler") 1
@EndpointId("someService") 2
@ServiceActivator(inputChannel = ...)
public MessageHandler someHandler() {
...
}
}
处理器:
someService.handler
(bean 名称)消费者:
someService
(端点 ID)
@EndpointId
注解创建的名称与使用 XML 配置的 id
属性创建的名称相同,只要你使用在 @Bean
名称后添加 .handler
的命名约定。
有一种特殊情况会创建第三个 bean:由于架构原因,如果 MessageHandler
@Bean
没有定义 AbstractReplyProducingMessageHandler
,框架会将提供的 bean 包装在一个 ReplyProducingMessageHandlerWrapper
中。这个包装器支持请求处理程序建议处理,并发出正常的“未产生回复”调试日志消息。它的 bean 名称是处理器 bean 名称加上 .wrapper
(当存在 @EndpointId
时——否则,它是正常生成的处理器名称)。
同样,可轮询的消息源 会创建两个 bean,一个 SourcePollingChannelAdapter
(SPCA) 和一个 MessageSource
。
考虑以下 XML 配置:
<int:inbound-channel-adapter id = "someAdapter" ... />
根据前面的 XML 配置,bean 名称如下:
-
SPCA:
someAdapter
(id
) -
处理程序:
someAdapter.source
考虑以下用于定义 @EndpointId
的 Java POJO 配置:
@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public String pojoSource() {
...
}
鉴于前面的 Java 配置示例,bean 名称如下:
-
SPCA:
someAdapter
-
处理程序:
someAdapter.source
考虑以下用于定义 @EndpointID
的 Java 配置bean:
@Bean("someAdapter.source")
@EndpointId("someAdapter")
@InboundChannelAdapter(channel = "channel3", poller = @Poller(fixedDelay = "5000"))
public MessageSource<?> source() {
return () -> {
...
};
}
鉴于前面的例子,bean 的名称如下:
-
SPCA:
someAdapter
-
处理程序:
someAdapter.source
(只要你使用在@Bean
名称后添加.source
的约定)
配置和 @EnableIntegration
在整个文档中,你可以看到对在 Spring Integration 流中声明元素的 XML 命名空间支持的引用。这种支持是由一系列命名空间解析器提供的,这些解析器生成适当的 bean 定义以实现特定组件。例如,许多端点由一个 MessageHandler
bean 和一个 ConsumerEndpointFactoryBean
组成,其中注入了处理程序和输入通道名称。
当第一次遇到 Spring Integration 命名空间元素时,框架会自动声明多个bean(一个任务调度器、一个隐式通道创建器等),这些bean用于支持运行时环境。
4.0 版引入了 @EnableIntegration
注解,以允许注册 Spring Integration 基础设施 bean(请参阅 Javadoc)。当仅使用 Java 配置时需要此注解 — 例如与 Spring Boot 或 Spring Integration Messaging 注解支持以及没有 XML 集成配置的 Spring Integration Java DSL 一起使用时。
@EnableIntegration
注解在父上下文中没有 Spring Integration 组件,而两个或多个子上下文使用 Spring Integration 时也非常有用。它允许这些通用组件只在父上下文中声明一次。
@EnableIntegration
注解会向应用程序上下文中注册许多基础架构组件。特别是,它会:
-
注册一些内置的bean,例如
errorChannel
及其LoggingHandler
、用于轮询器的taskScheduler
、jsonPath
SpEL 函数等。 -
添加多个
BeanFactoryPostProcessor
实例以增强BeanFactory
,用于全局和默认的集成环境。 -
添加多个
BeanPostProcessor
实例以增强、转换或包装特定的bean,用于集成目的。 -
添加注解处理器以解析消息注解,并为它们在应用程序上下文中注册组件。
@IntegrationComponentScan
注解也允许类路径扫描。此注解的作用类似于标准 Spring Framework 的 @ComponentScan
注解,但它仅限于特定于 Spring Integration 的组件和注解,而标准的 Spring Framework 组件扫描机制无法触及这些内容。有关示例,请参阅 @MessagingGateway 注解。
@EnablePublisher
注解注册了一个 PublisherAnnotationBeanPostProcessor
bean,并为那些没有提供 channel
属性的 @Publisher
注解配置了 default-publisher-channel
。如果存在多个 @EnablePublisher
注解,它们都必须具有相同的默认通道值。更多信息,请参阅 使用 @Publisher 注解的注解驱动配置。
@GlobalChannelInterceptor
注解已被引入,用于标记用于全局通道拦截的 ChannelInterceptor
bean。此注解是 <int:channel-interceptor>
XML 元素的类似物(参见 全局通道拦截器配置)。@GlobalChannelInterceptor
注解可以放在类级别(带有 @Component
类型注解)或在 @Configuration
类中的 @Bean
方法上。在这两种情况下,bean 必须实现 ChannelInterceptor
。
从 5.1 版本开始,全局通道拦截器适用于动态注册的通道——例如,使用 beanFactory.initializeBean()
或通过 IntegrationFlowContext
(在使用 Java DSL 时)初始化的 bean。以前,在应用程序上下文刷新后创建的 bean 不会应用拦截器。
@IntegrationConverter
注解将 Converter
、GenericConverter
或 ConverterFactory
beans 标记为 integrationConversionService
的候选转换器。此注解是 <int:converter>
XML 元素的类似物(参见 有效负载类型转换)。你可以在类级别(与 @Component
刻面注解一起)或在 @Configuration
类中的 @Bean
方法上放置 @IntegrationConverter
注解。
有关消息注解的更多信息,请参阅 Annotation Support。
编程注意事项
Spring Integration 中的大多数类(除非另有说明)必须在应用程序上下文中声明为单例 bean。这意味着这些类的实例是线程安全的,它们的生命周期和其他组件的连接由 Spring 依赖注入容器管理。工具类和构建器类(如 JacksonJsonUtils
、MessageBuilder
、ExpressionEvalMap
、IntegrationReactiveUtils
等)可以直接在 Java 代码中使用。但是,Java DSL 工厂和 IntegrationComponentSpec
实现的结果仍然必须作为 bean 注册到应用程序上下文中。许多模块中存在的 Session
抽象不是线程安全的,通常由 Factory
模式实现创建,并从线程安全的 Template
模式中使用。例如,请参阅 SftpRemoteFileTemplate
及其与 DefaultSftpSessionFactory
的关系。
你应该尽可能使用普通的 Java 对象 (POJO)(用于目标逻辑中的消息处理),并且只有在绝对必要时才在代码中暴露框架。有关更多信息,请参见 POJO 方法调用。
如果你确实将框架暴露给你的类,有一些需要考虑的问题,特别是在应用程序启动期间:
-
如果你的组件是
ApplicationContextAware
,你通常不应该在setApplicationContext()
方法中使用ApplicationContext
。相反,应该存储一个引用,并将这些用法推迟到上下文生命周期的后期。 -
如果你的组件是
InitializingBean
或使用@PostConstruct
方法,不要从这些初始化方法中发送任何消息。当这些方法被调用时,应用程序上下文尚未初始化,发送此类消息很可能会失败。如果你需要在启动期间发送消息,请实现ApplicationListener
并等待ContextRefreshedEvent
。或者,实现SmartLifecycle
,将你的 bean 放在一个较晚的阶段,并从start()
方法中发送消息。
使用打包的(例如,Shaded)JAR 文件时的注意事项
Spring Integration 通过使用 Spring Framework 的 SpringFactories
机制来加载多个 IntegrationConfigurationInitializer
类,从而引导某些功能。这包括 -core
jar 以及某些其他 jar,如 -http
和 -jmx
。此过程的信息存储在每个 jar 中的 META-INF/spring.factories
文件中。
一些开发人员更喜欢使用知名的工具(如 Apache Maven Shade Plugin)将他们的应用程序及其所有依赖项重新打包到一个 jar 文件中。
默认情况下,shade 插件在生成阴影 jar 时不会合并 spring.factories
文件。
除了 spring.factories
,其他 META-INF
文件(spring.handlers
和 spring.schemas
)也用于 XML 配置。这些文件也需要合并。
Spring Boot 的可执行 jar 机制采用了一种不同的方法,即嵌套 jars,从而保留类路径上的每个 spring.factories
文件。因此,对于使用其默认可执行 jar 格式的 Spring Boot 应用程序,不需要做额外的工作。
即使你不使用 Spring Boot,你仍然可以使用 Boot 提供的工具来增强 shade 插件,通过为上述文件添加转换器。以下示例展示了如何配置插件:
示例 1. pom.xml
...
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<keepDependenciesWithProvidedScope>true</keepDependenciesWithProvidedScope>
<createDependencyReducedPom>true</createDependencyReducedPom>
</configuration>
<dependencies>
<dependency> // <1>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
</dependency>
</dependencies>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers> // <2>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.handlers</resource>
</transformer>
<transformer
implementation="org.springframework.boot.maven.PropertiesMergingResourceTransformer">
<resource>META-INF/spring.factories</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>META-INF/spring.schemas</resource>
</transformer>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
...
具体来说,
添加
spring-boot-maven-plugin
作为依赖项。配置转换器。
你可以为 ${spring.boot.version}
添加一个属性,或者使用一个明确的版本。
编程技巧和窍门
本节记录了一些从 Spring Integration 获取最大价值的方法。
XML 模式
当使用 XML 配置时,为了避免出现错误的模式验证错误,你应该使用“Spring 意识”的 IDE,例如 Spring Tool Suite (STS)、带有 Spring IDE 插件的 Eclipse 或 IntelliJ IDEA。这些 IDE 知道如何从类路径中解析正确的 XML 模式(通过使用 jar 文件中的 META-INF/spring.schemas
文件)。当使用 STS 或带有插件的 Eclipse 时,你必须在项目上启用 Spring Project Nature
。
托管在互联网上的某些遗留模块(即 1.0 版本中已存在的模块)的架构是 1.0 版本,这是出于兼容性考虑。如果您的集成开发环境 (IDE) 使用这些架构,您可能会看到误报错误。
这些在线模式每种都有一个类似的警告:
此模式适用于 Spring Integration Core 1.0 版本。我们无法将其更新到当前模式,因为这将破坏任何使用 1.0.3 或更低版本的应用程序。对于后续版本,“未加版本的”模式是从类路径解析并从 jar 文件中获取的。请参阅 GitHub:
受影响的模块是
-
core
(spring-integration.xsd
) -
文件
-
HTTP
-
JMS
-
邮件
-
安全
-
流
-
Web服务
-
XML
为 Java 和 DSL 配置查找类名
使用 XML 配置和 Spring Integration 命名空间支持时,XML 解析器隐藏了目标 bean 是如何声明和连接在一起的。对于 Java 配置,了解框架 API 对于目标终端用户应用程序非常重要。
EIP 实现的第一类公民是 Message
、Channel
和 Endpoint
(参见本章前面的主要组件)。它们的实现(合约)是:
前两个简单 enough to understand 如何实施、配置和使用。最后一个需要更多的关注
AbstractEndpoint
在整个 Spring 框架中被广泛用于不同的组件实现。其主要实现包括:
-
EventDrivenConsumer
,在我们订阅SubscribableChannel
以监听消息时使用。 -
PollingConsumer
,在我们从PollableChannel
轮询消息时使用。
当你使用消息注解或 Java DSL 时,你不需要担心这些组件,因为框架会自动使用适当的注解和 BeanPostProcessor
实现来生成它们。当手动构建组件时,你应该使用 ConsumerEndpointFactoryBean
来帮助确定要创建的目标 AbstractEndpoint
消费者实现,这基于提供的 inputChannel
属性。
另一方面,ConsumerEndpointFactoryBean
委托给框架中的另一个重要组件 - org.springframework.messaging.MessageHandler
。这个接口的实现目标是处理由端点从通道消费的消息。Spring Integration 中的所有 EIP 组件都是 MessageHandler
实现(例如,AggregatingMessageHandler
、MessageTransformingHandler
、AbstractMessageSplitter
等)。目标协议的 outbound 适配器(如 FileWritingMessageHandler
、HttpRequestExecutingMessageHandler
、AbstractMqttMessageHandler
等)也是 MessageHandler
实现。当您使用 Java 配置开发 Spring Integration 应用程序时,应该查找 Spring Integration 模块以找到适合用于 @ServiceActivator
配置的适当 MessageHandler
实现。例如,要发送 XMPP 消息(参见 XMPP 支持),你应该配置类似以下内容:
@Bean
@ServiceActivator(inputChannel = "input")
public MessageHandler sendChatMessageHandler(XMPPConnection xmppConnection) {
ChatMessageSendingMessageHandler handler = new ChatMessageSendingMessageHandler(xmppConnection);
DefaultXmppHeaderMapper xmppHeaderMapper = new DefaultXmppHeaderMapper();
xmppHeaderMapper.setRequestHeaderNames("*");
handler.setHeaderMapper(xmppHeaderMapper);
return handler;
}
MessageHandler
实现表示消息流的 outbound 和处理部分。
传入的消息流一侧有其自己的组件,这些组件分为轮询和监听行为。监听(消息驱动)组件很简单,通常只需要一个目标类实现即可准备生成消息。监听组件可以是单向的 MessageProducerSupport
实现,(例如 AbstractMqttMessageDrivenChannelAdapter
和 ImapIdleChannelAdapter
)或请求-回复的 MessagingGatewaySupport
实现(例如 AmqpInboundGateway
和 AbstractWebServiceInboundGateway
)。
轮询入站端点适用于那些不提供监听器 API 或不适合此类行为的协议,包括任何基于文件的协议(如 FTP)、任何数据库(RDBMS 或 NoSQL)等。
这些入站端点由两个组件组成:轮询器配置,用于定期启动轮询任务;以及消息源类,用于从目标协议读取数据并为下游集成流生成消息。轮询器配置的第一个类是 SourcePollingChannelAdapter
。它是 AbstractEndpoint
的另一种实现,但特别用于轮询以启动集成流。通常,使用消息注解或 Java DSL 时,您不必担心这个类。框架会根据 @InboundChannelAdapter
配置或 Java DSL 构建器规范创建一个它的 bean。
消息源组件对目标应用程序开发更为重要,它们都实现了 MessageSource
接口(例如,MongoDbMessageSource
和 AbstractTwitterMessageSource
)。考虑到这一点,我们使用 JDBC 从 RDBMS 表读取数据的配置可能如下所示:
@Bean
@InboundChannelAdapter(value = "fooChannel", poller = @Poller(fixedDelay="5000"))
public MessageSource<?> storedProc(DataSource dataSource) {
return new JdbcPollingChannelAdapter(dataSource, "SELECT * FROM foo where status = 0");
}
你可以在特定的 Spring Integration 模块中找到目标协议所需的所有入站和出站类(在大多数情况下,在相应的包中)。例如,spring-integration-websocket
适配器是:
-
o.s.i.websocket.inbound.WebSocketInboundChannelAdapter
:实现MessageProducerSupport
以监听套接字上的帧并生成通道消息。 -
o.s.i.websocket.outbound.WebSocketOutboundMessageHandler
:单向的AbstractMessageHandler
实现,用于将传入的消息转换为适当的帧并通过 websocket 发送。
如果您熟悉 Spring Integration XML 配置,从 4. 3 版开始,我们在 XSD 元素定义中提供了关于用于声明适配器或网关 bean 的目标类的信息,如下例所示:
<xsd:element name="outbound-async-gateway">
<xsd:annotation>
<xsd:documentation>
Configures a Consumer Endpoint for the 'o.s.i.amqp.outbound.AsyncAmqpOutboundGateway'
that will publish an AMQP Message to the provided Exchange and expect a reply Message.
The sending thread returns immediately; the reply is sent asynchronously; uses 'AsyncRabbitTemplate.sendAndReceive()'.
</xsd:documentation>
</xsd:annotation>
POJO 方法调用
正如在 编程注意事项 中所讨论的,我们建议使用 POJO 编程风格,如下例所示:
@ServiceActivator
public String myService(String payload) { ... }
在这种情况下,框架提取一个 String
负载,调用你的方法,并将结果包装在一条消息中发送给流程中的下一个组件(原始头信息会被复制到新消息中)。实际上,如果你使用 XML 配置,你甚至不需要 @ServiceActivator
注解,如下所示的配对示例所示:
<int:service-activator ... ref="myPojo" method="myService" />
public String myService(String payload) { ... }
只要类的公共方法没有歧义,你可以省略 method
属性。
你也可以在你的POJO方法中获取标题信息,如下例所示:
@ServiceActivator
public String myService(@Payload String payload, @Header("foo") String fooHeader) { ... }
你也可以取消引用消息上的属性,如下例所示:
@ServiceActivator
public String myService(@Payload("payload.foo") String foo, @Header("bar.baz") String barbaz) { ... }
因为各种 POJO 方法调用是可用的,所以在 5.0 之前的版本使用了 SpEL(Spring 表达式语言)来调用这些 POJO 方法。SpEL(即使是解释型的)对于这些操作来说通常已经“足够快”,与方法中通常执行的实际工作相比。然而,从 5.0 版本开始,默认情况下尽可能使用 org.springframework.messaging.handler.invocation.InvocableHandlerMethod
。这种技术通常比解释型 SpEL 执行得更快,并且与其他 Spring 消息项目保持一致。InvocableHandlerMethod
类似于在 Spring MVC 中用于调用控制器方法的技术。有某些方法在使用 SpEL 时仍然总是被调用。例如,包括带有解引用属性的注解参数,如前所述。这是因为 SpEL 具有导航属性路径的能力。
可能还存在一些我们尚未考虑的边缘情况,这些情况也无法使用 InvocableHandlerMethod
实例。因此,我们会自动回退到在这些情况下使用 SpEL。
如果你愿意,你也可以设置你的 POJO 方法,使其始终使用 SpEL,使用 UseSpelInvoker
注解,如下例所示:
@UseSpelInvoker(compilerMode = "IMMEDIATE")
public void bar(String bar) { ... }
如果省略了 compilerMode
属性,那么编译器模式将由 spring.expression.compiler.mode
系统属性决定。有关编译 SpEL 的更多信息,请参阅 SpEL 编译。