在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
工作池与消息队列框架Woker Pool:工作池中有固定数量的协程,每一个协程对应一个消息任务队列。 消息任务队列:消息任务队列,本质就是go中的缓冲信道,任务在缓冲信道中传输,等待被处理。 TaskQueue:消息任务队列的集合,本质就是 client Handler Reader:在这里假设为客户端请求的处理方法,将请求对象或者任务传到某一个消息任务队列。 clent Handler Writer:客户端返回响应的执行程序。
简单工作池与消息队列的实现package main import ( "fmt" "time" ) //任务:大致思路,所有任务类都必须实现Task接口,所以用户需要创建task类 type ITask interface { write() reader() } type task struct { c string } func (t *task)write(){ fmt.Println("写入字符",t.c) } func (t *task)reader(){ fmt.Println("接收字符",t.c) } func Newtask(c string)*task { return &task{ c: c, } } //限制 //1.worker工作池的任务队列的最大值 //2.任务队列中任务的最大数量 //协程池 type WorkerPool struct { cap int//工作池中协程的数量 tasksSize int//任务队列中最大任务的容量 TaskQueue []chan ITask //信道集合 } //启动一个worker工作池,开启工作池只能发生一次 func (W *WorkerPool)StartWorkPool(){ //根据任务队列的大小,分别开启worker,每个worker用go来承载,每一个worker对应一个任务队列 for i:=0;i<W.cap;i++{ //为每个worker开辟缓冲信道(任务队列) W.TaskQueue[i] = make(chan ITask,W.tasksSize) //启动worker,阻塞等待任务从channel中到来 go W.StartOneWorker(i,W.TaskQueue[i]) } } func (W *WorkerPool)StartOneWorker(id int,taskqueue chan ITask){ for{ select { case request :=<- taskqueue: //如果有消息过来,则处理业务 request.write() request.reader() default: continue } } } func (W *WorkerPool)Put(task ITask){ W.TaskQueue[0] <- task } func New(cap int,len int)*WorkerPool{ return &WorkerPool{ cap:cap, tasksSize:len, TaskQueue:make([]chan ITask,cap), } } func main(){ b := make(chan bool) Pool := New(1,10) //创建工作池,池中只有一个协程,每个协程对应最大任务数为10个 go Pool.StartWorkPool() task1 :=Newtask("hello1") //task2 :=Newtask("hello2") //task3 :=Newtask("hello3") //task4 :=Newtask("hello4") time.Sleep(time.Second) //第一种方法测试 //Pool.Put(task1) //Pool.Put(task2) //Pool.Put(task3) //Pool.Put(task4) //第二种方式测试 go func(){ for{ Pool.Put(task1) time.Sleep(time.Second) } }() <-b } 执行结果: 写入字符 hello1 接收字符 hello1 写入字符 hello1 接收字符 hello1 写入字符 hello1 接收字符 hello1 写入字符 hello1 接收字符 hello1 写入字符 hello1 接收字符 hello1 写入字符 hello1 接收字符 hello1 写入字符 hello1 接收字符 hello1
Tcp服务器使用工作池与消息队列初始化 初始化工作池对象,cap消息队列数量,前面说了,消息队列与work数量一致,也限制了工作池中协程的数量,tasksSize为每一个消息队列的最大容量,TaskQueue:make([]chan IConnection,cap)创建了消息队列的集合。 func NewWorkerPool(cap int,len int)*WorkerPool{ return &WorkerPool{ cap:cap, tasksSize:len, TaskQueue:make([]chan IConnection,cap), } } 启动工作池与消息队列 每一个任务队列对应一个协程,在这里我们为每一个worker开辟了任务队列。 //启动一个worker工作池,开启工作池只能发生一次 func (W *WorkerPool)StartWorkPool(){ //根据任务队列的大小,分别开启worker,每个worker用go来承载,每一个worker对应一个任务队列 for i:=0;i<W.cap;i++{ //为每个worker开辟缓冲信道(任务队列) W.TaskQueue[i] = make(chan IConnection,W.tasksSize) //启动worker,阻塞等待任务从channel中到来 go W.StartOneWorker(i,W.TaskQueue[i]) } } 等待任务执行 func (W *WorkerPool)StartOneWorker(id int,taskqueue chan IConnection){ for{ select { case request :=<- taskqueue: //如果有消息过来,则处理业务 request.Start() default: continue } } }
package znet //限制 //1.worker工作池的任务队列的最大值 //2.任务队列中任务的最大数量 //协程池 type WorkerPool struct { cap int tasksSize int TaskQueue []chan IConnection //信道集合 } //启动一个worker工作池,开启工作池只能发生一次 func (W *WorkerPool)StartWorkPool(){ //根据任务队列的大小,分别开启worker,每个worker用go来承载,每一个worker对应一个任务队列 for i:=0;i<W.cap;i++{ //为每个worker开辟缓冲信道(任务队列) W.TaskQueue[i] = make(chan IConnection,W.tasksSize) //启动worker,阻塞等待任务从channel中到来 go W.StartOneWorker(i,W.TaskQueue[i]) } } func (W *WorkerPool)StartOneWorker(id int,taskqueue chan IConnection){ for{ select { case request :=<- taskqueue: //如果有消息过来,则处理业务 request.Start() default: continue } } } //将任务公平的分发,使用取模(客户端链接与工作池的协程数) func (W *WorkerPool)Put(Connection IConnection){ index := Connection.GetConnID()%uint32(W.cap) W.TaskQueue[index] <- Connection } func NewWorkerPool(cap int,len int)*WorkerPool{ return &WorkerPool{ cap:cap, tasksSize:len, TaskQueue:make([]chan IConnection,cap), } }
|
请发表评论