|
此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring Integration 6.4.0! |
Redis 支持
Spring Integration 2.1 引入了对 Redis 的支持:“一个开源的高级键值存储”。
这种支持以基于 Redis 的MessageStore以及 Redis 通过其PUBLISH,SUBSCRIBE和UNSUBSCRIBE命令。
您需要将此依赖项包含在您的项目中:
-
Maven
-
Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
<version>6.4.1-SNAPSHOT</version>
</dependency>
compile "org.springframework.integration:spring-integration-redis:6.4.1-SNAPSHOT"
您还需要包含 Redis 客户端依赖项,例如 Lettuce。
要下载、安装和运行 Redis,请参阅 Redis 文档。
连接到 Redis
要开始与 Redis 交互,您首先需要连接到它。
Spring 集成使用另一个 Spring 项目 Spring Data Redis 提供的支持,该项目提供了典型的 Spring 结构:ConnectionFactory和Template.
这些抽象简化了与多个 Redis 客户端 Java API 的集成。
目前,Spring Data Redis 支持 Jedis 和 Lettuce。
用RedisConnectionFactory
要连接到 Redis,您可以使用RedisConnectionFactory接口。
下面的清单显示了接口定义:
public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
/**
* Provides a suitable connection for interacting with Redis.
* @return connection for interacting with Redis.
*/
RedisConnection getConnection();
}
以下示例演示如何创建LettuceConnectionFactory在 Java 中:
LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();
以下示例演示如何创建LettuceConnectionFactory在 Spring 的 XML 配置中:
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
的RedisConnectionFactory提供一组属性,例如 Port 和 Host,您可以根据需要进行设置。
一旦您拥有RedisConnectionFactory中,您可以创建RedisTemplate并注入RedisConnectionFactory.
用RedisTemplate
与 Spring 中的其他模板类(例如JdbcTemplate和JmsTemplate) RedisTemplate是一个帮助程序类,可简化 Redis 数据访问代码。
有关RedisTemplate及其变体(例如StringRedisTemplate) 请参阅 Spring Data Redis 文档。
以下示例演示如何创建RedisTemplate在 Java 中:
RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);
以下示例演示如何创建RedisTemplate在 Spring 的 XML 配置中:
<bean id="redisTemplate"
class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
使用 Redis 进行消息收发
如简介中所述,Redis 通过其PUBLISH,SUBSCRIBE和UNSUBSCRIBE命令。
与 JMS 和 AMQP 一样, Spring 集成提供了消息通道和适配器,用于通过 Redis 发送和接收消息。
Redis 发布/订阅频道
与 JMS 类似,在某些情况下,生产者和使用者都应该成为同一应用程序的一部分,运行在同一个进程中。 您可以通过使用一对入站和出站通道适配器来实现此目的。 但是,与 Spring Integration 的 JMS 支持一样,有一种更简单的方法可以解决此用例。 您可以创建 publish-subscribe 通道,如下例所示:
<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>
一个publish-subscribe-channel的行为与正常<publish-subscribe-channel/>元素。
它可以被input-channel和output-channel属性。
区别在于,此通道由 Redis 主题名称提供支持:一个String值由topic-name属性。
但是,与 JMS 不同的是,此主题不必提前创建,甚至不必由 Redis 自动创建。
在 Redis 中,主题很简单String值,这些值的作用是 Address。
生产者和使用者可以使用相同的String值作为其主题名称。
对此通道的简单订阅意味着可以在生产终端节点和使用终端节点之间进行异步发布-订阅消息传递。
但是,与通过添加<queue/>元素中的元素<channel/>元素,则消息不会存储在内存中队列中。
相反,这些消息是通过 Redis 传递的,这让您能够依赖它对持久性和集群的支持,以及它与其他非 Java 平台的互作性。
Redis 入站通道适配器
Redis 入站通道适配器 (RedisInboundChannelAdapter)将传入的 Redis 消息调整为 Spring 消息,其方式与其他入站适配器相同。
它接收特定于平台的消息(在本例中为 Redis),并使用MessageConverter策略。
以下示例显示如何配置 Redis 入站通道适配器:
<int-redis:inbound-channel-adapter id="redisAdapter"
topics="thing1, thing2"
channel="receiveChannel"
error-channel="testErrorChannel"
message-converter="testConverter" />
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
前面的示例显示了 Redis 入站通道适配器的一个简单但完整的配置。
请注意,前面的配置依赖于熟悉的 Spring 自动发现某些 bean 的范例。
在这种情况下,redisConnectionFactory隐式注入到适配器中。
您可以使用connection-factory属性。
另请注意,前面的配置会向适配器注入自定义的MessageConverter.
该方法类似于 JMS,其中MessageConverter实例用于在 Redis 消息和 Spring 集成消息有效负载之间进行转换。
默认值为SimpleMessageConverter.
入站适配器可以订阅多个主题名称,因此topics属性。
从 3.0 版本开始,入站适配器除了现有的topics属性,现在具有topic-patterns属性。
此属性包含一组以逗号分隔的 Redis 主题模式。
有关 Redis 发布-订阅的更多信息,请参阅 Redis Pub/Sub。
入站适配器可以使用RedisSerializer反序列化 Redis 消息的正文。
这serializer属性的<int-redis:inbound-channel-adapter>可以设置为空字符串,这会导致null值RedisSerializer财产。
在这种情况下,原始byte[]Redis 消息的正文作为消息负载提供。
从 5.0 版本开始,您可以提供Executor实例添加到入站适配器中,方法是使用task-executor属性的<int-redis:inbound-channel-adapter>.
此外,接收到的 Spring 集成消息现在具有RedisHeaders.MESSAGE_SOURCE标头来指示已发布消息的来源:topic 或 pattern。
您可以将此下游用于路由逻辑。
Redis 出站通道适配器
Redis 出站通道适配器以与其他出站适配器相同的方式将传出的 Spring Integration 消息改编为 Redis 消息。
它接收 Spring 集成消息,并使用MessageConverter策略。
以下示例显示如何配置 Redis 出站通道适配器:
<int-redis:outbound-channel-adapter id="outboundAdapter"
channel="sendChannel"
topic="thing1"
message-converter="testConverter"/>
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379"/>
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
该配置与 Redis 入站通道适配器类似。
适配器隐式注入了RedisConnectionFactory,它由redisConnectionFactory作为其 bean 名称。
此示例还包括可选的(和自定义的)MessageConverter(testConverterbean) 的 Bean 的
从 Spring Integration 3.0 开始,<int-redis:outbound-channel-adapter>提供了topic属性:您可以使用topic-expression属性来确定运行时消息的 Redis 主题。
这些属性是互斥的。
Redis 队列入站通道适配器
Spring Integration 3.0 引入了一个队列入站通道适配器,用于从 Redis 列表中“弹出”消息。 默认情况下,它使用 “right pop”,但你可以将其配置为使用 “left pop” 来代替。 适配器是消息驱动的。 它使用内部侦听器线程,不使用 Poller。
下面的清单显示了 的所有可用属性queue-inbound-channel-adapter:
<int-redis:queue-inbound-channel-adapter id="" (1)
channel="" (2)
auto-startup="" (3)
phase="" (4)
connection-factory="" (5)
queue="" (6)
error-channel="" (7)
serializer="" (8)
receive-timeout="" (9)
recovery-interval="" (10)
expect-message="" (11)
task-executor="" (12)
right-pop=""/> (13)
| 1 | 组件 Bean 名称。
如果您未提供channel属性、DirectChannel在应用程序上下文中创建并注册id属性作为 bean 名称。
在这种情况下,端点本身是使用 Bean 名称注册的id加.adapter.
(如果 bean 名称为thing1,则终端节点将注册为thing1.adapter.) |
| 2 | 这MessageChannel要发送到的对象Message实例。 |
| 3 | 一个SmartLifecycle属性来指定此端点是否应在应用程序上下文启动后自动启动。
它默认为true. |
| 4 | 一个SmartLifecycle属性指定此终端节点的启动阶段。
它默认为0. |
| 5 | 对RedisConnectionFactory豆。
它默认为redisConnectionFactory. |
| 6 | Redis 列表的名称,在其上执行基于队列的“pop”作以获取 Redis 消息。 |
| 7 | 这MessageChannel要发送到的对象ErrorMessage实例。
默认情况下,底层的MessagePublishingErrorHandler使用默认的errorChannel从应用程序上下文。 |
| 8 | 这RedisSerializerbean 引用。
它可以是一个空字符串,这意味着 'no serializer'。
在这种情况下,原始byte[]从入站 Redis 消息发送到channel作为Message有效载荷。
默认情况下,它是一个JdkSerializationRedisSerializer. |
| 9 | “pop”作等待队列中的 Redis 消息的超时时间(以毫秒为单位)。 默认值为 1 秒。 |
| 10 | 在重新启动侦听器任务之前,侦听器任务在 'pop'作出现异常后应休眠的时间(以毫秒为单位)。 |
| 11 | 指定此终端节点是否期望 Redis 队列中的数据包含整个Message实例。
如果此属性设置为true这serializer不能为空字符串,因为消息需要某种形式的反序列化(默认为 JDK 序列化)。
其默认值为false. |
| 12 | 对 Spring 的引用TaskExecutor(或标准 JDK 1.5+Executor) bean 的 Bean 中。
它用于底层侦听任务。
它默认为SimpleAsyncTaskExecutor. |
| 13 | 指定此端点是否应使用 “right pop” (当true) 或 “left pop” (当false) 从 Redis 列表中读取消息。
如果true,Redis List 充当FIFOqueue 与默认 Redis 队列出站通道适配器一起使用时。
将其设置为false与使用 “right push” 写入列表的软件一起使用,或实现类似堆栈的消息顺序。
其默认值为true.
从 4.3 版本开始。 |
Redis 队列出站通道适配器
Spring Integration 3.0 引入了一个队列出站通道适配器,用于从 Spring Integration 消息“推送”到 Redis 列表。
默认情况下,它使用 “left push”,但您可以将其配置为使用 “right push” 来代替。
以下清单显示了 Redis 的所有可用属性queue-outbound-channel-adapter:
<int-redis:queue-outbound-channel-adapter id="" (1)
channel="" (2)
connection-factory="" (3)
queue="" (4)
queue-expression="" (5)
serializer="" (6)
extract-payload="" (7)
left-push=""/> (8)
| 1 | 组件 Bean 名称。
如果您未提供channel属性、DirectChannel在应用程序上下文中创建并注册id属性作为 bean 名称。
在这种情况下,端点使用 Bean 名称id加.adapter.
(如果 bean 名称为thing1,则终端节点将注册为thing1.adapter.) |
| 2 | 这MessageChannel此端点从中接收Message实例。 |
| 3 | 对RedisConnectionFactory豆。
它默认为redisConnectionFactory. |
| 4 | Redis 列表的名称,在其上执行基于队列的“推送”作以发送 Redis 消息。
此属性与queue-expression. |
| 5 | 一个 SPELExpression以确定 Redis 列表的名称。
它使用传入的Message在运行时作为#root变量。
此属性与queue. |
| 6 | 一个RedisSerializerbean 引用。
它默认为JdkSerializationRedisSerializer.
但是,对于Stringpayloads 中,一个StringRedisSerializer,如果serializer未提供 reference。 |
| 7 | 指定此端点是应仅发送有效负载还是发送整个Message添加到 Redis 队列中。
它默认为true. |
| 8 | 指定此端点是否应使用 “left push”(当true) 或 “right push” (当false) 将消息写入 Redis 列表。
如果true,Redis 列表充当FIFOqueue 与默认 Redis 队列入站通道适配器一起使用时。
将其设置为false与使用 “left pop” 从列表中读取的软件一起使用,或实现类似堆栈的消息顺序。
它默认为true.
从 4.3 版本开始。 |
Redis 应用程序事件
从 Spring Integration 3.0 开始,Redis 模块提供了IntegrationEvent,而org.springframework.context.ApplicationEvent.
这RedisExceptionEvent封装 Redis作中的异常(终端节点是事件的“源”)。
例如,<int-redis:queue-inbound-channel-adapter/>在从BoundListOperations.rightPop操作。
异常可以是任何泛型org.springframework.data.redis.RedisSystemException或org.springframework.data.redis.RedisConnectionFailureException.
使用<int-event:inbound-channel-adapter/>可用于确定后台 Redis 任务的问题和执行管理作。
Redis 消息存储
如 Enterprise Integration Patterns (EIP) 一书中所述,消息存储允许您保留消息。
当考虑可靠性时,当处理具有缓冲消息功能的组件(聚合器、resequencer 等)时,这可能很有用。
在 Spring 集成中,MessageStore策略还为 Claim Check 模式提供了基础,EIP 中对此也有介绍。
Spring 集成的 Redis 模块提供了RedisMessageStore.
以下示例显示了如何将其与聚合器一起使用:
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="redisMessageStore"/>
前面的示例是一个 Bean 配置,它需要一个RedisConnectionFactory作为构造函数参数。
默认情况下,RedisMessageStore使用 Java 序列化来序列化消息。
但是,如果您想使用不同的序列化技术(例如 JSON),则可以通过将valueSerializer属性的RedisMessageStore.
从版本 4.3.10 开始,框架为Messageinstances 和MessageHeadersinstances (实例) —MessageJacksonDeserializer和MessageHeadersJacksonSerializer分别。
它们必须配置为SimpleModule选项ObjectMapper.
此外,您应该设置enableDefaultTyping在ObjectMapper为每个序列化的复杂对象添加类型信息(如果您信任源)。
然后在反序列化期间使用该类型信息。
该框架提供了一个名为JacksonJsonUtils.messagingAwareMapper(),它已经随前面提到的所有属性和序列化程序一起提供。
此实用程序方法附带了trustedPackages参数来限制 Java 包进行反序列化以避免安全漏洞。
默认的受信任软件包:java.util,java.lang,org.springframework.messaging.support,org.springframework.integration.support,org.springframework.integration.message,org.springframework.integration.store.
要在RedisMessageStore,则必须以类似于以下示例的方式对其进行配置:
RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);
从版本 4.3.12 开始,RedisMessageStore支持prefix选项以允许区分同一 Redis 服务器上的 store 实例。
Redis 通道消息存储
这RedisMessageStore 前面显示的将每个组维护为单个键(组 ID)下的值。
虽然您可以使用它来支持QueueChannel为了实现持久性,会使用专门的RedisChannelMessageStore为此目的而提供(从版本 4.0 开始)。
此 store 使用LIST对于每个通道,LPUSH发送消息时,以及RPOP接收消息时。
默认情况下,此存储区还使用 JDK 序列化,但您可以修改值序列化器,如前所述。
我们建议使用此 store 支持通道,而不是使用常规的RedisMessageStore.
以下示例定义了一个 Redis 消息存储,并在具有队列的通道中使用它:
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="redisMessageStore"/>
<int:channel>
用于存储数据的键具有以下格式:<storeBeanName>:<channelId>(在前面的示例中,redisMessageStore:somePersistentQueueChannel).
此外,一个子类RedisChannelPriorityMessageStore。
当您将其与QueueChannel,则按 (FIFO) 优先级顺序接收消息。
它使用标准IntegrationMessageHeaderAccessor.PRIORITY标头并支持优先级值 (0 - 9).
具有其他优先级的消息(和没有优先级的消息)将按 FIFO 顺序检索,位于具有优先级的任何消息之后。
这些商店仅实现BasicMessageGroupStore并且不要实现MessageGroupStore.
它们只能用于支持某个QueueChannel. |
Redis 元数据存储
Spring Integration 3.0 引入了一个新的基于 Redis 的MetadataStore(请参阅 元数据存储) 实现。
您可以使用RedisMetadataStore要维护MetadataStore跨应用程序重启。
您可以使用这个新的MetadataStore使用适配器实现,例如:
要指示这些适配器使用新的RedisMetadataStore中,声明一个名为metadataStore.
Feed 入站通道适配器和 Feed 入站通道适配器都会自动获取并使用声明的RedisMetadataStore.
下面的示例展示了如何声明这样的 bean:
<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
<constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
这RedisMetadataStore由RedisProperties.
与它的交互使用BoundHashOperations,这反过来又需要一个key对于整个Properties商店。
在MetadataStore这key当多个应用程序使用同一个 Redis 服务器时,在分布式环境中扮演区域的角色非常有用。
默认情况下,此key的值为MetaData.
从版本 4.0 开始,此 store 实现ConcurrentMetadataStore,使其在多个应用程序实例之间可靠地共享,其中只允许一个实例存储或修改键的值。
您不能使用RedisMetadataStore.replace()(例如,在AbstractPersistentAcceptOnceFileListFilter),由于WATCH当前不支持原子性的命令。 |
Redis Store 入站通道适配器
Redis 存储入站通道适配器是一个轮询使用者,它从 Redis 集合中读取数据并将其作为Message有效载荷。
以下示例显示如何配置 Redis store 入站通道适配器:
<int-redis:store-inbound-channel-adapter id="listAdapter"
connection-factory="redisConnectionFactory"
key="myCollection"
channel="redisChannel"
collection-type="LIST" >
<int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>
前面的示例演示如何使用store-inbound-channel-adapter元素,为各种属性提供值,例如:
-
key或key-expression:正在使用的集合的键的名称。 -
collection-type:此适配器支持的集合类型的枚举。 支持的集合包括LIST,SET,ZSET,PROPERTIES和MAP. -
connection-factory:对o.s.data.redis.connection.RedisConnectionFactory. -
redis-template:对o.s.data.redis.core.RedisTemplate. -
所有其他入站适配器中通用的其他属性(例如 'channel')。
不能同时设置redis-template和connection-factory. |
|
默认情况下,适配器使用
这 |
因为它具有key,前面的示例相对简单且静态。
有时,您可能需要根据某些条件在运行时更改 key 的值。
为此,请使用key-expression相反,其中提供的表达式可以是任何有效的 SPEL 表达式。
此外,您可能希望对从 Redis 集合中读取的成功处理的数据执行一些后处理。
例如,您可能希望在处理完值后移动或删除该值。
你可以通过使用 Spring Integration 2.2 中添加的事务同步功能来实现这一点。
以下示例使用key-expression和事务同步:
<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
connection-factory="redisConnectionFactory"
key-expression="'presidents'"
channel="otherRedisChannel"
auto-startup="false"
collection-type="ZSET">
<int:poller fixed-rate="1000" max-messages-per-poll="2">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-redis:store-inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
您可以使用transactional元素。
此元素可以引用真实的事务管理器(例如,如果流的某个其他部分调用 JDBC)。
如果您没有 “真实” 交易,您可以使用o.s.i.transaction.PseudoTransactionManager,它是 Spring 的PlatformTransactionManager并允许在没有实际事务时使用 Redis 适配器的事务同步功能。
| 这不会使 Redis 活动本身成为事务性活动。 它允许在成功 (提交) 之前或之后或失败 (回滚) 执行作同步。 |
一旦你的 Poller 是事务性的,你就可以设置o.s.i.transaction.TransactionSynchronizationFactory在transactional元素。TransactionSynchronizationFactory创建TransactionSynchronization.
为方便起见,我们公开了默认的基于 SPEL 的TransactionSynchronizationFactory,它允许您配置 SPEL 表达式,其执行与事务协调(同步)。
支持 before-commit、after-commit 和 after-rollback 的表达式,以及发送评估结果(如果有)的通道(每种事件一个)。
对于每个子元素,您可以指定expression和channel属性。
如果只有channel属性,则收到的消息将作为特定同步方案的一部分发送到该位置。
如果只有expression属性,并且表达式的结果是非 null 值,则会生成一条消息,其中包含有效负载的结果,并将其发送到默认通道 (NullChannel)并显示在日志中(位于DEBUG级别)。
如果您希望评估结果转到特定频道,请添加channel属性。
如果表达式的结果是 null 或 void,则不会生成任何消息。
这RedisStoreMessageSource添加store属性替换为RedisStore绑定到事务的实例IntegrationResourceHolder,可以从TransactionSynchronizationProcessor实现。
有关事务同步的更多信息,请参阅事务同步。
RedisStore 出站通道适配器
RedisStore 出站通道适配器允许您将消息有效负载写入 Redis 集合,如下例所示:
<int-redis:store-outbound-channel-adapter id="redisListAdapter"
collection-type="LIST"
channel="requestChannel"
key="myCollection" />
前面的配置:Redis 使用store-inbound-channel-adapter元素。
它为各种属性提供值,例如:
-
key或key-expression:正在使用的集合的键的名称。 -
extract-payload-elements:如果设置为true(默认值),有效负载是“多值”对象的实例(即Collection或Map),则使用 “addAll” 和 “putAll” 语义进行存储。 否则,如果设置为false,则无论其类型如何,有效负载都将存储为单个条目。 如果有效负载不是“多值”对象的实例,则忽略此属性的值,并且有效负载始终存储为单个条目。 -
collection-type:枚举Collection此适配器支持的类型。 支持的集合包括LIST,SET,ZSET,PROPERTIES和MAP. -
map-key-expression:返回要存储的条目的键名称的 SPEL 表达式。 它仅适用于collection-type是MAP或PROPERTIES并且 'extract-payload-elements' 为 false。 -
connection-factory:对o.s.data.redis.connection.RedisConnectionFactory. -
redis-template:对o.s.data.redis.core.RedisTemplate. -
所有其他入站适配器中通用的其他属性(例如 'channel')。
不能同时设置redis-template和connection-factory. |
默认情况下,适配器使用StringRedisTemplate.
这将使用StringRedisSerializer键、值、哈希键和哈希值的实例。
但是,如果extract-payload-elements设置为false一个RedisTemplate该StringRedisSerializerkeys 和 hash keys 的实例,以及JdkSerializationRedisSerializer将使用 instances s for values 和 hash values。
使用 JDK 序列化程序时,请务必了解 Java 序列化用于所有值,而不管该值是否实际上是集合。
如果需要对值的序列化进行更多控制,请考虑提供自己的RedisTemplate而不是依赖这些默认值。 |
因为它具有key和其他 attribute 中,前面的示例相对简单且静态。
有时,您可能需要在运行时根据某些条件动态更改值。
为此,请使用他们的-expression等效项 (key-expression,map-key-expression,依此类推),其中提供的表达式可以是任何有效的 SPEL 表达式。
Redis 出站命令网关
Spring 集成 4.0 引入了 Redis 命令网关,允许您使用泛型RedisConnection#execute方法。
以下清单显示了 Redis 出站网关的可用属性:
<int-redis:outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
redis-template="" (6)
arguments-serializer="" (7)
command-expression="" (8)
argument-expressions="" (9)
use-command-variable="" (10)
arguments-strategy="" /> (11)
| 1 | 这MessageChannel此端点从中接收Message实例。 |
| 2 | 这MessageChannel此端点发送回复的位置Message实例。 |
| 3 | 指定此出站网关是否必须返回非 null 值。
它默认为true.
一个ReplyRequiredException在 Redis 返回null价值。 |
| 4 | 在发送回复消息之前等待的超时 (以毫秒为单位)。 它通常应用于基于队列的有限回复通道。 |
| 5 | 对RedisConnectionFactory豆。
它默认为redisConnectionFactory.
它与 'redis-template' 属性互斥。 |
| 6 | 对RedisTemplate豆。
它与 'connection-factory' 属性互斥。 |
| 7 | 对org.springframework.data.redis.serializer.RedisSerializer.
如有必要,它用于将每个 command 参数序列化为 byte[]。 |
| 8 | 返回命令键的 SpEL 表达式。
它默认为redis_command消息标头。
它不能计算为null. |
| 9 | 以逗号分隔的 SpEL 表达式,计算为命令参数。
与arguments-strategy属性。
如果您两者都未提供 attribute,则payload用作命令参数。
参数表达式的计算结果可以为 'null' 以支持可变数量的参数。 |
| 10 | 一个boolean标志,以指定是否将评估的 Redis 命令字符串作为#cmd变量o.s.i.redis.outbound.ExpressionArgumentsStrategy什么时候argument-expressions已配置。
否则,将忽略此属性。 |
| 11 | 对o.s.i.redis.outbound.ArgumentsStrategy.
它与argument-expressions属性。
如果您两者都未提供 attribute,则payload用作命令参数。 |
您可以使用<int-redis:outbound-gateway>作为执行任何所需 Redis作的通用组件。
以下示例显示如何从 Redis 原子序数获取递增的值:
<int-redis:outbound-gateway request-channel="requestChannel"
reply-channel="replyChannel"
command-expression="'INCR'"/>
这Messagepayload 的名称应为redisCounter,可由org.springframework.data.redis.support.atomic.RedisAtomicIntegerbean 定义。
这RedisConnection#executemethod 具有泛型Object作为其返回类型。
实际结果取决于命令类型。
例如MGET返回List<byte[]>.
有关命令、其参数和结果类型的更多信息,请参阅 Redis 规范。
Redis 队列出站网关
Spring 集成引入了 Redis 队列出站网关来执行请求和回复场景。
它推动对话UUID到提供的queue,则使用UUID作为其 key 添加到 Redis 列表,并等待 key 为UUID加.reply.
每次交互使用不同的 UUID。
以下清单显示了 Redis 出站网关的可用属性:
<int-redis:queue-outbound-gateway
request-channel="" (1)
reply-channel="" (2)
requires-reply="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
extract-payload=""/> (9)
| 1 | 这MessageChannel此端点从中接收Message实例。 |
| 2 | 这MessageChannel此端点发送回复的位置Message实例。 |
| 3 | 指定此出站网关是否必须返回非 null 值。
该值为false默认情况下。
否则,一个ReplyRequiredException在 Redis 返回null价值。 |
| 4 | 在发送回复消息之前等待的超时 (以毫秒为单位)。 它通常应用于基于队列的有限回复通道。 |
| 5 | 对RedisConnectionFactory豆。
它默认为redisConnectionFactory.
它与 'redis-template' 属性互斥。 |
| 6 | 出站网关向其发送会话的 Redis 列表的名称UUID. |
| 7 | 注册多个网关时此出站网关的顺序。 |
| 8 | 这RedisSerializerbean 引用。
它可以是一个空字符串,这意味着 “no serializer”。
在这种情况下,原始byte[]从入站 Redis 消息发送到channel作为Message有效载荷。
默认情况下,它是一个JdkSerializationRedisSerializer. |
| 9 | 指定此终端节点是否期望 Redis 队列中的数据包含整个Message实例。
如果此属性设置为true这serializer不能为空字符串,因为消息需要某种形式的反序列化(默认为 JDK 序列化)。 |
Redis 队列入站网关
Spring Integration 4.1 引入了 Redis 队列入站网关来执行请求和回复场景。
它弹出一个对话UUID从提供的queue,则使用UUID作为其 key,并将回复推送到 Redis 列表,并使用 key 为UUID加.reply.
以下清单显示了 Redis 队列入站网关的可用属性:
<int-redis:queue-inbound-gateway
request-channel="" (1)
reply-channel="" (2)
executor="" (3)
reply-timeout="" (4)
connection-factory="" (5)
queue="" (6)
order="" (7)
serializer="" (8)
receive-timeout="" (9)
expect-message="" (10)
recovery-interval=""/> (11)
| 1 | 这MessageChannel此端点将Message从 Redis 数据创建的实例。 |
| 2 | 这MessageChannel此终端节点等待回复的位置Message实例。
可选 - 的replyChannel标头仍在使用中。 |
| 3 | 对 Spring 的引用TaskExecutor(或标准 JDKExecutor) bean 的 Bean 中。
它用于底层侦听任务。
它默认为SimpleAsyncTaskExecutor. |
| 4 | 在发送回复消息之前等待的超时 (以毫秒为单位)。 它通常应用于基于队列的有限回复通道。 |
| 5 | 对RedisConnectionFactory豆。
它默认为redisConnectionFactory.
它与 'redis-template' 属性互斥。 |
| 6 | 对话的 Redis 列表的名称UUID. |
| 7 | 注册多个网关时此入站网关的顺序。 |
| 8 | 这RedisSerializerbean 引用。
它可以是一个空字符串,这意味着 “no serializer”。
在这种情况下,原始byte[]从入站 Redis 消息发送到channel作为Message有效载荷。
它默认为JdkSerializationRedisSerializer.
(请注意,在 4.3 版本之前的版本中,它是一个StringRedisSerializer默认情况下。
要恢复该行为,请提供对StringRedisSerializer). |
| 9 | 等待接收消息的超时 (以毫秒为单位)。 它通常应用于基于队列的有限请求通道。 |
| 10 | 指定此终端节点是否期望 Redis 队列中的数据包含整个Message实例。
如果此属性设置为true这serializer不能为空字符串,因为消息需要某种形式的反序列化(默认为 JDK 序列化)。 |
| 11 | 在重新启动侦听器任务之前,侦听器任务在 “right pop”作出现异常后应休眠的时间(以毫秒为单位)。 |
Redis 流出站频道适配器
Spring 集成 5.4 引入了反应式 Redis 流出站通道适配器,用于将 Message 有效负载写入 Redis 流。
出站通道适配器使用ReactiveStreamOperations.add(…)要添加Record到溪流中。
以下示例显示了如何使用 Redis Stream Outbound Channel Adapter 的 Java 配置和 Service 类。
@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); (1)
reactiveStreamMessageHandler.setSerializationContext(serializationContext); (2)
reactiveStreamMessageHandler.setHashMapper(hashMapper); (3)
reactiveStreamMessageHandler.setExtractPayload(true); (4)
return reactiveStreamMessageHandler;
}
| 1 | 构造ReactiveRedisStreamMessageHandler用ReactiveRedisConnectionFactory和 stream name 添加记录。
另一个构造函数变体基于 SpEL 表达式,用于根据请求消息评估流键。 |
| 2 | 设置RedisSerializationContext用于在添加到流之前序列化记录键和值。 |
| 3 | 设置HashMapper它提供 Java 类型和 Redis 哈希/映射之间的协定。 |
| 4 | 如果为 'true',则通道适配器将从请求消息中提取要添加的流记录的有效负载。
或者使用整个消息作为值。
它默认为true. |
Redis Stream 入站频道适配器
Spring 集成 5.4 引入了反应式流入站通道适配器,用于从 Redis 流中读取消息。
入站通道适配器使用StreamReceiver.receive(…)或StreamReceiver.receiveAutoAck()根据自动确认标志从 Redis 流中读取记录。
以下示例显示如何对 Redis Stream Inbound Channel Adapter 使用 Java 配置。
@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); (1)
messageProducer.setStreamReceiverOptions( (2)
StreamReceiver.StreamReceiverOptions.builder()
.pollTimeout(Duration.ofMillis(100))
.build());
messageProducer.setAutoStartup(true); (3)
messageProducer.setAutoAck(false); (4)
messageProducer.setCreateConsumerGroup(true); (5)
messageProducer.setConsumerGroup("my-group"); (6)
messageProducer.setConsumerName("my-consumer"); (7)
messageProducer.setOutputChannel(fromRedisStreamChannel); (8)
messageProducer.setReadOffset(ReadOffset.latest()); (9)
messageProducer.extractPayload(true); (10)
return messageProducer;
}
| 1 | 构造ReactiveRedisStreamMessageProducer用ReactiveRedisConnectionFactory和 stream key 读取记录。 |
| 2 | 一个StreamReceiver.StreamReceiverOptions使用反应式基础设施使用 Redis 流。 |
| 3 | 一个SmartLifecycle属性来指定此端点是否应在应用程序上下文启动后自动启动。
它默认为true.
如果false,RedisStreamMessageProducer应手动启动messageProducer.start(). |
| 4 | 如果false,则不会自动确认收到的消息。
消息的确认将推迟到客户端使用消息。
它默认为true. |
| 5 | 如果true,将创建一个 Consumer Group。
在创建消费组时,也会创建 stream(如果尚不存在)。
Consumer Group 跟踪消息送达并区分 Consumer
它默认为false. |
| 6 | 设置 Consumer Group name。 它默认为定义的 bean 名称。 |
| 7 | 设置 Consumer name。
将消息读取为my-consumer发件人组my-group. |
| 8 | 从此终端节点向其发送消息的消息通道。 |
| 9 | 定义要读取消息的偏移量。
它默认为ReadOffset.latest(). |
| 10 | 如果为 'true',则通道适配器将从Record.
否则,整个Record用作有效负载。
它默认为true. |
如果autoAck设置为false这Record在 Redis Stream 中,Redis 驱动程序不会自动确认,而是IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACKheader 添加到消息中,以使用SimpleAcknowledgmentinstance 作为值。
目标集成流责任将其acknowledge()callback 每当基于此类记录对消息执行业务逻辑时。
即使在反序列化期间发生异常,也需要类似的逻辑,并且errorChannel已配置。
因此,目标错误处理程序必须决定 ack 或 nack 此类失败的消息。
与IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK这ReactiveRedisStreamMessageProducer此外,将这些标头填充到消息中以生成:RedisHeaders.STREAM_KEY,RedisHeaders.STREAM_MESSAGE_ID,RedisHeaders.CONSUMER_GROUP和RedisHeaders.CONSUMER.
从版本 5.5 开始,您可以配置StreamReceiver.StreamReceiverOptionsBuilder选项显式地放在ReactiveRedisStreamMessageProducer,包括新引入的onErrorResume函数,如果 Redis Stream 使用者在发生反序列化错误时应继续轮询,则这是必需的。
default 函数向 error 通道(如果提供)发送一条消息,并可能确认失败的消息,如上所述。
所有这些StreamReceiver.StreamReceiverOptionsBuilder与外部提供的StreamReceiver.StreamReceiverOptions.
Redis Lock 注册表
Spring Integration 4.0 引入了RedisLockRegistry.
某些组件(例如,aggregator 和 resequencer)使用从LockRegistry实例来确保一次只有一个线程作一个组。
这DefaultLockRegistry在单个组件中执行此功能。
现在,您可以在这些组件上配置外部锁注册表。
当您将其与共享的MessageGroupStore中,您可以使用RedisLockRegistry跨多个应用程序实例提供此功能,以便一次只有一个实例可以作组。
当本地线程释放锁时,另一个本地线程通常可以立即获取该锁。 如果锁由使用其他注册表实例的线程释放,则可能需要长达 100 毫秒的时间才能获取锁。
为避免“挂起”锁(当服务器发生故障时),此注册表中的锁将在默认 60 秒后过期,但您可以在注册表上配置此值。 锁的持有时间通常要短得多。
| 由于密钥可能会过期,因此尝试解锁过期的锁会导致引发异常。 但是,受此类锁定保护的资源可能已泄露,因此应将此类异常视为严重异常。 您应该将过期时间设置得足够大,以防止出现这种情况,但要设置得足够低,以便在服务器发生故障后,可以在合理的时间内恢复锁定。 |
从版本 5.0 开始,RedisLockRegistry实现ExpirableLockRegistry,这将删除上次获取的锁定ageago 和当前未锁定的
从版本 5.5.6 开始,RedisLockRegistry支持自动清理 redisLocks 的缓存RedisLockRegistry.locks通过RedisLockRegistry.setCacheCapacity().
有关更多信息,请参阅其 JavaDocs。
从版本 5.5.13 开始,RedisLockRegistry暴露一个setRedisLockType(RedisLockType)选项来确定应该在哪种模式下进行 Redis 锁获取:
-
RedisLockType.SPIN_LOCK- 通过周期性循环(100ms)来获取锁,检查是否可以获取锁。 违约。 -
RedisLockType.PUB_SUB_LOCK- 锁由 redis 发布-订阅获取。
发布-订阅是首选模式 - 客户端 Redis 服务器之间的网络颤动更少,性能更高 - 当订阅在其他进程中收到解锁通知时,会立即获取锁。 但是,Redis 不支持主/副本连接中的发布-订阅(例如在 AWS ElastiCache 环境中),因此选择忙旋转模式作为默认模式,以使注册表在任何环境中工作。
从版本 6.4 开始,而不是抛出IllegalStateException这RedisLockRegistry.RedisLock.unlock()方法 throwConcurrentModificationException如果锁的所有权已过期。
从版本 6.4 开始,RedisLockRegistry.setRenewalTaskScheduler()用于配置定期更新锁的计划程序。
设置后,锁将每1/3成功获取锁后的过期时间,直到解锁或移除 Redis 密钥。