如何开发一个支持海量分布式锁的应用库


分布式锁是一种用于控制分布式系统中资源访问的同步机制,确保在任意时刻只有一个客户端能够获取到锁,并对共享资源进行操作。
作用
  1. 保证数据一致性:在多个节点并发执行的情况下,分布式锁可以防止同时修改同一份数据,从而避免数据不一致的问题。
  2. 协调任务执行:确保特定的任务不会被重复执行,特别是在需要幂等性(idempotent)保证的时候。
应用场景例如
  • 库存扣减:在电商系统中,当用户下单时需要扣减库存,为了避免超卖现象,必须确保每次扣减操作都是原子性的。
  • 定时任务调度:在分布式环境中,确保同一个定时任务只在一个节点上运行,防止重复执行。
  • 缓存更新:当多个服务实例试图更新同一个缓存项时,使用分布式锁可以确保更新过程的线程安全。
  • 秒杀活动:对于高并发的抢购活动,如秒杀,使用分布式锁来控制对有限商品资源的访问是至关重要的。
  • 文件上传:在分布式文件系统中,确保同一文件不会被多次上传或覆盖。
常见实现方式
  • 基于数据库:可以使用数据库的唯一索引来实现简单的分布式锁,也可以通过for update等机制来实现分布式锁。例如,在尝试获取锁时插入一条记录,如果插入成功则表示获取到锁;如果违反了唯一索引约束,则说明锁已经被其他客户端持有。这种方法简单直接,但性能可能不如其他专门设计的解决方案,并且需要处理死锁和锁的自动释放等问题。
  • 基于Redis:Redis是一个内存中的键值存储系统,它提供了原子性的SETNX(Set if Not Exists)命令来设置一个键,只有当该键不存在时才会成功。结合EXPIRE或PEXPIRE命令,可以为锁设置一个过期时间,防止死锁的发生。
  • 基于Zookeeper:Zookeeper支持临时顺序节点,这使得它可以实现复杂的分布式锁逻辑,如公平锁、重入锁以及读写锁。客户端创建一个临时顺序节点作为锁对象,然后检查自己创建的节点是否是最小编号的节点,以此判断是否获得锁。
  • 基于Etcd:Etcd是一个高可用的分布式键值存储系统,它也能够提供分布式锁功能。与Zookeeper类似,etcd使用临时键和租约机制来实现锁。
  • 基于Consul:同样可以用来实现分布式锁。Consul利用KV存储和会话机制,可以方便地构建出分布式锁的应用。


本文将利用raftx,用简单的方法,编写一个分布式锁的应用库,它的特点是:
  • 使用方式简单并且可用性强
  • 支持海量创建分布式锁,可以同时创建几十万甚至上百万个分布式锁
  • 占用极少量的系统资源
  • 无自旋阻塞策略,不占用CPU资源
  • 抢占式获取锁
  • 支持TTL(time to live), 防止集群节点宕机造成死锁


raftx的分布式易失性数据扩展模块实现分布式锁 有比常见分布式锁的实现较为明显的特点
  1. 高效,它基于内存。获取与释放分布式锁过程更快
  2. 可以创建海量分布式锁。如果系统需要创建海量分布式锁,比如售票系统,电商秒杀活动等, 对于Zookeeper,Etcd,redis等,在创建海量分布式锁时,可能面临大量日志与大量触发机制,导致系统负载过大的问题。而raftx不会有这个问题。可以通过以下的Lockx的实现过程,详细了解。

什么是Raftx

raftx 是一种对经典 Raft 协议的扩展,结合了 Multi-Paxos、ZAB(Zookeeper Atomic Broadcast)和 Raft 协议的优势。RaftX 具备快速选举、并发提案、数据同步、数据回滚以及易失性数据同步等特性,适用于高并发和大规模分布式系统场景。

raftx wiki


Lockx 分布式锁应用库,支持创建海量分布式锁

Lockx是依赖raftx实现的一个分布式锁应用库,实现方式简单,代码量少,100行左右代码,但是它的功能却十分强大,主要表现在:

  • 高效性与及时性
  • 资源占用极少
  • 支持海量创建分布式锁
  • API使用简单方便


Lockx 支持一次性创建成千上万,甚至数十万或数百万个分布式锁,它的实现机制保证了它不会大量占用CPU资源和内存资源;它的锁动作变更触发机制针对的是锁资源,而非分布式对象锁本身,也就是说,即使节点中有100万个锁竞争一个锁资源,每次也只会触发一次锁的释放与竞争的指令;比如锁资源"lockmux",那么在分布式系统中,当资源 “lockmux”被释放时,它将触发节点中的 “lockmux”绑定事件一次,并让等待的资源随机发送一条竞争锁的指令竞争该资源锁,而不是触发100万个等待中的锁对象竞争事件。



Lockx 实现方式

lockx主要依赖raftx的易失性数据API实现,它的特点是高效,强一致性,并且可以绑定键值的增删改的触发事件;利用这些特性,可以轻松实现分布式锁的逻辑。

	m.raft.MemWatch([]byte(lockstr), func(key, value []byte, watchType raft.WatchType) {
		//获取锁成功与否
		if watchType == raft.ADD {
			if mb, ok := m.mp.Get(util.BytesToInt64(value)); ok {
				m.del(string(key), util.BytesToInt64(value))
				close(mb.ctx)
			}
		}
		//锁释放,阻塞代码再次重新获取分布式锁
		if watchType == raft.DELETE {
			m.mux.Lock()
			defer m.mux.Unlock()
			if ids, b := m.rmap[string(key)]; b {
				for k := range ids {
					m.raft.MemCommand(key, util.Int64ToBytes(k), timeout, raft.MEM_PUT)
					break
				}
			}
		}
		//TryLock获取锁失败触发
		if watchType == raft.UPDATE {
			if mb, ok := m.mp.Get(util.BytesToInt64(value)); ok {
				if mb.isTry {
					m.del(string(key), util.BytesToInt64(value))
					mb.ctx <- true
					close(mb.ctx)
				}
			}
		}
	}, false, raft.ADD, raft.DELETE, raft.UPDATE)
