如何开发一个分布式日志系统


开发一个分布式日志系统是一个复杂的任务,涉及多个方面的设计和技术决策。以下是一些关键步骤和考虑因素

1. 一般需求
  • 持久性:确保日志条目不会因为单点故障而丢失。
  • 一致性:保证所有节点上的日志最终一致。
  • 可用性:即使部分节点失效,系统仍然可以处理日志写入和读取。
  • 性能:支持高吞吐量的日志写入和快速的查询响应时间。
  • 扩展性:能够随着数据增长或用户数量增加而水平扩展。
2. 设计架构
  • 分布式一致性协议
    • Paxos/Raft/ZAB/Raftx:这些协议用于解决分布式系统中的领导者选举和命令提交问题,确保在多数派节点达成一致的情况下,日志条目的顺序是全局一致的。
  • 数据分区与复制
    • 分片(Sharding):将日志按照某种规则划分为多个分片,每个分片由一组节点负责,以实现负载均衡和水平扩展。
    • 副本(Replication):为每个分片创建多个副本,放置在不同的节点上,以增强系统的容错能力和数据持久性
3. 核心功能
  • 日志追加(Log Append):实现高效且原子性的日志追加操作,确保每次写入都是不可分割的整体。
  • 日志同步(Log Syncing):设计一种机制让Leader将日志同步到Follower节点,同时保证数据的一致性和完整性



本文将基于这些需求,利用 raftx 协议,为分布式日志系统提供一个基础性框架,使用raftx作为一致性协议的实现来确保跨节点的日志数据一致性和顺序性

优点
  1. 一致性:通过使用 Raftx 协议,该系统能够保证日志条目的全局顺序一致性。
  2. 简单易懂:代码结构清晰,易于理解。
  3. 功能完整:实现了基本的日志记录功能,并且支持多种日志级别(Debug, Info, Warn, Error)。
  4. 并发处理:提供了并发写入日志的能力,适用并发场景。
缺点
  1. 缺乏故障恢复机制:当一个节点重新加入集群时,需要有一个有效的机制来同步缺失的数据,否则可能丢失数据。
  2. 监控与报警:不具备自我监控能力。


raftx的介绍,可以参考:Githubwiki

核心代码实现

	//注册Debug的事件监听
	cl.raft.MemWatch([]byte{0}, func(key, value []byte, watchType raft.WatchType) {
		cl.log.Debug(string(value))
	}, true, raft.ADD, raft.UPDATE)
  • 这部分代码是Debug方法的核心逻辑实现
  • 实现逻辑为通过监听键值的创建与修改即 ADD,UPDATE的事件,捕获写日志的操作,并调用本地日志工具,将事件触发的日志数据,写入本地日志文件,由于raftx保证了各个节点数据的有序性,因此,日志数据也是有序的,这样保证了各个节点的日志数据是相同的。


以下是完整的代码

分布式日志库 logx: 代码地址
import (
	"github.com/donnie4w/go-logger/logger"
	"github.com/donnie4w/raftx"
	"github.com/donnie4w/raftx/raft"
)

type Logx struct {
	log   *logger.Logging
	raftx raftx.Raftx
}

func NewLogx(filePath string, listen string, peers []string) (r *Logx) {
	log := logger.NewLogger().SetOption(&logger.Option{FileOption: &logger.FileSizeMode{Filename: filePath, Maxsize: 1 << 30}, Format: logger.FORMAT_DATE | logger.FORMAT_SHORTFILENAME})
	rx := raftx.NewRaftx(&raft.Config{ListenAddr: listen, PeerAddr: peers})
	go rx.Open() //启动raftx服务
	r = &Logx{log: log, raftx: rx}
	go func() {
		rx.WaitRun() //等待raftx集群可正常服务
		r.init()
	}()
	return
}

func (lx *Logx) init() {
	//注册Debug的事件监听
	lx.raftx.MemWatch([]byte{0}, func(key, value []byte, watchType raft.WatchType) {
		lx.log.Debug(string(value))
	}, true, raft.ADD, raft.UPDATE)

	//注册Info的事件监听
	lx.raftx.MemWatch([]byte{1}, func(key, value []byte, watchType raft.WatchType) {
		lx.log.Info(string(value))
	}, true, raft.ADD, raft.UPDATE)

	//注册Warn的事件监听
	lx.raftx.MemWatch([]byte{2}, func(key, value []byte, watchType raft.WatchType) {
		lx.log.Warn(string(value))
	}, true, raft.ADD, raft.UPDATE)

	//注册Error的事件监听
	lx.raftx.MemWatch([]byte{3}, func(key, value []byte, watchType raft.WatchType) {
		lx.log.Error(string(value))
	}, true, raft.ADD, raft.UPDATE)
}

