跳到主要内容
版本:7.0.2

ZeroMQ 支持

DeepSeek V3 中英对照 ZeroMQ Support

Spring Integration 提供了用于在应用程序中支持 ZeroMQ 通信的组件。该实现基于 JeroMQ 库中备受支持的 Java API。所有组件都封装了 ZeroMQ 套接字的生命周期,并在内部为其管理线程,使得与这些组件的交互是无锁且线程安全的。

此依赖项为项目所需:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-zeromq</artifactId>
<version>7.0.2</version>
</dependency>

ZeroMQ 代理

ZeroMqProxy 是一个对内置 ZMQ.proxy() 函数 的 Spring 友好封装。它封装了套接字生命周期和线程管理。此代理的客户端仍可使用标准的 ZeroMQ 套接字连接和交互 API。除了标准的 ZContext 外,它还需要一种已知的 ZeroMQ 代理模式:SUB/PUB、PULL/PUSH 或 ROUTER/DEALER。这样,代理的前端和后端会使用相应的一对 ZeroMQ 套接字类型。详情请参阅 ZeroMqProxy.Type

ZeroMqProxy 实现了 SmartLifecycle 接口,用于创建、绑定和配置套接字,并在一个来自 Executor(如果存在)的专用线程中启动 ZMQ.proxy()。前端和后端套接字的绑定是通过 tcp:// 协议在提供的端口上绑定到所有可用的网络接口上完成的。否则,它们将绑定到随机端口,这些端口稍后可以通过相应的 getFrontendPort()getBackendPort() API 方法获取。

控制套接字以 SocketType.PAIR 类型暴露,并通过线程间传输绑定在 "inproc://" + beanName + ".control" 地址上;可通过 getControlAddress() 方法获取。它应与同一应用程序中的另一个 SocketType.PAIR 套接字配合使用,以发送 ZMQ.PROXY_TERMINATEZMQ.PROXY_PAUSE 和/或 ZMQ.PROXY_RESUME 命令。当调用 stop() 方法以终止其生命周期时,ZeroMqProxy 会执行 ZMQ.PROXY_TERMINATE 命令,从而优雅地终止 ZMQ.proxy() 循环并关闭所有绑定的套接字。

setExposeCaptureSocket(boolean) 选项使该组件绑定一个额外的线程间套接字,其类型为 SocketType.PUB,用于捕获并发布前端与后端套接字之间的所有通信,正如 ZMQ.proxy() 实现所述。此套接字绑定到 "inproc://" + beanName + ".capture" 地址,并且不期望任何特定的订阅来进行过滤。

前端和后端套接字可以通过额外属性进行自定义,例如读写超时或安全设置。这种自定义功能分别通过 setFrontendSocketConfigurer(Consumer<ZMQ.Socket>)setBackendSocketConfigurer(Consumer<ZMQ.Socket>) 回调函数实现。

ZeroMqProxy 可以作为简单的 bean 提供,如下所示:

@Bean
ZeroMqProxy zeroMqProxy() {
ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
proxy.setExposeCaptureSocket(true);
proxy.setFrontendPort(6001);
proxy.setBackendPort(6002);
return proxy;
}

所有客户端节点都应通过 tcp:// 协议连接到该代理的主机,并使用各自感兴趣的相应端口。

ZeroMQ 消息通道

ZeroMqChannel 是一个 SubscribableChannel,它使用一对 ZeroMQ 套接字来连接发布者和订阅者,以实现消息交互。它可以在 PUB/SUB 模式下工作(默认为 PUSH/PULL);也可以用作本地线程间通道(使用 PAIR 套接字)——在这种情况下不提供 connectUrl。在分布式模式下,它必须连接到外部管理的 ZeroMQ 代理,并可以通过该代理与其他连接到同一代理的类似通道交换消息。连接 URL 选项是一个标准的 ZeroMQ 连接字符串,包含协议、主机以及一对用冒号分隔的端口,分别用于 ZeroMQ 代理的前端和后端套接字。为了方便起见,如果代理与通道配置在同一应用程序中,可以向通道提供 ZeroMqProxy 实例,而不是连接字符串。

发送和接收套接字都在各自独立的线程中进行管理,使得该通道具有良好的并发友好性。这样,我们就可以在不同的线程中向ZeroMqChannel发布和消费消息,而无需进行同步操作。

默认情况下,ZeroMqChannel 使用 EmbeddedHeadersJsonMessageMapper,通过 Jackson JSON 处理器将 Message(包括头部信息)序列化为 byte[] 或从 byte[] 反序列化。此逻辑可通过 setMessageMapper(BytesMessageMapper) 进行配置。

发送和接收套接字可以通过相应的 setSendSocketConfigurer(Consumer<ZMQ.Socket>)setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>) 回调函数,针对任何选项(读/写超时、安全性等)进行自定义配置。

ZeroMqChannel 的内部逻辑基于响应式流,通过 Project Reactor 的 FluxMono 操作符实现。这提供了更简便的线程控制,并允许在通道上进行无锁的并发发布和消费。本地 PUB/SUB 逻辑通过 Flux.publish() 操作符实现,使得该通道的所有本地订阅者都能接收到相同的发布消息,这与 PUB 套接字的分布式订阅者行为一致。

