跳到主要内容

Redis 支持

QWen Plus 中英对照 Redis Support

Spring Integration 2.1 引入了对 Redis 的支持:一个“开源的高级键值存储”。这种支持以基于 Redis 的 MessageStore 以及由 Redis 通过其 PUBLISH、SUBSCRIBE 和 UNSUBSCRIBE 命令支持的发布 - 订阅消息适配器的形式出现。

你需要将这个依赖添加到你的项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
<version>6.4.2</version>
</dependency>
xml

你还需要包含 Redis 客户端依赖项,例如 Lettuce。

要下载、安装和运行 Redis,请参阅 Redis 文档

连接到 Redis

要开始与 Redis 进行交互,您首先需要连接到它。Spring Integration 使用了另一个 Spring 项目 Spring Data Redis 提供的支持,它提供了典型的 Spring 构造:ConnectionFactoryTemplate。这些抽象简化了与多个 Redis 客户端 Java API 的集成。目前,Spring Data Redis 支持 JedisLettuce

使用 RedisConnectionFactory

要连接到 Redis,你可以使用 RedisConnectionFactory 接口的一个实现。以下列表显示了接口定义:

public interface RedisConnectionFactory extends PersistenceExceptionTranslator {

/**
* Provides a suitable connection for interacting with Redis.
* @return connection for interacting with Redis.
*/
RedisConnection getConnection();
}
java

以下示例展示了如何在 Java 中创建 LettuceConnectionFactory

LettuceConnectionFactory cf = new LettuceConnectionFactory();
cf.afterPropertiesSet();
java

以下示例展示了如何在 Spring 的 XML 配置中创建 LettuceConnectionFactory

<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>
xml

RedisConnectionFactory 的实现提供了一组属性,例如端口和主机,如果需要的话你可以设置。一旦你有了 RedisConnectionFactory 的实例,你可以创建一个 RedisTemplate 的实例,并用 RedisConnectionFactory 注入它。

使用 RedisTemplate

与 Spring 中的其他模板类(如 JdbcTemplateJmsTemplate)一样,RedisTemplate 是一个简化 Redis 数据访问代码的辅助类。有关 RedisTemplate 及其变体(如 StringRedisTemplate)的更多信息,请参阅 Spring Data Redis 文档

以下示例展示了如何在 Java 中创建 RedisTemplate 的实例:

RedisTemplate rt = new RedisTemplate<String, Object>();
rt.setConnectionFactory(redisConnectionFactory);
java

以下示例展示了如何在 Spring 的 XML 配置中创建 RedisTemplate 的实例:

<bean id="redisTemplate"
class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
xml

使用 Redis 进行消息传递

简介中所述,Redis 通过其 PUBLISHSUBSCRIBEUNSUBSCRIBE 命令提供发布 - 订阅消息传递支持。与 JMS 和 AMQP 一样,Spring Integration 提供了通过 Redis 发送和接收消息的消息通道和适配器。

Redis 发布/订阅通道

类似于 JMS,存在一些场景中生产者和消费者都打算成为同一应用程序的一部分,在同一个进程中运行。你可以通过使用一对入站和出站通道适配器来实现这一点。然而,与 Spring Integration 的 JMS 支持一样,有一个更简单的方法来解决这种用例。你可以创建一个发布 - 订阅通道,如下例所示:

<int-redis:publish-subscribe-channel id="redisChannel" topic-name="si.test.topic"/>
xml

publish-subscribe-channel 的行为非常类似于来自主 Spring Integration 命名空间的普通 <publish-subscribe-channel/> 元素。它可以通过任何端点的 input-channeloutput-channel 属性进行引用。不同之处在于该通道由 Redis 主题名称支持:由 topic-name 属性指定的一个 String 值。然而,与 JMS 不同的是,这个主题不需要预先创建甚至由 Redis 自动创建。在 Redis 中,主题是充当地址角色的简单 String 值。生产者和消费者可以通过使用相同的 String 值作为他们的主题名称来进行通信。对这个通道的简单订阅意味着生产端和消费端之间可以实现异步发布 - 订阅消息传递。但是,与通过在简单的 Spring Integration <channel/> 元素内添加 <queue/> 元素创建的异步消息通道不同,消息不会存储在内存队列中。相反,这些消息会通过 Redis 传递,这使您可以依赖它对持久性和集群的支持以及与其他非 Java 平台的互操作性。

Redis 入站通道适配器

Redis 入站通道适配器 (RedisInboundChannelAdapter) 以与其他入站适配器相同的方式将传入的 Redis 消息适配为 Spring 消息。它接收特定于平台的消息(在这种情况下是 Redis 消息),并通过使用 MessageConverter 策略将其转换为 Spring 消息。以下示例展示了如何配置 Redis 入站通道适配器:

<int-redis:inbound-channel-adapter id="redisAdapter"
topics="thing1, thing2"
channel="receiveChannel"
error-channel="testErrorChannel"
message-converter="testConverter" />

<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379" />
</bean>

<bean id="testConverter" class="things.something.SampleMessageConverter" />
xml

前面的示例展示了一个简单但完整的 Redis 入站通道适配器配置。请注意,前述配置依赖于 Spring 自动发现某些 bean 的熟悉范例。在本例中,redisConnectionFactory 隐式注入到适配器中。您也可以通过使用 connection-factory 属性来显式指定它。

