在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
开源软件名称:MyRpc开源软件地址:https://gitee.com/mjuyangyousong/MyRpc开源软件介绍:MyRpcEMAIL: [email protected]POWERED BY Yousong Yang编译环境编译器需支持:C++17 如何开始1.CLONE项目2.mkdir build3.cd build4.cmake .. -DCMAKE_CXX_COMPILER={支持C++17的编译器}5.make6.初期运行 生成配置文件7.关闭服务 修改配置文件 性能测试测试平台: VMware虚拟机15 core:4 mem:8G 主机:thinpad E14 r5 4500 内存16gbzeromq: muduo:50字节 单程延迟46us 50000字节 单程延迟64us MyRpc:50字节 单程延迟43us 50000字节 单程延迟62us 150000字节 单程延迟112us压力测试 SERVER 100CPU 单线程 1500字节数据包 转发回Client QPS 90000+Log: 单线程 80字节信息 9341520+qpsRpc: 单对单序列RPC 解析RPC 调用RPC SERVER: 序列化RPC+发送包 一次调用一个数据包 连续发送 每次65.601ns Client: 接受包+解析RPC+调用 每次40.739ns 如何使用RPC:RPC消息包默认格式: 单位字节|-------------------------------------------------------------------------------------------------------------|| 消息包大小 4 | 消息类型(定义在dataformat) 4 | 调用PRC序号 4 | 消息数据大小 4 | 是否调用回调 1 | 消息数据 -- ||-------------------------------------------------------------------------------------------------------------|connect->Read 返回对端一次Write的内容 即:消息类型(定义在dataformat) 调用PRC序号 消息数据大小 是否调用回调 消息数据 or 任何数据一次Write对应一次Read 需要口头规定消息包最大大小 以免一次Read无法读尽Rpc调用过程:对端注册一个RPCRpc.Init_fun(type(口头约定的类型),fun_address 可以是lambda 可以是函数地址,size 希望的接收到的参数数据包大小(消息数据大小),回调函数(可选))发送端发送一个RPCMessage::Make_rpc(connnet,(口头约定的类型),缓冲区,是否调用回调,参数...) // Server: Main(): myrpc::Message message = myprc::Message(); // 初始化messgae net_tools::Config config("Server"); // 加载配置 net_tools::Log log; // 初始化LOG net_tools::Tcpserver tcp; // 创建TCP服务 tcp.Set_connect_over_callback(conn_func); // 设置回调 tcp.Initserver(true,2,0); // 注册TCP服务 tcp.Startserver(); // 开始服务 conn_func(net_tools::net::Connect* connect): connect->Set_read_callback(read_func); // 设置收到消息包回调(可选) connect->Set_close_callback(close_func); // 设置对端Close回调(可选) connect->Set_lose_callback(lose_func); // 设置心跳包失效回调(可选) char buf[512] = {}; // 用户缓冲区 myrpc::Message::Make_rpc(connect,3,buf,true,12,13,14); //调用 3号RPC调用 缓冲区为buf 调用回调函数 参数为 (int)12 13 14 myrpc::Message::Make_rpc(connect,2,buf,true,15); //调用 2号RPC调用 缓冲区为buf 调用回调函数 参数为 (int)15 // Client: Main(): myrpc::Message message = myprc::Message(); // 初始化messgae net_tools::Config config("Client"); // 加载配置 net_tools::Log log; // 初始化LOG net_tools::Tcpclient tcp; // 创建TCP服务 message.Get_rpc()->Init_fun(3,myrpc::base::Rpc::Test,sizeof_fun(myrpc::base::Rpc::Test),[](net_tools::net::Connect*,void*,unsigned int){NT_LOG_INFO << "rpc_test" << NT_LOG_ENDL;}); message.Get_rpc()->Init_fun(2,Test,sizeof_fun(Test),[](net_tools::net::Connect*,void*,unsigned int){NT_LOG_INFO << "rpc" << NT_LOG_ENDL;}); { int b = 0; auto fun = [=](int a)->void{NT_LOG_INFO << a << b << NT_LOG_ENDL;}; message->Get_rpc()->Init_fun(1,fun); //接口会保存该lambda 延长生命周期 } tcp.Set_connect_over_callback(conn_func); // 设置回调 tcp.Startclient(true); // 循环尝试连接 pause(); char buf[200] = {}; // 缓冲区 read_func(net_tools::net::Connect* connect) connect->Read(buf,200); // 读取消息 buf缓冲区地址 200 大小 message->Consume_message(connect,buf,200); // 消费消息 工程目录 OLD├── base // 网络库 base| ├── base_buffer // 能移动的buffer 零拷贝发送│ ├── bplustree // 模板B+树 Timebplustree调用│ ├── channel // Channel 包装每一个event事件│ ├── channelpool // 管理一个线程内所有Channel│ ├── condition // 封装列posix的条件变量│ ├── config // 解析配置 JSON│ ├── count_down_cond // 带计数器的条件变量│ ├── cpuinfo // 分析平台CPU│ ├── epoll // 封装列linux epoll│ ├── eventloop // 事件循环│ ├── eventloopthread // 包装了事件循环的线程│ ├── eventloopthreadpool // IO线程池│ ├── function // 引入std::function│ ├── hash // 闭散列HASH│ ├── json // 包装列JSONCPP│ ├── logbuffer // LOG的缓冲区│ ├── log // LOG调用│ ├── logfile // LOG写入的文件│ ├── logstream // LOG iostream风格调用│ ├── logthread // LOG线程│ ├── mempool // 分配固定大小的内存池│ ├── mutex // 封装posix 互斥量│ ├── noncopyable // 基类 不允许左值拷贝构造│ ├── thread // 包装了posix线程│ ├── threadpool // 简易逻辑线程池│ ├── timebplustree // 管理单个线程单次定时任务│ ├── timeevent // 单词定时任务│ ├── timeeventset // 管理单个对象的单次定时任务│ ├── timeevery // 循环定时任务│ ├── timeeverymap // 管理单个对象打循环定时任务│ ├── timequeue // 管理单个线程打循环任务时间轮│ ├── timer // 包装了timefd定时器├── rpc // Rpc框架│ ├── dataformat // 信息包格式定义│ ├── dmq // 分布式消息队列│ ├── message // 信息包注册消费中心│ ├── nodeformat // 节点格式定义│ ├── rpc // rpc框架│ ├── raft // raft实现分布式(未实现)│ ├── rpccall // 元编程Rpc序列 反序列│ ├── serverfinder // 服务发现├── net // 网络库 net│ ├── accept // 封装posix accpet│ ├── buffer // TCP连接缓冲区| ├── buffer_reader // base_buffer迭代器│ ├── connect // 单个TCP连接│ ├── connectpool // 管理单个线程的CONNECT对象│ ├── heartbeat // 心跳系统 FOR SERVER│ ├── socket // 封装posix socket调用│ ├── tcpclient // TCP client主体or单次连接│ ├── tcpserver // TCP server└── user // 一些示例 Future:eventloop已经过时 该网络库为过时的框架网络库使用eventloop使得代码需要非阻塞 而很多的业务逻辑需要并行的rpc调用 过分的回调函数实现异步 所以说要使用应用层的线程来改造网络库至少lock_free 追求wait_free 应用层的线程也容易实现负载均衡 避免一个调用卡住后面的调用导致超时 task_steal等epoll的模型也能转换为io_uring 减少进入内核的次数 从而提高吞吐量 现阶段的rpc是在eventloop上进行的 只能实现丑陋的回调异步 需要应用层线程来实现join 从而让业务逻辑能够简化并行处理多个rpc返回值的问题而且可以方便的终止rpc 通过一个rpc的返回决定另外的rpc的处理方式rpc也需要实现可扩展的格式 这在现基础上 可以使用数据包附带偏移量的指示 按照原来预定的设想 通过编译期的参数推导实现入参 框架集成pb 简化依赖项为了可扩展性 protobuf可选支持服务发现需要raft的支持 保证服务发现的高可用集成管理的接口 不停机配置 维护 目标:底层网络库io_uring+应用层线程+集成协议的rpc+raft支持的服务发现+raft接口拓展为集群功能+可选的解耦中间件(类似kafka的消息队列 减少整个大集群连接的拓扑复杂度) 下一步:重构网络库应用层线程支持rpc协议更新高可用的服务发现raft接口拓展机器为集群功能往上搭建DMQ 消息队列实现最终的满足业务开发绝大多数需求的rpc架构 MyRpc 如何学习推荐书籍: 工程简介MyRpcRpc:自研Rpc框架 两步实现Rpc调用 RPC:使用元编程实现RPC序列反序列以及调用 自制function 实现类型擦除 相对于多态的类型擦除 调用速度提升7倍 faster_function 相对于多态实现的rpccall 速度提升15倍 qps达到 22m 不使用JSON等序列化工具 支持不定长 支持lambda(带捕获参数) 函数指针 自定义类型 回调函数 网络库:参考muduo的事件驱动型网络库 相对于muduo添加了定时器树,心跳树功能 两种定时任务策略 定时器树,TIMERFD 适合作为接入,解析数据包,转发服务器,IO密集 LOG系统c++ iostream风格 缓冲区系统内部采用内存池 隔离用户与系统调用 简化用户业务逻辑 支持base_buffer零拷贝 内存池仿ptmalloc 为缓冲区提供大块内存 逻辑线程池任务队列 用以执行用户注册的业务函数 IO线程池EPOLL监听注册IO描述符 定时器树用B+树作为底层 维护收到连接注册的定时期任务或在用户调用的定时期任务 定时器循环任务map以任务名为key 注册循环定时器任务 每个任务一个timefd 当定时任务多时 可用前缀树优化 心跳树使用类B+树 通过划分心跳时间片 高效管理连接心跳 Config使用json解析配置 |
请发表评论