func (lx *Logx) Debug(value []byte) error {
	return lx.raftx.MemCommand([]byte{0}, value, 0, raft.MEM_PUT)
}

func (lx *Logx) Info(value []byte) error {
	return lx.raftx.MemCommand([]byte{1}, value, 0, raft.MEM_PUT)
}

func (lx *Logx) Warn(value []byte) error {
	return lx.raftx.MemCommand([]byte{2}, value, 0, raft.MEM_PUT)
}

func (lx *Logx) Error(value []byte) error {
	return lx.raftx.MemCommand([]byte{3}, value, 0, raft.MEM_PUT)
}


测试代码:以下模拟3个集群节点:

var c1 *Logx
var c2 *Logx
var c3 *Logx

func newlog1() *Logx {
	return NewLogx("log1.log", ":20001", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})
}
func newlog2() *Logx {
	return NewLogx("log2.log", ":20002", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})
}
func newlog3() *Logx {
	return NewLogx("log3.log", ":20003", []string{"127.0.0.1:20001", "127.0.0.1:20002", "127.0.0.1:20003"})
}


压测调用
func Benchmark_logx(b *testing.B) {
	for i := 0; i < b.N; i++ {
		c1.Debug([]byte("hello--------------->" + strconv.Itoa(i)))        //模拟c1节点打印日志
		c2.Info([]byte("world--------------->" + strconv.Itoa(i)))         //模拟c2节点打印日志
		c3.Warn([]byte("hello raftx--------------->" + strconv.Itoa(i)))   //模拟c3节点打印日志
	}
}


并发测试调用
func Test_Parallel(t *testing.T) {
	for i := range  1 << 17 { //这里将模拟每个节点 13万并发写日志数据
		go func() {
			e1 := c1.Debug([]byte("hello--------------->" + strconv.Itoa(i)))        //模拟c1节点打印日志
			e2 := c2.Info([]byte("world--------------->" + strconv.Itoa(i)))         //模拟c2节点打印日志
			e3 := c3.Warn([]byte("hello raftx--------------->" + strconv.Itoa(i)))   //模拟c3节点打印日志
			if e1 != nil || e2 != nil || e3 != nil {
				t.Log(e1, e2, e3)
			}
		}()
	}
	time.Sleep(30 * time.Second) //根据实际环境设置等待时间,确保所有节点都执行结束
	TestFileSync(t) //检查各个节点生成的日志文件是否相同
}

注意:

  • 如果测试完过早退出,有的节点内部还没有执行完毕,可能导致日志不完整,出现日志缺少数据的情况



对比日志文件数据是否相同

func TestFileSync(t *testing.T) {
	t.Log("fileByteEq 1&2:", fileByteEq1())
	t.Log("fileByteEq 1&3:", fileByteEq2())
}

func fileByteEq1() bool {
	bs1, _ := util.ReadFile("log1.log")
	bs2, _ := util.ReadFile("log2.log")
	return bytes.Equal(bs1, bs2)
}

func fileByteEq2() bool {
	bs1, _ := util.ReadFile("log1.log")
	bs2, _ := util.ReadFile("log3.log")
	return bytes.Equal(bs1, bs2)
}


通过系列测试,可以看到,调用TestFileSync执行,每个节点13万并发写日志生成的3个文件内容完全一致。

    log_test.go:55: fileByteEq 1&2: true
    log_test.go:56: fileByteEq 1&3: true


总结

通过raftx API,可以高效协同各个集群节点的数据操作,需要注意的是:该实现并非绝对保证各个节点的数据一致, 该实现没有进一步检查各个节点的日志文件的差异并实现数据同步等操作。  例如,当一个集群节点宕机后,它的内存数据被清空,当它重新连接上集群时,可能会丢失部分数据无法同步回来,因为raftx易失性数据的日志长度是有限制的,节点断开时间过长,可能缺失的数据将超过日志最大上限,此时,重连或新增的节点将无法同步完整的数据。

raftx 分布式易失性数据服务的特点是数据应当是相对短暂的。如果使用以上的方式开发分布式日志库,并且出现了宕机丢失数据的情况,应当手动将最新的日志数据同步到重启的节点,尽量缩小新加入节点数据与正常节点数据差异。再让该节点重新连上集群。