另外,请注意,前面的配置注入了一个自定义的 MessageConverter。这种方法类似于 JMS,在 JMS 中,MessageConverter 实例用于在 Redis 消息和 Spring Integration 消息负载之间进行转换。默认情况下是 SimpleMessageConverter

入站适配器可以订阅多个主题名称,因此 topics 属性中使用逗号分隔的值集。

自 3.0 版本以来,入站适配器除了现有的 topics 属性外,现在还具有 topic-patterns 属性。此属性包含一组以逗号分隔的 Redis 主题模式。有关 Redis 发布订阅的更多信息,请参阅 Redis Pub/Sub

入站适配器可以使用 RedisSerializer 来反序列化 Redis 消息的主体。<int-redis:inbound-channel-adapter>serializer 属性可以设置为空字符串,这会导致 RedisSerializer 属性的值为 null。在这种情况下,原始的 byte[] 形式的 Redis 消息体将作为消息的有效负载提供。

自 5.0 版本以来,你可以通过 <int-redis:inbound-channel-adapter>task-executor 属性为入站适配器提供一个 Executor 实例。此外,接收到的 Spring Integration 消息现在具有 RedisHeaders.MESSAGE_SOURCE 头,用于指示发布消息的来源:主题或模式。你可以在下游使用此信息进行路由逻辑处理。

Redis 输出通道适配器

Redis outbound 通道适配器以与其他 outbound 适配器相同的方式,将传出的 Spring Integration 消息适应为 Redis 消息。它接收 Spring Integration 消息,并通过使用 MessageConverter 策略将其转换为平台特定的消息(在这种情况下是 Redis)。以下示例展示了如何配置 Redis outbound 通道适配器:

<int-redis:outbound-channel-adapter id="outboundAdapter"
channel="sendChannel"
topic="thing1"
message-converter="testConverter"/>

<bean id="redisConnectionFactory"
class="o.s.data.redis.connection.lettuce.LettuceConnectionFactory">
<property name="port" value="7379"/>
</bean>

<bean id="testConverter" class="things.something.SampleMessageConverter" />
xml

配置类似于 Redis 入站通道适配器。适配器隐式注入了一个 RedisConnectionFactory,它以 redisConnectionFactory 作为其 bean 名称定义。此示例还包括可选的(和自定义的)MessageConverter(即 testConverter bean)。

自从 Spring Integration 3.0,<int-redis:outbound-channel-adapter> 提供了 topic 属性的替代方案:您可以使用 topic-expression 属性来在运行时确定消息的 Redis 主题。这些属性是互斥的。

Redis 队列 inbound 通道适配器

Spring Integration 3.0 引入了一个队列传入通道适配器,用于从 Redis 列表中“弹出”消息。默认情况下,它使用“右弹出”,但你可以配置它使用“左弹出”。该适配器是消息驱动的。它使用内部监听线程,不使用轮询器。

以下列表显示了 queue-inbound-channel-adapter 的所有可用属性:

<int-redis:queue-inbound-channel-adapter id=""  // <1>
channel="" // <2>
auto-startup="" // <3>
phase="" // <4>
connection-factory="" // <5>
queue="" // <6>
error-channel="" // <7>
serializer="" // <8>
receive-timeout="" // <9>
recovery-interval="" // <10>
expect-message="" // <11>
task-executor="" // <12>
right-pop=""/> // <13>
xml
  • 组件的 bean 名称。如果你不提供 channel 属性,将创建一个 DirectChannel 并在应用程序上下文中以这个 id 属性作为 bean 名称进行注册。在这种情况下,端点本身会以 id.adapter 的形式注册。(如果 bean 名称为 thing1,则端点注册为 thing1.adapter。)

  • 从该端点发送 Message 实例的 MessageChannel

  • 一个 SmartLifecycle 属性,用于指定该端点是否应在应用程序上下文启动后自动启动。默认值为 true

  • 一个 SmartLifecycle 属性,用于指定此端点启动的阶段。默认值为 0

  • RedisConnectionFactory bean 的引用。默认为 redisConnectionFactory

  • 执行基于队列的 'pop' 操作以获取 Redis 消息的 Redis 列表名称。

  • 接收来自端点监听任务的异常时发送 ErrorMessage 实例的 MessageChannel。默认情况下,底层 MessagePublishingErrorHandler 使用应用程序上下文中的默认 errorChannel

  • RedisSerializer bean 引用。它可以是一个空字符串,这意味着“没有序列化器”。在这种情况下,来自入站 Redis 消息的原始 byte[] 作为 Message 负载发送到 channel。默认是 JdkSerializationRedisSerializer

  • 'pop' 操作等待从队列中获取 Redis 消息的超时时间(以毫秒为单位)。默认是 1 秒。

  • 在 'pop' 操作出现异常后,监听任务应暂停的时间(以毫秒为单位),然后再重新启动监听任务。

  • 指定此端点是否期望 Redis 队列中的数据包含完整的 Message 实例。如果此属性设置为 true,则 serializer 不能为空字符串,因为消息需要某种形式的反序列化(默认为 JDK 序列化)。其默认值为 false

  • 对 Spring TaskExecutor(或标准 JDK 1.5+ Executor)bean 的引用。它用于底层监听任务。默认是一个 SimpleAsyncTaskExecutor

  • 指定此端点应该使用“右弹出”(当 true 时)还是“左弹出”(当 false 时)从 Redis 列表读取消息。如果为 true,则在与默认 Redis 队列出站通道适配器一起使用时,Redis 列表的行为像一个 FIFO 队列。将其设置为 false 以与使用“右推”写入列表的软件一起使用,或实现类似于堆栈的消息顺序。默认值为 true。自 4.3 版起。

