跳到主要内容

ZeroMQ 支持

QWen Plus 中英对照 ZeroMQ Support

Spring Integration 提供了组件以支持应用程序中的 ZeroMQ 通信。该实现基于广受支持的 JeroMQ 库的 Java API。所有组件都封装了 ZeroMQ 套接字生命周期,并在内部为它们管理线程,使与这些组件的交互成为无锁且线程安全的。

你需要将这个依赖项添加到你的项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-zeromq</artifactId>
<version>6.4.2</version>
</dependency>
xml

ZeroMQ 代理

ZeroMqProxy 是 Spring 友好的内置 ZMQ.proxy() 函数 的包装器。它封装了套接字生命周期和线程管理。此代理的客户端仍然可以使用标准的 ZeroMQ 套接字连接和交互 API。除了标准的 ZContext 外,它还需要一个知名的 ZeroMQ 代理模式之一:SUB/PUB、PULL/PUSH 或 ROUTER/DEALER。这样,为代理的前端和后端使用适当的 ZeroMQ 套接字类型对。详情请参阅 ZeroMqProxy.Type

ZeroMqProxy 实现了 SmartLifecycle,以创建、绑定和配置套接字,并在一个专用线程中从 Executor(如果有的话)启动 ZMQ.proxy()。前端和后端套接字的绑定是通过 tcp:// 协议在所有可用的网络接口上进行的,使用提供的端口。否则,它们将绑定到随机端口,这些端口可以通过相应的 getFrontendPort()getBackendPort() API 方法稍后获取。

控制套接字以 SocketType.PAIR 暴露,并通过线程间传输在 "inproc://" + beanName + ".control" 地址上;可以通过 getControlAddress() 获取。它应该与同一个应用程序中的另一个 SocketType.PAIR 套接字一起使用,以发送 ZMQ.PROXY_TERMINATEZMQ.PROXY_PAUSE 和/或 ZMQ.PROXY_RESUME 命令。当调用 stop() 时,ZeroMqProxy 执行 ZMQ.PROXY_TERMINATE 命令,以终止 ZMQ.proxy() 循环并优雅地关闭所有绑定的套接字。

setExposeCaptureSocket(boolean) 选项会导致此组件绑定一个额外的线程间套接字,使用 SocketType.PUB 捕获并发布前端和后端套接字之间所有通信,就像 ZMQ.proxy() 实现所描述的那样。此套接字绑定到 "inproc://" + beanName + ".capture" 地址,并不期望有任何特定的订阅用于过滤。

前端和后端套接字可以通过附加属性进行自定义,例如读/写超时或安全性。分别可以通过 setFrontendSocketConfigurer(Consumer<ZMQ.Socket>)setBackendSocketConfigurer(Consumer<ZMQ.Socket>) 回调来进行这种自定义。

ZeroMqProxy 可以像这样作为一个简单的 bean 提供:

@Bean
ZeroMqProxy zeroMqProxy() {
ZeroMqProxy proxy = new ZeroMqProxy(CONTEXT, ZeroMqProxy.Type.SUB_PUB);
proxy.setExposeCaptureSocket(true);
proxy.setFrontendPort(6001);
proxy.setBackendPort(6002);
return proxy;
}
java

所有客户端节点都应该通过 tcp:// 连接到此代理的主机,并使用它们感兴趣的相应端口。

ZeroMQ 消息通道

ZeroMqChannel 是一个 SubscribableChannel,它使用一对 ZeroMQ 套接字来连接发布者和订阅者以进行消息交互。它可以工作在 PUB/SUB 模式(默认为 PUSH/PULL 模式);也可以用作本地线程间通道(使用 PAIR 套接字)——在这种情况下,不提供 connectUrl。在分布式模式下,它必须连接到外部管理的 ZeroMQ 代理,在那里它可以与其他连接到同一代理的类似通道交换消息。连接 URL 选项是一个标准的 ZeroMQ 连接字符串,包含协议、主机以及前端和后端套接字的一对端口,端口之间用冒号分隔。为了方便起见,如果在同一应用程序中配置了代理,则可以向通道提供 ZeroMqProxy 实例而不是连接字符串。

发送和接收套接字都在它们自己的专用线程中进行管理,这使得该通道对并发友好。这样,我们可以从不同的线程发布和消费 ZeroMqChannel 而不需要同步。

默认情况下,ZeroMqChannel 使用 EmbeddedJsonHeadersMessageMapper 通过 Jackson JSON 处理程序将 Message(包括头信息)从/到 byte[] 进行(反)序列化。可以通过 setMessageMapper(BytesMessageMapper) 配置此逻辑。

发送和接收套接字可以通过各自的 setSendSocketConfigurer(Consumer<ZMQ.Socket>)setSubscribeSocketConfigurer(Consumer<ZMQ.Socket>) 回调函数进行自定义,以设置任何选项(读/写超时、安全性等)。

ZeroMqChannel 的内部逻辑基于通过 Project Reactor FluxMono 操作符实现的反应式流。这提供了更简单的线程控制,并允许无锁的并发发布和从通道消费。本地 PUB/SUB 逻辑实现为 Flux.publish() 操作符,以允许所有订阅此通道的本地订阅者接收相同的已发布消息,就像分布式的 PUB 套接字订阅者一样。

以下是一个简单的 ZeroMqChannel 配置示例:

@Bean
ZeroMqChannel zeroMqPubSubChannel(ZContext context) {
ZeroMqChannel channel = new ZeroMqChannel(context, true);
channel.setConnectUrl("tcp://localhost:6001:6002");
channel.setConsumeDelay(Duration.ofMillis(100));
return channel;
}
java

ZeroMQ 入站通道适配器

