如何快速构建可靠的分布式IM聊天系统


tlnetim 聊天项目是一个分布式 im demo,基于 tlnet http框架和 tldb数据库。tldb是一个高性能的分布式数据库,基于tldb可以快速构建分布式系统。

tlnetim 的开源程序:

tlnetim主要的功能:多聊天室多人聊天系统,水平扩展多服务器部署构建分布式

im.tlnet.topim2.tlnet.top 是分布式系统的两个不同的服务。

用户可以连接任意服务器。

除去部分存储数据的实现,im的逻辑代码实际只有几十行,基于tldb mq代码实现分布式的代码也只有几十行。

room, ok := wsmap.Get(ws)
    if !ok {
        if wa.ATYPE == LOGIN {
            if iu, ok := getUserInfo(wa.MSG); ok {
                room = strings.TrimSpace(wa.ROOM)
                store(ws, iu, room)
                //记录登录日志
                orm.Insert(&ImLog{UserId: iu.Id, Room: room, Time: TimeNow()})
                ws.Send(wsack{ATYPE: wa.ATYPE, USERNAME: iu.Name, ICON: iu.Icon, TIME: TimeNow(), ROOM: room}.toJson())
                immq.PubId(room, iu.Id)
                //返回好友列表
                if *UseRobot {
                    ws.Send(wsack{ATYPE: FRIEND, USERNAME: robot.Name, ICON: robot.Icon, LABEL: robot.Label}.toJson())
                }
                broadcastToSelf(&wsack{ATYPE: FRIEND}, ws, room)
                //通知好友
                broadcast(&wsack{ATYPE: FRIEND, USERNAME: iu.Name, TIME: TimeNow(), ICON: iu.Icon}, ws, room, true, true)
                //返回聊天室 最新N条数据
                if id, _ := orm.SelectIdByIdx[ImMessage]("Room", room); id > 0 {
                    startid := id - 20
                    if startid < 0 {
                        startid = 0
                    }
                    if ims, _ := orm.SelectByIdxLimit[ImMessage](startid, 21, "Room", room); ims != nil {
                        for _, im := range ims {
                            var u *ImUser
                            if im.UserId > 1<<60 {
                                u = robot
                            } else {
                                u, _ = orm.SelectById[ImUser](im.UserId)
                            }
                            if u != nil {
                                ws.Send(wsack{ATYPE: MSG, USERNAME: u.Name, ICON: u.Icon, MSG: im.Content, TIME: im.Time}.toJson())
                            }
                        }
                    }
                }
            } else {
                ws.Send(wsack{ATYPE: NOPASS}.toJson())
            }
        }
    } else if wa.ATYPE == MSG {
        iu, _ := getIu(room, ws)
        t := TimeNow()
        //保存聊天信息
        if _, err := orm.Insert(&ImMessage{UserId: iu.Id, Content: wa.MSG, Time: t, Room: room}); err == nil {
            //发送聊天数据
            broadcast(&wsack{ATYPE: MSG, USERNAME: iu.Name, MSG: wa.MSG, TIME: t, ICON: iu.Icon}, nil, room, true, false)
        }
    }

tlnet将服务器的websocket封装为 三个阶段:

  • 打开连接阶段:OnOpen
  • 读到信息阶段:WS.Read()
  • 链接关闭或出错阶段:OnError

这些封装,让websocket的用法与使用普通的http服务基本一致:

    wc = &tlnet.WebsocketConfig{}
    //websocket断开时,触发OnError。删除wsmap中的连接
    wc.OnError = func(self *tlnet.Websocket) {
        if r, ok := wsmap.Get(self); ok {
            if u, ok := getIu(r, self); ok {
                //掉线通知
                broadcast(&wsack{ATYPE: LOGOUT, USERNAME: u.Name}, nil, r, true, true)
            }
        }
    }
    //wc.OnOpen 用在连接成功时调用
    
    //hc.WS.Read() 读取websocket接收的数据
    var wa wsack
	if err := json.Unmarshal(hc.WS.Read(), &wa); err == nil { 
		parse(wa, hc.WS) //解析并处理信息
	}

基于tldb的MQ消息订阅发布,简洁地实现分布式构建