important

task-executor 必须配置为使用多于一个线程进行处理;否则当 RedisQueueMessageDrivenEndpoint 尝试在错误后重新启动监听任务时,可能会出现死锁。可以使用 errorChannel 来处理这些错误,以避免重启,但最好是不要让应用程序暴露在可能的死锁情况下。有关 TaskExecutor 实现,请参阅 Spring Framework 参考手册

Redis 队列输出通道适配器

Spring Integration 3.0 引入了一个队列出站通道适配器,用于从 Spring Integration 消息“推送”到 Redis 列表。默认情况下,它使用“左推送”,但您可以配置它使用“右推送”。以下列表显示了 Redis queue-outbound-channel-adapter 的所有可用属性:

<int-redis:queue-outbound-channel-adapter id=""  // <1>
channel="" // <2>
connection-factory="" // <3>
queue="" // <4>
queue-expression="" // <5>
serializer="" // <6>
extract-payload="" // <7>
left-push=""/> // <8>
xml
  • 组件的bean名称。如果您不提供 channel 属性,则会创建一个 DirectChannel 并在应用程序上下文中以该 id 属性作为bean名称进行注册。在这种情况下,端点将以 id.adapter 作为bean名称进行注册。(如果bean名称为 thing1,则端点将注册为 thing1.adapter。)

  • 此端点接收 Message 实例的 MessageChannel

  • RedisConnectionFactory bean 的引用。默认为 redisConnectionFactory

  • 执行基于队列的 'push' 操作以发送 Redis 消息的 Redis 列表名称。此属性与 queue-expression 互斥。

  • 用于确定 Redis 列表名称的 SpEL Expression。它使用运行时传入的 Message 作为 #root 变量。此属性与 queue 互斥。

  • RedisSerializer bean 的引用。默认为 JdkSerializationRedisSerializer。但是,对于 String 类型的有效负载,如果未提供 serializer 引用,则使用 StringRedisSerializer

  • 指定此端点是否应仅发送有效负载或整个 Message 到 Redis 队列。默认为 true

  • 指定此端点是否应使用“左推”(当值为 true 时)或“右推”(当值为 false 时)将消息写入 Redis 列表。如果为 true,则 Redis 列表在与默认 Redis 队列入站通道适配器一起使用时充当 FIFO 队列。将其设置为 false 以与从列表中读取的软件一起使用“左弹出”,或实现类似栈的消息顺序。默认为 true。自 4.3 版起。

Redis 应用事件

自从 Spring Integration 3.0,Redis 模块提供了一个 IntegrationEvent 的实现,而 IntegrationEvent 又是 org.springframework.context.ApplicationEvent 的一种。RedisExceptionEvent 封装了来自 Redis 操作的异常(端点是事件的“来源”)。例如,<int-redis:queue-inbound-channel-adapter/> 在捕获 BoundListOperations.rightPop 操作的异常后会发出这些事件。异常可能是任何通用的 org.springframework.data.redis.RedisSystemExceptionorg.springframework.data.redis.RedisConnectionFailureException。使用 <int-event:inbound-channel-adapter/> 处理这些事件对于确定后台 Redis 任务的问题并采取管理措施非常有用。

Redis 消息存储

企业集成模式 (EIP) 一书所述,消息存储 让你可以持久化消息。在处理具有缓冲消息能力的组件(聚合器、重排序器等)时,当可靠性是一个问题时,这可能会很有用。在 Spring Integration 中,MessageStore 策略还为 声明检查 模式提供了基础,该模式也在 EIP 中有描述。

Spring Integration 的 Redis 模块提供了 RedisMessageStore。以下示例展示了如何与聚合器一起使用它:

<bean id="redisMessageStore" class="o.s.i.redis.store.RedisMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
message-store="redisMessageStore"/>
xml

前面的例子是一个 Bean 配置,它期望一个 RedisConnectionFactory 作为构造函数参数。

默认情况下,RedisMessageStore 使用 Java 序列化来序列化消息。但是,如果你想使用不同的序列化技术(例如 JSON),你可以通过设置 RedisMessageStorevalueSerializer 属性来提供自己的序列化程序。

从 4.3.10 版本开始,框架提供了 Message 实例和 MessageHeaders 实例的 Jackson 序列化器和反序列化器实现——分别是 MessageJacksonDeserializerMessageHeadersJacksonSerializer。它们必须使用 ObjectMapperSimpleModule 选项进行配置。此外,您应该在 ObjectMapper 上设置 enableDefaultTyping 以添加每个序列化的复杂对象的类型信息(如果您信任源)。然后在反序列化期间使用该类型信息。框架提供了一个名为 JacksonJsonUtils.messagingAwareMapper() 的实用方法,已经预配置了所有上述属性和序列化器。此实用方法带有 trustedPackages 参数,以限制 Java 反序列化包以避免安全漏洞。默认的信任包有:java.utiljava.langorg.springframework.messaging.supportorg.springframework.integration.supportorg.springframework.integration.messageorg.springframework.integration.store。要在 RedisMessageStore 中管理 JSON 序列化,您必须按照以下类似示例的方式进行配置:

