1. 什么是分布式锁机制
分布式锁机制是一种用于分布式系统中的并发控制的技术,用于确保在分布式环境下同时只有一个客户端能够访问共享资源。在分布式系统中,多个进程可能同时访问某个共享资源,如果不加控制,会出现多个进程并发修改同一个数据的情况,导致数据的不一致性。
分布式锁机制是通过在分布式系统中使用锁来实现对共享资源的并发控制,确保同一时刻只有一个客户端能够访问共享资源,从而保证数据的一致性。
2. Redis实现分布式锁机制
Redis是一个开源的内存数据存储系统,提供了一个键值对存储的数据库,支持多种数据结构,如字符串、哈希表、列表、集合、有序集合等。Redis具有高性能、高可扩展性、高可用性等优点,是实现分布式锁机制的一个理想选择。
Redis实现分布式锁机制的基本思路是利用Redis的原子性操作,将一个键值对作为锁,通过一系列操作来实现锁的获取和释放。
2.1 获取锁
获取锁的基本流程如下:
生成一个唯一的标识符,作为锁的持有者。
通过Redis的SETNX命令设置一个键值对,其中键为锁的名称,值为标识符。
检查SETNX命令的返回值,如果返回值为1,则表示锁获取成功,否则表示锁已被其他客户端持有,不能再次获取锁。
根据需要设置锁的超时时间,以避免锁被持有期间系统故障导致死锁。
以下是具体的代码实现:
// Redis配置
const Redis = require('ioredis');
const redis = new Redis({
host: 'localhost',
port: 6379
});
// 获取锁
async function acquireLock(lockName, acquireTimeout, lockTimeout, lockValue) {
const endTime = Date.now() + acquireTimeout;
while (Date.now() < endTime) {
// 生成一个唯一的标识符
const identifier = Math.random().toString(36).substr(2, 12);
// 尝试获取锁
const result = await redis.set(lockName, identifier, 'NX', 'PX', lockTimeout);
if (result === 'OK') {
return identifier;
}
// 等待一段时间再尝试获取锁
await new Promise(resolve => setTimeout(resolve, 10));
}
return false;
}
上述代码中,acquireLock函数用于获取锁,其传入的参数包括:
lockName:锁的名称。
acquireTimeout:获取锁的超时时间。
lockTimeout:锁的过期时间。
lockValue:锁的值,即标识符。
acquireLock函数的执行流程如下:
生成一个唯一的标识符。
尝试使用Redis的SET命令设置键值对,其中键为锁的名称,值为标识符,并设置NX参数,表示只有在键不存在时才能设置成功。
如果SET命令返回值为OK,表示锁获取成功,返回标识符。
如果SET命令返回值为null,表示锁已被其他客户端持有,等待一段时间后再尝试获取锁。
如果超时时间到达,仍未获取到锁,则返回false。
2.2 释放锁
释放锁的基本流程如下:
检查锁的持有者是否与标识符相同,如果相同则可以释放锁,否则表示该客户端未持有该锁,不能释放。
通过Redis的DEL命令删除锁。
以下是具体的代码实现:
// 释放锁
async function releaseLock(lockName, lockValue) {
const result = await redis.get(lockName);
if (result === lockValue) {
await redis.del(lockName);
return true;
}
return false;
}
上述代码中,releaseLock函数用于释放锁,其传入的参数包括:
lockName:锁的名称。
lockValue:锁的值,即唯一标识符。
releaseLock函数的执行流程如下:
通过Redis的GET命令获取锁的值。
如果锁的值与标识符相同,即表示该客户端持有该锁,通过Redis的DEL命令删除锁,返回true。
如果锁的值与标识符不同,即表示该客户端未持有该锁,返回false。
3. JavaScript实现分布式锁机制
JavaScript实现分布式锁机制的基本思路是通过使用MySQL等数据库的事务特性或者ZooKeeper等分布式协调框架实现锁功能。
3.1 MySQL实现分布式锁机制
MySQL可以通过其事务特性实现分布式锁机制,具体流程如下:
通过SELECT...FOR UPDATE语句锁定需要操作的行。
根据需要对行进行操作。
通过COMMIT或ROLLBACK语句释放锁。
以下是具体的代码实现:
// MySQL配置
const mysql = require('mysql');
const connection = mysql.createConnection({
host: 'localhost',
user: 'root',
password: 'password',
database: 'test'
});
// 获取锁
async function acquireLock(lockName, acquireTimeout) {
const conn = await getConnection();
const endTime = Date.now() + acquireTimeout;
while (Date.now() < endTime) {
try {
await conn.beginTransaction();
const result = await conn.query('SELECT * FROM locks WHERE name = ? FOR UPDATE', [lockName]);
if (result.length === 0) {
await conn.query('INSERT INTO locks SET name = ?', [lockName]);
await conn.commit();
return true;
}
await conn.commit();
} catch (err) {
if (err.code === 'ER_LOCK_DEADLOCK' || err.code === 'ER_LOCK_WAIT_TIMEOUT') {
// 忽略死锁错误
} else {
await conn.rollback();
throw err;
}
}
// 等待一段时间再尝试获取锁
await new Promise(resolve => setTimeout(resolve, 10));
}
await conn.rollback();
return false;
}
// 释放锁
async function releaseLock(lockName) {
const conn = await getConnection();
await conn.query('DELETE FROM locks WHERE name = ?', [lockName]);
await conn.commit();
return true;
}
// 获取MySQL连接
function getConnection() {
return new Promise((resolve, reject) => {
connection.getConnection((err, conn) => {
if (err) {
reject(err);
} else {
resolve(conn);
}
});
});
}
上述代码中,acquireLock函数用于获取锁,其传入的参数包括:
lockName:锁的名称。
acquireTimeout:获取锁的超时时间。
acquireLock函数的执行流程如下:
获取MySQL连接。
设定获取锁的截止时间。
循环执行以下步骤:
开启MySQL事务。
通过SELECT...FOR UPDATE语句查询锁表,如果查询结果为空,则表示锁未被持有,通过INSERT语句插入一条记录来获取锁,提交MySQL事务,返回true。
如果查询结果不为空,表示锁已被持有,提交MySQL事务。
处理MySQL死锁和等待超时错误,等待一段时间后重新尝试获取锁。
如果超时时间到达,仍未获取到锁,则回滚MySQL事务,返回false。
releaseLock函数用于释放锁,其传入的参数为锁的名称。
releaseLock函数的执行流程如下:
获取MySQL连接。
通过DELETE语句删除锁表中对应的记录。
提交MySQL事务,返回true。
3.2 ZooKeeper实现分布式锁机制
ZooKeeper是一个分布式协调框架,可以实现分布式系统中的各种锁功能,包括共享锁、排他锁、顺序锁等。
以下是具体的代码实现:
// ZooKeeper配置
const zookeeper = require('node-zookeeper-client');
const zookeeperClient = zookeeper.createClient('localhost:2181');
// 获取锁
async function acquireLock(lockName, acquireTimeout) {
await zookeeperClient.connect();
while (true) {
try {
const path = await zookeeperClient.create(lockName + '/', null, zookeeper.CreateMode.EPHEMERAL_SEQUENTIAL);
const children = await zookeeperClient.getChildren(lockName, false);
children.sort();
const sequence = path.substring(path.lastIndexOf('/') + 1);
if (sequence === children[0]) {
return true;
}
const index = children.indexOf(sequence);
const predecessor = lockName + '/' + children[index - 1];
await new Promise((resolve, reject) => {
const watcher = async event => {
if (event.type === zookeeper.Event.TYPE_CHILD) {
const children = await zookeeperClient.getChildren(lockName, false);
children.sort();
const index = children.indexOf(sequence);
if (index === 0) {
zookeeperClient.removeListener(watcher);
resolve();
} else {
const predecessor = lockName + '/' + children[index - 1];
zookeeperClient.exists(predecessor, watcher);
}
}
};
zookeeperClient.exists(predecessor, watcher, (err, stat) => {
if (err) {
reject(err);
}
});
});
} catch (err) {
if (err.code === zookeeper.Exception.NODE_EXISTS) {
// 等待一段时间再尝试获取锁
await new Promise(resolve => setTimeout(resolve, 10));
} else {
throw err;
}
}
if (Date.now() > endTime) {
return false;
}
}
}
// 释放锁
async function releaseLock(lockName) {
await zookeeperClient.remove(lockName);
await zookeeperClient.close();
return true;
}
上述代码中,acquireLock函数用于获取锁,其传入的参数包括:
lockName:锁的名称。
acquireTimeout:获取锁的超时时间。
acquireLock函数的执行流程如下:
连接ZooKeeper。
循环执行以下步骤:
创建临时顺序节点。
获取锁节点的子节点列表。
将子节点列表按照序列号排序。
获取自己的节点序列号。
如果自己的节点为子节点列表中序列号最小的节点,则获取锁成功,返回true。
如果自己的节点不是子节点列表中序列号最小的节点,则设置监视器,等待前一个节点被删除后再重新尝试获取锁。
如果超时时间到达,仍未获取到锁,则返回false。
releaseLock函数用于释放锁,其传入的参数为锁的名称。
releaseLock函数的执行流程如下:
删除锁节点,断开ZooKeeper连接。
返回true。