跳到主要内容
版本:7.0.2

Hazelcast 支持

DeepSeek V3 中英对照 Hazelcast Support

Spring Integration 提供了通道适配器和其他实用组件,用于与内存数据网格 Hazelcast 进行交互。

此依赖项为项目所需:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-hazelcast</artifactId>
<version>7.0.2</version>
</dependency>

Hazelcast 组件的 XML 命名空间和 schemaLocation 定义如下:

xmlns:int-hazelcast="http://www.springframework.org/schema/integration/hazelcast"
xsi:schemaLocation="http://www.springframework.org/schema/integration/hazelcast
https://www.springframework.org/schema/integration/hazelcast/spring-integration-hazelcast.xsd"

Hazelcast 事件驱动入站通道适配器

Hazelcast 提供分布式数据结构,例如:

  • com.hazelcast.map.IMap

  • com.hazelcast.multimap.MultiMap

  • com.hazelcast.collection.IList

  • com.hazelcast.collection.ISet

  • com.hazelcast.collection.IQueue

  • com.hazelcast.topic.ITopic

  • com.hazelcast.replicatedmap.ReplicatedMap

它还提供了事件监听器,以便监听对这些数据结构所做的修改。

  • com.hazelcast.core.EntryListener<K, V>

  • com.hazelcast.collection.ItemListener

  • com.hazelcast.topic.MessageListener

Hazelcast 事件驱动入站通道适配器监听相关缓存事件,并将事件消息发送到已定义的通道。它支持 XML 和 JavaConfig 两种驱动配置方式。

XML 配置 :

<int-hazelcast:inbound-channel-adapter channel="mapChannel"
cache="map"
cache-events="UPDATED, REMOVED"
cache-listening-policy="SINGLE" />

Hazelcast 事件驱动入站通道适配器需要以下属性:

  • channel: 指定消息发送到的通道;

  • cache: 指定被监听的分布式对象引用。这是一个必需属性;

  • cache-events: 指定监听的缓存事件。这是一个可选属性,其默认值为 ADDED。其支持的值如下:

  • 针对 IMapMultiMap 支持的缓存事件类型:ADDEDREMOVEDUPDATEDEVICTEDEVICT_ALLCLEAR_ALL

  • 针对 ReplicatedMap 支持的缓存事件类型:ADDEDREMOVEDUPDATEDEVICTED

  • 针对 IListISetIQueue 支持的缓存事件类型:ADDEDREMOVEDITopic 没有缓存事件类型。

  • cache-listening-policy: 将缓存监听策略指定为 SINGLEALL。这是一个可选属性,其默认值为 SINGLE。每个监听相同缓存对象且具有相同 cache-events 属性的 Hazelcast 入站通道适配器可以接收单个事件消息或所有事件消息。如果设置为 ALL,则所有监听相同缓存对象且具有相同 cache-events 属性的 Hazelcast 入站通道适配器都将接收到所有事件消息。如果设置为 SINGLE,则它们将接收到唯一的事件消息。

一些配置示例:

<int:channel id="mapChannel"/>

<int-hazelcast:inbound-channel-adapter channel="mapChannel"
cache="map"
cache-events="UPDATED, REMOVED" />

<bean id="map" factory-bean="instance" factory-method="getMap">
<constructor-arg value="map"/>
</bean>

<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
<int-hazelcast:inbound-channel-adapter channel="multiMapChannel"
cache="multiMap"
cache-events="ADDED, REMOVED, CLEAR_ALL" />

<bean id="multiMap" factory-bean="instance" factory-method="getMultiMap">
<constructor-arg value="multiMap"/>
</bean>
<int-hazelcast:inbound-channel-adapter  channel="listChannel"
cache="list"
cache-events="ADDED, REMOVED"
cache-listening-policy="ALL" />

<bean id="list" factory-bean="instance" factory-method="getList">
<constructor-arg value="list"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="setChannel" cache="set" />

<bean id="set" factory-bean="instance" factory-method="getSet">
<constructor-arg value="set"/>
</bean>
<int-hazelcast:inbound-channel-adapter  channel="queueChannel"
cache="queue"
cache-events="REMOVED"
cache-listening-policy="ALL" />

<bean id="queue" factory-bean="instance" factory-method="getQueue">
<constructor-arg value="queue"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="topicChannel" cache="topic" />

<bean id="topic" factory-bean="instance" factory-method="getTopic">
<constructor-arg value="topic"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="replicatedMapChannel"
cache="replicatedMap"
cache-events="ADDED, UPDATED, REMOVED"
cache-listening-policy="SINGLE" />

<bean id="replicatedMap" factory-bean="instance" factory-method="getReplicatedMap">
<constructor-arg value="replicatedMap"/>
</bean>

Java 配置示例:

以下示例展示了 DistributedMap 的配置。相同的配置也可用于其他分布式数据结构(IMapMultiMapReplicatedMapIListISetIQueueITopic):

@Bean
public PollableChannel distributedMapChannel() {
return new QueueChannel();
}

@Bean
public IMap<Integer, String> distributedMap() {
return hazelcastInstance().getMap("Distributed_Map");
}

@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}

@Bean
public HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer() {
final HazelcastEventDrivenMessageProducer producer = new HazelcastEventDrivenMessageProducer(distributedMap());
producer.setOutputChannel(distributedMapChannel());
producer.setCacheEventTypes("ADDED,REMOVED,UPDATED,CLEAR_ALL");
producer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE);

return producer;
}

Hazelcast 持续查询入站通道适配器

Hazelcast 连续查询功能支持监听特定映射条目的修改操作。Hazelcast 连续查询入站通道适配器是一种事件驱动的通道适配器,它会根据定义的谓词监听相关的分布式映射事件。

@Bean
public PollableChannel cqDistributedMapChannel() {
return new QueueChannel();
}

@Bean
public IMap<Integer, String> cqDistributedMap() {
return hazelcastInstance().getMap("CQ_Distributed_Map");
}

@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}

@Bean
public HazelcastContinuousQueryMessageProducer hazelcastContinuousQueryMessageProducer() {
final HazelcastContinuousQueryMessageProducer producer =
new HazelcastContinuousQueryMessageProducer(cqDistributedMap(), "surname=TestSurname");
producer.setOutputChannel(cqDistributedMapChannel());
producer.setCacheEventTypes("UPDATED");
producer.setIncludeValue(false);

return producer;
}

它支持以下六个属性:

  • channel: 指定消息发送的目标通道;

  • cache: 指定被监听的分布式 Map 引用。必需属性;

  • cache-events: 指定监听的缓存事件。可选属性,默认值为 ADDED。支持的值包括 ADDEDREMOVEDUPDATEDEVICTEDEVICT_ALLCLEAR_ALL

  • predicate: 指定一个谓词,用于监听对特定 Map 条目执行的修改。必需属性;

  • include-value: 指定是否在连续查询结果中包含 valueoldValue。可选属性,默认值为 true

  • cache-listening-policy: 指定缓存监听策略为 SINGLEALL。可选属性,默认值为 SINGLE。每个监听相同缓存对象且具有相同 cache-events 属性的 Hazelcast CQ 入站通道适配器可以接收单个事件消息或所有事件消息。如果设置为 ALL,则所有监听相同缓存对象且具有相同 cache-events 属性的 Hazelcast CQ 入站通道适配器都将收到所有事件消息。如果设置为 SINGLE,则它们将收到唯一的事件消息。

Hazelcast 集群监控入站通道适配器

Hazelcast 集群监视器支持监听集群上执行的修改操作。Hazelcast 集群监视器入站通道适配器是一个事件驱动的通道适配器,可监听相关的成员资格、分布式对象、迁移、生命周期和客户端事件:

@Bean
public PollableChannel eventChannel() {
return new QueueChannel();
}

@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}

@Bean
public HazelcastClusterMonitorMessageProducer hazelcastClusterMonitorMessageProducer() {
HazelcastClusterMonitorMessageProducer producer = new HazelcastClusterMonitorMessageProducer(hazelcastInstance());
producer.setOutputChannel(eventChannel());
producer.setMonitorEventTypes("DISTRIBUTED_OBJECT");

return producer;
}

它支持以下三个属性:

  • channel: 指定消息发送的目标通道;

  • hazelcast-instance: 指定用于监听集群事件的 Hazelcast 实例引用。这是一个必需属性;

  • monitor-types: 指定监听的监控器类型。这是一个可选属性,默认值为 MEMBERSHIP。支持的值为 MEMBERSHIPDISTRIBUTED_OBJECTMIGRATIONLIFECYCLECLIENT

Hazelcast 分布式 SQL 入站通道适配器

Hazelcast 允许在分布式映射上运行分布式查询。Hazelcast 分布式 SQL 入站通道适配器是一种轮询式入站通道适配器。它运行已定义的分布式 SQL 命令,并根据迭代类型返回结果。

@Bean
public PollableChannel dsDistributedMapChannel() {
return new QueueChannel();
}

@Bean
public IMap<Integer, String> dsDistributedMap() {
return hazelcastInstance().getMap("DS_Distributed_Map");
}

@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}

@Bean
@InboundChannelAdapter(value = "dsDistributedMapChannel", poller = @Poller(maxMessagesPerPoll = "1"))
public HazelcastDistributedSQLMessageSource hazelcastDistributedSQLMessageSource() {
final HazelcastDistributedSQLMessageSource messageSource =
new HazelcastDistributedSQLMessageSource(dsDistributedMap(),
"name='TestName' AND surname='TestSurname'");
messageSource.setIterationType(DistributedSQLIterationType.ENTRY);

return messageSource;
}

它需要一个轮询器并支持四个属性:

  • channel:指定消息发送到的通道。这是一个必需属性;

  • cache:指定被查询的分布式 IMap 引用。这是一个必需属性;

  • iteration-type:指定结果类型。分布式 SQL 可以在 EntrySetKeySetLocalKeySetValues 上运行。这是一个可选属性,默认值为 VALUE。支持的值为 ENTRY`、`KEYLOCAL_KEYVALUE

  • distributed-sql:指定 SQL 语句的 where 子句。这是一个必需属性。

Hazelcast 出站通道适配器

Hazelcast 出站通道适配器监听其定义的通道,并将传入的消息写入相关的分布式缓存。它期望通过 cachecache-expressionHazelcastHeaders.CACHE_NAME 之一来定义分布式对象。支持的分布式对象包括:IMapMultiMapReplicatedMapIListISetIQueueITopic

@Bean
public MessageChannel distributedMapChannel() {
return new DirectChannel();
}

@Bean
public IMap<Integer, String> distributedMap() {
return hzInstance().getMap("Distributed_Map");
}

@Bean
public HazelcastInstance hzInstance() {
return Hazelcast.newHazelcastInstance();
}

@Bean
@ServiceActivator(inputChannel = "distributedMapChannel")
public HazelcastCacheWritingMessageHandler hazelcastCacheWritingMessageHandler() {
HazelcastCacheWritingMessageHandler handler = new HazelcastCacheWritingMessageHandler();
handler.setDistributedObject(distributedMap());
handler.setKeyExpression(new SpelExpressionParser().parseExpression("payload.id"));
handler.setExtractPayload(true);
return handler;
}

它需要以下属性:

  • channel: 指定消息发送到的通道;

  • cache: 指定分布式对象引用。可选;

  • cache-expression: 通过 Spring 表达式语言 (SpEL) 指定分布式对象。可选;

  • key-expression: 通过 Spring 表达式语言 (SpEL) 指定键值对的键。可选,且仅对 IMapMultiMapReplicatedMap 分布式数据结构是必需的。

  • extract-payload: 指定是发送整个消息还是仅发送有效负载。可选属性,默认值为 true。如果为 true,则仅将有效负载写入分布式对象。否则,将通过转换消息头和有效负载来写入整个消息。

通过在消息头中设置分布式对象名称,可以通过同一通道将消息写入不同的分布式对象。如果未定义 cachecache-expression 属性,则必须在请求 Message 中设置 HazelcastHeaders.CACHE_NAME 消息头。

Hazelcast 消息存储

对于分布式消息状态管理,例如用于持久化的 QueueChannel 或跟踪 Aggregator 消息组,我们提供了 HazelcastMessageStore 实现:

@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}

@Bean
public MessageGroupStore messageStore() {
return new HazelcastMessageStore(hazelcastInstance());
}

默认情况下,SPRING_INTEGRATION_MESSAGE_STORE IMap 用于以键/值形式存储消息和组。任何自定义的 IMap 都可以提供给 HazelcastMessageStore

Hazelcast 元数据存储

ListenableMetadataStore 的一个实现可通过底层的 Hazelcast IMap 来使用。默认创建的映射名称为 SPRING_INTEGRATION_METADATA_STORE,该名称可以进行自定义。

@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}

@Bean
public MetadataStore metadataStore() {
return new HazelcastMetadataStore(hazelcastInstance());
}

HazelcastMetadataStore 实现了 ListenableMetadataStore 接口,允许您注册自己的 MetadataStoreListener 类型监听器,通过 addListener(MetadataStoreListener callback) 方法来监听事件。

使用 Hazelcast 的消息通道

Hazelcast 的 IQueueITopic 分布式对象本质上是消息传递原语,可以直接与 Spring Integration 核心组件配合使用,无需在此 Hazelcast 模块中进行额外实现。

QueueChannel 可以通过任何 java.util.Queue 提供支持,包括前面提到的 Hazelcast 分布式 IQueue

@Bean
PollableChannel hazelcastQueueChannel(HazelcastInstance hazelcastInstance) {
return new QueueChannel(hazelcastInstance.getQueue("springIntegrationQueue"));
}

将此配置放置在应用程序的Hazelcast集群中的多个节点上,将使QueueChannel成为分布式队列,并且只有一个节点能够从该IQueue中轮询单个Message。其工作原理类似于PollableJmsChannelPollableKafkaChannelPollableAmqpChannel

如果生产者端不是 Spring Integration 应用,则无法配置 QueueChannel,因此需要使用原生的 Hazelcast IQueue API 来生成数据。在这种情况下,消费者端采用 QueueChannel 的方式是错误的:必须改用入站通道适配器方案:

@Bean
public IQueue<String> myStringHzQueue(HazelcastInstance hazelcastInstance) {
return hazelcastInstance.getQueue("springIntegrationQueue");
}

@Bean
@InboundChannelAdapter(channel = "stringValuesFromHzQueueChannel")
Supplier<String> fromHzIQueueSource(IQueue<String> myStringHzQueue) {
return myStringHzQueue::poll;
}

Hazelcast 中的 ITopic 抽象与 JMS 中的 Topic 具有相似的语义:所有订阅者都会收到发布的消息。通过一对简单的 MessageChannel bean,该机制作为开箱即用的功能得到支持:

@Bean
public ITopic<Message<?>> springIntegrationTopic(HazelcastInstance hazelcastInstance,
MessageChannel fromHazelcastTopicChannel) {

ITopic<Message<?>> topic = hazelcastInstance.getTopic("springIntegrationTopic");
topic.addMessageListener(m -> fromHazelcastTopicChannel.send(m.getMessageObject()));
return topic;
}

@Bean
public MessageChannel publishToHazelcastTopicChannel(ITopic<Message<?>> springIntegrationTopic) {
return new FixedSubscriberChannel(springIntegrationTopic::publish);
}

@Bean
public MessageChannel fromHazelcastTopicChannel() {
return new DirectChannel();
}

FixedSubscriberChannelDirectChannel 的优化变体,它需要在初始化时指定一个 MessageHandler。由于 MessageHandler 是一个函数式接口,因此可以为 handleMessage 方法提供一个简单的 lambda 表达式。当消息被发送到 publishToHazelcastTopicChannel 时,它会被直接发布到 Hazelcast 的 ITopic 上。com.hazelcast.topic.MessageListener 同样是一个函数式接口,因此可以为 ITopic#addMessageListener 提供一个 lambda 表达式。这样,订阅 fromHazelcastTopicChannel 的消费者将接收到发送到该 ITopic 的所有消息。

ExecutorChannel 可以配置一个 IExecutorService。例如,通过相应的配置可以实现集群范围内的单例:

@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(
new Config()
.addExecutorConfig(new ExecutorConfig()
.setName("singletonExecutor")
.setPoolSize(1)));
}

@Bean
public MessageChannel hazelcastSingletonExecutorChannel(HazelcastInstance hazelcastInstance) {
return new ExecutorChannel(hazelcastInstance.getExecutorService("singletonExecutor"));
}