Home  /  tldb  /  tldb MQ客户端使用


TLDB MQ客户端使用

tldb有 java,python,go,js等语言实现的客户端,各客户端使用的方式统一:实例化与调用函数名是一致的

  1. 各个客户端都支持断线重连
  2. 各个客户端都实现一个SimpleClient ,可以直接使用
  3. tldb mq 支持 聚合接收数据如何使用tldb MQjava如何使用tldb MQ

go客户端 https://github.com/donnie4w/tlmq-go


            新建mq客户端实例
                sc := cli.SimpleClient{Url: "ws://127.0.0.1:5100", Auth: "mymq=123"}
            
            调用Connect()函数连接服务器
                sc.Connect()
            
            SimpleClient结构体中
                1.Url 为websocket 的访问连接,如:ws://127.0.0.1:5100
                若服务器启动TLS安全链接,则Url应为:wss://127.0.0.1:5100
                2.Auth 为mq链接用户名密码,用等于号连接起来
                3.Origin 为 http请求头origin,默认值:http://tldb-mq
            
            服务器向MQ客户端发送的数据会分发到以下函数
                1.PullByteHandler 客户端拉取数据异步返回 *MqBean 数据对象
                2.PullJsonHandler  客户端拉取数据异步返回 *JMqBean 数据对象
                3.PubByteHandler 客户端接收其他PubByte发布的 *MqBean 数据对象
                4.PubJsonHandler 客户端接收其他PubJson发布的 *JMqBean 数据对象
                5.PubMemHandler 客户端接收其他PubMem发布的 *MqBean 内存数据对象
                6.AckHandler 服务器对客户端发送的每条数据的ACK回执
                7.ErrHandler  服务器返回请求处理错误的错误码;如1301用户名密码错误
            
            PubByte发布的数据,将由PubByteHandler 接收处理
            PubJson发布的数据,将由PubJsonHandler 接收处理
            PubMem发布的数据,将由PubMemHandler 接收处理
            
            
            各个处理函数具体实现
                sc.PubByteHandler(func(mb *MqBean) { logging.Debug("PubByte >> ", mb) })
                sc.PubJsonHandler(func(jmb *JMqBean) { logging.Debug("PubJson >> ", jmb) })
                sc.PubMemHandler(func(jmb *JMqBean) { logging.Debug("PubMem >> ", jmb) })
                sc.PullByteHandler(func(mb *MqBean) { logging.Debug("PullByte >> ", mb) })
                sc.PullJsonHandler(func(jmb *JMqBean) { logging.Debug("PullJson >> ", jmb) })
                sc.AckHandler(func(id int64) { logging.Debug("ack >> ", id) })
                sc.ErrHandler(func(code int64) { logging.Error("err code >> ", code) })
            
            
            实际使用中,只需实现需要处理的函数,其他函数可无需实现;如:
                sc.PubByteHandler(func(mb *MqBean) { logging.Debug("PubByte >> ", mb) })
            订阅sc.Sub("usertable")
            所有执行sc.PubByte("usertable", []byte("this is go usertable byte >>"))发布的数据均会发送到 PubByteHandler 函数中
            
            订阅topic
                sc.Sub("usertable")
            删除订阅
                sc.SubCancel("usertable")
            
            发布topic
            1.sc.PubByte("usertable", []byte("this is go usertable byte"))
                发布数据由PubByteHandler函数接收处理
            
            2.sc.PubJson("usertable", []byte("this is go usertable json"))
                发布数据由PubJsonHandler函数接收处理
            
            3.sc.PubMem("usertable", []byte("this is go usertable mem"))
                发布数据由PubMemHandler函数接收处理
                该接口不存数据,所以JMqBean中ID为0
            
            异步拉取数据
                sc.PullByte("usertable",1)
                sc.PullJson("usertable",2)
                PullByte拉取数据由 PullByteHandler函数接收处理
                PullJson拉取数据由 PullJsonHandler函数接收处理
            
            拉取topic的当前ID值,如:
                sc.PullIdSync("usertable")
                PullIdSync 拉取 topic 的当前ID值
            
            同步拉取数据
                mb,err:=sc.PullByteSync("usertable",1)
                jmb,err:=sc.PullJsonSync("usertable",2)
            
            
            聚合接收数据
                sc.MergeOn(10)
                参数10表示10M,设定服务器发送协议数据压缩前大小上限
            
            启动客户端回执
                sc.RecvAckOn(60)
                客户端ack确认收到信息,否则服务器不认为信息已送达,将一直发送, 60s 设定服务器重发时间
            


java客户端 https://github.com/donnie4w/tlmq-j


             实例化
             SimpleClient sc = new SimpleClient("ws://127.0.0.1:5000", "mymq=123");
             连接服务器
             sc.connect();
            
             实现处理函数:
             sc.pubByteHandler((mb) -> {});
             sc.pubJsonHandler ((mb) -> {});
             sc.pullByteHandler ((mb) -> {});
             sc.pubMemHandler ((mb) -> {});
             sc.pullJsonHandler ((mb) -> {});
             sc.errHandler ((code) -> {});
             sc.ackHandler ((id) -> {});
            
             订阅发布所有函数与go一致
            


python客户端 https://github.com/donnie4w/tlmq-py


            实例化
            sc = SimpleClient("wss://127.0.0.1:5000", "mymq=123")
            连接服务器
            sc.connect()
            该函数阻塞,可以启动新的线程;如:
             _thread.start_new_thread(sc.connect, ())
            
            设置处理函数
                sc.pullByteHandler(lambda mb: logging.debug("PullByteHandler >> " + str(mb)))
                sc.pullJsonHandler(lambda mb: logging.debug("PullJsonHandler >> " + str(mb)))
                sc.pubByteHandler(lambda mb: logging.debug("PubByteHandler >> " + str(mb)))
                sc.pubJsonHandler(lambda mb:  logging.debug("PubJsonHandler >> " + str(mb)))
                sc.pubMemHandler(lambda mb:  logging.debug("PubMemHandler >> " + str(mb)))
                sc.ackHandler(lambda aid: logging.debug("ack id>> " + str(aid)))
                sc.errHandler(lambda code: logging.error("err code >>" + str(code)))
                sc.before(lambda : logging.debug("before >>"))
             订阅发布所有函数与go一致
            


js客户端 https://github.com/donnie4w/tlmq-js


                //实例化
                var mc = new mqCli("ws://localhost:5100", "mymq=123");
                //实现接收订阅信息的方法 
                mc.PubJsonHandler = function (data) { console.log("pubjson data>>>" + data); };
                //实现接收订阅信息的方法
                mc.PubMemHandler = function (data) { console.log("pubmem data>>>" + data); };
                //连接服务器
                mc.connect();
                //订阅topic
                mc.sub('111');
                //发布topic
                mc.pubJson('111','hello js pubjson');
                //发布topic,内存模式
                mc.pubMem('111','hello js pubmem');
                //循环调用ping函数,间隔4秒
                setInterval("mc.ping()", 4000);