跳到主要内容

消息通道实现

QWen Plus 中英对照 Message Channel Implementations

Spring Integration 提供了不同的消息通道实现。以下各节将简要描述每一种。

PublishSubscribeChannel

PublishSubscribeChannel 实现会将其接收到的任何 Message 广播给所有已订阅的处理程序。这通常用于发送事件消息,其主要作用是通知(不同于文档消息,后者通常旨在由单个处理程序处理)。请注意,PublishSubscribeChannel 仅用于发送。由于在其 send(Message) 方法被调用时直接向其订阅者广播,消费者无法轮询消息(它不实现 PollableChannel,因此没有 receive() 方法)。相反,任何订阅者本身必须是一个 MessageHandler,并且依次调用订阅者的 handleMessage(Message) 方法。

在 3.0 版本之前,在没有订阅者的情况下调用 PublishSubscribeChannelsend 方法会返回 false。当与 MessagingTemplate 一起使用时,会抛出 MessageDeliveryException。从 3.0 版本开始,行为已更改为只要存在最少数量的订阅者(并且成功处理消息),send 就始终被视为成功的。此行为可以通过设置 minSubscribers 属性来修改,默认值为 0

备注

如果您使用 TaskExecutor,则仅使用正确数量的订阅者的存在来进行此确定,因为消息的实际处理是异步执行的。

QueueChannel

QueueChannel 实现包装了一个队列。与 PublishSubscribeChannel 不同,QueueChannel 具有点对点语义。换句话说,即使该通道有多个消费者,发送到该通道的任何 Message 也只会被其中一个消费者接收。它提供了一个默认的无参构造函数(提供基本上无限制的容量 Integer.MAX_VALUE),以及一个接受队列容量的构造函数,如下所示:

public QueueChannel(int capacity)
java

一个尚未达到其容量限制的通道会将其消息存储在其内部队列中,send(Message<?>) 方法会立即返回,即使没有接收者准备好处理该消息。如果队列已满,则发送者会被阻塞,直到队列中有可用空间。或者,如果您使用带有额外超时参数的 send 方法,队列会一直阻塞,直到有可用空间或超时时间到期,以先发生者为准。同样地,如果有消息在队列中,receive() 调用会立即返回;但是,如果队列为空,则 receive 调用可能会一直阻塞,直到有消息可用或超时(如果提供),以先发生者为准。在这两种情况下,可以通过传递 0 的超时值来强制立即返回,而不考虑队列的状态。但是请注意,调用没有 timeout 参数版本的 send()receive() 会无限期阻塞。

PriorityChannel

虽然 QueueChannel 强制执行先进先出 (FIFO) 顺序,但 PriorityChannel 是一种替代实现,它允许根据优先级对通道中的消息进行排序。默认情况下,优先级由每条消息中的 priority 头确定。然而,对于自定义优先级确定逻辑,可以向 PriorityChannel 构造函数提供一个类型为 Comparator<Message<?>> 的比较器。

RendezvousChannel

RendezvousChannel 实现了“直接交接”场景,其中发送者会阻塞,直到另一方调用通道的 receive() 方法。另一方会阻塞,直到发送者发送消息。内部实现与 QueueChannel 非常相似,区别在于它使用了 SynchronousQueueBlockingQueue 的零容量实现)。这在发送者和接收者在不同线程中运行的情况下工作得很好,但异步地将消息放入队列是不合适的。换句话说,使用 RendezvousChannel 时,发送者知道某个接收者已经接受了消息;而使用 QueueChannel 时,消息会被存储到内部队列中,并且可能永远不会被接收。

提示

请记住,默认情况下所有这些基于队列的通道只在内存中存储消息。当需要持久性时,您可以在 'queue' 元素中提供一个 'message-store' 属性来引用持久化的 MessageStore 实现,或者用由持久化代理支持的本地通道替换它,例如基于 JMS 的通道或通道适配器。后一种选项允许您利用任何 JMS 提供商的消息持久化实现,如 JMS 支持 中所述。但是,当不需要在队列中进行缓冲时,最简单的做法是依赖于 DirectChannel,这将在下一节中讨论。

