Redis分布式锁的可靠性
Redis 的五种数据结构各有特色,用对了才能发挥它的优势。很多人只用到了 String 和 Hash,却不知道 List、Set、ZSet 在特定场景下更合适。本文从应用场景出发,讲什么时候用什么类型。
一、Redis分布式锁的基本问题
#
1.1 简单SETNX的问题
public boolean lock(String key, String value) { Boolean locked = redis.setnx(key, value); if (locked) { redis.expire(key, 30); } return locked; }
|
问题:
- SETNX和EXPIRE不是原子操作,中间可能崩溃
- 没有唯一标识,可能误删别人的锁
#
1.2 改进版(SET NX EX)
public boolean lock(String key, String value, int seconds) { Boolean locked = redis.opsForValue() .setIfAbsent(key, value, seconds, TimeUnit.SECONDS); return Boolean.TRUE.equals(locked); }
|
仍然存在的问题:
- 锁超时后业务还没执行完
- 主从切换时锁丢失
- 不可重入
二、锁超时问题
#
2.1 超时导致的问题
线程A获取锁,过期时间30秒 │ ├── 执行业务逻辑(需要40秒) │ ├── 30秒时,锁自动过期 │ └── 线程B获取锁 │ ├── 线程A继续执行(已没有锁) │ └── 线程A执行unlock └── 删除了线程B的锁!
|
#
2.2 解决方案一:唯一标识+Lua删除
public class SafeRedisLock { @Autowired private StringRedisTemplate redis; private static final String UNLOCK_SCRIPT = "if redis.call('get', KEYS[1]) == ARGV[1] then " + "return redis.call('del', KEYS[1]) " + "else return 0 end"; public boolean lock(String key, int seconds) { String value = UUID.randomUUID().toString(); Boolean locked = redis.opsForValue() .setIfAbsent(key, value, seconds, TimeUnit.SECONDS); if (Boolean.TRUE.equals(locked)) { LockContext.set(key, value); return true; } return false; } public void unlock(String key) { String value = LockContext.get(key); if (value != null) { redis.execute(new DefaultRedisScript<>(UNLOCK_SCRIPT, Long.class), Collections.singletonList(key), value); LockContext.remove(key); } } }
|
#
2.3 解决方案二:看门狗续期
public void businessWithLock() { RLock lock = redisson.getLock("myLock"); try { lock.lock(); doBusiness(); } finally { lock.unlock(); } }
|
#
2.4 解决方案三:业务监控+手动续期
@Service public class ManualRenewalLock { @Autowired private StringRedisTemplate redis; private ScheduledExecutorService executor = Executors.newScheduledThreadPool(10); public void lockWithRenewal(String key, String value, int expireSeconds) { Boolean locked = redis.opsForValue() .setIfAbsent(key, value, expireSeconds, TimeUnit.SECONDS); if (!Boolean.TRUE.equals(locked)) { throw new RuntimeException("获取锁失败"); } ScheduledFuture<?> future = executor.scheduleAtFixedRate(() -> { String script = "if redis.call('get', KEYS[1]) == ARGV[1] then " + "return redis.call('expire', KEYS[1], ARGV[2]) " + "else return 0 end"; redis.execute(new DefaultRedisScript<>(script, Long.class), Collections.singletonList(key), value, String.valueOf(expireSeconds)); }, expireSeconds / 3, expireSeconds / 3, TimeUnit.SECONDS); try { doBusiness(); } finally { future.cancel(false); unlock(key, value); } } }
|
三、主从切换问题
#
3.1 问题场景
场景:Redis主从架构
1. 客户端A在Master上获取锁 SET mylock value NX EX 30 2. Master还没同步到Slave就宕机
3. Slave提升为新的Master
4. 客户端B在新Master上获取锁 SET mylock value2 NX EX 30 ← 成功!
结果:客户端A和B同时持有锁!
|
#
3.2 RedLock算法
Redis作者Antirez提出的RedLock算法:
RedLock需要在多个独立的Redis实例上获取锁:
┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────┐ │Redis A │ │Redis B │ │Redis C │ │Redis D │ │Redis E │ │:6379 │ │:6380 │ │:6381 │ │:6382 │ │:6383 │ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘ │ │ │ │ │ └────────────┴────────────┼────────────┴────────────┘ │ 客户端尝试在所有节点获取锁
|
算法步骤:
- 获取当前时间戳(毫秒)
- 依次在N个Redis实例上尝试获取锁
- 使用相同的key和value
- 设置超时时间(如100ms)
- 如果某个实例获取失败,立即尝试下一个
- 计算获取锁的总耗时
- 如果成功获取了大多数实例的锁(如5个中获取3个),且总耗时小于锁过期时间,则认为获取锁成功
- 如果获取失败,在所有实例上释放锁
#
3.3 Redisson实现RedLock
@Configuration public class RedLockConfig { @Bean public RedissonRedLock redLock() { Config config1 = new Config(); config1.useSingleServer().setAddress("redis://192.168.1.101:6379"); RedissonClient redisson1 = Redisson.create(config1); Config config2 = new Config(); config2.useSingleServer().setAddress("redis://192.168.1.102:6379"); RedissonClient redisson2 = Redisson.create(config2); Config config3 = new Config(); config3.useSingleServer().setAddress("redis://192.168.1.103:6379"); RedissonClient redisson3 = Redisson.create(config3); RLock lock1 = redisson1.getLock("lock"); RLock lock2 = redisson2.getLock("lock"); RLock lock3 = redisson3.getLock("lock"); return new RedissonRedLock(lock1, lock2, lock3); } }
@Service public class RedLockService { @Autowired private RedissonRedLock redLock; public void doBusiness() { try { boolean locked = redLock.tryLock(10, 30, TimeUnit.SECONDS); if (locked) { try { } finally { redLock.unlock(); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
|
#
3.4 RedLock的争议
Martin Kleppmann(《 Designing Data-Intensive Applications》作者)对RedLock的批评:
- 时钟问题:如果客户端时钟偏移,可能导致锁过期判断错误
- GC暂停:JVM GC可能导致客户端”假死”,锁已过期但客户端不知道
- 网络延迟:获取锁的网络延迟可能影响判断
替代方案:
- 使用ZooKeeper或etcd(基于CP的系统)
- 使用数据库悲观锁
四、可重入性问题
#
4.1 问题
public void methodA() { lock("mylock"); methodB(); unlock("mylock"); }
public void methodB() { lock("mylock"); unlock("mylock"); }
|
#
4.2 Redisson的可重入实现
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;
return redis.call('pttl', KEYS[1]);
|
key设计:
mylock: { "UUID:threadId": 2, -- 重入计数 }
|
五、锁的性能优化
#
5.1 锁粒度
RLock lock = redisson.getLock("lock:order:" + orderId);
RLock lock = redisson.getLock("lock:orders");
|
#
5.2 锁等待策略
boolean locked = lock.tryLock();
boolean locked = lock.tryLock(10, TimeUnit.SECONDS);
lock.lock();
|
#
5.3 分段锁
public class SegmentLock { private static final int SEGMENT_COUNT = 16; private RLock[] locks; public SegmentLock(RedissonClient redisson) { locks = new RLock[SEGMENT_COUNT]; for (int i = 0; i < SEGMENT_COUNT; i++) { locks[i] = redisson.getLock("segment:lock:" + i); } } private RLock getLock(Object key) { int index = key.hashCode() & (SEGMENT_COUNT - 1); return locks[index]; } public void lock(Object key) { getLock(key).lock(); } public void unlock(Object key) { getLock(key).unlock(); } }
|
六、分布式锁对比
| 特性 |
Redis |
ZooKeeper |
etcd |
| 一致性 |
AP |
CP |
CP |
| 性能 |
高 |
中 |
高 |
| 可靠性 |
中(需RedLock) |
高 |
高 |
| 实现复杂度 |
低 |
中 |
中 |
| 自动释放 |
支持(过期) |
支持(session) |
支持(lease) |
| 可重入 |
需自己实现 |
支持 |
需自己实现 |
| 顺序性 |
不支持 |
支持 |
支持 |
七、最佳实践
#
7.1 选择合适的锁实现
| 场景 |
推荐方案 |
| 一般分布式锁 |
Redisson |
| 对一致性要求极高 |
ZooKeeper/etcd |
| 简单场景 |
SET NX EX |
| 防止主从问题 |
RedLock或CP系统 |
#
7.2 锁使用规范
public class LockBestPractice { public void safeLock(String lockKey, Runnable business) { RLock lock = redisson.getLock(lockKey); boolean locked = false; try { locked = lock.tryLock(10, 30, TimeUnit.SECONDS); if (!locked) { throw new LockException("获取锁失败: " + lockKey); } business.run(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new LockException("获取锁被中断", e); } finally { if (locked && lock.isHeldByCurrentThread()) { lock.unlock(); } } } }
|
#
7.3 避免死锁
- 设置过期时间:防止程序崩溃导致锁不释放
- 使用try-finally:确保解锁
- 避免嵌套锁:如果必须,统一获取顺序
- 设置等待超时:避免无限等待
八、总结
Redis分布式锁的可靠性要点:
- 原子操作:使用SET NX EX或Lua脚本
- 唯一标识:防止误删他人锁
- 自动续期:看门狗机制防止业务未完成锁过期
- 主从问题:考虑RedLock或CP系统
- 可重入性:使用Hash结构记录重入计数
- 性能优化:控制锁粒度,使用分段锁
核心认识:
- Redis分布式锁适合大多数场景
- 对一致性要求极高的场景考虑ZooKeeper/etcd
- RedLock有争议,谨慎使用
- 正确的使用方式比选择工具更重要
核心要点
String:简单的键值对,适合缓存、计数器
Hash:存储对象属性,适合用户信息、配置
List:有序列表,适合消息队列、最新列表
Set:无序去重,适合共同好友、抽奖
ZSet:有序集合,适合排行榜、积分系统
总结
选择合适的数据结构是使用 Redis 的关键。在实际项目中,根据业务需求选择合适的类型,可以提升性能和开发效率。