Hazelcast 支持
Spring Integration 提供了通道适配器和其他实用组件,用于与内存数据网格 Hazelcast 进行交互。
你需要将这个依赖项添加到你的项目中:
- Maven
- Gradle
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-hazelcast</artifactId>
    <version>6.4.2</version>
</dependency>
compile "org.springframework.integration:spring-integration-hazelcast:6.4.2"
Hazelcast 组件的 XML 命名空间和 schemaLocation 定义为:
xmlns:int-hazelcast="http://www.springframework.org/schema/integration/hazelcast"
xsi:schemaLocation="http://www.springframework.org/schema/integration/hazelcast
          https://www.springframework.org/schema/integration/hazelcast/spring-integration-hazelcast.xsd"
Hazelcast 事件驱动的传入通道适配器
Hazelcast 提供了分布式数据结构,例如:
- 
com.hazelcast.map.IMap
- 
com.hazelcast.multimap.MultiMap
- 
com.hazelcast.collection.IList
- 
com.hazelcast.collection.ISet
- 
com.hazelcast.collection.IQueue
- 
com.hazelcast.topic.ITopic
- 
com.hazelcast.replicatedmap.ReplicatedMap
它还提供了事件监听器,以便监听对这些数据结构的修改。
- 
com.hazelcast.core.EntryListener<K, V>
- 
com.hazelcast.collection.ItemListener
- 
com.hazelcast.topic.MessageListener
Hazelcast 事件驱动的传入通道适配器监听相关的缓存事件,并将事件消息发送到定义的通道。它支持 XML 和 JavaConfig 驱动的配置。
XML 配置 :
<int-hazelcast:inbound-channel-adapter channel="mapChannel"
                      cache="map"
                      cache-events="UPDATED, REMOVED"
                      cache-listening-policy="SINGLE" />
Hazelcast 事件驱动的传入通道适配器需要以下属性:
- 
channel: 指定消息发送的通道;
- 
cache: 指定监听的分布式对象引用。它是必填属性;
- 
cache-events: 指定要监听的缓存事件。它是可选属性,默认值是ADDED。其支持的值如下:
- 
IMap和MultiMap支持的缓存事件类型:ADDED,REMOVED,UPDATED,EVICTED,EVICT_ALL和CLEAR_ALL;
- 
ReplicatedMap支持的缓存事件类型:ADDED,REMOVED,UPDATED,EVICTED;
- 
IList,ISet和IQueue支持的缓存事件类型:ADDED,REMOVED。ITopic没有缓存事件类型。
- 
cache-listening-policy: 指定缓存监听策略为SINGLE或ALL。它是可选属性,默认值是SINGLE。每个 Hazelcast 入站通道适配器监听相同的缓存对象且具有相同的 cache-events 属性时,可以接收单个事件消息或所有事件消息。如果设置为ALL,所有监听相同缓存对象且具有相同 cache-events 属性的 Hazelcast 入站通道适配器将接收所有事件消息。如果设置为SINGLE,它们将接收唯一的事件消息。
一些配置示例:
<int:channel id="mapChannel"/>
<int-hazelcast:inbound-channel-adapter channel="mapChannel"
                              cache="map"
                              cache-events="UPDATED, REMOVED" />
<bean id="map" factory-bean="instance" factory-method="getMap">
    <constructor-arg value="map"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>
<int-hazelcast:inbound-channel-adapter channel="multiMapChannel"
                              cache="multiMap"
                              cache-events="ADDED, REMOVED, CLEAR_ALL" />
<bean id="multiMap" factory-bean="instance" factory-method="getMultiMap">
    <constructor-arg value="multiMap"/>
</bean>
<int-hazelcast:inbound-channel-adapter  channel="listChannel"
                               cache="list"
                               cache-events="ADDED, REMOVED"
                               cache-listening-policy="ALL" />
<bean id="list" factory-bean="instance" factory-method="getList">
    <constructor-arg value="list"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="setChannel" cache="set" />