RendezvousChannel 对于实现请求-回复操作也非常有用。发送者可以创建一个临时的、匿名的 RendezvousChannel 实例,并在构建 Message 时将其设置为 'replyChannel' 头。在发送该 Message 后,发送者可以立即调用 receive(可选地提供一个超时值),以便在等待回复 Message 时阻塞。这与许多 Spring Integration 请求-回复组件内部使用的实现非常相似。

DirectChannel

DirectChannel 具有点对点语义,但在其他方面更类似于 PublishSubscribeChannel,而不像前面描述的任何基于队列的通道实现。它实现了 SubscribableChannel 接口而不是 PollableChannel 接口,因此它会直接将消息分派给订阅者。然而,作为一个点对点通道,它与 PublishSubscribeChannel 不同之处在于它将每个 Message 发送到单个订阅的 MessageHandler

除了是最简单的点对点通道选项之外,它的一个重要特性是允许单个线程执行通道“两端”的操作。例如,如果一个处理器订阅了 DirectChannel,那么向该通道发送一条 Message 将直接在发送者的线程中触发该处理器的 handleMessage(Message) 方法调用,在 send() 方法调用返回之前。

提供具有这种行为的通道实现的关键动机是支持必须跨越通道的事务,同时仍然受益于通道所提供的抽象和松耦合。如果在事务的作用域内调用 send() 方法,处理程序调用的结果(例如,更新数据库记录)将在确定该事务的最终结果(提交或回滚)中起作用。

备注

由于 DirectChannel 是最简单的选项,并且不会增加调度和管理轮询器线程所需的任何额外开销,因此它是 Spring Integration 中的默认通道类型。一般的想法是为应用程序定义通道,考虑哪些通道需要提供缓冲或限制输入,并将这些通道修改为基于队列的 PollableChannels。同样,如果一个通道需要广播消息,它不应该是一个 DirectChannel,而应该是一个 PublishSubscribeChannel。稍后,我们将展示如何配置这些通道。

DirectChannel 内部委托给消息调度器来调用其订阅的消息处理程序,并且该调度器可以具有由 load-balancerload-balancer-ref 属性(二者互斥)暴露的负载均衡策略。负载均衡策略用于帮助消息调度器确定当多个消息处理程序订阅同一个通道时,如何在消息处理程序之间分发消息。为了方便起见,load-balancer 属性暴露了一个枚举值列表,指向 LoadBalancingStrategy 的现有实现。可用的值只有 round-robin (以轮询方式在处理程序之间进行负载均衡)和 none (用于明确禁用负载均衡的情况)。其他策略实现可能会在未来版本中添加。但是,自 3.0 版本以来,您可以提供自己的 LoadBalancingStrategy 实现,并使用 load-balancer-ref 属性注入它,该属性应指向实现 LoadBalancingStrategy 的 bean,如下例所示:

FixedSubscriberChannel 是一个 SubscribableChannel,它仅支持一个不能取消订阅的 MessageHandler 订阅者。当不涉及其他订阅者且不需要通道拦截器时,这对于高吞吐量性能用例非常有用。

<int:channel id="lbRefChannel">
<int:dispatcher load-balancer-ref="lb"/>
</int:channel>

<bean id="lb" class="foo.bar.SampleLoadBalancingStrategy"/>
xml

请注意 load-balancerload-balancer-ref 属性是互斥的。

负载均衡还与布尔属性 failover 协同工作。如果 failover 值为 true(默认值),则调度器会在前面的处理器抛出异常时,回退到任何后续的处理器(必要时)。顺序由处理器本身定义的可选顺序值决定,或者如果不存在这样的值,则由处理器订阅的顺序决定。

如果某种情况要求调度器始终尝试调用第一个处理器,然后每次发生错误时按照相同的固定顺序序列进行回退,则不应提供任何负载均衡策略。换句话说,即使没有启用负载均衡,调度器仍然支持 failover 布尔属性。然而,在没有负载均衡的情况下,处理器的调用总是从第一个开始,根据它们的顺序。例如,当有明确的主要、次要、第三级等定义时,这种方法效果很好。在使用命名空间支持时,任何端点上的 order 属性决定了顺序。

备注

请记住,负载均衡和 failover 仅在通道有多个订阅的消息处理程序时适用。当使用命名空间支持时,这意味着多个端点共享在 input-channel 属性中定义的同一个通道引用。

从 5.2 版本开始,当 failover 为 true 时,当前处理器的失败连同失败的消息会在配置的情况下以 debuginfo 级别进行记录。

