开发一个分布式日志系统是一个复杂的任务,涉及多个方面的设计和技术决策。以下是一些关键步骤和考虑因素
本文将基于这些需求,利用 raftx 协议,为分布式日志系统提供一个基础性框架,使用raftx作为一致性协议的实现来确保跨节点的日志数据一致性和顺序性。
核心代码实现
//注册Debug的事件监听
cl.raft.MemWatch([]byte{0}, func(key, value []byte, watchType raft.WatchType) {
cl.log.Debug(string(value))
}, true, raft.ADD, raft.UPDATE)
import (
"github.com/donnie4w/go-logger/logger"
"github.com/donnie4w/raftx"
"github.com/donnie4w/raftx/raft"
)
type Logx struct {
log *logger.Logging
raftx raftx.Raftx
}
func NewLogx(filePath string, listen string, peers []string) (r *Logx) {
log := logger.NewLogger().SetOption(&logger.Option{FileOption: &logger.FileSizeMode{Filename: filePath, Maxsize: 1 << 30}, Format: logger.FORMAT_DATE | logger.FORMAT_SHORTFILENAME})
rx := raftx.NewRaftx(&raft.Config{ListenAddr: listen, PeerAddr: peers})
go rx.Open() //启动raftx服务
r = &Logx{log: log, raftx: rx}
go func() {
rx.WaitRun() //等待raftx集群可正常服务
r.init()
}()
return
}
func (lx *Logx) init() {
//注册Debug的事件监听
lx.raftx.MemWatch([]byte{0}, func(key, value []byte, watchType raft.WatchType) {
lx.log.Debug(string(value))
}, true, raft.ADD, raft.UPDATE)
//注册Info的事件监听
lx.raftx.MemWatch([]byte{1}, func(key, value []byte, watchType raft.WatchType) {
lx.log.Info(string(value))
}, true, raft.ADD, raft.UPDATE)
//注册Warn的事件监听
lx.raftx.MemWatch([]byte{2}, func(key, value []byte, watchType raft.WatchType) {
lx.log.Warn(string(value))
}, true, raft.ADD, raft.UPDATE)
//注册Error的事件监听
lx.raftx.MemWatch([]byte{3}, func(key, value []byte, watchType raft.WatchType) {
lx.log.Error(string(value))
}, true, raft.ADD, raft.UPDATE)
}
func (lx *Logx) Debug(value []byte) error {
return lx.raftx.MemCommand([]byte{0}, value, 0, raft.MEM_PUT)
}
func (lx *Logx) Info(value []byte) error {
return lx.raftx.MemCommand([]byte{1}, value, 0, raft.MEM_PUT)
}
func (lx *Logx) Warn(value []byte) error {
return lx.raftx.MemCommand([]byte{2}, value, 0, raft.MEM_PUT)
}
func (lx *Logx) Error(value []byte) error {
return lx.raftx.MemCommand([]byte{3}, value, 0, raft.MEM_PUT)
}
测试代码:以下模拟3个集群节点:
var c1 *Logx
var c2 *Logx
var c3 *Logx
func newlog1() *Logx {
return NewLogx("log1.log", ":20001", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})
}
func newlog2() *Logx {
return NewLogx("log2.log", ":20002", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})
}
func newlog3() *Logx {
return NewLogx("log3.log", ":20003", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})
}
func Benchmark_logx(b *testing.B) {
for i := 0; i < b.N; i++ {
c1.Debug([]byte("hello--------------->" + strconv.Itoa(i))) //模拟c1节点打印日志
c2.Info([]byte("world--------------->" + strconv.Itoa(i))) //模拟c2节点打印日志
c3.Warn([]byte("hello raftx--------------->" + strconv.Itoa(i))) //模拟c3节点打印日志
}
}
func Test_Parallel(t *testing.T) {
for i := range 1 << 17 { //这里将模拟每个节点 13万并发写日志数据
go func() {
e1 := c1.Debug([]byte("hello--------------->" + strconv.Itoa(i))) //模拟c1节点打印日志
e2 := c2.Info([]byte("world--------------->" + strconv.Itoa(i))) //模拟c2节点打印日志
e3 := c3.Warn([]byte("hello raftx--------------->" + strconv.Itoa(i))) //模拟c3节点打印日志
if e1 != nil || e2 != nil || e3 != nil {
t.Log(e1, e2, e3)
}
}()
}
time.Sleep(30 * time.Second) //根据实际环境设置等待时间,确保所有节点都执行结束
TestFileSync(t) //检查各个节点生成的日志文件是否相同
}
注意:
对比日志文件数据是否相同
func TestFileSync(t *testing.T) {
t.Log("fileByteEq 1&2:", fileByteEq1())
t.Log("fileByteEq 1&3:", fileByteEq2())
}
func fileByteEq1() bool {
bs1, _ := util.ReadFile("log1.log")
bs2, _ := util.ReadFile("log2.log")
return bytes.Equal(bs1, bs2)
}
func fileByteEq2() bool {
bs1, _ := util.ReadFile("log1.log")
bs2, _ := util.ReadFile("log3.log")
return bytes.Equal(bs1, bs2)
}
通过系列测试,可以看到,调用TestFileSync
执行,每个节点13万并发写日志生成的3个文件内容完全一致。
log_test.go:55: fileByteEq 1&2: true
log_test.go:56: fileByteEq 1&3: true
通过raftx API,可以高效协同各个集群节点的数据操作,需要注意的是:该实现并非绝对保证各个节点的数据一致, 该实现没有进一步检查各个节点的日志文件的差异并实现数据同步等操作。 例如,当一个集群节点宕机后,它的内存数据被清空,当它重新连接上集群时,可能会丢失部分数据无法同步回来,因为raftx易失性数据的日志长度是有限制的,节点断开时间过长,可能缺失的数据将超过日志最大上限,此时,重连或新增的节点将无法同步完整的数据。
raftx 分布式易失性数据服务的特点是数据应当是相对短暂的。如果使用以上的方式开发分布式日志库,并且出现了宕机丢失数据的情况,应当手动将最新的日志数据同步到重启的节点,尽量缩小新加入节点数据与正常节点数据差异。再让该节点重新连上集群。