Redis中为什么需要分布式锁?如何实现?
1. Redis分布式锁是什么?
Redis分布式锁是在Redis基础上实现的一种分布式锁机制。它通过使用Redis的原子操作来保证在不同的进程或者不同的服务器上均能够保证互斥访问关键资源。
2. 为什么需要使用Redis分布式锁?
在分布式系统中,多个进程或者多台服务器访问同一份数据时,为了避免竞争关键资源,需要使用分布式锁机制。如果不加锁,就会出现多个进程或者多台服务器同时对同一份数据进行读写导致数据错误的情况。
2.1 高并发场景下的锁的角色
在高并发场景下,锁的作用非常明显:保证同一时刻只有一个请求能访问关键资源。同时,适当的加锁可以减少CPU资源的浪费以及I/O等操作的错误率。
2.2 锁的类型
在分布式锁的实践中,主要有两种类型的锁:悲观锁和乐观锁。
- 悲观锁:假定并发环境一定会发生,所以在访问数据之前就加锁了,保证同一时刻只有一个请求能访问关键资源。常见的悲观锁有MySQL中的排它锁和共享锁。
- 乐观锁:假定并发对系统的影响不大,所以在访问数据时不加锁,并且不会主动去检测并发的情况,而是在数据更新的时候对数据版本进行检查,如果版本相同则更新数据成功,否则会退回重试。
在实际应用中,我们可以根据不同的业务场景来选择使用不同的锁机制。
3. Redis分布式锁的实现方法
3.1 Redis单机锁的实现方法
在单机环境下,可以使用Redis的SET命令和NX参数来实现简单的锁机制。当key不存在时才进行SET操作,否则SET操作失败,返回false。
例如,下面是一个加锁的示例代码:
/**
* @desc 加锁
* @param string key 锁的键名
* @param int timeout 过期时间
* @return bool
*/
function lock($key, $timeout) {
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
// 如果key已经存在,说明锁已经被其他请求持有
$locked = $redis->set($key, true, ['nx', 'ex'=>$timeout]);
return $locked;
}
其中,$key表示锁的键名,$timeout表示锁的过期时间。
3.2 Redis分布式锁的实现方法
在分布式环境下,需要使用Redis的分布式锁机制,也就是Redlock算法。Redlock算法的基本原理是:通过在不同的Redis实例上创建相同的锁,以此来保证只有一个进程或者服务器能够持有该锁。
具体实现流程如下:
1. 生成一个随机数作为锁的value。
2. 根据key的hash值映射到不同的Redis实例上(可以根据分片算法来确定映射的Redis实例),在该实例上执行SET命令,设置过期时间为TTL,锁的value为随机数。
3. 如果执行成功,说明获取锁成功,并且记录锁的存活时间和锁的value。
4. 在其他Redis实例上也尝试执行SET命令来获取锁,如果某个Redis实例还没有获取到锁,说明锁被其他进程或者服务器持有,就需要删除已经获取的锁。
例如,下面是一个Redlock算法的示例代码:
class Redlock
{
private $retryDelay; // 重试延时
private $timeout; // 时间戳
private $clockDriftFactor = 0.01; // 转换壁延时因子
private $quorum; // 通过的实例数
private $servers = array(); // 保存连线Redis服务器的数组
private $locks = array(); // 保存锁的信息的数组
// 初始化实例
function __construct($serverList, $retryDelay=200, $timeout=2000)
{
$this->retryDelay = $retryDelay;
$this->timeout = $timeout;
foreach($serverList as $server)
{
$redis = new Redis();
$redis->connect($server[0], $server[1], 0.5);
$this->servers[] = $redis;
}
$this->quorum = min(count($serverList), (count($serverList)/2+1));
}
/**
* 加锁
* @param string $resource 键名
* @param int $ttl 过期时间
* @return mixed|null 成功返回锁的value,失败返回null
*/
public function lock($resource, $ttl)
{
$this->locks[$resource] = microtime(true);
$value = uniqid();
$ttl = ceil($ttl/1000);
$n = 0;
while(true){
$start = microtime(true);
$i = 0;
foreach($this->servers as $server){
$result = $server->set($resource, $value, ['nx', 'ex'=>$ttl]);
if($result == 'OK' || $result == true) {
$i++;
}
}
$end = microtime(true);
if($i >= $this->quorum && ($end-$start+$this->retryDelay*($n-1))<$ttl)
{
return $value;
} else {
foreach($this->servers as $server){
$server->del($resource);
}
}
if(($end-$start+$this->retryDelay*$n) >= $ttl) {
break;
}
$n++;
$this->waitRetry();
}
$this->locks[$resource] = null;
return null;
}
/**
* 解锁
* @param string $resource 键名
* @param string $value 锁的value
* @return bool
*/
public function unlock($resource, $value)
{
$n = 0;
while(true){
$i = 0;
foreach($this->servers as $server){
$result = $server->eval($this->script(), array($resource, $value));
if($result){
$i++;
}
}
if($i>=$this->quorum){
return true;
}
if(($this->retryDelay*$n) >= $this->timeout){
break;
}
$n++;
$this->waitRetry();
}
return false;
}
/**
* 重试等待
*/
private function waitRetry()
{
$delay = $this->retryDelay/1000;
usleep(rand($delay, 2*$delay)*1000000);
}
/**
* 生成并返回解锁脚本
* @return string
*/
private function script()
{
return 'if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end';
}
}
其中,$serverList表示Redis服务器列表,$retryDelay表示锁等待重试的延时时间,$timeout表示锁等待超时时间。
4. 总结
Redis分布式锁是一个非常常用的分布式锁机制,在高并发场景下可以有效保护关键资源不被并发访问。本文介绍了Redis分布式锁的实现方法,并提供了实例代码。在实际应用中,可以根据具体业务场景来选择不同的锁机制,从而保证应用的高可用性和高并发性能。