Redis分布式锁Redisson

Redis分布式锁Redisson

Redis 的五种数据结构各有特色,用对了才能发挥它的优势。很多人只用到了 String 和 Hash,却不知道 List、Set、ZSet 在特定场景下更合适。本文从应用场景出发,讲什么时候用什么类型。

一、Redisson简介

#

1.1 什么是Redisson

Redisson是一个基于Redis的Java驻内存数据网格(In-Memory Data Grid),提供了:

  • 分布式锁(Lock)
  • 分布式集合(Map, Set, List)
  • 分布式对象(Object, Bucket, AtomicLong)
  • 分布式服务(Remote Service, Live Object Service)
  • 分布式并发工具(Semaphore, CountDownLatch)

#

1.2 引入依赖

<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.23.5</version>
</dependency>

#

1.3 配置连接

# application.yml
spring:
redis:
redisson:
config: |
singleServerConfig:
address: "redis://localhost:6379"
password: null
database: 0
connectionMinimumIdleSize: 10
connectionPoolSize: 64
idleConnectionTimeout: 10000
connectTimeout: 10000
timeout: 3000
retryAttempts: 3
retryInterval: 1500
// Java配置方式
@Configuration
public class RedissonConfig {

@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://localhost:6379")
.setConnectionPoolSize(64)
.setConnectionMinimumIdleSize(10);

return Redisson.create(config);
}
}

二、可重入锁(Reentrant Lock)

#

2.1 基本使用

@Service
public class OrderService {

@Autowired
private RedissonClient redisson;

public void createOrder(Long userId) {
RLock lock = redisson.getLock("lock:order:" + userId);

try {
// 尝试获取锁,最多等待10秒,锁30秒后自动释放
boolean locked = lock.tryLock(10, 30, TimeUnit.SECONDS);

if (locked) {
try {
// 执行业务逻辑
doCreateOrder(userId);
} finally {
lock.unlock();
}
} else {
throw new RuntimeException("获取锁失败");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("获取锁被中断");
}
}
}

#

2.2 看门狗自动续期

// 不指定leaseTime时,启用看门狗自动续期
public void createOrderWithWatchdog(Long userId) {
RLock lock = redisson.getLock("lock:order:" + userId);

try {
// 只指定waitTime,不指定leaseTime
// 获取锁后,看门狗会自动续期(默认30秒过期,每10秒续期)
boolean locked = lock.tryLock(10, TimeUnit.SECONDS);

if (locked) {
try {
// 业务逻辑执行时间可能很长
// 看门狗会自动续期,避免业务未完成锁就过期
doLongBusiness(userId);
} finally {
lock.unlock();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

看门狗机制

  • 不指定leaseTime时启用
  • 锁默认30秒过期
  • 获取锁成功后,启动定时任务每10秒续期到30秒
  • 业务完成解锁后,取消定时任务

#

2.3 可重入性

@Service
public class BusinessService {

@Autowired
private RedissonClient redisson;

public void methodA() {
RLock lock = redisson.getLock("lock:business");
try {
lock.lock();

// 同一线程可以再次获取锁(可重入)
methodB();

} finally {
lock.unlock();
}
}

public void methodB() {
RLock lock = redisson.getLock("lock:business");
try {
lock.lock(); // 可重入,不会死锁

// 执行业务

} finally {
lock.unlock(); // 需要unlock两次
}
}
}

三、公平锁(Fair Lock)

#

3.1 公平锁 vs 非公平锁

// 非公平锁(默认):抢锁模式,先到的线程不一定先获得锁
RLock nonFairLock = redisson.getLock("lock:order");

// 公平锁:按请求锁的顺序获取
RLock fairLock = redisson.getFairLock("lock:order:fair");

公平锁实现原理

获取锁时:
1. 检查是否有等待队列
2. 如果有,加入队列尾部等待
3. 释放锁时,通知队列头部的线程

非公平锁:
1. 直接尝试获取锁
2. 获取失败才进入队列

#

3.2 公平锁使用

@Service
public class FairLockService {

@Autowired
private RedissonClient redisson;

public void processWithFairLock(Long taskId) {
RLock fairLock = redisson.getFairLock("lock:task:" + taskId);

try {
fairLock.lock();

try {
// 按请求顺序处理任务
processTask(taskId);
} finally {
fairLock.unlock();
}
} catch (Exception e) {
throw new RuntimeException("处理失败", e);
}
}
}

四、读写锁(ReadWrite Lock)

#

4.1 读写锁特点

RReadWriteLock rwLock = redisson.getReadWriteLock("lock:cache:data");
RLock readLock = rwLock.readLock();
RLock writeLock = rwLock.writeLock();

规则

  • 读锁:多个线程可同时获取
  • 写锁:独占,其他读/写都阻塞
  • 写锁优先:有写锁等待时,新的读锁阻塞

#

4.2 读写锁使用

@Service
public class CacheService {

@Autowired
private RedissonClient redisson;
@Autowired
private DataMapper dataMapper;

private RReadWriteLock rwLock = redisson.getReadWriteLock("lock:data");

// 读操作
public Data getData(Long id) {
RLock readLock = rwLock.readLock();
try {
readLock.lock();

// 从缓存获取
Data data = getFromCache(id);
if (data != null) {
return data;
}

// 从数据库获取
data = dataMapper.findById(id);
if (data != null) {
putToCache(id, data);
}
return data;
} finally {
readLock.unlock();
}
}

// 写操作
public void updateData(Data data) {
RLock writeLock = rwLock.writeLock();
try {
writeLock.lock();

// 更新数据库
dataMapper.update(data);

// 更新缓存
putToCache(data.getId(), data);
} finally {
writeLock.unlock();
}
}
}

五、信号量(Semaphore)

#

5.1 基本使用

@Service
public class SemaphoreService {

@Autowired
private RedissonClient redisson;

// 限制并发数为10
public void limitedAccess() {
RSemaphore semaphore = redisson.getSemaphore("semaphore:api");

// 初始化信号量(只需执行一次)
// semaphore.trySetPermits(10);

try {
// 获取许可
semaphore.acquire();

try {
// 执行受限操作
callExternalAPI();
} finally {
// 释放许可
semaphore.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}

#

5.2 限流场景

@Service
public class RateLimiterService {

@Autowired
private RedissonClient redisson;

public void apiCall(String clientId) {
RSemaphore semaphore = redisson.getSemaphore("rate:limit:" + clientId);

// 每秒最多5个请求
semaphore.trySetPermits(5);

boolean acquired = semaphore.tryAcquire();
if (!acquired) {
throw new RateLimitException("请求过于频繁");
}

try {
// 执行API调用
doApiCall();
} finally {
// 延迟释放(实现时间窗口)
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
executor.schedule(() -> semaphore.release(), 1, TimeUnit.SECONDS);
}
}
}

六、CountDownLatch

#

6.1 基本使用

@Service
public class ParallelService {

@Autowired
private RedissonClient redisson;

public void parallelProcess(List<Task> tasks) {
RCountDownLatch latch = redisson.getCountDownLatch("latch:tasks");
latch.trySetCount(tasks.size());

for (Task task : tasks) {
new Thread(() -> {
try {
processTask(task);
} finally {
latch.countDown();
}
}).start();
}

try {
// 等待所有任务完成
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

System.out.println("所有任务完成");
}
}

七、RedissonLock原理

#

7.1 加锁Lua脚本

-- 加锁
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;

if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
redis.call('hincrby', KEYS[1], ARGV[2], 1);
redis.call('pexpire', KEYS[1], ARGV[1]);
return nil;
end;

return redis.call('pttl', KEYS[1]);

参数说明

  • KEYS[1]: 锁名
  • ARGV[1]: 过期时间(毫秒)
  • ARGV[2]: 线程标识(UUID:threadId)

逻辑

  1. 锁不存在:创建hash,设置值为1,设置过期时间
  2. 锁存在且是当前线程:重入计数+1,续期
  3. 锁存在且不是当前线程:返回剩余过期时间

#

7.2 解锁Lua脚本

-- 解锁
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;

local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);

if (counter > 0) then
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;

逻辑

  1. 不是自己的锁:返回nil
  2. 重入计数-1,如果还大于0:续期
  3. 重入计数为0:删除锁,发布解锁消息

#

7.3 看门狗续期

// 获取锁后启动定时任务
private void scheduleExpirationRenewal(long threadId) {
Timeout task = commandExecutor.getConnectionManager().newTimeout(
timeout -> {
// 续期Lua脚本
String script =
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;";

// 执行续期
// 如果续期成功,递归调用自己,继续定时续期
// 如果锁已不存在,停止续期
},
internalLockLeaseTime / 3, // 1/3过期时间后续期
TimeUnit.MILLISECONDS
);
}

八、最佳实践

#

8.1 锁粒度控制

// 好的做法:锁粒度尽量小
RLock lock = redisson.getLock("lock:order:" + orderId);

// 不好的做法:锁粒度过大
RLock lock = redisson.getLock("lock:all:orders");

#

8.2 锁超时设置

// 根据业务时间设置合理的超时时间
// 太短:业务未完成锁就释放
// 太长:故障时恢复时间长

// 一般业务:30-60秒
lock.tryLock(10, 30, TimeUnit.SECONDS);

// 复杂业务:使用看门狗自动续期
lock.tryLock(10, TimeUnit.SECONDS);

#

8.3 异常处理

public void safeLockOperation(String lockKey, Runnable operation) {
RLock lock = redisson.getLock(lockKey);
boolean locked = false;

try {
locked = lock.tryLock(10, 30, TimeUnit.SECONDS);
if (!locked) {
throw new LockException("获取锁失败: " + lockKey);
}

operation.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new LockException("获取锁被中断", e);
} finally {
if (locked && lock.isHeldByCurrentThread()) {
lock.unlock();
}
}
}

九、常见问题

#

9.1 锁不释放

原因

  • 业务异常导致未执行unlock
  • 进程崩溃

解决

  • 使用try-finally确保解锁
  • 设置合理的过期时间

#

9.2 锁续期问题

现象:业务执行时间长,锁过期了

解决

  • 使用看门狗自动续期
  • 或手动设置足够长的过期时间

#

9.3 锁的可见性

问题:一个服务获取的锁,另一个服务能看到吗?

回答:可以,Redisson使用Redis实现分布式锁,所有连接同一个Redis的服务都能看到锁状态。

十、总结

锁类型 特点 适用场景
可重入锁 同线程可多次获取 一般分布式锁场景
公平锁 按请求顺序获取 需要公平性的场景
读写锁 读共享写独占 读多写少
信号量 限制并发数 限流
CountDownLatch 等待多个任务 并行任务同步

Redisson分布式锁的核心优势:

  1. 看门狗续期:自动防止锁过期
  2. 可重入:同线程多次获取不会死锁
  3. 阻塞等待:获取锁失败可以等待
  4. 公平性可选:支持公平和非公平锁
  5. 丰富的并发工具:Semaphore、CountDownLatch等

核心要点

  1. String:简单的键值对,适合缓存、计数器

  2. Hash:存储对象属性,适合用户信息、配置

  3. List:有序列表,适合消息队列、最新列表

  4. Set:无序去重,适合共同好友、抽奖

  5. ZSet:有序集合,适合排行榜、积分系统

总结

选择合适的数据结构是使用 Redis 的关键。在实际项目中,根据业务需求选择合适的类型,可以提升性能和开发效率。


   转载规则


《Redis分布式锁Redisson》 小乐 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录