分布式锁是一种用于控制分布式系统中资源访问的同步机制,确保在任意时刻只有一个客户端能够获取到锁,并对共享资源进行操作。
raftx 是一种对经典 Raft 协议的扩展,结合了 Multi-Paxos、ZAB(Zookeeper Atomic Broadcast)和 Raft 协议的优势。RaftX 具备快速选举、并发提案、数据同步、数据回滚以及易失性数据同步等特性,适用于高并发和大规模分布式系统场景。
Lockx是依赖raftx实现的一个分布式锁应用库,实现方式简单,代码量少,100行左右代码,但是它的功能却十分强大,主要表现在:
Lockx 支持一次性创建成千上万,甚至数十万或数百万个分布式锁,它的实现机制保证了它不会大量占用CPU资源和内存资源;它的锁动作变更触发机制针对的是锁资源,而非分布式对象锁本身,也就是说,即使节点中有100万个锁竞争一个锁资源,每次也只会触发一次锁的释放与竞争的指令;比如锁资源"lockmux",那么在分布式系统中,当资源 “lockmux”被释放时,它将触发节点中的 “lockmux”绑定事件一次,并让等待的资源随机发送一条竞争锁的指令竞争该资源锁,而不是触发100万个等待中的锁对象竞争事件。
lockx主要依赖raftx的易失性数据API实现,它的特点是高效,强一致性,并且可以绑定键值的增删改的触发事件;利用这些特性,可以轻松实现分布式锁的逻辑。
m.raft.MemWatch([]byte(lockstr), func(key, value []byte, watchType raft.WatchType) {
//获取锁成功与否
if watchType == raft.ADD {
if mb, ok := m.mp.Get(util.BytesToInt64(value)); ok {
m.del(string(key), util.BytesToInt64(value))
close(mb.ctx)
}
}
//锁释放,阻塞代码再次重新获取分布式锁
if watchType == raft.DELETE {
m.mux.Lock()
defer m.mux.Unlock()
if ids, b := m.rmap[string(key)]; b {
for k := range ids {
m.raft.MemCommand(key, util.Int64ToBytes(k), timeout, raft.MEM_PUT)
break
}
}
}
//TryLock获取锁失败触发
if watchType == raft.UPDATE {
if mb, ok := m.mp.Get(util.BytesToInt64(value)); ok {
if mb.isTry {
m.del(string(key), util.BytesToInt64(value))
mb.ctx <- true
close(mb.ctx)
}
}
}
}, false, raft.ADD, raft.DELETE, raft.UPDATE)
Lockx 的使用非常简单,并且它可以支持大量创建分布式锁,它一共有3个方法
Lock(string,int)
获取指定资源的分布式锁并设置过期时间,阻塞TryLock(string,int)bool
获取指定资源的分布式锁并设置过期时间,若获取不到返回false,不阻塞UnLock(string)
释放指定资源的分布式锁以下模拟3个集群节点
//节点1,创建分布式锁管理器 mutex1
mutex1 = NewMutex(":20001", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})
//节点2,创建分布式锁管理器 mutex2
mutex2 = NewMutex(":20002", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})
//节点3,创建分布式锁管理器 mutex3
mutex3 = NewMutex(":20003", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})
这样就完成了分布式锁管理器的创建,并可以直接获取各个自定义资源的分布式锁,这里的资源指的是字符串,比如 “test”
示例
//节点1
func lock1(i int) {
logger.Debugf("mutex1 lock%d lock.....", i)
mutex1.Lock("test", 10)
logger.Debugf("mutex1 lock%d get lock successful", i)
time.Sleep(2 * time.Second)
mutex1.Unlock("test")
logger.Debugf("mutex1 lock%d unlock", i)
}
//节点2
func lock2(i int) {
logger.Debugf("mutex2 lock%d lock.....", i)
mutex2.Lock("test", 10)
logger.Debugf("mutex2 lock%d get lock successful", i)
}
//节点3
func lock3(i int) {
logger.Debugf("mutex3 lock%d lock.....", i)
mutex3.Lock("test", 10)
logger.Debugf("mutex3 lock%d get lock successful", i)
time.Sleep(2 * time.Second)
mutex3.Unlock("test")
logger.Debugf("mutex3 lock%d unlock", i)
}
测试调用
func Test_lock(t *testing.T) {
go lock1(1)
go lock2(2)
go lock3(3)
select {}
}
可以看到:
2024/12/31 22:34:35
三个节点的同时抢占分布式锁2024/12/31 22:34:45
mutex2持有的分布式锁被服务自动释放,同时mutex1节点获取到分布式锁2024/12/31 22:34:47
mutex1在2秒后显式调用UnLock释放锁,同时mutex3节点获取到分布式锁2024/12/31 22:34:49
mutex3在2秒后显式调用UnLock释放锁func Test_multi_lock(t *testing.T) {
for i := 1; i < 1<<15; i++ { //mutex1节点创建32768个并发任务
go lock1(i)
}
for i := 1; i < 1<<15; i++ { //mutex2节点创建32768个并发任务
go lock2(i)
}
for i := 1; i < 1<<15; i++ { //mutex3节点创建32768个并发任务
go lock3(i)
}
select {}
}
可以看到,每2秒有一个对象获取到分布式锁,按顺序依次执行获取分布式锁与解锁。
(注意:mutex2增加了2秒后释放锁,否则mutex2节点获取锁后,将等待10秒后有raftx集群释放锁)
可以直接将其当成第三方分布式锁库在工程中使用
程序中调用示例
import "github.com/donnie4w/lockx"
//创建分布式锁管理器 mutex1
var mutex1 = lockx.NewMutex(":20001", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})
Lockx
利用了 raftx
的高效特性和易失性数据存储能力,提供了一种简洁而强大的分布式锁解决方案。它不仅适合常规的分布式锁需求,还能够在高并发环境下保持性能优势,确保系统的稳定性和可靠性。
如果你考虑在项目中引入这样的分布式锁库,可以参考上述信息进行评估和集成。此外,也可以根据自己的具体需求调整 Lockx
的实现,例如实现更复杂的锁行为(如公平锁等)。