this.mq = cli.NewMqClient("ws://127.0.0.1:5000", "mymq=123") //mq服务器地址与用户名密码
    if err := this.mq.Connect(); err != nil {                 //mq.Connect() 连接服务器
        panic("mq connect err:" + err.Error())
    }
    this.mq.MergeOn(1)              //设置服务器信息聚合发送到客户端,1表示数据包大小上限为1MB
    this.mq.Sub("immsg")            //订阅topic:immsg
    this.mq.Sub("id")               //订阅 topic:id
    this.mq.Sub(fmt.Sprint(nodeId)) //订阅本节点信息
    //处理订阅信息,接收发布函数PubMem()发送的数据,不存储信息
    this.mq.PubMemHandler(func(jmb *JMqBean) {
        defer MyRecover()
        var ms mqws
        json.Unmarshal([]byte(jmb.Msg), &ms)
        switch jmb.Topic {
        case "immsg":
            if ms.NodeId != nodeId {
                broadcast(ms.Wa[0], nil, ms.Room, false, false)
            }
        case "id":
            if m, ok := roomap.Get(ms.Room); ok {
                wss := make([]*wsack, 0)
                m.Range(func(_ *tlnet.Websocket, vu *ImUser) bool {
                    if ms.UserId != vu.Id {
                        wss = append(wss, &wsack{ATYPE: FRIEND, USERNAME: vu.Name, ICON: vu.Icon, LABEL: vu.Label})
                    }
                    return true
                })
                immq.PubInfo(ms.NodeId, ms.UserId, ms.Room, wss)
            }
        case fmt.Sprint(nodeId):
            if k, ok := wamap.Get(ms.UserId); ok {
                for _, v := range ms.Wa {
                    k.Range(func(w *tlnet.Websocket, _ int8) bool {
                        w.Send(v.toJson())
                        return true
                    })
                }
            }
        }
    })
    //处理订阅信息,这里使用json格式,接收发布函数PubJson()发送的数据,也可以使用 PubByteHandler()对应PubByte()
    this.mq.PubJsonHandler(func(jmb *JMqBean) {
        defer MyRecover()
        var ms mqws
        json.Unmarshal([]byte(jmb.Msg), &ms)
        switch jmb.Topic {
        case "immsg":
            if ms.NodeId != nodeId {
                broadcast(ms.Wa[0], nil, ms.Room, false, false)
            }
        }
    })

通知好友函数:

broadcast(&wsack{ATYPE: FRIEND, USERNAME: iu.Name, TIME: TimeNow(), ICON: iu.Icon}, ws, room, true, true)

broadcast实际是tlnetim实现的一个消息路由功能,将状态(上线,下线等),发出信息等进行广播,其中包括路由给其他分布式节点。tlnetim在发送给其他节点中的实现采用tldb MQ的订阅发布,主要用PubMem 与PubJson,这两个发布函数也是有区别的,PubMem 不存储发布的信息,一般用于状态信息的发布,比如登录信息,下线信息等。PubJson  发布的信息会记录在tldb中,保证信息不丢失。

针对在并发量比较大的分布式系统。tldb MQ对消息处理提供一些解决方案

this.mq.MergeOn(1)    //服务器信息聚合发送到客户端,1表示数据包大小上限为1MB
this.mq.SetZlib(true) //服务器消息压缩

MQ 提供消息的可靠性保证:客户端信息回执,拉取信息等:

this.mq.RecvAckOn(10) //消息回执与重发时间的设置,10表示如果客户端10内不回执服务器信息,则服务器会再次发送信息
this.mq.PullJsonSync("immsg",1) //拉取topic:immsg id为1的信息 json格式
this.mq.PullByteSync("immsg",1) //拉取topic:immsg id为1的信息 二进制格式
this.mq.PullIdSync("immsg") //拉取 topic:immsg的最大信息id

MQ支持各个客户端对 MergeOn  ,SetZlib, RecvAckOn这些功能 根据不同实际情况各自设定。

    SetZlib 是用zlib压缩消息发送,在大量使用zlib压缩时,服务器会消耗大量内存,所以并非每个消息都适合压缩发送,应该根据实际情况,如果消息体比较大,或采用聚合信息,也就是多少个消息聚合发送,总数据比较大,这时压缩信息会有比较好的效果,压缩比例较大,达到减小传输时间,提高吞掉量的效果。相反,如果消息体本身已经较小,压缩效果不佳,这时压缩消息反而增加服务器压力。

     RecvAckOn 是消息回执,保证消息不丢失。MQ服务器给节点推送信息时,节点会发Ack给MQ告知消息已收到,否则,当MQ服务器没有收到Ack时,会不断给节点推送信息;类似这样的功能,不管是tldb MQ还是其他MQ,节点的状态应该实时监控,如果节点压力过大,比如,某个服务节点cpu被打满了,此时,节点可能无法回复ack给mq服务器,mq服务器由于收不到回执,会出现大量信息积压。

    不开启RecvAckOn的情况下,tldb mq也提供了支持客户端信息不丢失的方法。如果不开启 RecvAckOn,对于同一信息,mq服务器只给节点发送一次。如果节点怀疑服务器信息没有到达,即信息丢了(需要客户端根据实际业务情况实现一个怀疑发现策略),可以通过拉取id函数PullIdSync拉取订阅主题的最大id,与本地的id比较,来判断本地是否有信息未读取到,通过拉取信息函数PullJsonSync或PullByteSync 将mq服务器的信息拉取到本地。


      即时通讯系统,根据不同业务要求,有不同的实现。tlnetim聊天IM在协议方面只是简单设计,只是demo,不适用复杂或完整的IM系统。完整的IM聊天协议可以参考xmpp即时通讯协议。



有任何问题或建议请Email:donnie4w@gmail.comhttp://tlnet.top/contact  发信给我,谢谢!