在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
开源软件名称:AvenirMQ开源软件地址:https://gitee.com/onlyyyy/AvenirMQ开源软件介绍:AvenirMQ一个轻量化的消息中间件 安装需要较高版本Node.js,推荐V14,支持跨平台 git clone https://gitee.com/onlyyyy/AvenirMQ.git cd AvenirMQ pm2 start AvenirMQ 或先安装pm2:npm i pm2 -g 并提供Nodejs版操作库:https://www.npmjs.com/package/avenirmq 代码结构
技术特点
技术实现
用户信息保存在user.json中 async login(data) { if (!data.name || !data.password) { throw ('INVALID_LOGIN'); } let password = delQuotation(libcu.cipher.AesDecode(data.password)); toLog("password = ", password); if (!this.userList[data.name] || this.userList[data.name].password != password) { throw ('INVALID_LOGIN'); } //其他的就成功了 let sign = getSign(data.name, data.password); toLog("生成签名", sign); return sign;} 解析routingkey //解析绑定时的from.to.keyparseKey(keys) { let arr = keys.split('.'); toLog('arr = ', arr); if (arr.length != 3) { throw ("BAD_KEYS"); } return { send: arr[0], to: arr[1], type: arr[2], }} 加解密: 使用libcu.cipher.AesEncode/AesDecode函数进行加解密。 密钥参见libcu库。 用户增删改查暂略。
async add2ConnectPool(data, sign, client) { this.signPool[sign] = { conn: client, name: data.name, createTime: moment().valueOf(), }; throw ({ code: SUCCESS, data: sign });} 将连接保存到对象中,下次发送消息的时候会优先选用连接发送,失败的话就会更新连接池
async AvenirMQSend(msg, type) { //20210110先写个简单版的 不用promise.all发送消息 for (let i = 0; i < msg.length; i++) { try { ······ if(type != 'gc') { for (let j = 0; j < bind.length; j++) { //20210116增加对类型的判断 if ((bind[j].type === sub.type || bind[j].type === AvenirMQ_ALL) && (bind[j].receive === sub.to)) { //说明这是要发送的消息 let info = { ip: bind[j].ip, port: bind[j].port, }; let conn = null; if (this.connPoll[bind[j].ip] && this.connPoll[bind[j].ip][bind[j].port]) { conn = this.connPoll[bind[j].ip][bind[j].port].conn; } let newSub = JSON.parse(JSON.stringify(sub)); await this.send(text, conn, info, newSub, type, i); } else { toLog("存在无人接收的信息", msg[i]); } } } else { let conn = null; if (this.connPoll[gcInfo.ip] && this.connPoll[gcInfo.ip][gcInfo.port]) { conn = this.connPoll[gcInfo.ip][gcInfo.port].conn; } await this.send(text, conn, gcInfo, sub, type, i); } ···· } catch (error) { toLog("AvenirMQSend error->",error); } }} 将gc与普通的消息放在一个函数中处理。 接口协议格式
{ type:'login', name:'test', password:'AES',} 返回值 { code:0, message:'success', data:'sign'}
//key为send.to.type的结构 表示自己的键值与接收的键值 以及接收的消息类型//告诉AvenirMQ自己的连接信息{ "type":"addUser", "name":"test", "password":"123456" "key":"a.b.rpc" "ip":"127.0.0.1" "port":13000,} 返回值 { "code":0, "message":"success",}
{ "type":"deleteUser", "name":"test", } 返回值 { "code":0, "message":"success",}
//key为send.to.type的结构 send : 接收名称为send的发送方消息,to:发送给谁 type:消息类型//告诉AvenirMQ自己的连接信息{ "type":"updateUser", "name":"test", "password":"123456" "key":"a.b.rpc" "ip":"127.0.0.1" "port":13000,} 返回值 { "code":0, "message":"success",}
//生产者->消费者的概念{ sign:"test", type:"send", data:"hello world"} 返回值 { "code":0, "message":"success",}
{ sign:'test', type:'setKey', data: { name:"test", key:"a.b.rpc", }} 返回值 { code:0, message:'success',}
{ code:0, message:'success', sender:'发送方的名字', data:'消息',}
{ type:'userList',} 返回值 { code:0, message:'success', data:[a,b],} 配置文件示例[main]ip=127.0.0.1port=52013[mq]#用户文件路径userFileName=./user.json#是否输出日志ifConsoleLog=true#连接超时时间timeOut=10#是否为长连接keepAlive=true#AvenirMQ发消息的超时时间(秒)MQTimeOut=3#AvenirMQ重发消息的周期(秒) 范围 2-50MQResend=2#重试次数retryTime=5 结语本项目照着RabbitMQ的思想简单地实现了一个消息中间件,不过没有使用AMQP协议,而只是简单的tcp处理,只能后期再优化了。 不过当这个项目在我脑海中浮现,我就认为我应该通过努力将它编写出来。目前AvenirMQ也达到了能用的程度了,这一路上学到的知识也是久久难忘的。 技术没有高低贵贱之分,脑海中如果有想法的话,我们要做的就是去把它实现。 百舸争流,奋楫者先。 编程之路漫漫修远兮,吾将上下而求索。 谢谢。 |
请发表评论