RedisMessageStore store = new RedisMessageStore(redisConnectionFactory);
ObjectMapper mapper = JacksonJsonUtils.messagingAwareMapper();
RedisSerializer<Object> serializer = new GenericJackson2JsonRedisSerializer(mapper);
store.setValueSerializer(serializer);
java

从 4.3.12 版本开始,RedisMessageStore 支持 prefix 选项,以便在同一 Redis 服务器上区分存储实例。

Redis 通道消息存储

RedisMessageStore 之前展示的 将每个组作为一个单一键(组 ID)下的一个值来维护。虽然你可以用它来为 QueueChannel 提供持久化支持,但从 4.0 版本开始提供了一个专门的 RedisChannelMessageStore 来实现这个目的。这个存储使用每个通道一个 LIST,在发送消息时使用 LPUSH,在接收消息时使用 RPOP。默认情况下,此存储也使用 JDK 序列化,但你可以修改值序列化器,如 之前所述

我们建议使用这种存储支持的通道,而不是使用通用的 RedisMessageStore。以下示例定义了一个 Redis 消息存储,并在带有队列的通道中使用它:

<bean id="redisMessageStore" class="o.s.i.redis.store.RedisChannelMessageStore">
<constructor-arg ref="redisConnectionFactory"/>
</bean>

<int:channel id="somePersistentQueueChannel">
<int:queue message-store="redisMessageStore"/>
<int:channel>
xml

用于存储数据的键的形式为:<storeBeanName>:<channelId>(在前面的例子中,redisMessageStore:somePersistentQueueChannel)。

此外,还提供了一个子类 RedisChannelPriorityMessageStore。当你将它与 QueueChannel 一起使用时,消息会按照(FIFO)优先级顺序接收。它使用标准的 IntegrationMessageHeaderAccessor.PRIORITY 头,并支持优先级值(0 - 9)。具有其他优先级的消息(以及没有指定优先级的消息)会在任何具有优先级的消息之后按 FIFO 顺序获取。

important

这些存储仅实现 BasicMessageGroupStore,并未实现 MessageGroupStore。它们只能用于支持 QueueChannel 等情况。

Redis 元数据存储

Spring Integration 3.0 引入了一个新的基于 Redis 的 MetadataStore (参见 Metadata Store)实现。你可以使用 RedisMetadataStore 在应用程序重启之间保持 MetadataStore 的状态。你可以将这种新的 MetadataStore 实现与适配器(例如:)一起使用。

要指示这些适配器使用新的 RedisMetadataStore,声明一个名为 metadataStore 的 Spring bean 。Feed 入站通道适配器和 feed 入站通道适配器都会自动拾取并使用声明的 RedisMetadataStore。以下示例展示了如何声明此类 bean:

<bean name="metadataStore" class="o.s.i.redis.store.metadata.RedisMetadataStore">
<constructor-arg name="connectionFactory" ref="redisConnectionFactory"/>
</bean>
xml

RedisMetadataStoreRedisProperties 支持。与之交互使用 BoundHashOperations,这反过来又需要一个 key 用于整个 Properties 存储。在 MetadataStore 的情况下,这个 key 起到区域的作用,在分布式环境中非常有用,当多个应用程序使用同一台 Redis 服务器时。默认情况下,这个 key 的值为 MetaData

从 4.0 版开始,此存储实现了 ConcurrentMetadataStore,允许它在多个应用程序实例之间可靠地共享,其中只允许一个实例存储或修改键的值。

important

你不能在 Redis 集群中使用 RedisMetadataStore.replace()(例如,在 AbstractPersistentAcceptOnceFileListFilter 中),因为目前不支持用于原子性的 WATCH 命令。

Redis 存储 inbound 通道适配器

Redis 存储 inbound 通道适配器是一个轮询消费者,它从 Redis 集合中读取数据并将其作为 Message 负载发送。以下示例展示了如何配置 Redis 存储 inbound 通道适配器:

<int-redis:store-inbound-channel-adapter id="listAdapter"
connection-factory="redisConnectionFactory"
key="myCollection"
channel="redisChannel"
collection-type="LIST" >
<int:poller fixed-rate="2000" max-messages-per-poll="10"/>
</int-redis:store-inbound-channel-adapter>
xml

前面的例子展示了如何使用 store-inbound-channel-adapter 元素来配置 Redis 存储入站通道适配器,为各种属性提供值,例如:

  • keykey-expression: 正在使用的集合的键名称。

  • collection-type: 该适配器支持的集合类型的枚举。支持的集合有 LISTSETZSETPROPERTIESMAP

  • connection-factory: 引用 o.s.data.redis.connection.RedisConnectionFactory 的一个实例。

  • redis-template: 引用 o.s.data.redis.core.RedisTemplate 的一个实例。

  • 其他在所有其他入站适配器中共有的属性(例如 'channel')。

备注

你不能同时设置 redis-templateconnection-factory

important

默认情况下,适配器使用 StringRedisTemplate。这会为键、值、哈希键和哈希值使用 StringRedisSerializer 实例。如果您的 Redis 存储包含使用其他技术序列化的对象,则必须提供配置了适当序列化程序的 RedisTemplate。例如,如果存储是通过设置了 extract-payload-elementsfalse 的 Redis 存储输出适配器写入的,则必须提供如下配置的 RedisTemplate

