Redis分布式锁-这一篇全了解(Redission实现分布式锁完美方案)
# 什么是分布式锁
分布式锁,即分布式系统中的锁,分布式锁是控制分布式系统有序的对共享资源进行操作,在单体应用中我们通过锁实现共享资源访问,而分布式锁,就是解决了分布式系统中控制共享资源访问的问题。
可能初学的小伙伴就会有疑问,Java多线程中的公平锁、非公平锁、自旋锁、可重入锁、读写锁、互斥锁这些都还没闹明白呢?怎么又出来一个分布式锁?
其实,可以这么理解:Java的原生锁是解决多线程下对于共享资源的操作,而分布式锁则是多进程下对于共享资源的操作。分布式系统中竞争共享资源的最小粒度从线程升级成了进程。
分布式锁经被应用到各种高并发的场景下,典场景案例包括:秒杀、车票、订单、退款、库存等场景。
# 为什么要使用分布式锁
在分布式系统中,常常需要协调他们的动作。如果不同的系统或是同一个系统的不同主机之间共享了一个或一组资源,那么访问这些资源的时候,往往需要互斥来防止彼此干扰来保证一致性,这个时候,便需要使用到分布式锁。
目前几乎很多大型网站及应用都是分布式部署的,如何保证分布式场景中的数据一致性问题一直是一个比较重要的话题。在某些场景下,为了保证数据的完整性和一致性,我们需要保证一个方法在同一时间内只能被同一个线程执行,这就需要使用分布式锁。
# 常见分布式锁方案对比
分类 | 方案 | 实现原理 | 优点 | 缺点 |
---|---|---|---|---|
基于数据库 | 基于mysql 表唯一索引 | 1.表增加唯一索引 2.加锁:执行insert语句,若报错,则表明加锁失败 3.解锁:执行delete语句 | 完全利用DB现有能力,实现简单 | 1.锁无超时自动失效机制,有死锁风险 2.不支持锁重入,不支持阻塞等待 3.操作数据库开销大,性能不高 |
基于MongoDB findAndModify原子操作 | 1.加锁:执行findAndModify原子命令查找document,若不存在则新增 2.解锁:删除document | 实现也很容易,较基于MySQL唯一索引的方案,性能要好很多 | 1.大部分公司数据库用MySQL,可能缺乏相应的MongoDB运维、开发人员 2.锁无超时自动失效机制 | |
基于分布式协调系统 | 基于ZooKeeper | 1.加锁:在/lock目录下创建临时有序节点,判断创建的节点序号是否最小。若是,则表示获取到锁;否,则则watch /lock目录下序号比自身小的前一个节点 2.解锁:删除节点 | 1.由zk保障系统高可用 2.Curator框架已原生支持系列分布式锁命令,使用简单 | 需单独维护一套zk集群,维保成本高 |
基于缓存 | 基于redis命令 | 1. 加锁:执行setnx,若成功再执行expire添加过期时间 2. 解锁:执行delete命令 | 实现简单,相比数据库和分布式系统的实现,该方案最轻,性能最好 | 1.setnx和expire分2步执行,非原子操作;若setnx执行成功,但expire执行失败,就可能出现死锁 2.delete命令存在误删除非当前线程持有的锁的可能 3.不支持阻塞等待、不可重入 |
基于redis Lua脚本能力 | 1. 加锁:执行SET lock_name random_value EX seconds NX 命令 2. 解锁:执行Lua脚本,释放锁时验证random_value -- ARGV[1]为random_value, KEYS[1]为lock_nameif redis.call("get", KEYS[1]) == ARGV[1] then return redis.call("del",KEYS[1])else return 0end | 同上;实现逻辑上也更严谨,除了单点问题,生产环境采用用这种方案,问题也不大。 | 不支持锁重入,不支持阻塞等待 |
# 分布式锁需满足四个条件
为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件:
- 互斥性。在任意时刻,只有一个客户端能持有锁。
- 不会发生死锁。即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。
- 解铃还须系铃人。加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了,即不能误解锁。
- 具有容错性。只要大多数Redis节点正常运行,客户端就能够获取和释放锁。
# Redisson分布式锁的实现
Redisson保持了简单易用、支持锁重入、支持阻塞等待、Lua脚本原子操作。
Redisson 支持单点模式、主从模式、哨兵模式、集群模式,这里以单点模式为例:
// 1.构造redisson实现分布式锁必要的Config
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:5379").setPassword("123456").setDatabase(0);
// 2.构造RedissonClient
RedissonClient redissonClient = Redisson.create(config);
// 3.获取锁对象实例(无法保证是按线程的顺序获取到)
RLock rLock = redissonClient.getLock(lockKey);
try {
/**
* 4.尝试获取锁
* waitTimeout 尝试获取锁的最大等待时间,超过这个值,则认为获取锁失败
* leaseTime 锁的持有时间,超过这个时间锁会自动失效(值应设置为大于业务处理的时间,确保在锁有效期内业务能处理完)
*/
boolean res = rLock.tryLock((long)waitTimeout, (long)leaseTime, TimeUnit.SECONDS);
if (res) {
//成功获得锁,在这里处理业务
}
} catch (Exception e) {
throw new RuntimeException("aquire lock fail");
}finally{
//无论如何, 最后都要解锁
rLock.unlock();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
# 加锁&解锁Lua脚本
# 加锁Lua脚本
- 脚本入参
参数 | 示例值 | 含义 |
---|---|---|
KEY个数 | 1 | KEY个数 |
KEYS[1] | my_first_lock_name | 锁名 |
ARGV[1] | 60000 | 持有锁的有效时间:毫秒 |
ARGV[2] | 58c62432-bb74-4d14-8a00-9908cc8b828f:1 | 唯一标识:获取锁时set的唯一值,实现上为redisson客户端ID(UUID)+线程ID |
脚本内容
-- 若锁不存在:则新增锁,并设置锁重入计数为1、设置锁过期时间 if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; -- 若锁存在,且唯一标识也匹配:则表明当前加锁请求为锁重入请求,故锁重入计数+1,并再次设置锁过期时间 if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; -- 若锁存在,但唯一标识不匹配:表明锁是被其他线程占用,当前线程无权解他人的锁,直接返回锁剩余过期时间 return redis.call('pttl', KEYS[1]);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 解锁Lua脚本
- 脚本入参
参数 | 示例值 | 含义 |
---|---|---|
KEY个数 | 2 | KEY个数 |
KEYS[1] | my_first_lock_name | 锁名 |
KEYS[2] | redisson_lock__channel:{my_first_lock_name} | 解锁消息PubSub频道 |
ARGV[1] | 0 | redisson定义0表示解锁消息 |
ARGV[2] | 30000 | 设置锁的过期时间;默认值30秒 |
ARGV[3] | 58c62432-bb74-4d14-8a00-9908cc8b828f:1 | 唯一标识;同加锁流程 |
- 脚本内容
-- 若锁不存在:则直接广播解锁消息,并返回1
if (redis.call('exists', KEYS[1]) == 0) then
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
-- 若锁存在,但唯一标识不匹配:则表明锁被其他线程占用,当前线程不允许解锁其他线程持有的锁
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
return nil;
end;
-- 若锁存在,且唯一标识匹配:则先将锁重入计数减1
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
if (counter > 0) then
-- 锁重入计数减1后还大于0:表明当前线程持有的锁还有重入,不能进行锁删除操作,但可以友好地帮忙设置下过期时期
redis.call('pexpire', KEYS[1], ARGV[2]);
return 0;
else
-- 锁重入计数已为0:间接表明锁已释放了。直接删除掉锁,并广播解锁消息,去唤醒那些争抢过锁但还处于阻塞中的线程
redis.call('del', KEYS[1]);
redis.call('publish', KEYS[2], ARGV[1]);
return 1;
end;
return nil;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 加锁&解锁流程串起来
概括下整个流程
1、线程A和线程B两个线程同时争抢锁。线程A很幸运,最先抢到了锁。线程B在获取锁失败后,并未放弃希望,而是主动订阅了解锁消息,然后再尝试获取锁,顺便看看没有抢到的这把锁还有多久就过期,线程B就按需阻塞等锁释放。
2、线程A拿着锁干完了活,自觉释放了持有的锁,于此同时广播了解锁消息,通知其他抢锁的线程再来枪;
3、解锁消息的监听者LockPubSub收到消息后,释放自己持有的信号量;线程B就瞬间从阻塞中被唤醒了,接着再抢锁,这次终于抢到锁了!后面再按部就班,干完活,解锁
# Redisson使用
上面讲了原理,接下来我们看下怎么使用Redisson。
引入依赖
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.16.3</version>
</dependency>
2
3
4
5
# 单机环境
Config config = new Config();
config.useSingleServer()
.setAddress("redis://127.0.0.1:6379")
.setPassword("123456")
.setDatabase(0);
RedissonClient redissonClient = Redisson.create(config);
//获取锁对象实例
final String lockKey = "abc";
RLock rLock = redissonClient.getLock(lockKey);
try {
//尝试5秒内获取锁,如果获取到了,最长60秒自动释放
boolean res = rLock.tryLock(5L, 60L, TimeUnit.SECONDS);
if (res) {
//成功获得锁,在这里处理业务
System.out.println("获取锁成功");
}
} catch (Exception e) {
System.out.println("获取锁失败,失败原因:" + e.getMessage());
} finally {
//无论如何, 最后都要解锁
rLock.unlock();
}
//关闭客户端
redissonClient.shutdown();
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# 集群模式
Config config = new Config();
config.useClusterServers()
.setScanInterval(2000) // 集群状态扫描间隔时间,单位是毫秒
//可以用"rediss://"来启用SSL连接
.addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001")
.addNodeAddress("redis://127.0.0.1:7002");
RedissonClient redisson = Redisson.create(config);
2
3
4
5
6
7
8
# 哨兵模式
Config config = new Config();
config.useSentinelServers()
.setMasterName("mymaster")
//可以用"rediss://"来启用SSL连接
.addSentinelAddress("127.0.0.1:26389", "127.0.0.1:26379")
.addSentinelAddress("127.0.0.1:26319");
RedissonClient redisson = Redisson.create(config);
2
3
4
5
6
7
8