|
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.4.0! |
Hazelcast 支持
Spring 集成提供了通道适配器和其他 Util 组件,以便与内存中的数据网格 Hazelcast 进行交互。
您需要将此依赖项包含在您的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-hazelcast</artifactId>
<version>6.4.1-SNAPSHOT</version>
</dependency>
compile "org.springframework.integration:spring-integration-hazelcast:6.4.1-SNAPSHOT"
Hazelcast 组件的 XML 命名空间和 schemaLocation 定义是:
xmlns:int-hazelcast="http://www.springframework.org/schema/integration/hazelcast"
xsi:schemaLocation="http://www.springframework.org/schema/integration/hazelcast
https://www.springframework.org/schema/integration/hazelcast/spring-integration-hazelcast.xsd"
Hazelcast 事件驱动的入站通道适配器
Hazelcast 提供分布式数据结构,例如:
-
com.hazelcast.map.IMap -
com.hazelcast.multimap.MultiMap -
com.hazelcast.collection.IList -
com.hazelcast.collection.ISet -
com.hazelcast.collection.IQueue -
com.hazelcast.topic.ITopic -
com.hazelcast.replicatedmap.ReplicatedMap
它还提供事件侦听器,以便侦听对这些数据结构所做的修改。
-
com.hazelcast.core.EntryListener<K, V> -
com.hazelcast.collection.ItemListener -
com.hazelcast.topic.MessageListener
Hazelcast 事件驱动的入站通道适配器侦听相关的缓存事件,并将事件消息发送到定义的通道。 它支持 XML 和 JavaConfig 驱动的配置。
XML 配置 :
<int-hazelcast:inbound-channel-adapter channel="mapChannel"
cache="map"
cache-events="UPDATED, REMOVED"
cache-listening-policy="SINGLE" />
Hazelcast 事件驱动的入站通道适配器需要以下属性:
-
channel:指定将消息发送到的通道; -
cache:指定要侦听的分布式 Object 引用。 这是一个强制性属性; -
cache-events:指定要侦听的缓存事件。 它是一个可选属性,其默认值为ADDED. 其支持的值如下: -
支持的缓存事件类型
IMap和MultiMap:ADDED,REMOVED,UPDATED,EVICTED,EVICT_ALL和CLEAR_ALL; -
支持的缓存事件类型
ReplicatedMap:ADDED,REMOVED,UPDATED,EVICTED; -
支持的缓存事件类型
IList,ISet和IQueue:ADDED,REMOVED. 没有 的缓存事件类型ITopic. -
cache-listening-policy:将缓存侦听策略指定为SINGLE或ALL. 它是一个可选属性,其默认值为SINGLE. 每个侦听具有相同 cache-events 属性的相同缓存对象的 Hazelcast 入站通道适配器可以接收单个事件消息或所有事件消息。 如果是ALL,所有侦听具有相同 cache-events 属性的同一缓存对象的 Hazelcast 入站通道适配器都将收到所有事件消息。 如果是SINGLE,他们将收到唯一的事件消息。
一些配置示例:
<int:channel id="mapChannel"/>
<int-hazelcast:inbound-channel-adapter channel="mapChannel"
cache="map"
cache-events="UPDATED, REMOVED" />
<bean id="map" factory-bean="instance" factory-method="getMap">
<constructor-arg value="map"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
<int-hazelcast:inbound-channel-adapter channel="multiMapChannel"
cache="multiMap"
cache-events="ADDED, REMOVED, CLEAR_ALL" />
<bean id="multiMap" factory-bean="instance" factory-method="getMultiMap">
<constructor-arg value="multiMap"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="listChannel"
cache="list"
cache-events="ADDED, REMOVED"
cache-listening-policy="ALL" />
<bean id="list" factory-bean="instance" factory-method="getList">
<constructor-arg value="list"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="setChannel" cache="set" />
<bean id="set" factory-bean="instance" factory-method="getSet">
<constructor-arg value="set"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="queueChannel"
cache="queue"
cache-events="REMOVED"
cache-listening-policy="ALL" />
<bean id="queue" factory-bean="instance" factory-method="getQueue">
<constructor-arg value="queue"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="topicChannel" cache="topic" />
<bean id="topic" factory-bean="instance" factory-method="getTopic">
<constructor-arg value="topic"/>
</bean>
<int-hazelcast:inbound-channel-adapter channel="replicatedMapChannel"
cache="replicatedMap"
cache-events="ADDED, UPDATED, REMOVED"
cache-listening-policy="SINGLE" />
<bean id="replicatedMap" factory-bean="instance" factory-method="getReplicatedMap">
<constructor-arg value="replicatedMap"/>
</bean>
Java 配置示例:
以下示例显示了DistributedMap配置。
相同的配置可以用于其他分布式数据结构(IMap,MultiMap,ReplicatedMap,IList,ISet,IQueue和ITopic):
@Bean
public PollableChannel distributedMapChannel() {
return new QueueChannel();
}
@Bean
public IMap<Integer, String> distributedMap() {
return hazelcastInstance().getMap("Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer() {
final HazelcastEventDrivenMessageProducer producer = new HazelcastEventDrivenMessageProducer(distributedMap());
producer.setOutputChannel(distributedMapChannel());
producer.setCacheEventTypes("ADDED,REMOVED,UPDATED,CLEAR_ALL");
producer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE);
return producer;
}
Hazelcast Continuous Query 入站通道适配器
Hazelcast Continuous Query 允许侦听对特定 map 条目执行的修改。 Hazelcast Continuous Query Inbound Channel Adapter 是一个事件驱动的通道适配器,它根据定义的谓词侦听相关的分布式 map 事件。
-
Java
-
XML
@Bean
public PollableChannel cqDistributedMapChannel() {
return new QueueChannel();
}
@Bean
public IMap<Integer, String> cqDistributedMap() {
return hazelcastInstance().getMap("CQ_Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastContinuousQueryMessageProducer hazelcastContinuousQueryMessageProducer() {
final HazelcastContinuousQueryMessageProducer producer =
new HazelcastContinuousQueryMessageProducer(cqDistributedMap(), "surname=TestSurname");
producer.setOutputChannel(cqDistributedMapChannel());
producer.setCacheEventTypes("UPDATED");
producer.setIncludeValue(false);
return producer;
}
<int:channel id="cqMapChannel"/>
<int-hazelcast:cq-inbound-channel-adapter
channel="cqMapChannel"
cache="cqMap"
cache-events="UPDATED, REMOVED"
predicate="name=TestName AND surname=TestSurname"
include-value="true"
cache-listening-policy="SINGLE"/>
<bean id="cqMap" factory-bean="instance" factory-method="getMap">
<constructor-arg value="cqMap"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
它支持以下 6 个属性:
-
channel:指定将消息发送到的通道; -
cache:指定要侦听的分布式 Map 引用。 命令的; -
cache-events:指定要侦听的缓存事件。 Optional 属性替换为ADDED作为其默认值。 支持的值包括ADDED,REMOVED,UPDATED,EVICTED,EVICT_ALL和CLEAR_ALL; -
predicate:指定一个谓词,用于侦听对特定映射条目执行的修改。 命令的; -
include-value:指定在连续查询结果中包含 value 和 oldValue。 可选true是默认的; -
cache-listening-policy:将缓存侦听策略指定为SINGLE或ALL. 可选,默认值为SINGLE. 每个侦听具有相同 cache-events 属性的相同缓存对象的 Hazelcast CQ 入站通道适配器都可以接收单个事件消息或所有事件消息。 如果是ALL,所有侦听具有相同 cache-events 属性的同一缓存对象的 Hazelcast CQ 入站通道适配器都将收到所有事件消息。 如果是SINGLE,他们将收到唯一的事件消息。
Hazelcast 群集监视器入站通道适配器
Hazelcast Cluster Monitor 支持侦听在集群上执行的修改。 Hazelcast Cluster Monitor Inbound Channel Adapter 是一个事件驱动的通道适配器,它监听相关的 Membership、Distributed Object、Migration、Lifecycle 和 Client 事件:
-
Java
-
XML
@Bean
public PollableChannel eventChannel() {
return new QueueChannel();
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastClusterMonitorMessageProducer hazelcastClusterMonitorMessageProducer() {
HazelcastClusterMonitorMessageProducer producer = new HazelcastClusterMonitorMessageProducer(hazelcastInstance());
producer.setOutputChannel(eventChannel());
producer.setMonitorEventTypes("DISTRIBUTED_OBJECT");
return producer;
}
<int:channel id="monitorChannel"/>
<int-hazelcast:cm-inbound-channel-adapter
channel="monitorChannel"
hazelcast-instance="instance"
monitor-types="MEMBERSHIP, DISTRIBUTED_OBJECT" />
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
它支持以下三个属性:
-
channel:指定将消息发送到的通道; -
hazelcast-instance:指定用于侦听集群事件的 Hazelcast 实例引用。 这是一个强制性属性; -
monitor-types:指定要侦听的监视器类型。 它是一个可选属性,具有MEMBERSHIP作为默认值。 支持的值包括MEMBERSHIP,DISTRIBUTED_OBJECT,MIGRATION,LIFECYCLE,CLIENT.
Hazelcast 分布式 SQL 入站通道适配器
Hazelcast 允许在分布式 map 上运行分布式查询。 Hazelcast 分布式 SQL 入站通道适配器是一个轮询入站通道适配器。 它运行定义的 distributed-sql 命令,并根据迭代类型返回结果。
-
Java
-
XML
@Bean
public PollableChannel dsDistributedMapChannel() {
return new QueueChannel();
}
@Bean
public IMap<Integer, String> dsDistributedMap() {
return hazelcastInstance().getMap("DS_Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
@InboundChannelAdapter(value = "dsDistributedMapChannel", poller = @Poller(maxMessagesPerPoll = "1"))
public HazelcastDistributedSQLMessageSource hazelcastDistributedSQLMessageSource() {
final HazelcastDistributedSQLMessageSource messageSource =
new HazelcastDistributedSQLMessageSource(dsDistributedMap(),
"name='TestName' AND surname='TestSurname'");
messageSource.setIterationType(DistributedSQLIterationType.ENTRY);
return messageSource;
}
<int:channel id="dsMapChannel"/>
<int-hazelcast:ds-inbound-channel-adapter
channel="dsMapChannel"
cache="dsMap"
iteration-type="ENTRY"
distributed-sql="active=false OR age >= 25 OR name = 'TestName'">
<int:poller fixed-delay="100"/>
</int-hazelcast:ds-inbound-channel-adapter>
<bean id="dsMap" factory-bean="instance" factory-method="getMap">
<constructor-arg value="dsMap"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
factory-method="newHazelcastInstance">
<constructor-arg>
<bean class="com.hazelcast.config.Config" />
</constructor-arg>
</bean>
它需要一个 Poller 并支持四个属性:
-
channel:指定消息发送到的通道。 这是一个强制性属性; -
cache:指定分布的IMap引用。 它是 mandatory 属性; -
iteration-type:指定结果类型。 分布式 SQL 可以在EntrySet,KeySet,LocalKeySet或Values. 它是一个可选属性,具有VALUE是默认值。 支持的值包括ENTRY, `KEY,LOCAL_KEY和VALUE; -
distributed-sql:指定 sql 语句的 where 子句。 这是一个强制性属性。
Hazelcast 出站通道适配器
Hazelcast 出站通道适配器侦听其定义的通道并将传入消息写入相关的分布式缓存。
它期望cache,cache-expression或HazelcastHeaders.CACHE_NAME对于分布式对象定义。
支持的分布式对象包括:IMap,MultiMap,ReplicatedMap,IList,ISet,IQueue和ITopic.
-
Java
-
XML
@Bean
public MessageChannel distributedMapChannel() {
return new DirectChannel();
}
@Bean
public IMap<Integer, String> distributedMap() {
return hzInstance().getMap("Distributed_Map");
}
@Bean
public HazelcastInstance hzInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
@ServiceActivator(inputChannel = "distributedMapChannel")
public HazelcastCacheWritingMessageHandler hazelcastCacheWritingMessageHandler() {
HazelcastCacheWritingMessageHandler handler = new HazelcastCacheWritingMessageHandler();
handler.setDistributedObject(distributedMap());
handler.setKeyExpression(new SpelExpressionParser().parseExpression("payload.id"));
handler.setExtractPayload(true);
return handler;
}
<int-hazelcast:outbound-channel-adapter channel="mapChannel"
cache-expression="headers['CACHE_HEADER']"
key-expression="payload.key"
extract-payload="true"/>
它需要以下属性:
-
channel:指定将消息发送到的通道; -
cache:指定分布式对象引用。 自选; -
cache-expression:通过 Spring 表达式语言 (SpEL) 指定分布式对象。 自选; -
key-expression:通过 Spring 表达式语言 (SpEL) 指定键值对的键。 可选且仅 for requiredIMap,MultiMap和ReplicatedMap分布式数据结构。 -
extract-payload:指定是发送整个消息还是仅发送有效负载。 Optional 属性替换为true是默认值。 如果为 true,则只会将有效负载写入分布式对象。 否则,将通过转换 message header 和 payload 来写入整个消息。
通过在 header 中设置分布式对象名称,可以通过同一通道将消息写入不同的分布式对象。
如果cache或cache-expressionattributes 未定义,则HazelcastHeaders.CACHE_NAME标头必须在请求中设置Message.
Hazelcast 领导人选举
如果需要领导者选举(例如,对于只有一个节点应接收消息的高可用性消息使用者),则基于 Hazelcast 的LeaderInitiator可用于:
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public LeaderInitiator initiator() {
return new LeaderInitiator(hazelcastInstance());
}
当节点被选为 leader 时,它将发送一个OnGrantedEvent分配给所有应用程序侦听器。
Hazelcast 消息存储
对于分布式消息传递状态管理,例如对于持久性QueueChannel或跟踪Aggregator消息组、HazelcastMessageStoreimplementation 是:
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public MessageGroupStore messageStore() {
return new HazelcastMessageStore(hazelcastInstance());
}
默认情况下,SPRING_INTEGRATION_MESSAGE_STORE IMap用于将消息和组存储为键/值。
任何自定义IMap可以提供给HazelcastMessageStore.
Hazelcast 元数据存储
一个ListenableMetadataStore可使用背衬 HazelcastIMap.
默认映射是使用名称创建的SPRING_INTEGRATION_METADATA_STORE可以自定义。
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public MetadataStore metadataStore() {
return new HazelcastMetadataStore(hazelcastInstance());
}
这HazelcastMetadataStore实现ListenableMetadataStore它允许您注册自己的MetadataStoreListener要通过addListener(MetadataStoreListener callback).
Hazelcast Lock 注册表
一个LockRegistry使用底层 Hazelcast 分布式ILock支持:
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance();
}
@Bean
public LockRegistry lockRegistry() {
return new HazelcastLockRegistry(hazelcastInstance());
}
当与共享的MessageGroupStore(例如Aggregatorstore management)、HazelcastLockRegistry可用于跨多个应用程序实例提供此功能,以便一次只有一个实例可以作该组。
对于所有分布式作,必须在HazelcastInstance. |
使用 Hazelcast 的消息通道
Hazelcast酒店IQueue和ITopic分布式对象本质上是消息传递原语,可以与 Spring 集成核心组件一起使用,而无需在此 Hazelcast 模块中进行额外的实现。
这QueueChannel可由任何java.util.Queue,包括提到的 Hazelcast 分布式IQueue:
@Bean
PollableChannel hazelcastQueueChannel(HazelcastInstance hazelcastInstance) {
return new QueueChannel(hazelcastInstance.getQueue("springIntegrationQueue"));
}
将此配置放在应用程序的 Hazelcast 集群中的多个节点上,将使QueueChannel作为分布式节点,并且只有一个节点将能够轮询单个Message从那IQueue.
这类似于PollableJmsChannel,PollableKafkaChannel或PollableAmqpChannel.
如果生产者端不是 Spring 集成应用程序,则无法配置QueueChannel,因此是普通的 HazelcastIQueueAPI 用于生成数据。
在这种情况下,QueueChannel方法在消费者端是错误的:必须使用 Inbound Channel Adapter 解决方案来代替:
@Bean
public IQueue<String> myStringHzQueue(HazelcastInstance hazelcastInstance) {
return hazelcastInstance.getQueue("springIntegrationQueue");
}
@Bean
@InboundChannelAdapter(channel = "stringValuesFromHzQueueChannel")
Supplier<String> fromHzIQueueSource(IQueue<String> myStringHzQueue) {
return myStringHzQueue::poll;
}
这ITopicHazelcast 中的 abstraction 具有与Topic在 JMS 中:所有订阅者都会收到已发布的消息。
搭配一双简单的MessageChannelbean 此机制作为开箱即用的功能受到支持:
@Bean
public ITopic<Message<?>> springIntegrationTopic(HazelcastInstance hazelcastInstance,
MessageChannel fromHazelcastTopicChannel) {
ITopic<Message<?>> topic = hazelcastInstance.getTopic("springIntegrationTopic");
topic.addMessageListener(m -> fromHazelcastTopicChannel.send(m.getMessageObject()));
return topic;
}
@Bean
public MessageChannel publishToHazelcastTopicChannel(ITopic<Message<?>> springIntegrationTopic) {
return new FixedSubscriberChannel(springIntegrationTopic::publish);
}
@Bean
public MessageChannel fromHazelcastTopicChannel() {
return new DirectChannel();
}
这FixedSubscriberChannel是 的优化变体DirectChannel,这需要MessageHandler初始化时。
由于MessageHandler是一个函数式接口,是handleMessagemethod 的 intent 中。
当消息发送到publishToHazelcastTopicChannel它刚刚发布到 Hazelcast 上ITopic.
这com.hazelcast.topic.MessageListener也是一个函数式接口,因此ITopic#addMessageListener可以提供。
因此,订阅fromHazelcastTopicChannel将消耗发送到上述ITopic.
一ExecutorChannel可随附IExecutorService.
例如,使用相应的配置,可以实现集群范围的单例:
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(
new Config()
.addExecutorConfig(new ExecutorConfig()
.setName("singletonExecutor")
.setPoolSize(1)));
}
@Bean
public MessageChannel hazelcastSingletonExecutorChannel(HazelcastInstance hazelcastInstance) {
return new ExecutorChannel(hazelcastInstance.getExecutorService("singletonExecutor"));
}