• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

AvenirMQ: 用Node.js实现一个消息中间件

原作者: [db:作者] 来自: 网络 收藏 邀请

开源软件名称:

AvenirMQ

开源软件地址:

https://gitee.com/onlyyyy/AvenirMQ

开源软件介绍:

AvenirMQ

一个轻量化的消息中间件

安装

需要较高版本Node.js,推荐V14,支持跨平台

git clone https://gitee.com/onlyyyy/AvenirMQ.git

cd AvenirMQ

pm2 start AvenirMQ

或先安装pm2:npm i pm2 -g

并提供Nodejs版操作库:https://www.npmjs.com/package/avenirmq

代码结构

  • AvenirMQ.js 启动的主进程
  • dtest.js 测试用函数
  • run.ini 配置文件
  • user.json 用户文件
  • curl.js cli程序
  • /core AvenirMQ核心模块
  • /AvenirMQ 提供的Nodejs版操作库
  • clien1-4.js 测试用的接收方

技术特点

  • 1.使用签名进行消息分发,支持用户管理。
  • 2.通过routingkey来实现用户的消息绑定。
  • 3.支持连接池,并支持智能的连接淘汰策略。
  • 4.使用生产者/消费者模型。
  • 5.支持消息重发,死信回收
  • 6.灵活的策略配置
  • 7.提供cli程序进行管理

技术实现

  1. 用户管理

用户信息保存在user.json中

async login(data) {    if (!data.name || !data.password) {        throw ('INVALID_LOGIN');    }    let password = delQuotation(libcu.cipher.AesDecode(data.password));    toLog("password = ", password);    if (!this.userList[data.name] || this.userList[data.name].password != password) {        throw ('INVALID_LOGIN');    }    //其他的就成功了    let sign = getSign(data.name, data.password);    toLog("生成签名", sign);    return sign;}

解析routingkey

//解析绑定时的from.to.keyparseKey(keys) {    let arr = keys.split('.');    toLog('arr = ', arr);    if (arr.length != 3) {        throw ("BAD_KEYS");    }    return {        send: arr[0],        to: arr[1],        type: arr[2],    }}

加解密: 使用libcu.cipher.AesEncode/AesDecode函数进行加解密。

密钥参见libcu库。

用户增删改查暂略。

  1. 连接池
async add2ConnectPool(data, sign, client) {    this.signPool[sign] = {        conn: client,        name: data.name,        createTime: moment().valueOf(),    };    throw ({ code: SUCCESS, data: sign });}

将连接保存到对象中,下次发送消息的时候会优先选用连接发送,失败的话就会更新连接池

  1. 消息发送-死信处理
async AvenirMQSend(msg, type) {    //20210110先写个简单版的 不用promise.all发送消息    for (let i = 0; i < msg.length; i++) {        try {            ······            if(type != 'gc') {                for (let j = 0; j < bind.length; j++) {                    //20210116增加对类型的判断                    if ((bind[j].type === sub.type || bind[j].type === AvenirMQ_ALL)                        && (bind[j].receive === sub.to)) {                        //说明这是要发送的消息                        let info = {                            ip: bind[j].ip,                            port: bind[j].port,                        };                        let conn = null;                        if (this.connPoll[bind[j].ip] && this.connPoll[bind[j].ip][bind[j].port]) {                            conn = this.connPoll[bind[j].ip][bind[j].port].conn;                        }                        let newSub = JSON.parse(JSON.stringify(sub));                        await this.send(text, conn, info, newSub, type, i);                    } else {                        toLog("存在无人接收的信息", msg[i]);                    }                }            } else {                let conn = null;                if (this.connPoll[gcInfo.ip] && this.connPoll[gcInfo.ip][gcInfo.port]) {                    conn = this.connPoll[gcInfo.ip][gcInfo.port].conn;                }                await this.send(text, conn, gcInfo, sub, type, i);            }                            ····        } catch (error) {            toLog("AvenirMQSend error->",error);        }            }}

将gc与普通的消息放在一个函数中处理。

接口协议格式

  1. 登录获得签名
{    type:'login',    name:'test',    password:'AES',}

返回值

{    code:0,    message:'success',    data:'sign'}
  1. 新建用户
//key为send.to.type的结构 表示自己的键值与接收的键值 以及接收的消息类型//告诉AvenirMQ自己的连接信息{    "type":"addUser",    "name":"test",      "password":"123456"    "key":"a.b.rpc"    "ip":"127.0.0.1"    "port":13000,}

返回值

{    "code":0,    "message":"success",}
  1. 删除用户
{    "type":"deleteUser",    "name":"test",  }

返回值

{    "code":0,    "message":"success",}
  1. 修改用户信息
//key为send.to.type的结构 send : 接收名称为send的发送方消息,to:发送给谁 type:消息类型//告诉AvenirMQ自己的连接信息{    "type":"updateUser",    "name":"test",      "password":"123456"    "key":"a.b.rpc"    "ip":"127.0.0.1"    "port":13000,}

返回值

{    "code":0,    "message":"success",}
  1. 发送消息
//生产者->消费者的概念{    sign:"test",    type:"send",    data:"hello world"}

返回值

{    "code":0,    "message":"success",}
  1. 修改绑定的键值
{    sign:'test',    type:'setKey',    data: {        name:"test",        key:"a.b.rpc",    }}

返回值

{    code:0,    message:'success',}
  1. 收到消息(无需请求,需要用户方起一个tcp服务器)
{    code:0,    message:'success',    sender:'发送方的名字',    data:'消息',}
  1. 获取用户列表
{    type:'userList',}

返回值

{    code:0,    message:'success',    data:[a,b],}

配置文件示例

[main]ip=127.0.0.1port=52013[mq]#用户文件路径userFileName=./user.json#是否输出日志ifConsoleLog=true#连接超时时间timeOut=10#是否为长连接keepAlive=true#AvenirMQ发消息的超时时间(秒)MQTimeOut=3#AvenirMQ重发消息的周期(秒) 范围 2-50MQResend=2#重试次数retryTime=5

结语

本项目照着RabbitMQ的思想简单地实现了一个消息中间件,不过没有使用AMQP协议,而只是简单的tcp处理,只能后期再优化了。

不过当这个项目在我脑海中浮现,我就认为我应该通过努力将它编写出来。目前AvenirMQ也达到了能用的程度了,这一路上学到的知识也是久久难忘的。

技术没有高低贵贱之分,脑海中如果有想法的话,我们要做的就是去把它实现。

百舸争流,奋楫者先。

编程之路漫漫修远兮,吾将上下而求索。

谢谢。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap