跳到主要内容

路由器实现

QWen Plus 中英对照 Router Implementations

由于基于内容的路由通常需要一些特定领域的逻辑,大多数用例需要 Spring Integration 提供的委托给 POJO 的选项,可以通过使用 XML 命名空间支持或注解来实现。这两种方式将在后面讨论。然而,我们首先介绍几种满足常见需求的实现。

PayloadTypeRouter

一个 PayloadTypeRouter 根据有效载荷类型映射将消息发送到定义的通道,如下例所示:

<bean id="payloadTypeRouter"
class="org.springframework.integration.router.PayloadTypeRouter">
<property name="channelMapping">
<map>
<entry key="java.lang.String" value-ref="stringChannel"/>
<entry key="java.lang.Integer" value-ref="integerChannel"/>
</map>
</property>
</bean>
xml

PayloadTypeRouter 的配置也受 Spring Integration 提供的命名空间支持(见 [命名空间支持](../configuration/namespace.md)),这实际上通过将 <router/> 配置及其对应的实现(使用 <bean/> 元素定义)组合成一个更简洁的配置元素来简化配置。以下示例展示了与上述配置等效的 PayloadTypeRouter 配置,但使用了命名空间支持:

<int:payload-type-router input-channel="routingChannel">
<int:mapping type="java.lang.String" channel="stringChannel" />
<int:mapping type="java.lang.Integer" channel="integerChannel" />
</int:payload-type-router>
xml

以下示例展示了用 Java 配置的等效路由器:

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public PayloadTypeRouter router() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(String.class.getName(), "stringChannel");
router.setChannelMapping(Integer.class.getName(), "integerChannel");
return router;
}
java

当使用 Java DSL 时,有两种选择。

首先,您可以像前面的例子中所示定义路由对象:

@Bean
public IntegrationFlow routerFlow1() {
return IntegrationFlow.from("routingChannel")
.route(router())
.get();
}

public PayloadTypeRouter router() {
PayloadTypeRouter router = new PayloadTypeRouter();
router.setChannelMapping(String.class.getName(), "stringChannel");
router.setChannelMapping(Integer.class.getName(), "integerChannel");
return router;
}
java

请注意,路由器可以是,但不一定是 @Bean。如果它不是 @Bean,流程也会注册它。

其次,你可以在 DSL 流程本身内定义路由函数,如下例所示:

@Bean
public IntegrationFlow routerFlow2() {
return IntegrationFlow.from("routingChannel")
.<Object, Class<?>>route(Object::getClass, m -> m
.channelMapping(String.class, "stringChannel")
.channelMapping(Integer.class, "integerChannel"))
.get();
}
java

HeaderValueRouter

一个 HeaderValueRouter 根据单独的头部值映射将消息发送到通道。当创建一个 HeaderValueRouter 时,它会被初始化为要评估的头部名称。头部的值可能是以下两种情况之一:

  • 任意值

  • 通道名称

如果它是任意值,则需要额外的映射将这些标题值映射到通道名称。否则,不需要额外的配置。

Spring Integration 提供了一个简单的基于命名空间的 XML 配置来配置 HeaderValueRouter。以下示例演示了在需要将标题值映射到通道时 HeaderValueRouter 的配置:

<int:header-value-router input-channel="routingChannel" header-name="testHeader">
<int:mapping value="someHeaderValue" channel="channelA" />
<int:mapping value="someOtherHeaderValue" channel="channelB" />
</int:header-value-router>
xml

在解析过程中,前面示例中定义的路由器可能会遇到通道解析失败,导致异常。如果你想抑制此类异常,并将未解析的消息发送到默认输出通道(通过 default-output-channel 属性标识),请将 resolution-required 设置为 false

通常情况下,标题值未明确映射到通道的消息会被发送到 default-output-channel。但是,当标题值被映射到一个通道名称而该通道无法解析时,将 resolution-required 属性设置为 false 会导致此类消息路由到 default-output-channel

