• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

MyRpc: c++高性能RPC框架

原作者: [db:作者] 来自: 网络 收藏 邀请

开源软件名称:

MyRpc

开源软件地址:

https://gitee.com/mjuyangyousong/MyRpc

开源软件介绍:

MyRpc

EMAIL: [email protected]

POWERED BY Yousong Yang

编译环境

编译器需支持:C++17

如何开始

1.CLONE项目2.mkdir build3.cd build4.cmake .. -DCMAKE_CXX_COMPILER={支持C++17的编译器}5.make6.初期运行 生成配置文件7.关闭服务 修改配置文件

性能测试

测试平台: VMware虚拟机15 core:4 mem:8G 主机:thinpad E14 r5 4500 内存16gbzeromq:  muduo:50字节 单程延迟46us        50000字节 单程延迟64us  MyRpc:50字节 单程延迟43us        50000字节 单程延迟62us        150000字节 单程延迟112us压力测试  SERVER 100CPU 单线程 1500字节数据包 转发回Client QPS 90000+Log:  单线程 80字节信息 9341520+qpsRpc:  单对单序列RPC 解析RPC 调用RPC    SERVER: 序列化RPC+发送包 一次调用一个数据包 连续发送 每次65.601ns    Client: 接受包+解析RPC+调用 每次40.739ns

如何使用

RPC:RPC消息包默认格式: 单位字节|-------------------------------------------------------------------------------------------------------------|| 消息包大小 4 | 消息类型(定义在dataformat) 4 | 调用PRC序号 4 | 消息数据大小 4 | 是否调用回调 1 | 消息数据 -- ||-------------------------------------------------------------------------------------------------------------|connect->Read 返回对端一次Write的内容 即:消息类型(定义在dataformat) 调用PRC序号 消息数据大小 是否调用回调 消息数据 or 任何数据一次Write对应一次Read 需要口头规定消息包最大大小 以免一次Read无法读尽Rpc调用过程:对端注册一个RPCRpc.Init_fun(type(口头约定的类型),fun_address 可以是lambda 可以是函数地址,size 希望的接收到的参数数据包大小(消息数据大小),回调函数(可选))发送端发送一个RPCMessage::Make_rpc(connnet,(口头约定的类型),缓冲区,是否调用回调,参数...)
    // Server:      Main():        myrpc::Message message = myprc::Message();  // 初始化messgae        net_tools::Config config("Server");         // 加载配置        net_tools::Log log;                         // 初始化LOG        net_tools::Tcpserver tcp;                   // 创建TCP服务        tcp.Set_connect_over_callback(conn_func);   // 设置回调        tcp.Initserver(true,2,0);                   // 注册TCP服务        tcp.Startserver();                          // 开始服务      conn_func(net_tools::net::Connect* connect):        connect->Set_read_callback(read_func);      // 设置收到消息包回调(可选)        connect->Set_close_callback(close_func);    // 设置对端Close回调(可选)        connect->Set_lose_callback(lose_func);      // 设置心跳包失效回调(可选)        char buf[512] = {};                         // 用户缓冲区        myrpc::Message::Make_rpc(connect,3,buf,true,12,13,14);  //调用 3号RPC调用 缓冲区为buf 调用回调函数 参数为 (int)12 13 14        myrpc::Message::Make_rpc(connect,2,buf,true,15);        //调用 2号RPC调用 缓冲区为buf 调用回调函数 参数为 (int)15    // Client:      Main():          myrpc::Message message = myprc::Message(); // 初始化messgae          net_tools::Config config("Client");         // 加载配置          net_tools::Log log;                         // 初始化LOG          net_tools::Tcpclient tcp;                   // 创建TCP服务          message.Get_rpc()->Init_fun(3,myrpc::base::Rpc::Test,sizeof_fun(myrpc::base::Rpc::Test),[](net_tools::net::Connect*,void*,unsigned int){NT_LOG_INFO << "rpc_test" << NT_LOG_ENDL;});          message.Get_rpc()->Init_fun(2,Test,sizeof_fun(Test),[](net_tools::net::Connect*,void*,unsigned int){NT_LOG_INFO << "rpc" << NT_LOG_ENDL;});          {            int b = 0;            auto fun = [=](int a)->void{NT_LOG_INFO << a << b << NT_LOG_ENDL;};            message->Get_rpc()->Init_fun(1,fun); //接口会保存该lambda 延长生命周期          }          tcp.Set_connect_over_callback(conn_func);   // 设置回调          tcp.Startclient(true);                      // 循环尝试连接          pause();          char buf[200] = {};                         // 缓冲区      read_func(net_tools::net::Connect* connect)          connect->Read(buf,200);                     // 读取消息 buf缓冲区地址 200 大小          message->Consume_message(connect,buf,200);  // 消费消息

工程目录 OLD