<bean id="set" factory-bean="instance" factory-method="getSet">
    <constructor-arg value="set"/>
</bean>
<int-hazelcast:inbound-channel-adapter  channel="queueChannel"
                               cache="queue"
                               cache-events="REMOVED"
                               cache-listening-policy="ALL" />
<bean id="queue" factory-bean="instance" factory-method="getQueue">
    <constructor-arg value="queue"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="topicChannel" cache="topic" />
<bean id="topic" factory-bean="instance" factory-method="getTopic">
    <constructor-arg value="topic"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="replicatedMapChannel"
                              cache="replicatedMap"
                              cache-events="ADDED, UPDATED, REMOVED"
                              cache-listening-policy="SINGLE"  />
<bean id="replicatedMap" factory-bean="instance" factory-method="getReplicatedMap">
    <constructor-arg value="replicatedMap"/>
</bean>
Java 配置示例:
以下示例显示了 DistributedMap 的配置。相同的配置也可以用于其他分布式数据结构(IMap、MultiMap、ReplicatedMap、IList、ISet、IQueue 和 ITopic):
@Bean
public PollableChannel distributedMapChannel() {
    return new QueueChannel();
}
@Bean
public IMap<Integer, String> distributedMap() {
    return hazelcastInstance().getMap("Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer() {
    final HazelcastEventDrivenMessageProducer producer = new HazelcastEventDrivenMessageProducer(distributedMap());
    producer.setOutputChannel(distributedMapChannel());
    producer.setCacheEventTypes("ADDED,REMOVED,UPDATED,CLEAR_ALL");
    producer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE);
    return producer;
}
Hazelcast 持续查询入站通道适配器
Hazelcast 持续查询使监听对特定映射条目执行的修改成为可能。Hazelcast 持续查询输入通道适配器是一种事件驱动的通道适配器,它根据定义的谓词监听相关的分布式映射事件。
- Java
- XML
@Bean
public PollableChannel cqDistributedMapChannel() {
    return new QueueChannel();
}
@Bean
public IMap<Integer, String> cqDistributedMap() {
    return hazelcastInstance().getMap("CQ_Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastContinuousQueryMessageProducer hazelcastContinuousQueryMessageProducer() {
    final HazelcastContinuousQueryMessageProducer producer =
        new HazelcastContinuousQueryMessageProducer(cqDistributedMap(), "surname=TestSurname");
    producer.setOutputChannel(cqDistributedMapChannel());
    producer.setCacheEventTypes("UPDATED");
    producer.setIncludeValue(false);
    return producer;
}
<int:channel id="cqMapChannel"/>
<int-hazelcast:cq-inbound-channel-adapter
                channel="cqMapChannel"
                cache="cqMap"
                cache-events="UPDATED, REMOVED"
                predicate="name=TestName AND surname=TestSurname"
                include-value="true"
                cache-listening-policy="SINGLE"/>
<bean id="cqMap" factory-bean="instance" factory-method="getMap">
    <constructor-arg value="cqMap"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>
它支持六个属性,如下:
- 
channel: 指定消息发送的通道;
- 
cache: 指定被监听的分布式 Map 引用。必填项;
- 
cache-events: 指定要监听的缓存事件。可选属性,默认值为ADDED。支持的值有ADDED、REMOVED、UPDATED、EVICTED、EVICT_ALL和CLEAR_ALL;
- 
predicate: 指定一个谓词以监听对特定映射条目执行的修改。必填项;
- 
include-value: 指定是否在连续查询结果中包含值和 oldValue。可选,默认为true;
- 
cache-listening-policy: 指定缓存监听策略为SINGLE或ALL。可选,默认值为SINGLE。每个 Hazelcast CQ 入站通道适配器监听相同的缓存对象且具有相同的 cache-events 属性时,可以接收单个事件消息或所有事件消息。如果设置为ALL,所有监听相同缓存对象且具有相同 cache-events 属性的 Hazelcast CQ 入站通道适配器将接收所有事件消息。如果设置为SINGLE,它们将只接收唯一的事件消息。
Hazelcast 集群监控入站通道适配器
Hazelcast 集群监视器支持监听对集群执行的修改。Hazelcast 集群监视器 inbound 通道适配器是一个事件驱动的通道适配器,它监听相关的成员身份、分布式对象、迁移、生命周期和客户端事件:
- Java
- XML
@Bean
public PollableChannel eventChannel() {
    return new QueueChannel();
}
@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastClusterMonitorMessageProducer hazelcastClusterMonitorMessageProducer() {
    HazelcastClusterMonitorMessageProducer producer = new HazelcastClusterMonitorMessageProducer(hazelcastInstance());
    producer.setOutputChannel(eventChannel());
    producer.setMonitorEventTypes("DISTRIBUTED_OBJECT");
    return producer;
}
<int:channel id="monitorChannel"/>
<int-hazelcast:cm-inbound-channel-adapter
                 channel="monitorChannel"
                 hazelcast-instance="instance"
                 monitor-types="MEMBERSHIP, DISTRIBUTED_OBJECT" />
<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>
它支持以下三个属性:
- 
channel: 指定消息发送的通道;
- 
hazelcast-instance: 指定用于监听集群事件的 Hazelcast 实例引用。这是一个必填属性;
- 
monitor-types: 指定要监听的监控类型。这是一个可选属性,默认值为MEMBERSHIP。支持的值有MEMBERSHIP、DISTRIBUTED_OBJECT、MIGRATION、LIFECYCLE和CLIENT。
Hazelcast 分布式 SQL 入站通道适配器
Hazelcast 允许在分布式映射上运行分布式查询。Hazelcast 分布式 SQL 入站通道适配器是轮询入站通道适配器。它运行定义的 distributed-sql 命令,并根据迭代类型返回结果。
- Java
- XML
@Bean
public PollableChannel dsDistributedMapChannel() {
    return new QueueChannel();
}
@Bean
public IMap<Integer, String> dsDistributedMap() {
    return hazelcastInstance().getMap("DS_Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}
@Bean
@InboundChannelAdapter(value = "dsDistributedMapChannel", poller = @Poller(maxMessagesPerPoll = "1"))
public HazelcastDistributedSQLMessageSource hazelcastDistributedSQLMessageSource() {
    final HazelcastDistributedSQLMessageSource messageSource =
        new HazelcastDistributedSQLMessageSource(dsDistributedMap(),
            "name='TestName' AND surname='TestSurname'");
    messageSource.setIterationType(DistributedSQLIterationType.ENTRY);
    return messageSource;
}
<int:channel id="dsMapChannel"/>
<int-hazelcast:ds-inbound-channel-adapter
            channel="dsMapChannel"
            cache="dsMap"
            iteration-type="ENTRY"
            distributed-sql="active=false OR age >= 25 OR name = 'TestName'">
    <int:poller fixed-delay="100"/>
</int-hazelcast:ds-inbound-channel-adapter>
<bean id="dsMap" factory-bean="instance" factory-method="getMap">
    <constructor-arg value="dsMap"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>
它需要一个 poller 并支持四个属性:
- 
channel: 指定消息发送的通道。它是必填属性;
- 
cache: 指定查询的分布式IMap引用。它是必填属性;
- 
iteration-type: 指定结果类型。分布式 SQL 可以在EntrySet、KeySet、LocalKeySet或Values上运行。这是一个可选属性,默认值为VALUE。支持的值为 ``ENTRY,KEY,LOCAL_KEY和VALUE;
- 
distributed-sql: 指定 SQL 语句的 where 子句。它是必填属性。
Hazelcast 外发通道适配器
Hazelcast 外发通道适配器监听其定义的通道,并将传入的消息写入相关的分布式缓存。它期望有一个 cache、cache-expression 或 HazelcastHeaders.CACHE_NAME 用于分布式对象定义。支持的分布式对象有:IMap、MultiMap、ReplicatedMap、IList、ISet、IQueue 和 ITopic。
- Java
- XML
@Bean
public MessageChannel distributedMapChannel() {
    return new DirectChannel();
}
@Bean
public IMap<Integer, String> distributedMap() {
    return hzInstance().getMap("Distributed_Map");
}
@Bean
public HazelcastInstance hzInstance() {
    return Hazelcast.newHazelcastInstance();
}
@Bean
@ServiceActivator(inputChannel = "distributedMapChannel")
public HazelcastCacheWritingMessageHandler hazelcastCacheWritingMessageHandler() {
    HazelcastCacheWritingMessageHandler handler = new HazelcastCacheWritingMessageHandler();
    handler.setDistributedObject(distributedMap());
    handler.setKeyExpression(new SpelExpressionParser().parseExpression("payload.id"));
    handler.setExtractPayload(true);
    return handler;
}
<int-hazelcast:outbound-channel-adapter channel="mapChannel"
                    cache-expression="headers['CACHE_HEADER']"
                    key-expression="payload.key"
                    extract-payload="true"/>
它需要以下属性:
- 
channel: 指定消息发送的通道;
- 
cache: 指定分布式对象引用。可选;
- 
cache-expression: 通过 Spring 表达式语言 (SpEL) 指定分布式对象。可选;
- 
key-expression: 通过 Spring 表达式语言 (SpEL) 指定键值对中的键。仅对于IMap、MultiMap和ReplicatedMap分布式数据结构是可选且必需的。
- 
extract-payload: 指定是否发送整个消息或仅发送负载。可选属性,默认为true。如果为 true,则仅写入负载到分布式对象。否则,将通过转换消息头和负载来写入整个消息。
通过在头部设置分布式对象名称,消息可以通过相同的通道写入不同的分布式对象。如果未定义 cache 或 cache-expression 属性,则必须在请求的 Message 中设置 HazelcastHeaders.CACHE_NAME 头部。
Hazelcast 领导者选举
如果需要领导者选举(例如,对于高可用的消息消费者,只有 一个节点应该接收消息),可以使用基于 Hazelcast 的 LeaderInitiator:
@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}
@Bean
public LeaderInitiator initiator() {
    return new LeaderInitiator(hazelcastInstance());
}
当一个节点被选为领导者时,它将向所有应用程序监听器发送一个 OnGrantedEvent。
Hazelcast 消息存储
对于分布式消息状态管理,例如对于持久化的 QueueChannel 或跟踪 Aggregator 消息组,提供了 HazelcastMessageStore 实现:
@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}
@Bean
public MessageGroupStore messageStore() {
    return new HazelcastMessageStore(hazelcastInstance());
}
默认情况下,使用 SPRING_INTEGRATION_MESSAGE_STORE IMap 作为键/值来存储消息和组。可以向 HazelcastMessageStore 提供任何自定义的 IMap。
Hazelcast 元数据存储
使用支持的 Hazelcast IMap 可以实现 ListenableMetadataStore。默认创建的地图名称为 SPRING_INTEGRATION_METADATA_STORE,可以对其进行自定义。
@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}
@Bean
public MetadataStore metadataStore() {
    return new HazelcastMetadataStore(hazelcastInstance());
}
HazelcastMetadataStore 实现了 ListenableMetadataStore,它允许你注册自己的 MetadataStoreListener 类型的监听器,通过 addListener(MetadataStoreListener callback) 来监听事件。
Hazelcast 锁注册表
可以使用支持 Hazelcast 分布式 ILock 的后端来实现 LockRegistry:
@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}
@Bean
public LockRegistry lockRegistry() {
    return new HazelcastLockRegistry(hazelcastInstance());
}
当与共享的 MessageGroupStore (例如 Aggregator 存储管理)一起使用时,HazelcastLockRegistry 可以用于在多个应用程序实例之间提供此功能,使得只有一个实例可以同时操作该组。
对于所有分布式操作,必须在 HazelcastInstance 上启用 CP 子系统。
使用 Hazelcast 的消息通道
Hazelcast IQueue 和 ITopic 分布式对象本质上是消息传递原始类型,可以与 Spring Integration 核心组件一起使用而无需在此 Hazelcast 模块中进行额外的实现。
QueueChannel 可以由任何 java.util.Queue 提供,包括上述的 Hazelcast 分布式 IQueue:
@Bean
PollableChannel hazelcastQueueChannel(HazelcastInstance hazelcastInstance) {
    return new QueueChannel(hazelcastInstance.getQueue("springIntegrationQueue"));
}
将此配置放置在应用程序的 Hazelcast 集群的多个节点上,会使 QueueChannel 分布式化,并且只有一个节点能够从该 IQueue 中轮询到单个 Message。这类似于 PollableJmsChannel、PollableKafkaChannel 或 PollableAmqpChannel 的工作方式。
如果生产者端不是一个 Spring Integration 应用程序,则无法配置 QueueChannel,因此会使用普通的 Hazelcast IQueue API 来生成数据。在这种情况下,在消费者端使用 QueueChannel 方法是错误的:必须改用 入站通道适配器 解决方案:
@Bean
public IQueue<String> myStringHzQueue(HazelcastInstance hazelcastInstance) {
    return hazelcastInstance.getQueue("springIntegrationQueue");
}
@Bean
@InboundChannelAdapter(channel = "stringValuesFromHzQueueChannel")
Supplier<String> fromHzIQueueSource(IQueue<String> myStringHzQueue) {
    return myStringHzQueue::poll;
}
Hazelcast 中的 ITopic 抽象与 JMS 中的 Topic 具有类似的语义:所有订阅者接收已发布的消息。通过一对简单的 MessageChannel bean,这种机制作为开箱即用的功能得到支持:
@Bean
public ITopic<Message<?>> springIntegrationTopic(HazelcastInstance hazelcastInstance,
        MessageChannel fromHazelcastTopicChannel) {
    ITopic<Message<?>> topic = hazelcastInstance.getTopic("springIntegrationTopic");
	topic.addMessageListener(m -> fromHazelcastTopicChannel.send(m.getMessageObject()));
	return topic;
}
@Bean
public MessageChannel publishToHazelcastTopicChannel(ITopic<Message<?>> springIntegrationTopic) {
    return new FixedSubscriberChannel(springIntegrationTopic::publish);
}
@Bean
public MessageChannel fromHazelcastTopicChannel() {
    return new DirectChannel();
}
FixedSubscriberChannel 是 DirectChannel 的一种优化变体,在初始化时需要一个 MessageHandler。由于 MessageHandler 是一个函数式接口,因此可以为 handleMessage 方法提供一个简单的 lambda 表达式。当消息发送到 publishToHazelcastTopicChannel 时,它只是发布到 Hazelcast ITopic 上。com.hazelcast.topic.MessageListener 也是一个函数式接口,因此可以为 ITopic#addMessageListener 提供一个 lambda 表达式。因此,订阅 fromHazelcastTopicChannel 的客户端将消费所有发送到上述 ITopic 的消息。
一个 ExecutorChannel 可以使用 IExecutorService 进行配置。例如,通过相应的配置可以实现集群范围的单例:
@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance(
                new Config()
                    .addExecutorConfig(new ExecutorConfig()
                         .setName("singletonExecutor")
                         .setPoolSize(1)));
}
@Bean
public MessageChannel hazelcastSingletonExecutorChannel(HazelcastInstance hazelcastInstance) {
    return new ExecutorChannel(hazelcastInstance.getExecutorService("singletonExecutor"));
}