<bean id="redisTemplate" class="org.springframework.data.redis.core.RedisTemplate">
<property name="connectionFactory" ref="redisConnectionFactory"/>
<property name="keySerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
</property>
<property name="hashKeySerializer">
<bean class="org.springframework.data.redis.serializer.StringRedisSerializer"/>
</property>
</bean>
xml

RedisTemplate 对于键和哈希键使用 String 序列化程序,并且对于值和哈希值使用默认的 JDK 序列化程序。

因为 key 有一个字面值,前面的例子相对简单和静态。有时,您可能需要根据某些条件在运行时更改键的值。为此,请使用 key-expression,其中提供的表达式可以是任何有效的 SpEL 表达式。

此外,您可能希望对从 Redis 集合读取的成功处理的数据进行一些后处理。例如,您可能希望在处理完值之后移动或删除该值。您可以通过使用 Spring Integration 2.2 添加的事务同步功能来实现。以下示例使用 key-expression 和事务同步:

<int-redis:store-inbound-channel-adapter id="zsetAdapterWithSingleScoreAndSynchronization"
connection-factory="redisConnectionFactory"
key-expression="'presidents'"
channel="otherRedisChannel"
auto-startup="false"
collection-type="ZSET">
<int:poller fixed-rate="1000" max-messages-per-poll="2">
<int:transactional synchronization-factory="syncFactory"/>
</int:poller>
</int-redis:store-inbound-channel-adapter>

<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="payload.removeByScore(18, 18)"/>
</int:transaction-synchronization-factory>

<bean id="transactionManager" class="o.s.i.transaction.PseudoTransactionManager"/>
xml

你可以通过使用 transactional 元素来声明你的轮询器是事务性的。这个元素可以引用一个真实的事务管理器(例如,如果你的流程的其他部分调用了 JDBC)。如果你没有“真实”的事务,你可以使用 o.s.i.transaction.PseudoTransactionManager,它是 Spring 的 PlatformTransactionManager 的一种实现,在没有实际事务的情况下,它能够启用 Redis 适配器的事务同步功能。

important

这并不会使 Redis 操作本身具有事务性。它允许在成功(提交)之前或之后、或失败(回滚)之后进行动作的同步。

一旦你的轮询器是事务性的,你可以在 transactional 元素上设置 o.s.i.transaction.TransactionSynchronizationFactory 的一个实例。TransactionSynchronizationFactory 创建 TransactionSynchronization 的一个实例。为了方便起见,我们已经暴露了一个默认的基于 SpEL 的 TransactionSynchronizationFactory,它允许你配置 SpEL 表达式,其执行与事务协调(同步)。支持 before-commit、after-commit 和 after-rollback 事件的表达式,以及每个事件类型的通道,在这些通道中发送评估结果(如果有)。对于每个子元素,你可以指定 expressionchannel 属性。如果只存在 channel 属性,则接收的消息将作为特定同步场景的一部分发送到那里。如果只存在 expression 属性且表达式的结果是非空值,则会生成一条以结果为有效负载的消息,并将其发送到默认通道 (NullChannel),并在日志中显示(在 DEBUG 级别)。如果你想让评估结果发送到特定的通道,请添加一个 channel 属性。如果表达式的结果是 null 或 void,则不会生成消息。

RedisStoreMessageSource 添加了一个 store 属性,该属性将一个 RedisStore 实例绑定到事务 IntegrationResourceHolder,可以从 TransactionSynchronizationProcessor 实现中访问该实例。

有关事务同步的更多信息,请参阅事务同步

RedisStore 外发通道适配器

RedisStore outbound 通道适配器允许您将消息有效负载写入 Redis 集合,如下例所示:

<int-redis:store-outbound-channel-adapter id="redisListAdapter"
collection-type="LIST"
channel="requestChannel"
key="myCollection" />
xml

上述配置使用 store-inbound-channel-adapter 元素配置了一个 Redis 存储 outbound 通道适配器。它为各种属性提供了值,例如:

  • keykey-expression:正在使用的集合的键名称。

  • extract-payload-elements:如果设置为 true(默认值)且有效负载是“多值”对象(即 CollectionMap)的实例,则使用“addAll”和“putAll”语义进行存储。否则,如果设置为 false,则无论有效负载的类型如何,都会将其作为单个条目存储。如果有效负载不是“多值”对象的实例,则忽略此属性的值,并且始终将有效负载作为单个条目存储。

  • collection-type:此适配器支持的 Collection 类型的枚举。支持的集合有 LISTSETZSETPROPERTIESMAP

  • map-key-expression:返回要存储的条目的键名称的 SpEL 表达式。仅当 collection-typeMAPPROPERTIES 且 'extract-payload-elements' 为 false 时适用。

  • connection-factory:对 o.s.data.redis.connection.RedisConnectionFactory 实例的引用。

  • redis-template:对 o.s.data.redis.core.RedisTemplate 实例的引用。

  • 其他在所有其他入站适配器中通用的属性(例如 'channel')。

备注

你不能同时设置 redis-templateconnection-factory

important

默认情况下,适配器使用 StringRedisTemplate。这会为键、值、哈希键和哈希值使用 StringRedisSerializer 实例。但是,如果将 extract-payload-elements 设置为 false,则会使用一个具有 StringRedisSerializer 实例的 RedisTemplate 用于键和哈希键,以及具有 JdkSerializationRedisSerializer 实例的 RedisTemplate 用于值和哈希值。使用 JDK 序列化程序时,重要的是要理解 Java 序列化会用于所有值,无论该值实际上是否为集合。如果您需要对值的序列化有更多的控制,请考虑提供自己的 RedisTemplate 而不是依赖这些默认设置。