├── base                         // 网络库 base|   ├── base_buffer             // 能移动的buffer 零拷贝发送│   ├── bplustree               // 模板B+树 Timebplustree调用│   ├── channel                 // Channel 包装每一个event事件│   ├── channelpool             // 管理一个线程内所有Channel│   ├── condition               // 封装列posix的条件变量│   ├── config                  // 解析配置 JSON│   ├── count_down_cond         // 带计数器的条件变量│   ├── cpuinfo                 // 分析平台CPU│   ├── epoll                   // 封装列linux epoll│   ├── eventloop               // 事件循环│   ├── eventloopthread         // 包装了事件循环的线程│   ├── eventloopthreadpool     // IO线程池│   ├── function                // 引入std::function│   ├── hash                    // 闭散列HASH│   ├── json                    // 包装列JSONCPP│   ├── logbuffer               // LOG的缓冲区│   ├── log                     // LOG调用│   ├── logfile                 // LOG写入的文件│   ├── logstream               // LOG iostream风格调用│   ├── logthread               // LOG线程│   ├── mempool                 // 分配固定大小的内存池│   ├── mutex                   // 封装posix 互斥量│   ├── noncopyable             // 基类 不允许左值拷贝构造│   ├── thread                  // 包装了posix线程│   ├── threadpool              // 简易逻辑线程池│   ├── timebplustree           // 管理单个线程单次定时任务│   ├── timeevent               // 单词定时任务│   ├── timeeventset            // 管理单个对象的单次定时任务│   ├── timeevery               // 循环定时任务│   ├── timeeverymap            // 管理单个对象打循环定时任务│   ├── timequeue               // 管理单个线程打循环任务时间轮│   ├── timer                   // 包装了timefd定时器├── rpc                          // Rpc框架│   ├── dataformat              // 信息包格式定义│   ├── dmq                     // 分布式消息队列│   ├── message                 // 信息包注册消费中心│   ├── nodeformat              // 节点格式定义│   ├── rpc                     // rpc框架│   ├── raft                    // raft实现分布式(未实现)│   ├── rpccall                 // 元编程Rpc序列 反序列│   ├── serverfinder            // 服务发现├── net                          // 网络库 net│   ├── accept                  // 封装posix accpet│   ├── buffer                  // TCP连接缓冲区|   ├── buffer_reader           // base_buffer迭代器│   ├── connect                 // 单个TCP连接│   ├── connectpool             // 管理单个线程的CONNECT对象│   ├── heartbeat               // 心跳系统 FOR SERVER│   ├── socket                  // 封装posix socket调用│   ├── tcpclient               // TCP client主体or单次连接│   ├── tcpserver               // TCP server└── user                         // 一些示例

Future:

eventloop已经过时 该网络库为过时的框架网络库使用eventloop使得代码需要非阻塞 而很多的业务逻辑需要并行的rpc调用 过分的回调函数实现异步 所以说要使用应用层的线程来改造网络库至少lock_free 追求wait_free 应用层的线程也容易实现负载均衡 避免一个调用卡住后面的调用导致超时 task_steal等epoll的模型也能转换为io_uring 减少进入内核的次数 从而提高吞吐量 现阶段的rpc是在eventloop上进行的 只能实现丑陋的回调异步 需要应用层线程来实现join 从而让业务逻辑能够简化并行处理多个rpc返回值的问题而且可以方便的终止rpc 通过一个rpc的返回决定另外的rpc的处理方式rpc也需要实现可扩展的格式 这在现基础上 可以使用数据包附带偏移量的指示 按照原来预定的设想 通过编译期的参数推导实现入参 框架集成pb 简化依赖项为了可扩展性 protobuf可选支持服务发现需要raft的支持 保证服务发现的高可用集成管理的接口 不停机配置 维护

目标:

底层网络库io_uring+应用层线程+集成协议的rpc+raft支持的服务发现+raft接口拓展为集群功能+可选的解耦中间件(类似kafka的消息队列 减少整个大集群连接的拓扑复杂度)

下一步:

重构网络库应用层线程支持rpc协议更新高可用的服务发现raft接口拓展机器为集群功能往上搭建DMQ 消息队列实现最终的满足业务开发绝大多数需求的rpc架构 MyRpc

如何学习

推荐书籍:

C++

服务器

操作系统

分布式

数据库

杂项

工程简介

MyRpc

Rpc:

自研Rpc框架 两步实现Rpc调用

RPC:使用元编程实现RPC序列反序列以及调用

自制function 实现类型擦除 相对于多态的类型擦除 调用速度提升7倍

faster_function 相对于多态实现的rpccall 速度提升15倍 qps达到 22m

不使用JSON等序列化工具 支持不定长 支持lambda(带捕获参数) 函数指针 自定义类型 回调函数

网络库:

参考muduo的事件驱动型网络库

相对于muduo添加了定时器树,心跳树功能

两种定时任务策略 定时器树,TIMERFD

适合作为接入,解析数据包,转发服务器,IO密集

LOG系统c++ iostream风格

缓冲区系统内部采用内存池 隔离用户与系统调用 简化用户业务逻辑 支持base_buffer零拷贝

内存池仿ptmalloc 为缓冲区提供大块内存

逻辑线程池任务队列 用以执行用户注册的业务函数

IO线程池EPOLL监听注册IO描述符

定时器树用B+树作为底层 维护收到连接注册的定时期任务或在用户调用的定时期任务

定时器循环任务map以任务名为key 注册循环定时器任务 每个任务一个timefd 当定时任务多时 可用前缀树优化

心跳树使用类B+树 通过划分心跳时间片 高效管理连接心跳

Config使用json解析配置


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
mqtt-go: 基于 go 语言开发的 mqtt broker发布时间:2022-03-25
下一篇:
workerman-queue: workerman 消息队列,基于Linux sysv 队列实现发布时间:2022-03-25
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap