Redis 支持
Spring Integration 2.1 引入了对 Redis 的支持:这是一个“开源的高级键值存储”。该支持以基于 Redis 的 MessageStore 以及发布-订阅消息适配器的形式提供,这些适配器通过 Redis 的 PUBLISH、SUBSCRIBE 和 UNSUBSCRIBE 命令实现。
此依赖项为项目所需:
- Maven
- Gradle
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
<version>7.0.2</version>
</dependency>
compile "org.springframework.integration:spring-integration-redis:7.0.2"
必须包含 Redis 客户端依赖,例如 Lettuce。
要下载、安装和运行 Redis,请参阅 Redis 文档。
连接到 Redis
要开始与 Redis 交互,首先必须建立连接。Spring Integration 使用了另一个 Spring 项目 Spring Data Redis 提供的支持,该项目提供了典型的 Spring 构造:ConnectionFactory 和 Template。这些抽象简化了与多种 Redis 客户端 Java API 的集成。目前,Spring Data Redis 支持 Jedis 和 Lettuce。
使用 RedisConnectionFactory
Spring Data Redis 中的 RedisConnectionFactory 是用于管理 Redis 连接的高级抽象。以下清单展示了其接口定义:
public interface RedisConnectionFactory extends PersistenceExceptionTranslator {
/**
* Provides a suitable connection for interacting with Redis.
* @return connection for interacting with Redis.
*/
RedisConnection getConnection();
}
以下示例展示了如何在 Java 中创建 LettuceConnectionFactory:
LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();
以下示例展示了如何在 Spring 的 XML 配置中创建 LettuceConnectionFactory:
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
RedisConnectionFactory 的实现提供了一组属性,例如端口和主机。一旦 RedisConnectionFactory 的实例存在,就可以创建 RedisTemplate。
使用 RedisTemplate
与其他Spring模板类(如 JdbcTemplate 和 JmsTemplate)一样,RedisTemplate 是一个辅助类,用于简化Redis数据访问代码。有关 RedisTemplate 及其变体(如 StringRedisTemplate)的更多信息,请参阅 Spring Data Redis 文档。
以下示例展示了如何在 Java 中创建 RedisTemplate 实例:
RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);
以下示例展示了如何在 Spring 的 XML 配置中创建 RedisTemplate 实例:
<bean id="redisTemplate"
class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
使用 Redis 进行消息传递
如引言所述,Redis通过其PUBLISH、SUBSCRIBE和UNSUBSCRIBE命令提供发布-订阅消息传递支持。与 JMS 和 AMQP 类似,Spring Integration 提供了消息通道和适配器,用于通过 Redis 发送和接收消息。
Redis 发布/订阅频道
与JMS类似,在某些情况下,生产者和消费者可能属于同一应用程序,并在同一进程中运行。这可以通过一对入站和出站通道适配器来实现。然而,与Spring Integration的JMS支持一样,有一种更简单的方法来处理这种用例。相反,可以使用发布-订阅通道,如下例所示:
<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>
publish-subscribe-channel 的行为与主 Spring Integration 命名空间中的常规 <publish-subscribe-channel/> 元素非常相似。它可以被任何端点的 input-channel 和 output-channel 属性引用。不同之处在于,该通道由 Redis 主题名称支持:即由 topic-name 属性指定的 String 值。然而,与 JMS 不同,此主题无需提前创建,甚至无需由 Redis 自动创建。在 Redis 中,主题是简单的 String 值,扮演地址的角色。生产者和消费者可以通过使用相同的 String 值作为其主题名称进行通信。对此通道的简单订阅意味着生产端点和消费端点之间可以实现异步发布-订阅消息传递。然而,与通过在简单的 Spring Integration <channel/> 元素内添加 <queue/> 元素创建的异步消息通道不同,消息不会存储在内存队列中。相反,这些消息通过 Redis 传递,这使得我们可以依赖其对持久化和集群的支持,以及与其他非 Java 平台的互操作性。
Redis 入站通道适配器
Redis入站通道适配器(RedisInboundChannelAdapter)与其他入站适配器一样,将传入的Redis消息适配为Spring消息。它接收平台特定的消息(此处为Redis消息),并通过使用MessageConverter策略将其转换为Spring消息。以下示例展示了如何配置Redis入站通道适配器:
<int-redis:inbound-channel-adapter id="redisAdapter"
topics="thing1, thing2"
channel="receiveChannel"
error-channel="testErrorChannel"
message-converter="testConverter" />
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
前面的示例展示了一个简单但完整的Redis入站通道适配器配置。请注意,该配置依赖于Spring框架中自动发现特定bean的常见模式。在这种情况下,redisConnectionFactory 会被隐式注入到适配器中。或者,也可以通过 connection-factory 属性注入自定义的 RedisConnectionFactory。
此外,请注意,上述配置通过自定义的 MessageConverter 注入了适配器。这种方法类似于 JMS,其中 MessageConverter 实例用于在 Redis 消息和 Spring Integration 消息负载之间进行转换。默认使用的是 SimpleMessageConverter。
入站适配器可以订阅多个主题名称,因此topics属性中包含以逗号分隔的值集合。
自 3.0 版本起,入站适配器除了现有的 topics 属性外,现在新增了 topic-patterns 属性。该属性包含一组以逗号分隔的 Redis 主题模式。有关 Redis 发布/订阅的更多信息,请参阅 Redis Pub/Sub。
入站适配器可以使用 RedisSerializer 来反序列化 Redis 消息体。通过将 <int-redis:inbound-channel-adapter> 的 serializer 属性设置为空字符串,可使 RedisSerializer 属性值为 null。此时,Redis 消息的原始 byte[] 消息体会直接作为消息负载提供。
自 5.0 版本起,可以通过使用 <int-redis:inbound-channel-adapter> 的 task-executor 属性,将 Executor 实例注入到入站适配器中。此外,接收到的 Spring Integration 消息现在包含 RedisHeaders.MESSAGE_SOURCE 头,用于指示发布消息的来源:主题或模式。这可以在下游用于路由逻辑。
Redis 出站通道适配器
Redis出站通道适配器与其他出站适配器一样,将出站的Spring Integration消息适配为Redis消息。它接收Spring Integration消息,并通过使用MessageConverter策略将其转换为特定平台的消息(此处为Redis)。以下示例展示了如何配置Redis出站通道适配器:
<int-redis:outbound-channel-adapter id="outboundAdapter"
channel="sendChannel"
topic="thing1"
message-converter="testConverter"/>
<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379"/>
</bean>
<bean id="testConverter" class="things.something.SampleMessageConverter" />
该配置与Redis入站通道适配器类似。适配器隐式注入了名为redisConnectionFactory的RedisConnectionFactory Bean。此示例还包含了可选的(且自定义的)MessageConverter(即testConverter Bean)。
自 Spring Integration 3.0 起,<int-redis:outbound-channel-adapter> 提供了 topic 属性的替代方案:新增了 topic-expression 属性,用于在运行时确定消息的 Redis 主题。这两个属性是互斥的。
Redis 队列入站通道适配器
Spring Integration 3.0 引入了一个队列入站通道适配器,用于从 Redis 列表中“弹出”消息。默认情况下,它使用“右弹出”,但也可以配置为使用“左弹出”。该适配器是消息驱动的。它使用内部监听器线程,而不使用轮询器。
以下列表展示了 queue-inbound-channel-adapter 的所有可用属性:
<int-redis:queue-inbound-channel-adapter id="" // <1>
channel="" // <2>
auto-startup="" // <3>
phase="" // <4>
connection-factory="" // <5>
queue="" // <6>
error-channel="" // <7>
serializer="" // <8>
receive-timeout="" // <9>
recovery-interval="" // <10>
expect-message="" // <11>
task-executor="" // <12>
right-pop=""/> // <13>
组件 Bean 名称。如果未提供
channel属性,则会创建一个DirectChannel并在应用上下文中注册,此id属性将作为 Bean 名称。此时,端点本身会以id加上.adapter作为 Bean 名称进行注册(例如,若 Bean 名称为thing1,则端点将注册为thing1.adapter)。用于从此端点发送
Message实例的MessageChannel。SmartLifecycle属性,用于指定此端点是否应在应用上下文启动后自动启动。默认值为true。SmartLifecycle属性,用于指定此端点的启动阶段。默认值为0。对
RedisConnectionFactoryBean 的引用。默认值为redisConnectionFactory。执行基于队列的 'pop' 操作以获取 Redis 消息的 Redis 列表名称。
当从端点的监听任务接收到异常时,用于发送
ErrorMessage实例的MessageChannel。默认情况下,底层的MessagePublishingErrorHandler会使用应用上下文中的默认errorChannel。RedisSerializerBean 引用。它可以是一个空字符串,表示“不使用序列化器”。在这种情况下,来自入站 Redis 消息的原始byte[]将作为Message负载发送到channel。默认情况下,它使用JdkSerializationRedisSerializer。'pop' 操作等待从队列获取 Redis 消息的超时时间(毫秒)。默认值为 1 秒。
监听任务在 'pop' 操作发生异常后,重新启动前应休眠的时间(毫秒)。
指定此端点是否期望 Redis 队列中的数据包含完整的
Message实例。如果此属性设置为true,则serializer不能为空字符串,因为消息需要某种形式的反序列化(默认为 JDK 序列化)。其默认值为false。对 Spring
TaskExecutor(或标准 JDK 1.5+Executor)Bean 的引用。它用于底层的监听任务。默认使用SimpleAsyncTaskExecutor。指定此端点应使用“右弹出”(当为
true时)还是“左弹出”(当为false时)来从 Redis 列表读取消息。如果为true,当与默认的 Redis 队列出站通道适配器一起使用时,Redis 列表充当FIFO队列。将其设置为false可与使用“右推入”方式写入列表的软件配合使用,或实现类似堆栈的消息顺序。其默认值为true。自版本 4.3 起可用。
task-executor 必须配置为多线程处理;否则当 RedisQueueMessageDrivenEndpoint 在错误后尝试重新启动监听器任务时,可能会出现死锁。可以使用 errorChannel 来处理这些错误以避免重启,但最好避免应用程序暴露在可能的死锁情况下。有关可能的 TaskExecutor 实现,请参阅 Spring Framework 参考手册。
Redis 队列出站通道适配器
Spring Integration 3.0 引入了一个队列出站通道适配器,用于从 Spring Integration 消息“推送”到 Redis 列表。默认情况下,它使用“左推”,但也可以配置为“右推”。以下列表展示了 Redis queue-outbound-channel-adapter 的所有可用属性:
<int-redis:queue-outbound-channel-adapter id="" // <1>
channel="" // <2>
connection-factory="" // <3>
queue="" // <4>
queue-expression="" // <5>
serializer="" // <6>
extract-payload="" // <7>
left-push=""/> // <8>
组件 Bean 名称。若未提供
channel属性,将创建DirectChannel并在应用上下文中以该id属性作为 Bean 名称进行注册。此时,端点将以id加上.adapter作为 Bean 名称注册(例如 Bean 名称为thing1时,端点将注册为thing1.adapter)。此端点接收
Message实例的MessageChannel。对
RedisConnectionFactoryBean 的引用。默认值为redisConnectionFactory。执行基于队列的“推送”操作以发送 Redis 消息的 Redis 列表名称。此属性与
queue-expression互斥。用于确定 Redis 列表名称的 SpEL
Expression。运行时将传入的Message作为#root变量使用。此属性与queue互斥。RedisSerializerBean 引用。默认使用JdkSerializationRedisSerializer。但对于String类型负载,若未提供serializer引用,则会使用StringRedisSerializer。指定此端点应仅发送负载还是完整
Message到 Redis 队列。默认值为true。指定此端点应使用“左推送”(
true时)还是“右推送”(false时)将消息写入 Redis 列表。若为true,当与默认 Redis 队列入站通道适配器配合使用时,Redis 列表将作为FIFO队列运作。设置为false可与通过“左弹出”读取列表的软件配合使用,或实现类似堆栈的消息顺序。默认值为true。自 4.3 版本起支持。
Redis 应用事件
自 Spring Integration 3.0 起,Redis 模块提供了 IntegrationEvent 的实现,而该实现本身是 org.springframework.context.ApplicationEvent 的子类。RedisExceptionEvent 封装了来自 Redis 操作的异常(其中端点作为事件的“源”)。例如,<int-redis:queue-inbound-channel-adapter/> 在捕获来自 BoundListOperations.rightPop 操作的异常后会发出这些事件。该异常可以是任何通用的 org.springframework.data.redis.RedisSystemException 或 org.springframework.data.redis.RedisConnectionFailureException。通过 <int-event:inbound-channel-adapter/> 处理这些事件,有助于识别后台 Redis 任务的问题并采取管理措施。
Redis 消息存储
Spring Integration 的 Redis 模块提供了 RedisMessageStore。以下示例展示了如何将其与聚合器一起使用:
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="redisMessageStore"/>
前面的例子是一个bean配置,它期望一个 RedisConnectionFactory 作为构造函数的参数。
默认情况下,RedisMessageStore 使用 Java 序列化来序列化消息。然而,如果需要不同的序列化技术(例如 JSON),可以将自定义序列化器设置到 RedisMessageStore 的 valueSerializer 属性中。
该框架为 Message 实例和 MessageHeaders 实例提供了 Jackson 序列化器和反序列化器实现——分别是 MessageJsonDeserializer 和 MessageHeadersJsonSerializer。它们必须通过 ObjectMapper 的 SimpleModule 选项进行配置。此外,应在 ObjectMapper 上设置 enableDefaultTyping,以便为每个序列化的复杂对象添加类型信息。该 type 信息随后将在反序列化过程中使用。该框架提供了一个名为 JacksonMessagingUtils.messagingAwareMapper() 的实用方法,该方法已预先配置了所有前述属性和序列化器。此实用方法附带 trustedPackages 参数,用于限制反序列化的 Java 包,以避免安全漏洞。默认受信任的包包括:java.util、java.lang、org.springframework.messaging.support、org.springframework.integration.support、org.springframework.integration.message、org.springframework.integration.store。要在 RedisMessageStore 中管理 JSON 序列化,必须应用如下配置:
RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonMessagingUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson3JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);
从 4.3.12 版本开始,RedisMessageStore 支持 prefix 选项,以便在同一 Redis 服务器上区分不同的存储实例。
Redis 通道消息存储
之前展示的 RedisMessageStore 将每个组作为单个键(组ID)下的值进行维护。虽然 QueueChannel 可用于持久化,但为此目的提供了专门的 RedisChannelMessageStore(自版本 4.0 起)。该存储为每个通道使用一个 LIST,发送消息时使用 LPUSH,接收消息时使用 RPOP。默认情况下,此存储也使用 JDK 序列化,但可以修改其值序列化器,如前文所述。
建议使用支持存储的通道,而不是使用通用的 RedisMessageStore。以下示例定义了一个 Redis 消息存储,并在一个带有队列的通道中使用它:
<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>
<int:channel id="somePersistentQueueChannel">
<int:queue message-store="redisMessageStore"/>
<int:channel>
用于存储数据的键具有以下形式:<storeBean名称>:<channelId>(在前面的示例中,即 redisMessageStore:somePersistentQueueChannel)。
此外,还提供了一个子类 RedisChannelPriorityMessageStore。当它与 QueueChannel 结合使用时,消息将按照(先进先出)优先级顺序被接收。它使用标准的 IntegrationMessageHeaderAccessor.PRIORITY 头部,并支持优先级值(0 - 9)。具有其他优先级的消息(以及没有优先级的消息)将在所有具有优先级的消息之后,按照先进先出的顺序被检索。
这些存储仅实现了 BasicMessageGroupStore,而未实现 MessageGroupStore。它们仅适用于诸如为 QueueChannel 提供支持等场景。
Redis 元数据存储
Spring Integration 3.0 引入了一种新的基于 Redis 的 MetadataStore(参见 元数据存储)实现。RedisMetadataStore 可用于在应用程序重启之间维护 MetadataStore 的状态。这种 MetadataStore 实现可以与以下适配器一起使用:
要指示这些适配器使用新的 RedisMetadataStore,需要声明一个名为 metadataStore 的 Spring Bean。Feed 入站通道适配器和 feed 入站通道适配器都会自动获取并使用已声明的 RedisMetadataStore。以下示例展示了如何声明这样的 Bean:
<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
<constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
RedisMetadataStore 基于 RedisProperties 实现。与它的交互使用 BoundHashOperations,而后者需要为整个 Properties 存储提供一个 key。对于 MetadataStore 而言,这个 key 扮演着区域(region)的角色,这在分布式环境中非常有用,尤其是当多个应用程序使用同一个 Redis 服务器时。默认情况下,这个 key 的值为 MetaData。
从 4.0 版本开始,该存储实现了 ConcurrentMetadataStore,使其能够在多个应用实例之间可靠地共享,其中仅允许一个实例存储或修改某个键的值。
RedisMetadataStore.replace() 方法(例如在 AbstractPersistentAcceptOnceFileListFilter 中)无法在 Redis 集群中使用,因为目前不支持用于保证原子性的 WATCH 命令。
Redis 存储入站通道适配器
Redis 存储入站通道适配器是一个轮询消费者,它从 Redis 集合中读取数据,并将其作为 Message 负载发送。以下示例展示了如何配置 Redis 存储入站通道适配器:
<int-redis:store-inbound-channel-adapter id="listAdapter"
connection-factory="redisConnectionFactory"
key="myCollection"
channel="redisChannel"
collection-type="LIST" >
<int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>
前面的示例展示了如何使用 store-inbound-channel-adapter 元素配置 Redis 存储入站通道适配器,并为各种属性提供值,例如:
-
key或key-expression:所使用的集合的键名。 -
collection-type:此适配器支持的集合类型的枚举。支持的集合类型包括LIST、SET、ZSET、PROPERTIES和MAP。 -
connection-factory:对o.s.data.redis.connection.RedisConnectionFactory实例的引用。 -
redis-template:对o.s.data.redis.core.RedisTemplate实例的引用。 -
所有入站适配器共有的其他属性(例如
channel)。
redis-template 和 connection-factory 是互斥的。
默认情况下,适配器使用 StringRedisTemplate。该模板对键、值、哈希键和哈希值使用 StringRedisSerializer 实例。如果 Redis 存储中包含使用其他技术序列化的对象,则必须为 RedisTemplate 配置适当的序列化器。例如,如果存储是通过 Redis 存储出站适配器写入的,且该适配器的 extract-payload-elements 设置为 false,则应按如下方式配置 RedisTemplate:
<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
<property name="keySerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
</property>
<property name="hashKeySerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
</property>
</bean>
该 RedisTemplate 对键和哈希键使用 String 序列化器,对值和哈希值使用默认的 JDK 序列化器。
由于前面的示例中 key 具有字面值,因此相对简单且静态。有时,key 的值必须根据某些条件在运行时更改。为此,请使用 key-expression 代替,其中提供的表达式可以是任何有效的 SpEL 表达式。
此外,可以对从Redis集合中成功读取并处理的数据进行一些后处理。例如,在处理完某个值后,可以将其移动或删除。事务同步功能可用于实现此类逻辑。以下示例使用了key-expression和事务同步:
<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
connection-factory="redisConnectionFactory"
key-expression="'presidents'"
channel="otherRedisChannel"
auto-startup="false"
collection-type="ZSET">
<int:poller fixed-rate="1000" max-messages-per-poll="2">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-redis:store-inbound-channel-adapter>
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>
<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
轮询器可以通过使用 transactional 元素实现事务性。该元素可以引用真实的事务管理器,例如,当流程的其他部分调用 JDBC 时。如果没有“真实”事务,可以使用 o.s.i.transaction.PseudoTransactionManager 替代,它是 Spring PlatformTransactionManager 的实现,在没有实际事务时,能够启用 Redis 适配器的事务同步功能。
这并不会使 Redis 活动本身具有事务性。它允许在成功(提交)之前或之后,或在失败(回滚)之后执行操作的同步。
一旦轮询器具备事务性,便可在 transactional 元素上添加 o.s.i.transaction.TransactionSynchronizationFactory 的实例。TransactionSynchronizationFactory 会创建 TransactionSynchronization 的实例。为方便起见,系统提供了一个基于 SpEL 的默认 TransactionSynchronizationFactory,允许配置 SpEL 表达式,其执行过程将与事务协调(同步)。该工厂支持配置提交前、提交后和回滚后的表达式,并为每种事件类型分别提供对应的通道,用于发送表达式求值结果(如果存在)。每个子元素均可指定 expression 和 channel 属性。若仅设置 channel 属性,接收到的消息将作为特定同步场景的一部分发送至该通道。若仅设置 expression 属性且表达式结果为非空值,则会生成以该结果为负载的消息并发送至默认通道(NullChannel),同时会在日志中显示(DEBUG 级别)。若表达式结果为 null 或 void,则不会生成消息。
RedisStoreMessageSource 会添加一个带有 RedisStore 实例的 store 属性,该实例绑定到事务 IntegrationResourceHolder,可以从 TransactionSynchronizationProcessor 实现中访问。
有关事务同步的更多信息,请参阅事务同步。
RedisStore 出站通道适配器
RedisStore 出站通道适配器允许将消息负载写入 Redis 集合,如下例所示:
<int-redis:store-outbound-channel-adapter id="redisListAdapter"
collection-type="LIST"
channel="requestChannel"
key="myCollection" />
上述配置通过使用 store-inbound-channel-adapter 元素创建了一个 Redis 出站通道适配器。它为各种属性提供了值,例如:
-
key或key-expression:所使用的集合的键名。 -
extract-payload-elements:如果设置为true(默认值)并且有效负载是“多值”对象(即Collection或Map)的实例,则使用“addAll”和“putAll”语义进行存储。否则,如果设置为false,则无论其类型如何,有效负载都将作为单个条目存储。如果有效负载不是“多值”对象的实例,则忽略此属性的值,并且有效负载始终作为单个条目存储。 -
collection-type:此适配器支持的Collection类型的枚举。支持的集合类型有LIST、SET、ZSET、PROPERTIES和MAP。 -
map-key-expression:返回要存储的条目的键名的 SpEL 表达式。仅当collection-type为MAP或PROPERTIES且extract-payload-elements为 false 时适用。 -
connection-factory:对o.s.data.redis.connection.RedisConnectionFactory实例的引用。 -
redis-template:对o.s.data.redis.core.RedisTemplate实例的引用。 -
所有入站适配器共有的其他属性(例如
channel)。
redis-template 和 connection-factory 是互斥的。
默认情况下,适配器使用 StringRedisTemplate。它使用 StringRedisSerializer 实例来处理键、值、哈希键和哈希值。然而,如果 extract-payload-elements 设置为 false,则会使用一个 RedisTemplate,该模板对键和哈希键使用 StringRedisSerializer 实例,对值和哈希值使用 JdkSerializationRedisSerializer 实例。使用 JDK 序列化器时,重要的是要理解所有值都使用 Java 序列化,无论该值是否实际上是集合。如果需要更精细地控制值的序列化,可以提供自定义的 RedisTemplate,而不是依赖这些默认设置。
由于前面的示例中 key 和其他属性使用了字面值,因此相对简单且静态。有时,这些值可能会在运行时根据某些条件动态更改。为此,提供了它们的 -expression 等价形式(如 key-expression、map-key-expression 等),其中表达式可以是任何有效的 SpEL 表达式。
Redis 出站命令网关
Spring Integration 4.0 引入了 Redis 命令网关,允许通过通用的 RedisConnection#execute 方法执行任何标准 Redis 命令。以下列表展示了 Redis 出站网关的可用属性:
<int-redis:outbound-gateway
request-channel="" // <1>
reply-channel="" // <2>
requires-reply="" // <3>
reply-timeout="" // <4>
connection-factory="" // <5>
redis-template="" // <6>
arguments-serializer="" // <7>
command-expression="" // <8>
argument-expressions="" // <9>
use-command-variable="" // <10>
arguments-strategy="" /> // <11>
此端点接收
Message实例的MessageChannel。此端点发送回复
Message实例的MessageChannel。指定此出站网关是否必须返回非空值。默认值为
true。当 Redis 返回null值时,将抛出ReplyRequiredException。等待回复消息发送的超时时间(以毫秒为单位)。通常应用于基于队列的有限回复通道。
对
RedisConnectionFactorybean 的引用。默认值为redisConnectionFactory。与redis-template属性互斥。对
RedisTemplatebean 的引用。与connection-factory属性互斥。对
org.springframework.data.redis.serializer.RedisSerializer实例的引用。用于在必要时将每个命令参数序列化为byte[]。返回命令键的 SpEL 表达式。默认值为
redis_command消息头。其求值结果不得为null。以逗号分隔的 SpEL 表达式,将作为命令参数进行求值。与
arguments-strategy属性互斥。如果未提供任何属性,则使用payload作为命令参数。参数表达式可以求值为null,以支持可变数量的参数。一个
boolean标志,用于指定当配置了argument-expressions时,求值后的 Redis 命令字符串是否在o.s.i.redis.outbound.ExpressionArgumentsStrategy的表达式求值上下文中作为#cmd变量可用。否则,此属性将被忽略。对
o.s.i.redis.outbound.ArgumentsStrategy实例的引用。与argument-expressions属性互斥。如果未提供任何属性,则使用payload作为命令参数。
<int-redis:outbound-gateway> 可用作通用组件来执行任何所需的 Redis 操作。以下示例展示了如何从 Redis 原子数中获取递增后的值:
<int-redis:outbound-gateway request-channel="requestChannel"
reply-channel="replyChannel"
command-expression="'INCR'"/>
Message 负载的名称应为 redisCounter,该名称可由 org.springframework.data.redis.support.atomic.RedisAtomicInteger bean 定义提供。
RedisConnection#execute 方法的返回类型为泛型 Object。实际结果取决于命令类型。例如,MGET 命令返回 List<byte[]>。有关命令、其参数和结果类型的更多信息,请参阅 Redis 规范。
Redis 队列出站网关
Spring Integration 引入了 Redis 队列出站网关,用于执行请求-应答场景。它会将一个会话 UUID 推送到指定的 queue,同时将该值以该 UUID 作为键推送到 Redis 列表中,并等待来自键为 UUID 加上 .reply 的 Redis 列表的应答。每次交互使用不同的 UUID。以下清单展示了 Redis 出站网关的可用属性:
<int-redis:queue-outbound-gateway
request-channel="" // <1>
reply-channel="" // <2>
requires-reply="" // <3>
reply-timeout="" // <4>
connection-factory="" // <5>
queue="" // <6>
order="" // <7>
serializer="" // <8>
extract-payload=""/> // <9>
此端点接收
Message实例的MessageChannel。此端点发送回复
Message实例的MessageChannel。指定此出站网关是否必须返回非空值。默认值为
false。否则,当 Redis 返回null值时,将抛出ReplyRequiredException。等待回复消息发送的超时时间(以毫秒为单位)。通常应用于基于队列的有限回复通道。
对
RedisConnectionFactoryBean 的引用。默认为redisConnectionFactory。与 'redis-template' 属性互斥。出站网关发送会话
UUID的 Redis 列表名称。当注册多个网关时,此出站网关的顺序。
RedisSerializerBean 引用。可以是空字符串,表示“无序列化器”。在这种情况下,来自入站 Redis 消息的原始byte[]将作为Message负载发送到channel。默认情况下,它是JdkSerializationRedisSerializer。指定此端点是否期望来自 Redis 队列的数据包含完整的
Message实例。如果此属性设置为true,则serializer不能是空字符串,因为消息需要某种形式的反序列化(默认情况下为 JDK 序列化)。
Redis 队列入站网关
Spring Integration 4.1 引入了 Redis 队列入站网关,用于执行请求和回复场景。它从提供的 queue 中弹出一个会话 UUID,从 Redis 列表中弹出以该 UUID 为键的值,并将回复推送到 Redis 列表中,其键为 UUID 加上 .reply。以下列表展示了 Redis 队列入站网关的可用属性:
<int-redis:queue-inbound-gateway
request-channel="" // <1>
reply-channel="" // <2>
executor="" // <3>
reply-timeout="" // <4>
connection-factory="" // <5>
queue="" // <6>
order="" // <7>
serializer="" // <8>
receive-timeout="" // <9>
expect-message="" // <10>
recovery-interval=""/> // <11>
此端点发送从 Redis 数据创建的
Message实例的MessageChannel。此端点等待回复
Message实例的MessageChannel。可选 -replyChannel头部仍在使用中。对 Spring
TaskExecutor(或标准 JDKExecutor)bean 的引用。它用于底层的监听任务。默认使用SimpleAsyncTaskExecutor。等待回复消息发送的超时时间(以毫秒为单位)。通常应用于基于队列的有限回复通道。
对
RedisConnectionFactorybean 的引用。默认为redisConnectionFactory。与redis-template属性互斥。会话
UUID的 Redis 列表名称。当注册多个网关时,此入站网关的顺序。
RedisSerializerbean 引用。可以是一个空字符串,表示“无序列化器”。在这种情况下,来自入站 Redis 消息的原始byte[]将作为Message负载发送到channel。默认为JdkSerializationRedisSerializer。(注意,在 4.3 版本之前,默认是StringRedisSerializer。要恢复该行为,请提供对StringRedisSerializer的引用)。等待获取接收到的消息的超时时间(以毫秒为单位)。通常应用于基于队列的有限请求通道。
指定此端点是否期望来自 Redis 队列的数据包含完整的
Message实例。如果此属性设置为true,则serializer不能是空字符串,因为消息需要某种形式的反序列化(默认是 JDK 序列化)。监听任务在“右弹出”操作发生异常后,在重新启动监听任务之前应休眠的时间(以毫秒为单位)。
task-executor 必须配置为多线程处理;否则当 RedisQueueMessageDrivenEndpoint 在发生错误后尝试重新启动监听器任务时,可能会出现死锁。可以使用 errorChannel 来处理这些错误以避免重启,但最好避免让应用程序暴露在可能的死锁情况下。有关可能的 TaskExecutor 实现,请参阅 Spring Framework 参考手册。
Redis Stream 出站通道适配器
Spring Integration 5.4 引入了 Reactive Redis Stream 出站通道适配器,用于将消息负载写入 Redis 流。出站通道适配器使用 ReactiveStreamOperations.add(…) 将 Record 添加到流中。以下示例展示了如何为 Redis Stream 出站通道适配器使用 Java 配置和服务类。
@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); 1
reactiveStreamMessageHandler.setSerializationContext(serializationContext); 2
reactiveStreamMessageHandler.setHashMapper(hashMapper); 3
reactiveStreamMessageHandler.setExtractPayload(true); 4
return reactiveStreamMessageHandler;
}
使用
ReactiveRedisConnectionFactory和流名称构建ReactiveRedisStreamMessageHandler实例以添加记录。另一种构造函数变体基于 SpEL 表达式,用于根据请求消息评估流键。设置
RedisSerializationContext,用于在添加到流之前序列化记录键和值。设置
HashMapper,它提供 Java 类型与 Redis 哈希/映射之间的契约。如果为
true,通道适配器将从请求消息中提取有效负载以添加流记录。否则将整个消息用作值。默认值为true。
从版本 6.5 开始,ReactiveRedisStreamMessageHandler 提供了一个 setAddOptionsFunction(Function<Message<?>, RedisStreamCommands.XAddOptions> addOptionsFunction) 方法,用于基于请求消息构建 RedisStreamCommands.XAddOptions,以供内部 ReactiveStreamOperations.add(Record<K, ?> record, XAddOptions xAddOptions) 调用使用。
Redis Stream 入站通道适配器
Spring Integration 5.4 引入了用于从 Redis Stream 读取消息的 Reactive Stream 入站通道适配器。入站通道适配器根据自动确认标志,使用 StreamReceiver.receive(…) 或 StreamReceiver.receiveAutoAck() 从 Redis 流中读取记录。以下示例展示了如何为 Redis Stream 入站通道适配器使用 Java 配置。
@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); 1
messageProducer.setStreamReceiverOptions( 2
StreamReceiver.StreamReceiverOptions.builder()
.pollTimeout(Duration.ofMillis(100))
.build());
messageProducer.setAutoStartup(true); 3
messageProducer.setAutoAck(false); 4
messageProducer.setCreateConsumerGroup(true); 5
messageProducer.setConsumerGroup("my-group"); 6
messageProducer.setConsumerName("my-consumer"); 7
messageProducer.setOutputChannel(fromRedisStreamChannel); 8
messageProducer.setReadOffset(ReadOffset.latest()); 9
messageProducer.extractPayload(true); 10
return messageProducer;
}
使用
ReactiveRedisConnectionFactory和流键构造ReactiveRedisStreamMessageProducer实例以读取记录。使用反应式基础设施消费 Redis 流的
StreamReceiver.StreamReceiverOptions。SmartLifecycle属性,用于指定此端点是否应在应用程序上下文启动后自动启动。默认为true。如果为false,则需要手动启动RedisStreamMessageProducer,即messageProducer.start()。如果为
false,则接收到的消息不会自动确认。消息的确认将延迟到客户端消费消息时进行。默认为true。如果为
true,将创建一个消费者组。在创建消费者组时,如果流尚不存在,也会创建流。消费者组跟踪消息传递并区分消费者。默认为false。设置消费者组名称。默认为定义的 bean 名称。
设置消费者名称。从
my-group组中以my-consumer身份读取消息。此端点向其发送消息的消息通道。
定义读取消息的偏移量。默认为
ReadOffset.latest()。如果为
true,通道适配器将从Record中提取有效负载值。否则,整个Record将用作有效负载。默认为true。
若 autoAck 设置为 false,Redis Stream 中的 Record 将不会由 Redis 驱动程序自动确认,而是会在待生成的消息中添加一个 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 头部,其值为一个 SimpleAcknowledgment 实例。当基于此类记录的消息业务逻辑处理完成后,目标集成流有责任调用其 acknowledge() 回调。即使在反序列化过程中发生异常且配置了 errorChannel 时,也需要类似的逻辑。因此,目标错误处理程序必须决定是确认(ack)还是拒绝确认(nack)此类失败的消息。除了 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 外,ReactiveRedisStreamMessageProducer 还会将以下头部填充到待生成的消息中:RedisHeaders.STREAM_KEY、RedisHeaders.STREAM_MESSAGE_ID、RedisHeaders.CONSUMER_GROUP 和 RedisHeaders.CONSUMER。
从版本 5.5 开始,StreamReceiver.StreamReceiverOptionsBuilder 选项可以在 ReactiveRedisStreamMessageProducer 上显式配置,包括新引入的 onErrorResume 函数。当 Redis Stream 消费者在发生反序列化错误时需要继续轮询时,此函数是必需的。默认函数会向错误通道(如果已提供)发送一条消息,并可能对失败的消息进行确认,如上所述。所有这些 StreamReceiver.StreamReceiverOptionsBuilder 都与外部提供的 StreamReceiver.StreamReceiverOptions 互斥。
Redis 锁注册表
Spring Integration 4.0 引入了 RedisLockRegistry。某些组件(例如聚合器和重排器)使用从 LockRegistry 实例获取的锁,以确保同一时间只有一个线程操作一个组。DefaultLockRegistry 在单个组件内执行此功能。可以在这些组件上配置外部锁注册表。当与共享的 MessageGroupStore 一起使用时,可以设置 RedisLockRegistry 以在多个应用程序实例之间提供此功能,从而确保同一时间只有一个实例可以操作该组。
当锁被本地线程释放时,另一个本地线程通常可以立即获取该锁。如果锁是由使用不同注册表实例的线程释放的,则获取锁可能需要长达100毫秒的时间。
为避免“挂起”锁(当服务器发生故障时),此注册表中的锁会在默认 60 秒后过期,但可以在注册表上进行配置。通常,锁的持有时间要短得多。
由于密钥可能过期,尝试解锁一个已过期的锁会导致抛出异常。然而,受此锁保护的资源可能已被破坏,因此此类异常应被视为严重问题。应将过期时间设置得足够长以防止这种情况发生,但同时也要设置得足够短,以便在服务器故障后能在合理时间内恢复锁。
从 5.0 版本开始,RedisLockRegistry 实现了 ExpirableLockRegistry,它会移除最后一次获取时间超过 age 且当前未被锁定的锁。
从版本 5.5.6 开始,RedisLockRegistry 支持通过 RedisLockRegistry.setCacheCapacity() 自动清理 RedisLockRegistry.locks 中 redisLocks 的缓存。更多信息请参阅其 JavaDocs。
从版本 5.5.13 开始,RedisLockRegistry 提供了 setRedisLockType(RedisLockType) 选项,用于确定 Redis 锁获取应以何种模式进行:
-
RedisLockType.SPIN_LOCK- 通过周期性循环(100ms)检查锁是否可获取来获得锁。默认方式。 -
RedisLockType.PUB_SUB_LOCK- 通过 Redis 发布-订阅机制来获得锁。
发布-订阅模式是首选方式——客户端与Redis服务器之间的网络通信较少,性能更高——当订阅收到其他进程解锁通知时,锁会立即被获取。然而,Redis在主/从连接中不支持发布-订阅功能(例如在 AWS ElastiCache 环境中),因此默认选择忙等待模式,以确保注册表在任何环境中都能正常工作。
从版本 6.4 开始,如果锁的所有权已过期,RedisLockRegistry.RedisLock.unlock() 方法将抛出 ConcurrentModificationException,而不是 IllegalStateException。
从版本 6.4 开始,新增了 RedisLockRegistry.setRenewalTaskScheduler() 方法,用于配置锁定期续约的调度器。当设置此调度器后,锁在成功获取后,将每隔过期时间的 1/3 自动续约,直到解锁或 Redis 键被移除。
自 7.0 版本起,RedisLock 实现了 DistributedLock 接口,以支持为锁状态数据自定义生存时间(TTL)的功能。现在可以使用 lock(Duration ttl) 或 tryLock(long time, TimeUnit unit, Duration ttl) 方法获取 RedisLock,并指定生存时间(TTL)值。RedisLockRegistry 现在提供了新的 renewLock(Object lockKey, Duration ttl) 方法,允许使用自定义的生存时间值来续期锁。
AWS ElastiCache 对集群模式下 Valkey 的支持
自版本 6.4.9/6.5.4/7.0.0 起,RedisLockRegistry 开始支持集群模式下的 AWS Elasticache for Valkey。在此版本的 valkey(一个 Redis 的替代品)中,所有 PubSub 操作(PUBLISH、SUBSCRIBE 等)在内部都使用其分片变体(SPUBLISH、SSUBSCRIBE 等)。如果出现以下形式的错误:
Caused by: io.lettuce.core.RedisCommandExecutionException: ERR Script attempted to access keys that do not hash to the same slot script: b2dedc0ab01c17f9f20e3e6ddb62dcb6afbed0bd, on @user_script:3.
在 RedisLockRegistry 的 unlock 步骤中,必须提供一个包含哈希标签 {…} 的锁键,以确保 unlock 脚本中的所有操作都被哈希到同一个集群槽/分片,例如:
RedisLockRegistry lockRegistry = new RedisLockRegistry("my-lock-key{choose_your_tag}");
lockRegistry.lock();
# critical section
lockRegistry.unlock();