跳到主要内容

使用协议适配器

QWen Plus 中英对照 Using Protocol Adapters

到目前为止,所有示例都说明了 DSL 如何通过使用 Spring Integration 编程模型来支持消息传递架构。然而,我们还没有进行任何真正的集成。这样做需要通过 HTTP、JMS、AMQP、TCP、JDBC、FTP、SMTP 等访问远程资源或访问本地文件系统。Spring Integration 支持所有这些以及更多。理想情况下,DSL 应该为所有这些提供一流的支持,但实现所有这些并跟上 Spring Integration 新适配器的添加是一个令人生畏的任务。因此,预期是 DSL 将不断赶上 Spring Integration。

因此,我们提供了高级 API 来无缝定义特定协议的消息传递。我们通过工厂和构建器模式以及 lambda 表达式来实现这一点。你可以将工厂类视为“命名空间工厂”,因为它们在具体协议特定的 Spring Integration 模块组件中扮演着与 XML 命名空间相同的角色。目前,Spring Integration Java DSL 支持 AmqpFeedJmsFiles(S)FtpHttpJPAMongoDbTCP/UDPMailWebFluxScripts 命名空间工厂。以下示例展示了如何使用其中三个(AmqpJmsMail):

@Bean
public IntegrationFlow amqpFlow() {
return IntegrationFlow.from(Amqp.inboundGateway(this.rabbitConnectionFactory, queue()))
.transform("hello "::concat)
.transform(String.class, String::toUpperCase)
.get();
}

@Bean
public IntegrationFlow jmsOutboundGatewayFlow() {
return IntegrationFlow.from("jmsOutboundGatewayChannel")
.handle(Jms.outboundGateway(this.jmsConnectionFactory)
.replyContainer(c ->
c.concurrentConsumers(3)
.sessionTransacted(true))
.requestDestination("jmsPipelineTest"))
.get();
}

@Bean
public IntegrationFlow sendMailFlow() {
return IntegrationFlow.from("sendMailChannel")
.handle(Mail.outboundAdapter("localhost")
.port(smtpPort)
.credentials("user", "pw")
.protocol("smtp")
.javaMailProperties(p -> p.put("mail.debug", "true")),
e -> e.id("sendMailEndpoint"))
.get();
}
java

前面的例子展示了如何将“命名空间工厂”用作内联适配器声明。然而,你可以从 @Bean 定义中使用它们,以使 IntegrationFlow 方法链更易读。

备注

在我们着手处理其他命名空间工厂之前,我们正在征求社区的反馈。我们也非常感谢任何关于我们应该优先支持哪些适配器和网关的建议。

你可以在本参考手册的协议特定章节中找到更多 Java DSL 示例。

所有其他协议通道适配器可以配置为通用 bean 并连接到 IntegrationFlow,如下例所示:

@Bean
public QueueChannelSpec wrongMessagesChannel() {
return MessageChannels
.queue()
.wireTap("wrongMessagesWireTapChannel");
}

@Bean
public IntegrationFlow xpathFlow(MessageChannel wrongMessagesChannel) {
return IntegrationFlow.from("inputChannel")
.filter(new StringValueTestXPathMessageSelector("namespace-uri(/*)", "my:namespace"),
e -> e.discardChannel(wrongMessagesChannel))
.log(LoggingHandler.Level.ERROR, "test.category", m -> m.getHeaders().getId())
.route(xpathRouter(wrongMessagesChannel))
.get();
}

@Bean
public AbstractMappingMessageRouter xpathRouter(MessageChannel wrongMessagesChannel) {
XPathRouter router = new XPathRouter("local-name(/*)");
router.setEvaluateAsString(true);
router.setResolutionRequired(false);
router.setDefaultOutputChannel(wrongMessagesChannel);
router.setChannelMapping("Tags", "splittingChannel");
router.setChannelMapping("Tag", "receivedChannel");
return router;
}
java