ZeroMqMessageProducer 是一个具有反应式语义的 MessageProducerSupport 实现。它以非阻塞的方式不断从 ZeroMQ 套接字读取数据,并将消息发布到一个无限的 Flux,该 Flux 要么由 FluxMessageChannel 订阅,要么在 start() 方法中显式订阅,如果输出通道不是反应式的。当套接字上没有收到数据时,在下一次读取尝试之前会应用一个 consumeDelay(默认为 1 秒)。

仅支持 SocketType.PAIRSocketType.PULLSocketType.SUB。此组件可以连接到远程套接字或绑定到 TCP 协议,使用提供的或随机的端口。在该组件启动并且 ZeroMQ 套接字绑定之后,可以通过 getBoundPort() 获取实际端口。可以通过 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) 回调来配置套接字选项(例如,安全设置或写入超时)。

如果 receiveRaw 选项设置为 true,从套接字消费的 ZMsg 将原样作为生成的 Message 的有效载荷发送:由下游流来解析和转换该 ZMsg。否则,使用 InboundMessageMapper 将消费的数据转换为 Message。如果收到的 ZMsg 是多帧的,则第一帧被视为此 ZeroMQ 消息发布的 ZeroMqHeaders.TOPIC 标头。

如果 unwrapTopic 选项设置为 false,则认为传入的消息由两个帧组成:主题和 ZeroMQ 消息。否则,默认情况下,ZMsg 被认为由三个帧组成:第一个帧包含主题,最后一个帧包含消息,在中间有一个空帧。

使用 SocketType.SUB 时,ZeroMqMessageProducer 使用提供的 topics 选项进行订阅;默认订阅所有主题。可以在运行时使用 subscribeToTopics()unsubscribeFromTopics() @ManagedOperation 调整订阅。

以下是 ZeroMqMessageProducer 配置的示例:

@Bean
ZeroMqMessageProducer zeroMqMessageProducer(ZContext context, MessageChannel outputChannel) {
ZeroMqMessageProducer messageProducer = new ZeroMqMessageProducer(context, SocketType.SUB);
messageProducer.setOutputChannel(outputChannel);
messageProducer.setTopics("some");
messageProducer.setReceiveRaw(true);
messageProducer.setBindPort(7070);
messageProducer.setConsumeDelay(Duration.ofMillis(100));
return messageProducer;
}
java

ZeroMQ 外发通道适配器

ZeroMqMessageHandlerReactiveMessageHandler 的一个实现,用于将发布消息生成到 ZeroMQ 套接字中。仅支持 SocketType.PAIRSocketType.PUSHSocketType.PUB。此组件可以连接到远程套接字或绑定到 TCP 协议提供的或随机的端口。在该组件启动并绑定 ZeroMQ 套接字后,可以通过 getBoundPort() 获取实际的端口。

当使用 SocketType.PUB 时,topicExpression 会对请求消息进行求值,以在 ZeroMQ 消息中注入一个主题帧(前提是它不为 null)。订阅者端 (SocketType.SUB) 必须先接收主题帧,然后才能解析实际数据。

如果 wrapTopic 选项设置为 false,则 ZeroMQ 消息帧会在注入的主题之后发送,如果存在的话。默认情况下,在主题和消息之间会发送一个额外的空帧。

当请求消息的有效载荷是 ZMsg 时,不会执行任何转换或主题提取:ZMsg 将原样发送到套接字中,并且不会被销毁以便可能的进一步重用。否则,将使用 OutboundMessageMapper<byte[]> 将请求消息(或只是其有效载荷)转换为 ZeroMQ 帧以发布。默认情况下,会使用一个由 ConfigurableCompositeMessageConverter 提供的 ConvertingBytesMessageMapper。可以通过 setSocketConfigurer(Consumer<ZMQ.Socket> socketConfigurer) 回调来配置套接字选项(例如,安全设置或写入超时)。

以下是 ZeroMqMessageHandler 配置的示例,该配置连接到一个 socket:

@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, "tcp://localhost:6060", SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}
java

以下是 ZeroMqMessageHandler 配置的示例,它绑定到提供的端口:

@Bean
@ServiceActivator(inputChannel = "zeroMqPublisherChannel")
ZeroMqMessageHandler zeroMqMessageHandler(ZContext context) {
ZeroMqMessageHandler messageHandler =
new ZeroMqMessageHandler(context, 7070, SocketType.PUB);
messageHandler.setTopicExpression(
new FunctionExpression<Message<?>>((message) -> message.getHeaders().get("topic")));
messageHandler.setMessageMapper(new EmbeddedJsonHeadersMessageMapper());
}
java

ZeroMQ Java DSL 支持

spring-integration-zeromq 通过 ZeroMq 工厂和 IntegrationComponentSpec 实现为上述组件提供了一个方便的 Java DSL 流式 API。

这是 ZeroMqChannel 的 Java DSL 示例:

.channel(ZeroMq.zeroMqChannel(this.context)
.connectUrl("tcp://localhost:6001:6002")
.consumeDelay(Duration.ofMillis(100)))
}
java

ZeroMQ Java DSL 的入站通道适配器是:

IntegrationFlow.from(
ZeroMq.inboundChannelAdapter(this.context, SocketType.SUB)
.connectUrl("tcp://localhost:9000")
.topics("someTopic")
.receiveRaw(true)
.consumeDelay(Duration.ofMillis(100)))
}
java

ZeroMQ Java DSL 的 outbound channel adapter 是:

.handle(ZeroMq.outboundChannelAdapter(this.context, "tcp://localhost:9001", SocketType.PUB)
.topicFunction(message -> message.getHeaders().get("myTopic")))
}
java