ExecutorChannel

ExecutorChannel 是一个点对点通道,支持与 DirectChannel 相同的调度程序配置(负载均衡策略和 failover 布尔属性)。这两种调度通道类型之间的关键区别在于 ExecutorChannel 委托给 TaskExecutor 的实例来执行分发。这意味着发送方法通常不会阻塞,但也意味着处理程序调用可能不会发生在发送者的线程中。因此,它不支持跨越发送者和接收处理程序的事务。

警告

发送者有时可能会被阻塞。例如,在使用带有节流客户端拒绝策略(如 ThreadPoolExecutor.CallerRunsPolicy)的 TaskExecutor 时,当线程池达到最大容量且执行器的工作队列已满时,发送者的线程可以随时执行该方法。由于这种情况只会以不可预测的方式发生,因此你不应该依赖它来进行事务处理。

PartitionedChannel

从 6.1 版本开始,提供了一个 PartitionedChannel 实现。这是 AbstractExecutorChannel 的扩展,表示点对点调度逻辑,其中实际的消费是在特定线程上处理的,该线程由发送到此通道的消息评估的分区键决定。这个通道类似于上面提到的 ExecutorChannel,但不同之处在于具有相同分区键的消息总是在同一个线程中处理,从而保持顺序。它不需要外部 TaskExecutor,但可以配置自定义 ThreadFactory(例如 Thread.ofVirtual().name("partition-", 0).factory())。此工厂用于为每个分区填充单线程执行器到 MessageDispatcher 委托。默认情况下,使用消息头 IntegrationMessageHeaderAccessor.CORRELATION_ID 作为分区键。这个通道可以作为一个简单的 bean 进行配置:

@Bean
PartitionedChannel somePartitionedChannel() {
return new PartitionedChannel(3, (message) -> message.getHeaders().get("partitionKey"));
}
java

该通道将有 3 个分区 - 专用线程;将使用 partitionKey 标头确定消息将在哪个分区中处理。有关更多信息,请参阅 PartitionedChannel 类的 Javadoc。

FluxMessageChannel

FluxMessageChannelorg.reactivestreams.Publisher 的一个实现,用于将发送的消息“下沉”到内部的 reactor.core.publisher.Flux 中,以便下游的响应式订阅者按需消费。这种通道实现既不是 SubscribableChannel,也不是 PollableChannel,因此只有 org.reactivestreams.Subscriber 实例可以用于从该通道消费消息,并遵循响应式流的背压特性。另一方面,FluxMessageChannel 实现了 ReactiveStreamsSubscribableChannel 及其 subscribeTo(Publisher<Message<?>>) 合约,允许从响应式源发布者接收事件,将响应式流桥接到集成流中。为了在整个集成流中实现完全的响应式行为,这样的通道必须放置在流中的所有端点之间。

有关与 Reactive Streams 交互的更多信息,请参阅Reactive Streams 支持

作用域通道

Spring Integration 1.0 提供了一个 ThreadLocalChannel 实现,但从 2.0 开始已将其移除。现在处理相同需求的更通用方法是向通道添加一个 scope 属性。该属性的值可以是上下文中可用的作用域名称。例如,在 Web 环境中,某些作用域是可用的,并且任何自定义作用域实现都可以注册到上下文中。以下示例展示了线程本地作用域如何应用于通道,包括作用域本身的注册:

<int:channel id="threadScopedChannel" scope="thread">
<int:queue />
</int:channel>

<bean class="org.springframework.beans.factory.config.CustomScopeConfigurer">
<property name="scopes">
<map>
<entry key="thread" value="org.springframework.context.support.SimpleThreadScope" />
</map>
</property>
</bean>
xml

在前面的例子中定义的通道也在内部委托给一个队列,但该通道绑定到当前线程,因此队列的内容也是类似的绑定。这样,发送消息到该通道的线程可以在之后接收这些相同的消息,但其他线程将无法访问它们。虽然线程作用域的通道很少需要,但在使用 DirectChannel 实例来强制执行单个线程操作的情况下,而任何回复消息应发送到一个“终端”通道时,它们可能是有用的。如果该终端通道是线程作用域的,则原始发送线程可以从终端通道收集其回复。

现在,由于任何通道都可以具有作用域,除了 thread-Local 之外,你还可以定义自己的作用域。