因为 key 和其他属性有字面值,前面的例子相对简单和静态。有时,您可能需要根据某些条件在运行时动态更改这些值。为此,请使用它们的 -expression 等价物(key-expressionmap-key-expression 等),提供的表达式可以是任何有效的 SpEL 表达式。

Redis 外发命令网关

Spring Integration 4.0 引入了 Redis 命令网关,允许你通过使用通用的 RedisConnection#execute 方法执行任何标准的 Redis 命令。以下列表显示了 Redis 外向网关可用的属性:

<int-redis:outbound-gateway
request-channel="" // <1>
reply-channel="" // <2>
requires-reply="" // <3>
reply-timeout="" // <4>
connection-factory="" // <5>
redis-template="" // <6>
arguments-serializer="" // <7>
command-expression="" // <8>
argument-expressions="" // <9>
use-command-variable="" // <10>
arguments-strategy="" /> // <11>
xml
  • 该端点接收 Message 实例的 MessageChannel

  • 该端点发送回复 Message 实例的 MessageChannel

  • 指定此出站网关是否必须返回非空值。默认为 true。当 Redis 返回 null 值时,会抛出 ReplyRequiredException

  • 等待回复消息发送的时间(以毫秒为单位)。通常应用于基于队列的有限回复通道。

  • 引用一个 RedisConnectionFactory bean。默认为 redisConnectionFactory。与 'redis-template' 属性互斥。

  • 引用一个 RedisTemplate bean。与 'connection-factory' 属性互斥。

  • 引用 org.springframework.data.redis.serializer.RedisSerializer 的一个实例。如果需要,它用于将每个命令参数序列化为 byte[]。

  • 返回命令键的 SpEL 表达式。默认为 redis_command 消息头。不得计算为 null

  • 作为命令参数评估的逗号分隔 SpEL 表达式。与 arguments-strategy 属性互斥。如果您未提供任一属性,则使用 payload 作为命令参数。参数表达式可以计算为 'null' 以支持可变数量的参数。

  • 一个 boolean 标志,用于指定在 o.s.i.redis.outbound.ExpressionArgumentsStrategy 中进行表达式求值上下文时,是否将评估的 Redis 命令字符串作为 #cmd 变量提供。当配置了 argument-expressions 时,否则忽略此属性。

  • 引用 o.s.i.redis.outbound.ArgumentsStrategy 的一个实例。与 argument-expressions 属性互斥。如果您未提供任一属性,则使用 payload 作为命令参数。

你可以使用 <int-redis:outbound-gateway> 作为通用组件来执行任何所需的 Redis 操作。以下示例展示了如何从 Redis 原子数字获取递增的值:

<int-redis:outbound-gateway request-channel="requestChannel"
reply-channel="replyChannel"
command-expression="'INCR'"/>
xml

Message 负载应该有一个名为 redisCounter 的名称,该名称可能由 org.springframework.data.redis.support.atomic.RedisAtomicInteger bean 定义提供。

RedisConnection#execute 方法的返回类型是一个通用的 Object。实际结果取决于命令类型。例如,MGET 返回一个 List<byte[]>。有关命令、其参数和结果类型的更多信息,请参阅 Redis 规范

Redis 队列 outbound 网关

Spring Integration 引入了 Redis 队列 outbound 网关来执行请求和回复场景。它将对话的 UUID 推送到提供的 queue,将该 UUID 作为键的值推送到 Redis 列表,并等待来自键为 UUID.reply 的 Redis 列表的回复。每次交互都使用不同的 UUID。以下列表显示了 Redis outbound 网关的可用属性:

<int-redis:queue-outbound-gateway
request-channel="" // <1>
reply-channel="" // <2>
requires-reply="" // <3>
reply-timeout="" // <4>
connection-factory="" // <5>
queue="" // <6>
order="" // <7>
serializer="" // <8>
extract-payload=""/> // <9>
xml
  • 该端点从中接收 Message 实例的 MessageChannel

  • 该端点发送回复 Message 实例的 MessageChannel

  • 指定此出站网关是否必须返回非空值。此值默认为 false。否则,当 Redis 返回 null 值时,将抛出 ReplyRequiredException

  • 等待回复消息发送的超时时间(以毫秒为单位)。它通常应用于基于队列的有限回复通道。

  • RedisConnectionFactory bean 的引用。默认为 redisConnectionFactory。它与 'redis-template' 属性互斥。

  • 出站网关发送对话 UUID 的 Redis 列表名称。

  • 当注册多个网关时,此出站网关的顺序。

  • RedisSerializer bean 引用。它可以是一个空字符串,这意味着“没有序列化器”。在这种情况下,来自入站 Redis 消息的原始 byte[] 将作为 Message 负载发送到 channel。默认情况下,它是 JdkSerializationRedisSerializer

  • 指定此端点是否期望从 Redis 队列接收到完整的 Message 实例数据。如果此属性设置为 true,则 serializer 不能是空字符串,因为消息需要某种形式的反序列化(默认为 JDK 序列化)。

Redis 队列 inbound 网关