以下是一个 ZeroMqChannel 配置的简单示例:

@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
ZeroMqChannel channel = new ZeroMqChannel(context, true);
channel.setConnectUrl("tcp://localhost:6001:6002");
channel.setConsumeDelay(Duration.ofMillis(100));
return channel;
}

ZeroMQ 入站通道适配器

ZeroMqMessageProducer 是一个具有响应式语义的 MessageProducerSupport 实现。它以非阻塞方式持续从 ZeroMQ 套接字读取数据,并将消息发布到一个无限的 Flux 流中,该流由 FluxMessageChannel 订阅,或者当输出通道不是响应式时,在 start() 方法中显式订阅。当套接字上没有接收到数据时,会在下一次读取尝试之前应用 consumeDelay(默认为 1 秒)。

ZeroMqMessageProducer 仅支持 SocketType.PAIRSocketType.PULLSocketType.SUB 类型。该组件可以连接到远程套接字,或通过指定或随机端口绑定到 TCP 协议。启动此组件并绑定 ZeroMQ 套接字后,可通过 getBoundPort() 获取实际端口。套接字选项(例如安全设置或写入超时)可通过 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) 回调进行配置。

如果 receiveRaw 选项设置为 true,从套接字接收的 ZMsg 会以原始形式作为所生成 Message 的有效载荷发送:由下游流程负责解析和转换 ZMsg。否则,将使用 InboundMessageMapper 将接收的数据转换为 Message。如果接收到的 ZMsg 是多帧的,第一帧将被视为该 ZeroMQ 消息发布到的 ZeroMqHeaders.TOPIC 头部。

如果 unwrapTopic 选项设置为 false,则传入消息被视为由两个帧组成:主题和 ZeroMQ 消息。否则,默认情况下,ZMsg 被视为由三个帧组成:第一个帧包含主题,最后一个帧包含消息,中间是一个空帧。

使用 SocketType.SUB 时,ZeroMqMessageProducer 会使用提供的 topics 选项进行订阅;默认情况下订阅所有主题。订阅可以在运行时通过 subscribeToTopics()unsubscribeFromTopics() 这两个 @ManagedOperation 进行调整。

以下是 ZeroMqMessageProducer 的配置示例:

@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
messageProducer.setOutputChannel(outputChannel);
messageProducer.setTopics("some");
messageProducer.setReceiveRaw(true);
messageProducer.setBindPort(7070);
messageProducer.setConsumeDelay(Duration.ofMillis(100));
return messageProducer;
}

ZeroMQ 出站通道适配器

ZeroMqMessageHandler 是一个 ReactiveMessageHandler 实现,用于将发布消息发送到 ZeroMQ 套接字。仅支持 SocketType.PAIRSocketType.PUSHSocketType.PUB。该组件可以连接到远程套接字,或通过提供的端口或随机端口绑定到 TCP 协议。实际端口可在组件启动且 ZeroMQ 套接字绑定后通过 getBoundPort() 获取。

当使用 SocketType.PUB 时,topicExpression 会针对请求消息进行评估,如果结果不为空,则会将一个主题帧注入到 ZeroMQ 消息中。订阅方(SocketType.SUB)必须先接收主题帧,然后才能解析实际数据。

如果 wrapTopic 选项设置为 false,ZeroMQ 消息帧将在注入的主题(如果存在)之后发送。默认情况下,会在主题和消息之间发送一个额外的空帧。

当请求消息的有效载荷为 ZMsg 时,不会进行任何转换或主题提取:ZMsg 会按原样发送到套接字中,并且不会被销毁以便后续可能的重用。否则,将使用 OutboundMessageMapper<byte[]> 将请求消息(或其有效载荷)转换为 ZeroMQ 帧进行发布。默认情况下,会使用一个配置了 ConfigurableCompositeMessageConverterConvertingBytesMessageMapper。套接字选项(例如安全性或写入超时)可以通过 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) 回调进行配置。

以下是 ZeroMqMessageHandler 连接到套接字的配置示例:

@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, "tcp://localhost:6060", SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedHeadersJsonMessageMapper());
}

以下是 ZeroMqMessageHandler 的配置示例,它绑定到指定的端口:

@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, 7070, SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedHeadersJsonMessageMapper());
}

ZeroMQ Java DSL 支持

spring-integration-zeromq 通过 ZeroMq 工厂和上述组件的 IntegrationComponentSpec 实现,提供了一个便捷的 Java DSL 流式 API。

这是 ZeroMqChannel 的 Java DSL 示例:

.channel(ZeroMq.zeroMqChannel(this.context)
.connectUrl("tcp://localhost:6001:6002")
.consumeDelay(Duration.ofMillis(100)))
}

ZeroMQ Java DSL 的入站通道适配器为:

IntegrationFlow.from(
ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
.connectUrl("tcp://localhost:9000")
.topics("someTopic")
.receiveRaw(true)
.consumeDelay(Duration.ofMillis(100)))
}

ZeroMQ Java DSL 的出站通道适配器为:

.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://localhost:9001", SocketType.PUB)
.topicFunction(message -> message.getHeaders().get("myTopic")))
}