跳到主要内容
版本:7.0.2

Apache Camel 支持

DeepSeek V3 中英对照 Apache Camel Support

Spring Integration 提供了一个 API 和配置,用于与同一应用上下文中声明的 Apache Camel 端点进行通信。

此依赖项为项目所需:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-camel</artifactId>
<version>7.0.2</version>
</dependency>

Spring Integration 与 Apache Camel 均实现了企业集成模式,并提供了便捷的组合方式,但这两个项目在 API 和抽象实现上采用了不同的方法。Spring Integration 完全依赖于 Spring Core 的依赖注入容器,并利用许多其他 Spring 项目(如 Spring Data、Spring AMQP、Spring for Apache Kafka 等)来实现其通道适配器。此外,它将 MessageChannel 抽象作为一等公民,开发者在组合集成流时需要明确意识到其存在。另一方面,Apache Camel 并未提供消息通道的一等公民抽象,而是建议通过内部交换机制来组合其路由,这些机制在 API 层面是隐藏的。此外,若要在 Spring 应用程序中使用 Apache Camel,还需要一些额外的依赖项和配置

即使对于最终的企业集成解决方案而言,其各部分如何实现并不重要,开发者的体验和高生产力仍是需要考虑的因素。因此,开发者可能出于多种原因选择某一框架而非另一框架,或者在某些目标系统支持存在差距时同时使用两者。Spring Integration 和 Apache Camel 应用程序可以通过许多外部协议进行交互,它们为这些协议实现了通道适配器。例如,Spring Integration 流可以将记录发布到 Apache Kafka 主题,而该主题由消费者端的 Apache Camel 端点消费。或者,Apache Camel 路由可以将数据写入 SFTP 目录中的文件,然后由 Spring Integration 的 SFTP 入站通道适配器轮询该文件。或者,在同一 Spring 应用程序上下文中,它们可以通过 ApplicationEvent 进行通信。

为了简化开发流程并避免不必要的网络跳转,Spring Integration 现在提供了一个通道适配器,用于调用 Apache Camel 端点,并可选择等待回复。目前没有入站通道适配器,因为使用 Apache Camel 的 Bean Binding 足以调用 Spring 应用上下文中的任何 Bean,包括 消息网关

Apache Camel 的出站通道适配器

CamelMessageHandler 是一个 AbstractReplyProducingMessageHandler 实现,可以在单向(默认)和请求-应答两种模式下工作。它使用 org.apache.camel.ProducerTemplate 来发送(或发送并接收)消息到 org.apache.camel.Endpoint。交互模式可以通过 ExchangePattern 选项进行控制(该选项可以在运行时通过 SpEL 表达式根据请求消息进行评估)。目标 Apache Camel 端点可以显式配置,也可以配置为在运行时评估的 SpEL 表达式。否则,它将回退到 ProducerTemplate 上提供的 defaultEndpoint。除了指定端点外,还可以提供内联的、显式的 LambdaRouteBuilder,例如,用于调用 Spring Integration 中不支持通道适配器的 Apache Camel 组件。

此外,可以提供一个 HeaderMapper<org.apache.camel.Message>CamelHeaderMapper 是默认实现),用于确定在 Spring Integration 和 Apache Camel 消息之间映射哪些头部。默认情况下,所有头部都会被映射。

CamelMessageHandler 支持异步模式,通过调用 ProducerTemplate.asyncSend() 并生成 CompletableFuture 来处理回复(如果有的话)。

exchangeProperties 可通过 SpEL 表达式进行自定义,该表达式必须解析为 Map 类型。

如果未提供 ProducerTemplate,则会通过从应用程序上下文中解析出的 CamelContext bean 来创建它。

@Bean
@ServiceActivator(inputChannel = "sendToCamel")
CamelMessageHandler camelService(ProducerTemplate producerTemplate) {
CamelHeaderMapper headerMapper = new CamelHeaderMapper();
headerMapper.setOutboundHeaderNames("");
headerMapper.setInboundHeaderNames("testHeader");

CamelMessageHandler camelMessageHandler = new CamelMessageHandler(producerTemplate);
camelMessageHandler.setEndpointUri("direct:simple");
camelMessageHandler.setExchangePatternExpression(spelExpressionParser.parseExpression("headers.exchangePattern"));
camelMessageHandler.setHeaderMapper(headerMapper);
return camelMessageHandler;
}

对于Java DSL流程定义,该通道适配器可以通过Camel工厂提供的几种变体进行配置:

@Bean
IntegrationFlow camelFlow() {
return f -> f
.handle(Camel.gateway().endpointUri("direct:simple"))
.handle(Camel.route(this::camelRoute))
.handle(Camel.handler().endpointUri("log:com.mycompany.order?level=WARN"));
}

private void camelRoute(RouteBuilder routeBuilder) {
routeBuilder.from("direct:inbound").transform(routeBuilder.simple("${body.toUpperCase()}"));
}