跳到主要内容

分布式锁

QWen Plus 中英对照 Distributed Locks

在许多情况下,针对某些上下文(甚至单个消息)的操作必须以独占方式进行。一个例子是聚合组件,我们必须检查当前消息的消息组状态,以确定我们是否可以释放该组或只是添加该消息以供将来考虑。为此,Java 提供了带有 java.util.concurrent.locks.Lock 实现的 API。然而,当应用程序是分布式的和/或在集群中运行时,问题变得更加复杂。在这种情况下,锁定具有挑战性,需要一些共享状态及其特定方法来实现独占性要求。

Spring Integration 提供了一个 LockRegistry 抽象,它有一个基于 ReentrantLock API 的内存中 DefaultLockRegistry 实现。LockRegistryobtain(Object) 方法需要一个特定上下文的 lock key。例如,聚合器使用 correlationKey 来锁定其组周围的操作。这样不同的锁可以并发使用。此 obtain(Object) 方法返回一个 java.util.concurrent.locks.Lock 实例(取决于 LockRegistry 实现),因此其余逻辑与标准 Java 并发算法相同。

从 6.2 版本开始,LockRegistry 提供了一个 executeLocked() API(此接口中的 default 方法)来在锁定时执行某些任务。此 API 的行为类似于广为人知的 JdbcTemplateJmsTemplateRestTemplate。以下示例演示了此 API 的用法:

LockRegistry registry = new DefaultLockRegistry();
...
registry.executeLocked("someLockKey", () -> someExclusiveResourceCall());
java

该方法重新抛出任务调用中的异常,如果 Lock 被中断,则抛出 InterruptedException。此外,带有 Duration 的变体在 lock.tryLock() 返回 false 时抛出 java.util.concurrent.TimeoutException

Spring Integration 提供了这些 LockRegistry 实现用于分布式锁:

Spring Integration AWS 扩展也实现了 DynamoDbLockRegistry