Redis分布式锁Redisson
Redis 的五种数据结构各有特色,用对了才能发挥它的优势。很多人只用到了 String 和 Hash,却不知道 List、Set、ZSet 在特定场景下更合适。本文从应用场景出发,讲什么时候用什么类型。
一、Redisson简介
#
1.1 什么是Redisson
Redisson是一个基于Redis的Java驻内存数据网格(In-Memory Data Grid),提供了:
- 分布式锁(Lock)
- 分布式集合(Map, Set, List)
- 分布式对象(Object, Bucket, AtomicLong)
- 分布式服务(Remote Service, Live Object Service)
- 分布式并发工具(Semaphore, CountDownLatch)
#
1.2 引入依赖
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.23.5</version> </dependency>
|
#
1.3 配置连接
spring: redis: redisson: config: | singleServerConfig: address: "redis://localhost:6379" password: null database: 0 connectionMinimumIdleSize: 10 connectionPoolSize: 64 idleConnectionTimeout: 10000 connectTimeout: 10000 timeout: 3000 retryAttempts: 3 retryInterval: 1500
|
@Configuration public class RedissonConfig { @Bean public RedissonClient redissonClient() { Config config = new Config(); config.useSingleServer() .setAddress("redis://localhost:6379") .setConnectionPoolSize(64) .setConnectionMinimumIdleSize(10); return Redisson.create(config); } }
|
二、可重入锁(Reentrant Lock)
#
2.1 基本使用
@Service public class OrderService { @Autowired private RedissonClient redisson; public void createOrder(Long userId) { RLock lock = redisson.getLock("lock:order:" + userId); try { boolean locked = lock.tryLock(10, 30, TimeUnit.SECONDS); if (locked) { try { doCreateOrder(userId); } finally { lock.unlock(); } } else { throw new RuntimeException("获取锁失败"); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("获取锁被中断"); } } }
|
#
2.2 看门狗自动续期
public void createOrderWithWatchdog(Long userId) { RLock lock = redisson.getLock("lock:order:" + userId); try { boolean locked = lock.tryLock(10, TimeUnit.SECONDS); if (locked) { try { doLongBusiness(userId); } finally { lock.unlock(); } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }
|
看门狗机制:
- 不指定leaseTime时启用
- 锁默认30秒过期
- 获取锁成功后,启动定时任务每10秒续期到30秒
- 业务完成解锁后,取消定时任务
#
2.3 可重入性
@Service public class BusinessService { @Autowired private RedissonClient redisson; public void methodA() { RLock lock = redisson.getLock("lock:business"); try { lock.lock(); methodB(); } finally { lock.unlock(); } } public void methodB() { RLock lock = redisson.getLock("lock:business"); try { lock.lock(); } finally { lock.unlock(); } } }
|
三、公平锁(Fair Lock)
#
3.1 公平锁 vs 非公平锁
RLock nonFairLock = redisson.getLock("lock:order");
RLock fairLock = redisson.getFairLock("lock:order:fair");
|
公平锁实现原理:
获取锁时: 1. 检查是否有等待队列 2. 如果有,加入队列尾部等待 3. 释放锁时,通知队列头部的线程
非公平锁: 1. 直接尝试获取锁 2. 获取失败才进入队列
|
#
3.2 公平锁使用
@Service public class FairLockService { @Autowired private RedissonClient redisson; public void processWithFairLock(Long taskId) { RLock fairLock = redisson.getFairLock("lock:task:" + taskId); try { fairLock.lock(); try { processTask(taskId); } finally { fairLock.unlock(); } } catch (Exception e) { throw new RuntimeException("处理失败", e); } } }
|
四、读写锁(ReadWrite Lock)
#
4.1 读写锁特点
RReadWriteLock rwLock = redisson.getReadWriteLock("lock:cache:data"); RLock readLock = rwLock.readLock(); RLock writeLock = rwLock.writeLock();
|
规则:
- 读锁:多个线程可同时获取
- 写锁:独占,其他读/写都阻塞
- 写锁优先:有写锁等待时,新的读锁阻塞
#
4.2 读写锁使用
@Service public class CacheService { @Autowired private RedissonClient redisson; @Autowired private DataMapper dataMapper; private RReadWriteLock rwLock = redisson.getReadWriteLock("lock:data"); public Data getData(Long id) { RLock readLock = rwLock.readLock(); try { readLock.lock(); Data data = getFromCache(id); if (data != null) { return data; } data = dataMapper.findById(id); if (data != null) { putToCache(id, data); } return data; } finally { readLock.unlock(); } } public void updateData(Data data) { RLock writeLock = rwLock.writeLock(); try { writeLock.lock(); dataMapper.update(data); putToCache(data.getId(), data); } finally { writeLock.unlock(); } } }
|
五、信号量(Semaphore)
#
5.1 基本使用
@Service public class SemaphoreService { @Autowired private RedissonClient redisson; public void limitedAccess() { RSemaphore semaphore = redisson.getSemaphore("semaphore:api"); try { semaphore.acquire(); try { callExternalAPI(); } finally { semaphore.release(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }
|
#
5.2 限流场景
@Service public class RateLimiterService { @Autowired private RedissonClient redisson; public void apiCall(String clientId) { RSemaphore semaphore = redisson.getSemaphore("rate:limit:" + clientId); semaphore.trySetPermits(5); boolean acquired = semaphore.tryAcquire(); if (!acquired) { throw new RateLimitException("请求过于频繁"); } try { doApiCall(); } finally { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); executor.schedule(() -> semaphore.release(), 1, TimeUnit.SECONDS); } } }
|
六、CountDownLatch
#
6.1 基本使用
@Service public class ParallelService { @Autowired private RedissonClient redisson; public void parallelProcess(List<Task> tasks) { RCountDownLatch latch = redisson.getCountDownLatch("latch:tasks"); latch.trySetCount(tasks.size()); for (Task task : tasks) { new Thread(() -> { try { processTask(task); } finally { latch.countDown(); } }).start(); } try { latch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println("所有任务完成"); } }
|
七、RedissonLock原理
#
7.1 加锁Lua脚本
if (redis.call('exists', KEYS[1]) == 0) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end;
return redis.call('pttl', KEYS[1]);
|
参数说明:
- KEYS[1]: 锁名
- ARGV[1]: 过期时间(毫秒)
- ARGV[2]: 线程标识(UUID:threadId)
逻辑:
- 锁不存在:创建hash,设置值为1,设置过期时间
- 锁存在且是当前线程:重入计数+1,续期
- 锁存在且不是当前线程:返回剩余过期时间
#
7.2 解锁Lua脚本
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil; end;
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end;
|
逻辑:
- 不是自己的锁:返回nil
- 重入计数-1,如果还大于0:续期
- 重入计数为0:删除锁,发布解锁消息
#
7.3 看门狗续期
private void scheduleExpirationRenewal(long threadId) { Timeout task = commandExecutor.getConnectionManager().newTimeout( timeout -> { String script = "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return 1; " + "end; " + "return 0;"; }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS ); }
|
八、最佳实践
#
8.1 锁粒度控制
RLock lock = redisson.getLock("lock:order:" + orderId);
RLock lock = redisson.getLock("lock:all:orders");
|
#
8.2 锁超时设置
lock.tryLock(10, 30, TimeUnit.SECONDS);
lock.tryLock(10, TimeUnit.SECONDS);
|
#
8.3 异常处理
public void safeLockOperation(String lockKey, Runnable operation) { RLock lock = redisson.getLock(lockKey); boolean locked = false; try { locked = lock.tryLock(10, 30, TimeUnit.SECONDS); if (!locked) { throw new LockException("获取锁失败: " + lockKey); } operation.run(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new LockException("获取锁被中断", e); } finally { if (locked && lock.isHeldByCurrentThread()) { lock.unlock(); } } }
|
九、常见问题
#
9.1 锁不释放
原因:
解决:
- 使用try-finally确保解锁
- 设置合理的过期时间
#
9.2 锁续期问题
现象:业务执行时间长,锁过期了
解决:
#
9.3 锁的可见性
问题:一个服务获取的锁,另一个服务能看到吗?
回答:可以,Redisson使用Redis实现分布式锁,所有连接同一个Redis的服务都能看到锁状态。
十、总结
| 锁类型 |
特点 |
适用场景 |
| 可重入锁 |
同线程可多次获取 |
一般分布式锁场景 |
| 公平锁 |
按请求顺序获取 |
需要公平性的场景 |
| 读写锁 |
读共享写独占 |
读多写少 |
| 信号量 |
限制并发数 |
限流 |
| CountDownLatch |
等待多个任务 |
并行任务同步 |
Redisson分布式锁的核心优势:
- 看门狗续期:自动防止锁过期
- 可重入:同线程多次获取不会死锁
- 阻塞等待:获取锁失败可以等待
- 公平性可选:支持公平和非公平锁
- 丰富的并发工具:Semaphore、CountDownLatch等
核心要点
String:简单的键值对,适合缓存、计数器
Hash:存储对象属性,适合用户信息、配置
List:有序列表,适合消息队列、最新列表
Set:无序去重,适合共同好友、抽奖
ZSet:有序集合,适合排行榜、积分系统
总结
选择合适的数据结构是使用 Redis 的关键。在实际项目中,根据业务需求选择合适的类型,可以提升性能和开发效率。