Spring Integration 4.1 引入了 Redis 队列入站网关,以执行请求和回复场景。它从提供的 queue 弹出一个会话 UUID,从 Redis 列表中弹出该 UUID 作为键的值,并将回复推送到 Redis 列表,其键为 UUID 加上 .reply。以下列表显示了 Redis 队列入站网关的可用属性:

<int-redis:queue-inbound-gateway
request-channel="" // <1>
reply-channel="" // <2>
executor="" // <3>
reply-timeout="" // <4>
connection-factory="" // <5>
queue="" // <6>
order="" // <7>
serializer="" // <8>
receive-timeout="" // <9>
expect-message="" // <10>
recovery-interval=""/> // <11>
xml
  • 该端点发送从 Redis 数据创建的 Message 实例的 MessageChannel

  • 此端点等待回复 Message 实例的 MessageChannel。可选 - replyChannel 头仍然在使用。

  • 对 Spring TaskExecutor(或标准 JDK Executor)bean 的引用。它用于底层监听任务。默认为 SimpleAsyncTaskExecutor

  • 等待回复消息发送的时间(以毫秒为单位)。通常应用于基于队列的有限回复通道。

  • RedisConnectionFactory bean 的引用。默认为 redisConnectionFactory。与 'redis-template' 属性互斥。

  • 会话 UUID 的 Redis 列表名称。

  • 当注册多个网关时,此入站网关的顺序。

  • RedisSerializer bean 引用。它可以是一个空字符串,这意味着“没有序列化”。在这种情况下,来自入站 Redis 消息的原始 byte[] 作为 Message 载荷发送到 channel。默认为 JdkSerializationRedisSerializer。(请注意,在 4.3 版之前的版本中,默认为 StringRedisSerializer。要恢复该行为,请提供对 StringRedisSerializer 的引用)。

  • 等待接收消息获取的时间(以毫秒为单位)。通常应用于基于队列的有限请求通道。

  • 指定此端点是否期望 Redis 队列中的数据包含完整的 Message 实例。如果将此属性设置为 true,则 serializer 不能是空字符串,因为消息需要某种形式的反序列化(默认为 JDK 序列化)。

  • 在“右弹出”操作出现异常后,监听任务重新启动前应休眠的时间(以毫秒为单位)。

important

task-executor 必须配置为使用多于一个线程进行处理;否则当 RedisQueueMessageDrivenEndpoint 在错误后尝试重新启动监听任务时,可能会发生死锁。可以使用 errorChannel 来处理这些错误,以避免重启,但最好还是不要让应用程序暴露在可能的死锁情况下。有关 TaskExecutor 实现,请参阅 Spring Framework 参考手册

Redis Stream 外发通道适配器

Spring Integration 5.4 引入了响应式 Redis Stream 输出通道适配器,用于将消息有效负载写入 Redis 流。输出通道适配器使用 ReactiveStreamOperations.add(…​)Record 添加到流中。以下示例展示了如何使用 Java 配置和服务类来配置 Redis Stream 输出通道适配器。

@Bean
@ServiceActivator(inputChannel = "messageChannel")
public ReactiveRedisStreamMessageHandler reactiveValidatorMessageHandler(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageHandler reactiveStreamMessageHandler =
new ReactiveRedisStreamMessageHandler(reactiveRedisConnectionFactory, "myStreamKey"); 1
reactiveStreamMessageHandler.setSerializationContext(serializationContext); 2
reactiveStreamMessageHandler.setHashMapper(hashMapper); 3
reactiveStreamMessageHandler.setExtractPayload(true); 4
return reactiveStreamMessageHandler;
}
java
  • 使用 ReactiveRedisConnectionFactory 和流名称来构建 ReactiveRedisStreamMessageHandler 的实例以添加记录。另一个构造函数变体是基于 SpEL 表达式来针对请求消息评估流键。

  • 设置用于在将记录键和值添加到流之前进行序列化的 RedisSerializationContext

  • 设置 HashMapper,它提供了 Java 类型和 Redis 哈希/映射之间的契约。

  • 如果为 'true',通道适配器将从请求消息中提取有效负载以作为要添加的流记录。或者使用整个消息作为值。默认值为 true

Redis Stream 输入通道适配器

Spring Integration 5.4 引入了用于从 Redis 流读取消息的反应式流 inbound 通道适配器。inbound 通道适配器根据自动确认标志使用 StreamReceiver.receive(…​)StreamReceiver.receiveAutoAck() 从 Redis 流读取记录。以下示例展示了如何使用 Java 配置来设置 Redis 流 Inbound 通道适配器。

