Home / tldb / tldb MQ客户端使用
TLDB MQ客户端使用
tldb有 java,python,go,js等语言实现的客户端,各客户端使用的方式统一:实例化与调用函数名是一致的
- 各个客户端都支持断线重连
- 各个客户端都实现一个SimpleClient ,可以直接使用
- tldb mq 支持 聚合接收数据如何使用tldb MQjava如何使用tldb MQ
go客户端 https://github.com/donnie4w/tlmq-go
新建mq客户端实例
sc := cli.SimpleClient{Url: "ws://127.0.0.1:5100", Auth: "mymq=123"}
调用Connect()函数连接服务器
sc.Connect()
SimpleClient结构体中
1.Url 为websocket 的访问连接,如:ws://127.0.0.1:5100
若服务器启动TLS安全链接,则Url应为:wss://127.0.0.1:5100
2.Auth 为mq链接用户名密码,用等于号连接起来
3.Origin 为 http请求头origin,默认值:http://tldb-mq
服务器向MQ客户端发送的数据会分发到以下函数
1.PullByteHandler 客户端拉取数据异步返回 *MqBean 数据对象
2.PullJsonHandler 客户端拉取数据异步返回 *JMqBean 数据对象
3.PubByteHandler 客户端接收其他PubByte发布的 *MqBean 数据对象
4.PubJsonHandler 客户端接收其他PubJson发布的 *JMqBean 数据对象
5.PubMemHandler 客户端接收其他PubMem发布的 *MqBean 内存数据对象
6.AckHandler 服务器对客户端发送的每条数据的ACK回执
7.ErrHandler 服务器返回请求处理错误的错误码;如1301用户名密码错误
PubByte发布的数据,将由PubByteHandler 接收处理
PubJson发布的数据,将由PubJsonHandler 接收处理
PubMem发布的数据,将由PubMemHandler 接收处理
各个处理函数具体实现
sc.PubByteHandler(func(mb *MqBean) { logging.Debug("PubByte >> ", mb) })
sc.PubJsonHandler(func(jmb *JMqBean) { logging.Debug("PubJson >> ", jmb) })
sc.PubMemHandler(func(jmb *JMqBean) { logging.Debug("PubMem >> ", jmb) })
sc.PullByteHandler(func(mb *MqBean) { logging.Debug("PullByte >> ", mb) })
sc.PullJsonHandler(func(jmb *JMqBean) { logging.Debug("PullJson >> ", jmb) })
sc.AckHandler(func(id int64) { logging.Debug("ack >> ", id) })
sc.ErrHandler(func(code int64) { logging.Error("err code >> ", code) })
实际使用中,只需实现需要处理的函数,其他函数可无需实现;如:
sc.PubByteHandler(func(mb *MqBean) { logging.Debug("PubByte >> ", mb) })
订阅sc.Sub("usertable")
所有执行sc.PubByte("usertable", []byte("this is go usertable byte >>"))发布的数据均会发送到 PubByteHandler 函数中
订阅topic
sc.Sub("usertable")
删除订阅
sc.SubCancel("usertable")
发布topic
1.sc.PubByte("usertable", []byte("this is go usertable byte"))
发布数据由PubByteHandler函数接收处理
2.sc.PubJson("usertable", []byte("this is go usertable json"))
发布数据由PubJsonHandler函数接收处理
3.sc.PubMem("usertable", []byte("this is go usertable mem"))
发布数据由PubMemHandler函数接收处理
该接口不存数据,所以JMqBean中ID为0
异步拉取数据
sc.PullByte("usertable",1)
sc.PullJson("usertable",2)
PullByte拉取数据由 PullByteHandler函数接收处理
PullJson拉取数据由 PullJsonHandler函数接收处理
拉取topic的当前ID值,如:
sc.PullIdSync("usertable")
PullIdSync 拉取 topic 的当前ID值
同步拉取数据
mb,err:=sc.PullByteSync("usertable",1)
jmb,err:=sc.PullJsonSync("usertable",2)
聚合接收数据
sc.MergeOn(10)
参数10表示10M,设定服务器发送协议数据压缩前大小上限
启动客户端回执
sc.RecvAckOn(60)
客户端ack确认收到信息,否则服务器不认为信息已送达,将一直发送, 60s 设定服务器重发时间
java客户端 https://github.com/donnie4w/tlmq-j
实例化
SimpleClient sc = new SimpleClient("ws://127.0.0.1:5000", "mymq=123");
连接服务器
sc.connect();
实现处理函数:
sc.pubByteHandler((mb) -> {});
sc.pubJsonHandler ((mb) -> {});
sc.pullByteHandler ((mb) -> {});
sc.pubMemHandler ((mb) -> {});
sc.pullJsonHandler ((mb) -> {});
sc.errHandler ((code) -> {});
sc.ackHandler ((id) -> {});
订阅发布所有函数与go一致
python客户端 https://github.com/donnie4w/tlmq-py
实例化
sc = SimpleClient("wss://127.0.0.1:5000", "mymq=123")
连接服务器
sc.connect()
该函数阻塞,可以启动新的线程;如:
_thread.start_new_thread(sc.connect, ())
设置处理函数
sc.pullByteHandler(lambda mb: logging.debug("PullByteHandler >> " + str(mb)))
sc.pullJsonHandler(lambda mb: logging.debug("PullJsonHandler >> " + str(mb)))
sc.pubByteHandler(lambda mb: logging.debug("PubByteHandler >> " + str(mb)))
sc.pubJsonHandler(lambda mb: logging.debug("PubJsonHandler >> " + str(mb)))
sc.pubMemHandler(lambda mb: logging.debug("PubMemHandler >> " + str(mb)))
sc.ackHandler(lambda aid: logging.debug("ack id>> " + str(aid)))
sc.errHandler(lambda code: logging.error("err code >>" + str(code)))
sc.before(lambda : logging.debug("before >>"))
订阅发布所有函数与go一致
js客户端 https://github.com/donnie4w/tlmq-js
//实例化
var mc = new mqCli("ws://localhost:5100", "mymq=123");
//实现接收订阅信息的方法
mc.PubJsonHandler = function (data) { console.log("pubjson data>>>" + data); };
//实现接收订阅信息的方法
mc.PubMemHandler = function (data) { console.log("pubmem data>>>" + data); };
//连接服务器
mc.connect();
//订阅topic
mc.sub('111');
//发布topic
mc.pubJson('111','hello js pubjson');
//发布topic,内存模式
mc.pubMem('111','hello js pubmem');
//循环调用ping函数,间隔4秒
setInterval("mc.ping()", 4000);