以下示例显示了用 Java 配置的等效路由器:

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter("testHeader");
router.setChannelMapping("someHeaderValue", "channelA");
router.setChannelMapping("someOtherHeaderValue", "channelB");
return router;
}
java

当使用 Java DSL 时,有两种选择。首先,您可以像前面的示例中所示定义路由器对象:

@Bean
public IntegrationFlow routerFlow1() {
return IntegrationFlow.from("routingChannel")
.route(router())
.get();
}

public HeaderValueRouter router() {
HeaderValueRouter router = new HeaderValueRouter("testHeader");
router.setChannelMapping("someHeaderValue", "channelA");
router.setChannelMapping("someOtherHeaderValue", "channelB");
return router;
}
java

请注意,路由器可以是,但不一定是 @Bean。如果它不是一个 @Bean,流程也会注册它。

其次,你可以在 DSL 流程本身内定义路由功能,如下例所示:

@Bean
public IntegrationFlow routerFlow2() {
return IntegrationFlow.from("routingChannel")
.route(Message.class, m -> m.getHeaders().get("testHeader", String.class),
m -> m
.channelMapping("someHeaderValue", "channelA")
.channelMapping("someOtherHeaderValue", "channelB"),
e -> e.id("headerValueRouter"))
.get();
}
java

配置中不需要将标题值映射到通道名称,因为标题值本身代表通道名称。以下示例显示了一个不需要将标题值映射到通道名称的路由器:

<int:header-value-router input-channel="routingChannel" header-name="testHeader"/>
xml
备注

自从 Spring Integration 2.1,解析通道的行为更加明确。例如,如果您省略了 default-output-channel 属性,则路由器无法解析至少一个有效的通道,并且任何通道名称解析失败都会通过将 resolution-required 设置为 false 而被忽略,这时会抛出一个 MessageDeliveryException

基本上,默认情况下,路由器必须能够成功地将消息路由到至少一个通道。如果您确实想要丢弃消息,则还必须将 default-output-channel 设置为 nullChannel

RecipientListRouter

在计算机领域中,RecipientListRouter 是一种路由器实现,用于将消息分发给多个接收者。

一个 RecipientListRouter 将每个接收到的消息发送到静态定义的消息通道列表。以下示例创建了一个 RecipientListRouter

<bean id="recipientListRouter"
class="org.springframework.integration.router.RecipientListRouter">
<property name="channels">
<list>
<ref bean="channel1"/>
<ref bean="channel2"/>
<ref bean="channel3"/>
</list>
</property>
</bean>
xml

Spring Integration 还为 RecipientListRouter 配置提供了命名空间支持(详见 命名空间支持),如下例所示:

<int:recipient-list-router id="customRouter" input-channel="routingChannel"
timeout="1234"
ignore-send-failures="true"
apply-sequence="true">
<int:recipient channel="channel1"/>
<int:recipient channel="channel2"/>
</int:recipient-list-router>
xml

以下示例展示了用 Java 配置的等效路由器:

@ServiceActivator(inputChannel = "routingChannel")
@Bean
public RecipientListRouter router() {
RecipientListRouter router = new RecipientListRouter();
router.setSendTimeout(1_234L);
router.setIgnoreSendFailures(true);
router.setApplySequence(true);
router.addRecipient("channel1");
router.addRecipient("channel2");
router.addRecipient("channel3");
return router;
}
java

以下示例展示了使用 Java DSL 配置的等效路由器:

@Bean
public IntegrationFlow routerFlow() {
return IntegrationFlow.from("routingChannel")
.routeToRecipients(r -> r
.applySequence(true)
.ignoreSendFailures(true)
.recipient("channel1")
.recipient("channel2")
.recipient("channel3")
.sendTimeout(1_234L))
.get();
}
java
备注

这里的 'apply-sequence' 标志与发布订阅通道中的效果相同,并且,与发布订阅通道一样,默认情况下在 recipient-list-router 上是禁用的。更多信息,请参阅 PublishSubscribeChannel 配置

