Redis分布式锁的可靠性

Redis分布式锁的可靠性

Redis 的五种数据结构各有特色,用对了才能发挥它的优势。很多人只用到了 String 和 Hash,却不知道 List、Set、ZSet 在特定场景下更合适。本文从应用场景出发,讲什么时候用什么类型。

一、Redis分布式锁的基本问题

#

1.1 简单SETNX的问题

// 最简单的分布式锁(有问题)
public boolean lock(String key, String value) {
// SET if Not eXists
Boolean locked = redis.setnx(key, value);
if (locked) {
// 问题1:如果这里程序崩溃,锁永远不会释放
redis.expire(key, 30);
}
return locked;
}

问题

  1. SETNX和EXPIRE不是原子操作,中间可能崩溃
  2. 没有唯一标识,可能误删别人的锁

#

1.2 改进版(SET NX EX)

// Redis 2.6.12+ 支持原子操作
public boolean lock(String key, String value, int seconds) {
// SET key value NX EX seconds
// NX: 只有key不存在时才设置
// EX: 设置过期时间
Boolean locked = redis.opsForValue()
.setIfAbsent(key, value, seconds, TimeUnit.SECONDS);
return Boolean.TRUE.equals(locked);
}

仍然存在的问题

  1. 锁超时后业务还没执行完
  2. 主从切换时锁丢失
  3. 不可重入

二、锁超时问题

#

2.1 超时导致的问题

线程A获取锁,过期时间30秒

├── 执行业务逻辑(需要40秒)

├── 30秒时,锁自动过期
│ └── 线程B获取锁

├── 线程A继续执行(已没有锁)

└── 线程A执行unlock
└── 删除了线程B的锁!

#

2.2 解决方案一:唯一标识+Lua删除

public class SafeRedisLock {

@Autowired
private StringRedisTemplate redis;

private static final String UNLOCK_SCRIPT =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
"return redis.call('del', KEYS[1]) " +
"else return 0 end";

public boolean lock(String key, int seconds) {
String value = UUID.randomUUID().toString();
Boolean locked = redis.opsForValue()
.setIfAbsent(key, value, seconds, TimeUnit.SECONDS);
if (Boolean.TRUE.equals(locked)) {
// 将value保存到ThreadLocal
LockContext.set(key, value);
return true;
}
return false;
}

public void unlock(String key) {
String value = LockContext.get(key);
if (value != null) {
redis.execute(new DefaultRedisScript<>(UNLOCK_SCRIPT, Long.class),
Collections.singletonList(key), value);
LockContext.remove(key);
}
}
}

#

2.3 解决方案二:看门狗续期

// Redisson的看门狗机制
public void businessWithLock() {
RLock lock = redisson.getLock("myLock");
try {
// 不指定leaseTime,启用看门狗
lock.lock();

// 业务逻辑执行很长时间
doBusiness();

} finally {
lock.unlock();
}
}

// 看门狗原理:
// 1. 获取锁后,启动定时任务
// 2. 每隔1/3过期时间续期
// 3. 业务完成解锁后,取消定时任务

#

2.4 解决方案三:业务监控+手动续期

@Service
public class ManualRenewalLock {

@Autowired
private StringRedisTemplate redis;
private ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);

public void lockWithRenewal(String key, String value, int expireSeconds) {
// 获取锁
Boolean locked = redis.opsForValue()
.setIfAbsent(key, value, expireSeconds, TimeUnit.SECONDS);

if (!Boolean.TRUE.equals(locked)) {
throw new RuntimeException("获取锁失败");
}

// 启动续期任务
ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> {
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
"return redis.call('expire', KEYS[1], ARGV[2]) " +
"else return 0 end";

redis.execute(new DefaultRedisScript<>(script, Long.class),
Collections.singletonList(key), value, String.valueOf(expireSeconds));
}, expireSeconds / 3, expireSeconds / 3, TimeUnit.SECONDS);

try {
doBusiness();
} finally {
// 取消续期
future.cancel(false);
// 释放锁
unlock(key, value);
}
}
}

三、主从切换问题

#

3.1 问题场景

场景:Redis主从架构

1. 客户端A在Master上获取锁
SET mylock value NX EX 30

2. Master还没同步到Slave就宕机

3. Slave提升为新的Master

4. 客户端B在新Master上获取锁
SET mylock value2 NX EX 30 ← 成功!

结果:客户端A和B同时持有锁!

#

3.2 RedLock算法

Redis作者Antirez提出的RedLock算法:

RedLock需要在多个独立的Redis实例上获取锁:

┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐
│Redis A │ │Redis B │ │Redis C │ │Redis D │ │Redis E │
│:6379 │ │:6380 │ │:6381 │ │:6382 │ │:6383 │
└────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘
│ │ │ │ │
└────────────┴────────────┼────────────┴────────────┘

客户端尝试在所有节点获取锁

算法步骤

  1. 获取当前时间戳(毫秒)
  2. 依次在N个Redis实例上尝试获取锁
    • 使用相同的key和value
    • 设置超时时间(如100ms)
    • 如果某个实例获取失败,立即尝试下一个
  3. 计算获取锁的总耗时
  4. 如果成功获取了大多数实例的锁(如5个中获取3个),且总耗时小于锁过期时间,则认为获取锁成功
  5. 如果获取失败,在所有实例上释放锁

#

3.3 Redisson实现RedLock

@Configuration
public class RedLockConfig {

@Bean
public RedissonRedLock redLock() {
Config config1 = new Config();
config1.useSingleServer().setAddress("redis://192.168.1.101:6379");
RedissonClient redisson1 = Redisson.create(config1);

Config config2 = new Config();
config2.useSingleServer().setAddress("redis://192.168.1.102:6379");
RedissonClient redisson2 = Redisson.create(config2);

Config config3 = new Config();
config3.useSingleServer().setAddress("redis://192.168.1.103:6379");
RedissonClient redisson3 = Redisson.create(config3);

RLock lock1 = redisson1.getLock("lock");
RLock lock2 = redisson2.getLock("lock");
RLock lock3 = redisson3.getLock("lock");

return new RedissonRedLock(lock1, lock2, lock3);
}
}

@Service
public class RedLockService {

@Autowired
private RedissonRedLock redLock;

public void doBusiness() {
try {
boolean locked = redLock.tryLock(10, 30, TimeUnit.SECONDS);
if (locked) {
try {
// 执行业务
} finally {
redLock.unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

#

3.4 RedLock的争议

Martin Kleppmann(《 Designing Data-Intensive Applications》作者)对RedLock的批评:

  1. 时钟问题:如果客户端时钟偏移,可能导致锁过期判断错误
  2. GC暂停:JVM GC可能导致客户端”假死”,锁已过期但客户端不知道
  3. 网络延迟:获取锁的网络延迟可能影响判断

替代方案

  • 使用ZooKeeper或etcd(基于CP的系统)
  • 使用数据库悲观锁

四、可重入性问题

#

4.1 问题

// 简单的SETNX锁不可重入
public void methodA() {
lock("mylock"); // 获取锁成功
methodB();
unlock("mylock");
}

public void methodB() {
lock("mylock"); // 获取锁失败(已经被methodA获取)
// ...
unlock("mylock");
}

#

4.2 Redisson的可重入实现

-- 加锁(可重入)
if (redis.call('exists', KEYS[1]) == 0) then
-- 锁不存在,创建hash
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
-- 锁存在且是当前线程,重入计数+1
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;

return redis.call('pttl', KEYS[1]);

key设计

mylock: {
"UUID:threadId": 2, -- 重入计数
}

五、锁的性能优化

#

5.1 锁粒度

// 好的做法:细粒度锁
RLock lock = redisson.getLock("lock:order:" + orderId);

// 不好的做法:粗粒度锁
RLock lock = redisson.getLock("lock:orders");

#

5.2 锁等待策略

// 立即返回
boolean locked = lock.tryLock();

// 等待一段时间
boolean locked = lock.tryLock(10, TimeUnit.SECONDS);

// 阻塞等待(默认)
lock.lock();

#

5.3 分段锁

// 将大锁拆分为多个小锁
public class SegmentLock {

private static final int SEGMENT_COUNT = 16;
private RLock[] locks;

public SegmentLock(RedissonClient redisson) {
locks = new RLock[SEGMENT_COUNT];
for (int i = 0; i < SEGMENT_COUNT; i++) {
locks[i] = redisson.getLock("segment:lock:" + i);
}
}

private RLock getLock(Object key) {
int index = key.hashCode() & (SEGMENT_COUNT - 1);
return locks[index];
}

public void lock(Object key) {
getLock(key).lock();
}

public void unlock(Object key) {
getLock(key).unlock();
}
}

六、分布式锁对比

特性 Redis ZooKeeper etcd
一致性 AP CP CP
性能
可靠性 中(需RedLock)
实现复杂度
自动释放 支持(过期) 支持(session) 支持(lease)
可重入 需自己实现 支持 需自己实现
顺序性 不支持 支持 支持

七、最佳实践

#

7.1 选择合适的锁实现

场景 推荐方案
一般分布式锁 Redisson
对一致性要求极高 ZooKeeper/etcd
简单场景 SET NX EX
防止主从问题 RedLock或CP系统

#

7.2 锁使用规范

public class LockBestPractice {

public void safeLock(String lockKey, Runnable business) {
RLock lock = redisson.getLock(lockKey);
boolean locked = false;

try {
// 1. 尝试获取锁,设置超时
locked = lock.tryLock(10, 30, TimeUnit.SECONDS);
if (!locked) {
throw new LockException("获取锁失败: " + lockKey);
}

// 2. 执行业务
business.run();

} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new LockException("获取锁被中断", e);
} finally {
// 3. 确保释放锁
if (locked && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}
}

#

7.3 避免死锁

  1. 设置过期时间:防止程序崩溃导致锁不释放
  2. 使用try-finally:确保解锁
  3. 避免嵌套锁:如果必须,统一获取顺序
  4. 设置等待超时:避免无限等待

八、总结

Redis分布式锁的可靠性要点:

  1. 原子操作:使用SET NX EX或Lua脚本
  2. 唯一标识:防止误删他人锁
  3. 自动续期:看门狗机制防止业务未完成锁过期
  4. 主从问题:考虑RedLock或CP系统
  5. 可重入性:使用Hash结构记录重入计数
  6. 性能优化:控制锁粒度,使用分段锁

核心认识:

  • Redis分布式锁适合大多数场景
  • 对一致性要求极高的场景考虑ZooKeeper/etcd
  • RedLock有争议,谨慎使用
  • 正确的使用方式比选择工具更重要

核心要点

  1. String:简单的键值对,适合缓存、计数器

  2. Hash:存储对象属性,适合用户信息、配置

  3. List:有序列表,适合消息队列、最新列表

  4. Set:无序去重,适合共同好友、抽奖

  5. ZSet:有序集合,适合排行榜、积分系统

总结

选择合适的数据结构是使用 Redis 的关键。在实际项目中,根据业务需求选择合适的类型,可以提升性能和开发效率。


   转载规则


《Redis分布式锁的可靠性》 小乐 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录