这是lockx实现的核心代码,主要通过监听raftx易失性数据主键的增删改事件来实现资源锁的锁定与释放
  • raft.ADD  这是资源锁新增的触发事件,通过它判断哪个对象获取到分布式锁,同时关闭相应阻塞的通道,让获取锁的程序继续执行。
  • raft.DELETE  这是资源锁删除的触发事件,同时它将再次发送获取资源锁的指令,抢占资源锁
  • raft.UPDATE 这是资源锁更新的触发事件,它表示资源锁获取失败,用于TryLock,同时关闭相应阻塞的通道并返回false


Lockx 使用方式

Lockx 的使用非常简单,并且它可以支持大量创建分布式锁,它一共有3个方法

  • Lock(string,int)  获取指定资源的分布式锁并设置过期时间,阻塞
  • TryLock(string,int)bool  获取指定资源的分布式锁并设置过期时间,若获取不到返回false,不阻塞
  • UnLock(string) 释放指定资源的分布式锁


以下模拟3个集群节点

//节点1,创建分布式锁管理器 mutex1
mutex1 = NewMutex(":20001", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})

//节点2,创建分布式锁管理器 mutex2
mutex2 = NewMutex(":20002", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})

//节点3,创建分布式锁管理器 mutex3
mutex3 = NewMutex(":20003", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})
  • 第一个参数 raftx服务地址
  • 第二个参数是所有集群节点都相同的,为所有节点的访问地址 []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"}

这样就完成了分布式锁管理器的创建,并可以直接获取各个自定义资源的分布式锁,这里的资源指的是字符串,比如 “test”


示例

//节点1
func lock1(i int) {
	logger.Debugf("mutex1 lock%d lock.....", i)
	mutex1.Lock("test", 10)
	logger.Debugf("mutex1 lock%d get lock successful", i)
	time.Sleep(2 * time.Second)
	mutex1.Unlock("test")
	logger.Debugf("mutex1 lock%d unlock", i)
}

//节点2
func lock2(i int) {
	logger.Debugf("mutex2 lock%d lock.....", i)
	mutex2.Lock("test", 10)
	logger.Debugf("mutex2 lock%d get lock successful", i)
}

//节点3
func lock3(i int) {
	logger.Debugf("mutex3 lock%d lock.....", i)
	mutex3.Lock("test", 10)
	logger.Debugf("mutex3 lock%d get lock successful", i)
	time.Sleep(2 * time.Second)
	mutex3.Unlock("test")
	logger.Debugf("mutex3 lock%d unlock", i)
}

测试调用

func Test_lock(t *testing.T) {
	go lock1(1)
	go lock2(2)
	go lock3(3)
	select {}
}


执行结果:

可以看到:

  • 2024/12/31 22:34:35 三个节点的同时抢占分布式锁
  • 节点mutex2获取到了锁,由于mutex2没有主动释放锁,mutex2.Lock("test", 10) 这里表示10秒后 ,集群自动释放锁
  • 2024/12/31 22:34:45 mutex2持有的分布式锁被服务自动释放,同时mutex1节点获取到分布式锁
  • 2024/12/31 22:34:47 mutex1在2秒后显式调用UnLock释放锁,同时mutex3节点获取到分布式锁
  • 2024/12/31 22:34:49 mutex3在2秒后显式调用UnLock释放锁

Lockx 可以海量创建分布式锁,如:

func Test_multi_lock(t *testing.T) {
	for i := 1; i < 1<<15; i++ {  //mutex1节点创建32768个并发任务
		go lock1(i)
	}
	for i := 1; i < 1<<15; i++ { //mutex2节点创建32768个并发任务
		go lock2(i)
	}
	for i := 1; i < 1<<15; i++ { //mutex3节点创建32768个并发任务
		go lock3(i)
	}
	select {}
}
  • 每个节点同时并发创建32768个分布式锁对象

执行结果:

可以看到,每2秒有一个对象获取到分布式锁,按顺序依次执行获取分布式锁与解锁。

(注意:mutex2增加了2秒后释放锁,否则mutex2节点获取锁后,将等待10秒后有raftx集群释放锁)



Lockx 的源码地址

可以直接将其当成第三方分布式锁库在工程中使用

程序中调用示例

import "github.com/donnie4w/lockx"

//创建分布式锁管理器 mutex1
var mutex1 = lockx.NewMutex(":20001", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})



结论

Lockx 利用了 raftx 的高效特性和易失性数据存储能力,提供了一种简洁而强大的分布式锁解决方案。它不仅适合常规的分布式锁需求,还能够在高并发环境下保持性能优势,确保系统的稳定性和可靠性。

如果你考虑在项目中引入这样的分布式锁库,可以参考上述信息进行评估和集成。此外,也可以根据自己的具体需求调整 Lockx 的实现,例如实现更复杂的锁行为(如公平锁等)。