@Bean
public ReactiveRedisStreamMessageProducer reactiveRedisStreamProducer(
ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
ReactiveRedisStreamMessageProducer messageProducer =
new ReactiveRedisStreamMessageProducer(reactiveRedisConnectionFactory, "myStreamKey"); 1
messageProducer.setStreamReceiverOptions( 2
StreamReceiver.StreamReceiverOptions.builder()
.pollTimeout(Duration.ofMillis(100))
.build());
messageProducer.setAutoStartup(true); 3
messageProducer.setAutoAck(false); 4
messageProducer.setCreateConsumerGroup(true); 5
messageProducer.setConsumerGroup("my-group"); 6
messageProducer.setConsumerName("my-consumer"); 7
messageProducer.setOutputChannel(fromRedisStreamChannel); 8
messageProducer.setReadOffset(ReadOffset.latest()); 9
messageProducer.extractPayload(true); 10
return messageProducer;
}
java
  • 使用 ReactiveRedisConnectionFactory 和流键来构造 ReactiveRedisStreamMessageProducer 的实例以读取记录。

  • 一个 StreamReceiver.StreamReceiverOptions,用于通过响应式基础设施消费 Redis 流。

  • 一个 SmartLifecycle 属性,用于指定此端点是否应在应用程序上下文启动后自动启动。默认值为 true。如果为 false,则应手动启动 RedisStreamMessageProducermessageProducer.start()

  • 如果为 false,接收到的消息不会自动确认。消息的确认将推迟到消费消息的客户端进行。默认值为 true

  • 如果为 true,将创建一个消费者组。在创建消费者组时,如果流还不存在,也会创建流。消费者组跟踪消息传递并区分不同的消费者。默认值为 false

  • 设置消费者组名称。默认值为定义的 bean 名称。

  • 设置消费者名称。从组 my-group 中以 my-consumer 的身份读取消息。

  • 从此端点发送消息的消息通道。

  • 定义读取消息的偏移量。默认值为 ReadOffset.latest()

  • 如果为 'true',通道适配器将从 Record 中提取有效负载值。否则,整个 Record 将作为有效负载使用。默认值为 true

如果 autoAck 设置为 false,Redis Stream 中的 Record 不会由 Redis 驱动程序自动确认,而是会在消息中添加一个 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 头,其值为一个 SimpleAcknowledgment 实例。目标集成流有责任在根据该记录处理完消息的业务逻辑后调用它的 acknowledge() 回调。即使在反序列化过程中发生异常且配置了 errorChannel 时,也需要类似的逻辑。因此,目标错误处理程序必须决定对失败的消息进行确认(ack)或拒绝确认(nack)。除了 IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK 之外,ReactiveRedisStreamMessageProducer 还会将这些头填充到要生成的消息中:RedisHeaders.STREAM_KEYRedisHeaders.STREAM_MESSAGE_IDRedisHeaders.CONSUMER_GROUPRedisHeaders.CONSUMER

从 5.5 版本开始,您可以在 ReactiveRedisStreamMessageProducer 上显式配置 StreamReceiver.StreamReceiverOptionsBuilder 选项,包括新引入的 onErrorResume 函数,如果 Redis Stream 消费者在反序列化错误发生时应继续轮询,则需要此函数。默认函数会向错误通道(如果提供)发送消息,并可能确认失败的消息,如上所述。所有这些 StreamReceiver.StreamReceiverOptionsBuilder 与外部提供的 StreamReceiver.StreamReceiverOptions 是互斥的。

Redis 锁注册表

Spring Integration 4.0 引入了 RedisLockRegistry。某些组件(例如,聚合器和重排序器)使用从 LockRegistry 实例获取的锁来确保一次只有一个线程操作一个组。DefaultLockRegistry 在单个组件内执行此功能。您现在可以配置这些组件上的外部锁注册表。当您将其与共享的 MessageGroupStore 一起使用时,可以使用 RedisLockRegistry 跨多个应用程序实例提供此功能,从而确保一次只有一个实例可以操作该组。

当一个锁由本地线程释放时,另一个本地线程通常可以立即获得该锁。如果一个锁是由使用不同注册表实例的线程释放的,则可能需要最多 100 毫秒才能获得该锁。

为了避免“挂起”的锁(当服务器故障时),此注册表中的锁在默认 60 秒后过期,但您可以在注册表上配置此值。锁通常持有的时间要短得多。

important

由于键可以过期,尝试解锁已过期的锁会导致抛出异常。然而,此类锁保护的资源可能已被破坏,因此这些异常应被视为严重的。您应该将过期时间设置为足够大的值以防止这种情况发生,但也要设置得足够低,以便在服务器故障后可以在合理的时间内恢复锁。

从 5.0 版本开始,RedisLockRegistry 实现了 ExpirableLockRegistry,这会移除超过 age 时间之前获取且当前未被锁定的锁。

从 5.5.6 版本开始,RedisLockRegistry 支持通过 RedisLockRegistry.setCacheCapacity() 自动清理 RedisLockRegistry.locks 中的 redisLocks 缓存。更多信息请参阅其 JavaDocs。

从版本 5.5.13 开始,RedisLockRegistry 暴露了一个 setRedisLockType(RedisLockType) 选项,用于确定 Redis 锁获取应该发生在哪种模式下:

  • RedisLockType.SPIN_LOCK - 通过周期性循环(100 毫秒)检查是否可以获取锁来获得锁。默认。

  • RedisLockType.PUB_SUB_LOCK - 通过 Redis 发布订阅机制获取锁。

发布-订阅模式是优选模式——客户端与 Redis 服务器之间的网络通信较少,性能更高——当一个进程中的订阅收到解锁通知时,锁可以立即被获取。然而,Redis 不支持在 Master/Replica 连接中使用发布-订阅(例如在 AWS ElastiCache 环境中),因此选择忙等模式作为默认模式,以确保注册表能够在任何环境中正常工作。

从 6.4 版本开始,RedisLockRegistry.RedisLock.unlock() 方法不再抛出 IllegalStateException,如果锁的持有权已过期,则会抛出 ConcurrentModificationException

从 6.4 版本开始,添加了 RedisLockRegistry.setRenewalTaskScheduler() 用于配置锁的定期续期调度器。当设置了该调度器后,锁将在成功获取后每 1/3 的过期时间自动续期一次,直到解锁或 Redis 键被删除。