Apache Camel 支持
Spring Integration 提供了与在同一应用程序上下文中声明的 Apache Camel 端点进行通信的 API 和配置。
你需要将这个依赖项添加到你的项目中:
- Maven
- Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-camel</artifactId>
<version>6.4.2</version>
</dependency>
compile "org.springframework.integration:spring-integration-camel:6.4.2"
Spring Integration 和 Apache Camel 实现了企业集成模式,并提供了一种方便的方式来组合它们,但这两个项目在其 API 和抽象实现上采用了不同的方法。Spring Integration 完全依赖于 Spring Core 的依赖注入容器。它使用了许多其他 Spring 项目(如 Spring Data、Spring AMQP、Spring for Apache Kafka 等)来实现其通道适配器。它还使用 MessageChannel
抽象作为开发者在组合集成流时需要了解的一等公民。相比之下,Apache Camel 没有提供消息通道的一等公民抽象,而是建议通过内部交换来组合其路由,这些交换对 API 是隐藏的。此外,在 Spring 应用程序中使用它时,还需要一些额外的依赖和配置。
即使对于最终的企业集成解决方案而言,其各个部分如何实现并不重要,但开发人员的体验和高生产力仍然需要考虑在内。因此,开发人员可能会基于多种原因选择一个框架而不是另一个,或者如果某些目标系统的支持存在差距,则可能会选择两个框架。Spring Integration 和 Apache Camel 应用程序可以通过许多外部协议进行交互,这些协议实现了通道适配器。例如,一个 Spring Integration 流可以将记录发布到 Apache Kafka 主题,该主题由消费者端的 Apache Camel 终端消费。或者,一个 Apache Camel 路由可以将数据写入 SFTP 文件目录,该目录由 Spring Integration 的 SFTP Inbound Channel Adapter 进行轮询。或者,在同一个 Spring 应用程序上下文中,它们可以通过 ApplicationEvent
抽象 进行通信。
为了使开发过程更简单,并避免不必要的网络跳转,Apache Camel 提供了一个 模块,通过消息通道与 Spring Integration 进行通信。只需要从应用程序上下文中引用一个 MessageChannel
,即可发送或消费消息。当 Apache Camel 路由是消息流的发起者,而 Spring Integration 仅作为解决方案的一部分起辅助作用时,这种方法效果很好。
为了提供类似的开发人员体验,Spring Integration 现在提供了一个通道适配器来调用 Apache Camel 端点,并可选择等待回复。由于从 Spring Integration API 和抽象的角度来看,订阅 MessageChannel
以消费 Apache Camel 消息已经足够,因此没有提供入站通道适配器。
Apache Camel 外发通道适配器
CamelMessageHandler
是 AbstractReplyProducingMessageHandler
的一个实现,可以在单向(默认)和请求-回复模式下工作。它使用 org.apache.camel.ProducerTemplate
来发送(或发送并接收)到一个 org.apache.camel.Endpoint
。交互模式可以通过 ExchangePattern
选项进行控制(该选项可以通过 SpEL 表达式在运行时针对请求消息进行评估)。目标 Apache Camel 端点可以显式配置,或者作为 SpEL 表达式在运行时进行评估。否则,它将回退到在 ProducerTemplate
上提供的 defaultEndpoint
。除了指定端点外,还可以提供内联的显式 LambdaRouteBuilder
,例如,调用没有 Spring Integration 通道适配器支持的 Apache Camel 组件。
此外,可以提供一个 HeaderMapper<org.apache.camel.Message>
(CamelHeaderMapper
是默认实现),以确定在 Spring Integration 和 Apache Camel 消息之间映射哪些标题。默认情况下,所有标题都会被映射。
CamelMessageHandler
支持 async
模式,调用 ProducerTemplate.asyncSend()
并生成一个 CompletableFuture
用于回复处理(如果有的话)。
exchangeProperties
可以通过 SpEL 表达式进行自定义,该表达式必须求值为一个 Map
。
如果未提供 ProducerTemplate
,则通过从应用程序上下文中解析的 CamelContext
bean 创建它。
@Bean
@ServiceActivator(inputChannel = "sendToCamel")
CamelMessageHandler camelService(ProducerTemplate producerTemplate) {
CamelHeaderMapper headerMapper = new CamelHeaderMapper();
headerMapper.setOutboundHeaderNames("");
headerMapper.setInboundHeaderNames("testHeader");
CamelMessageHandler camelMessageHandler = new CamelMessageHandler(producerTemplate);
camelMessageHandler.setEndpointUri("direct:simple");
camelMessageHandler.setExchangePatternExpression(spelExpressionParser.parseExpression("headers.exchangePattern"));
camelMessageHandler.setHeaderMapper(headerMapper);
return camelMessageHandler;
}
对于 Java DSL 流定义,此通道适配器可以通过 Camel
工厂提供的几种变体进行配置:
@Bean
IntegrationFlow camelFlow() {
return f -> f
.handle(Camel.gateway().endpointUri("direct:simple"))
.handle(Camel.route(this::camelRoute))
.handle(Camel.handler().endpointUri("log:com.mycompany.order?level=WARN"));
}
private void camelRoute(RouteBuilder routeBuilder) {
routeBuilder.from("direct:inbound").transform(routeBuilder.simple("${body.toUpperCase()}"));
}