消息通道实现
Spring Integration 提供了不同的消息通道实现。以下各节将简要描述每一种实现。
PublishSubscribeChannel
PublishSubscribeChannel 实现会将发送给它的任何 Message 广播给所有已订阅的处理器。这通常用于发送事件消息,其主要作用是通知(与文档消息相反,文档消息通常由单个处理器处理)。请注意,PublishSubscribeChannel 仅用于发送。由于它在调用其 send(Message) 方法时直接向其订阅者广播,因此消费者无法轮询消息(它未实现 PollableChannel,因此没有 receive() 方法)。相反,任何订阅者本身必须是 MessageHandler,并且会依次调用订阅者的 handleMessage(Message) 方法。
在 3.0 版本之前,对没有订阅者的 PublishSubscribeChannel 调用 send 方法会返回 false。当与 MessagingTemplate 结合使用时,会抛出 MessageDeliveryException。从 3.0 版本开始,行为已发生变化,只要至少存在最低数量的订阅者(并且成功处理消息),send 操作就始终被视为成功。可以通过设置 minSubscribers 属性来修改此行为,该属性默认值为 0。
如果使用 TaskExecutor,则仅根据正确数量的订阅者是否存在来进行此判断,因为消息的实际处理是异步执行的。
QueueChannel
QueueChannel 实现包装了一个队列。与 PublishSubscribeChannel 不同,QueueChannel 具有点对点语义。换句话说,即使该通道有多个消费者,其中也只有一个消费者应接收发送到该通道的任何 Message。它提供了一个默认的无参数构造函数(提供本质上无界的容量 Integer.MAX_VALUE)以及一个接受队列容量的构造函数,如下所示:
public QueueChannel(int capacity)
一个尚未达到容量限制的通道会将消息存储在其内部队列中,即使没有接收方准备好处理消息,send(Message<?>) 方法也会立即返回。如果队列已达到容量,发送方将阻塞,直到队列中有可用空间。或者,如果您使用带有额外超时参数的 send 方法,队列将阻塞,直到队列中有可用空间或超时时间结束,以先发生者为准。类似地,如果队列中有可用消息,receive() 调用会立即返回;但如果队列为空,则 receive 调用可能会阻塞,直到有消息可用或提供的超时时间结束。无论哪种情况,都可以通过传递超时值 0 来强制立即返回,而不考虑队列的状态。但请注意,不带 timeout 参数的 send() 和 receive() 版本会无限期阻塞。
PriorityChannel
QueueChannel 强制实施先进先出(FIFO)排序,而 PriorityChannel 则是一种替代实现,允许根据优先级对通道内的消息进行排序。默认情况下,优先级由每个消息中的 priority 头部决定。然而,对于自定义的优先级确定逻辑,可以向 PriorityChannel 构造函数提供一个类型为 Comparator<Message<?>> 的比较器。
RendezvousChannel
RendezvousChannel 实现了“直接交接”场景,发送方会阻塞直到另一方调用通道的 receive() 方法。另一方则会阻塞直到发送方发送消息。在内部实现上,该实现与 QueueChannel 非常相似,区别在于它使用了 SynchronousQueue(一种零容量的 BlockingQueue 实现)。这种机制在发送方和接收方运行于不同线程但异步地将消息放入队列并不合适的场景下表现良好。换句话说,使用 RendezvousChannel 时,发送方能够确认某个接收方已接收消息;而使用 QueueChannel 时,消息可能仅被存储到内部队列中,且存在永远无法被接收的风险。
请记住,默认情况下所有这些基于队列的通道都仅在内存中存储消息。当需要持久化时,您可以在 queue 元素内提供 message-store 属性来引用一个持久的 MessageStore 实现,或者您可以用一个由持久化代理支持的通道来替换本地通道,例如 JMS 支持的通道或通道适配器。后一种选项允许您利用任何 JMS 提供程序的消息持久化实现,如 JMS 支持 中所述。然而,当队列中的缓冲不是必需时,最简单的方法是依赖下一节将讨论的 DirectChannel。
RendezvousChannel 在实现请求-回复操作时同样非常有用。发送方可以创建一个临时的匿名 RendezvousChannel 实例,然后在构建 Message 时将其设置为 'replyChannel' 头部。发送该 Message 后,发送方可以立即调用 receive 方法(可选择提供超时值),以便在等待回复 Message 时阻塞。这与 Spring Integration 中许多请求-回复组件内部使用的实现方式非常相似。
DirectChannel
DirectChannel 具有点对点语义,但除此之外,它比之前描述的任何基于队列的通道实现都更类似于 PublishSubscribeChannel。它实现了 SubscribableChannel 接口而非 PollableChannel 接口,因此它将消息直接分派给订阅者。然而,作为一个点对点通道,它与 PublishSubscribeChannel 的不同之处在于,它将每条 Message 发送给单个已订阅的 MessageHandler。
除了是最简单的点对点通道选项外,其最重要的特性之一是允许单个线程在通道的"两端"执行操作。例如,若某个处理器订阅了 DirectChannel,则向该通道发送 Message 时,会在发送方的线程中直接触发该处理器的 handleMessage(Message) 方法调用,而这一过程发生在 send() 方法调用返回之前。
提供具有此行为的通道实现的关键动机在于支持必须跨越通道的事务,同时仍能受益于通道提供的抽象和松耦合特性。如果在事务范围内调用 send() 方法,则处理程序的调用结果(例如更新数据库记录)将参与决定该事务的最终结果(提交或回滚)。
由于 DirectChannel 是最简单的选项,并且不会增加调度和管理轮询器线程所需的任何额外开销,因此它是 Spring Integration 中的默认通道类型。总体思路是为应用程序定义通道,考虑哪些需要提供缓冲或限制输入,并将这些通道修改为基于队列的 PollableChannels。同样,如果通道需要广播消息,则不应使用 DirectChannel,而应使用 PublishSubscribeChannel。稍后我们将展示如何配置这些通道中的每一种。
DirectChannel 内部委托消息分发器来调用其订阅的消息处理器,该分发器可通过 load-balancer 或 load-balancer-ref 属性(两者互斥)暴露负载均衡策略。当多个消息处理器订阅同一通道时,负载均衡策略被消息分发器用于帮助确定消息如何在消息处理器之间分配。为方便起见,load-balancer 属性公开了一个枚举值,指向 LoadBalancingStrategy 的现有实现。目前仅提供 round-robin(在处理器间轮询负载均衡)和 none(用于明确禁用负载均衡的情况)两种可用值。其他策略实现可能在未来的版本中添加。然而,从版本 3.0 开始,您可以提供自己的 LoadBalancingStrategy 实现,并通过 load-balancer-ref 属性注入它,该属性应指向实现 LoadBalancingStrategy 的 bean,如下例所示:
FixedSubscriberChannel 是一种 SubscribableChannel,它仅支持单个 MessageHandler 订阅者,且该订阅者无法取消订阅。这在涉及高吞吐量性能场景且无需其他订阅者或通道拦截器时非常有用。
<int:channel id="lbRefChannel">
<int:dispatcher load-balancer-ref="lb"/>
</int:channel>
<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>
请注意,load-balancer 和 load-balancer-ref 属性是互斥的。
负载均衡功能还与布尔属性 failover 协同工作。如果 failover 值为 true(默认值),当先前的处理器抛出异常时,调度器会回退到任何后续的处理器(根据需要)。顺序由处理器自身定义的可选 order 值决定,如果不存在该值,则按照处理器订阅的顺序确定。
如果某种情况要求调度器在每次发生错误时总是尝试调用第一个处理程序,然后按照相同的固定顺序回退,则不应提供负载均衡策略。换句话说,即使未启用负载均衡,调度器仍支持 failover 布尔属性。然而,在没有负载均衡的情况下,处理程序的调用始终从第一个开始,按照它们的顺序进行。例如,当明确定义了主、次、第三等处理程序时,这种方法效果很好。在使用命名空间支持时,任何端点上的 order 属性决定了顺序。
请注意,负载均衡和 failover 仅在通道拥有多个已订阅的消息处理器时适用。在使用命名空间支持时,这意味着多个端点共享 input-channel 属性中定义的同一通道引用。
从版本5.2开始,当 failover 为 true 时,当前处理程序的故障以及失败的消息将根据配置分别在 debug 或 info 级别下记录。
ExecutorChannel
ExecutorChannel 是一种点对点通道,支持与 DirectChannel 相同的调度器配置(负载均衡策略和 failover 布尔属性)。这两种调度通道类型的关键区别在于,ExecutorChannel 委托给 TaskExecutor 实例来执行调度。这意味着发送方法通常不会阻塞,但也意味着处理程序的调用可能不会在发送者的线程中发生。因此,它不支持跨越发送者和接收处理程序的事务。
发送方有时可能会阻塞。例如,当使用带有限制客户端拒绝策略的 TaskExecutor(例如 ThreadPoolExecutor.CallerRunsPolicy)时,只要线程池达到最大容量且执行器的工作队列已满,发送方的线程就可能随时执行该方法。由于这种情况仅会以不可预测的方式发生,因此不应依赖它来处理事务。
PartitionedChannel
从版本 6.1 开始,提供了一个 PartitionedChannel 实现。这是 AbstractExecutorChannel 的一个扩展,代表点对点分发逻辑,其中实际消费在特定线程上处理,该线程由从发送到此通道的消息中评估出的分区键决定。此通道类似于上面提到的 ExecutorChannel,但不同之处在于,具有相同分区键的消息总是在同一线程中处理,从而保持顺序。它不需要外部 TaskExecutor,但可以使用自定义的 ThreadFactory 进行配置(例如 Thread.ofVirtual().name("partition-", 0).factory())。该工厂用于为每个分区将单线程执行器填充到 MessageDispatcher 委托中。默认情况下,使用 IntegrationMessageHeaderAccessor.CORRELATION_ID 消息头作为分区键。此通道可以配置为一个简单的 bean:
@Bean
PartitionedChannel somePartitionedChannel() {
return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}
该通道将拥有 3 个分区——即专用线程;它将使用 partitionKey 头部来确定消息将在哪个分区中处理。更多信息请参阅 PartitionedChannel 类的 Javadocs。
FluxMessageChannel
FluxMessageChannel 是一个 org.reactivestreams.Publisher 实现,用于将发送的消息“下沉”到内部的 reactor.core.publisher.Flux 中,以供下游的响应式订阅者按需消费。此通道实现既不是 SubscribableChannel,也不是 PollableChannel,因此只有 org.reactivestreams.Subscriber 实例可用于从此通道消费,以遵循响应式流的背压特性。另一方面,FluxMessageChannel 实现了 ReactiveStreamsSubscribableChannel,其 subscribeTo(Publisher<Message<?>>) 契约允许从响应式源发布者接收事件,从而将响应式流桥接到集成流中。为了实现整个集成流的完全响应式行为,必须在流中的所有端点之间放置这样的通道。
有关与 Reactive Streams 交互的更多信息,请参阅 Reactive Streams 支持。
作用域通道
Spring Integration 1.0 曾提供 ThreadLocalChannel 实现,但该实现已在 2.0 版本中移除。现在处理相同需求的更通用方式是为通道添加 scope 属性。该属性的值可以是上下文中可用范围的名称。例如,在 Web 环境中某些范围是可用的,并且任何自定义范围实现都可以在上下文中注册。以下示例展示了将线程本地范围应用于通道的情况,包括范围本身的注册:
<int:channel id="threadScopedChannel" scope="thread">
<int:queue />
</int:channel>
<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
<property name="scopes">
<map>
<entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
</map>
</property>
</bean>
在前一个示例中定义的通道同样在内部委托给一个队列,但该通道被绑定到当前线程,因此队列的内容也相应地受到绑定。这样一来,发送到该通道的线程稍后可以接收这些相同的消息,但其他线程将无法访问它们。虽然线程作用域的通道很少需要,但在使用 DirectChannel 实例来强制单线程操作,但任何回复消息应发送到“终端”通道的情况下,它们可能很有用。如果该终端通道是线程作用域的,原始发送线程可以从终端通道收集其回复。
现在,由于任何通道都可以被限定范围,除了线程本地(thread-Local)之外,你还可以定义自己的范围。