在配置 RecipientListRouter 时,另一个便捷的选择是使用 Spring 表达式语言 (SpEL) 作为各个接收方通道的选择器。这样做类似于在 'chain' 的开头使用过滤器来充当“选择性消费者”。然而,在这种情况下,所有内容都被简洁地结合到路由器的配置中,如下例所示:

<int:recipient-list-router id="customRouter" input-channel="routingChannel">
<int:recipient channel="channel1" selector-expression="payload.equals('foo')"/>
<int:recipient channel="channel2" selector-expression="headers.containsKey('bar')"/>
</int:recipient-list-router>
xml

在上述配置中,通过 selector-expression 属性标识的 SpEL 表达式会被评估以确定此接收者是否应包含在给定输入消息的接收者列表中。表达式的评估结果必须是 boolean。如果未定义此属性,则该通道始终在接收者列表中。

RecipientListRouterManagement

从 4.1 版本开始,RecipientListRouter 提供了多个操作,可以在运行时动态地操作接收者。这些管理操作通过 RecipientListRouterManagement 类的 @ManagedResource 注解提供。可以通过使用 Control Bus 以及使用 JMX 来使用它们,如下例所示:

<control-bus input-channel="controlBus"/>

<recipient-list-router id="simpleRouter" input-channel="routingChannelA">
<recipient channel="channel1"/>
</recipient-list-router>

<channel id="channel2"/>
xml
Message<?> addRecipientCommandMessage =
MessageBuilder.withPayload("'simpleRouter.handler'.addRecipient")
.setHeader(IntegrationMessageHeaderAccessor.CONTROL_BUS_ARGUMENTS, List.of("channel2"))
.build();
java

从应用程序启动时,simpleRouter 仅有 channel1 这一个接收者。但在执行 addRecipient 命令后,添加了 channel2 接收者。这是一个“对消息的某部分注册兴趣”的用例,在某些时间段内我们可能对接收来自路由器的消息感兴趣,因此我们订阅了 recipient-list-router ,并在某个时刻决定取消订阅。

由于 <recipient-list-router> 的运行时管理操作,可以从一开始就配置为不包含任何 <recipient>。在这种情况下,当没有匹配的消息接收者时,RecipientListRouter 的行为是相同的。如果配置了 defaultOutputChannel,则消息将发送到那里。否则,将抛出 MessageDeliveryException

XPath 路由器

XPath 路由器是 XML 模块的一部分。参见 使用 XPath 路由 XML 消息

路由和错误处理

Spring Integration 还提供了一种特殊类型的路由器,称为 ErrorMessageExceptionTypeRouter,用于路由错误消息(定义为 payloadThrowable 实例的消息)。ErrorMessageExceptionTypeRouterPayloadTypeRouter 类似。实际上,它们几乎完全相同。唯一的区别在于,当 PayloadTypeRouter 遍历有效负载实例的类层次结构(例如,payload.getClass().getSuperclass())以查找最具体的类型和通道映射时,ErrorMessageExceptionTypeRouter 则遍历“异常原因”的层次结构(例如,payload.getCause())以查找最具体的 Throwable 类型或通道映射,并使用 mappingClass.isInstance(cause)cause 匹配到该类或任何父类。

important

在这种情况下,通道映射顺序很重要。因此,如果有需求为 IllegalArgumentException 获取映射,而不是为 RuntimeException 获取映射,则必须首先在路由器上配置后者。

备注

从 4.3 版本开始,ErrorMessageExceptionTypeRouter 在初始化阶段加载所有映射类,以便在出现 ClassNotFoundException 时快速失败。

以下示例显示了 ErrorMessageExceptionTypeRouter 的样本配置:

@Bean
public IntegrationFlow someFlow() {
return f -> f
.routeByException(r -> r
.channelMapping(IllegalArgumentException.class, "illegalChannel")
.channelMapping(NullPointerException.class, "npeChannel")
.defaultOutputChannel("defaultChannel"));
}
java