package znet import ( "fmt" "strconv" "zinx/utils" "zinx/ziface" ) /** 消息模块的实现层 */ type MsgHandler struct { //存放每个msgId 对应的处理方法 Apis map[uint32]ziface.IRouter //负责worker 取任务的消息对列 TaskQueue []chan ziface.IRequest //业务工作池的数量 WorkPoolSize uint32 } // 添加初始化方法 func NewMsgHandler() *MsgHandler { return &MsgHandler{ Apis: make(map[uint32]ziface.IRouter), WorkPoolSize: utils.GlobalObject.WokePoolSize, TaskQueue: make([]chan ziface.IRequest, utils.GlobalObject.WokePoolSize), } } // 调度/执行队形router 消息处理方法 func (msgHandler *MsgHandler) DoMsgHandler(request ziface.IRequest) { //request 获取msgId router, ok := msgHandler.Apis[request.GetMsgId()] if !ok { fmt.Println("api msg id = ", request.GetMsgId(), " is not found , need register") return } //根据路由调用对应的业务 router.PreHandle(request) router.Handle(request) router.PostHandle(request) } // 为消息添加具体的处理模块 func (msgHandler *MsgHandler) AddRouter(msgId uint32, router ziface.IRouter) { //判断msg 绑定的api 是否存在 if _, ok := msgHandler.Apis[msgId]; ok { //id已经注册过 panic("repeat api , msgId = " + strconv.Itoa(int(msgId))) } //添加对应关系 msgHandler.Apis[msgId] = router fmt.Println("add api msgid = ", msgId, " successful") } // StartWorkPool 启动一个worker 工作池,(开启工作池只能发生一次,一个zinx 框架只能有一个工作池) func (msgHandler *MsgHandler) StartWorkPool() { //根据workpoolsize分别开启worker,每个worker 用一个go 承载 for i := 0; i < int(msgHandler.WorkPoolSize); i++ { //1.当前worker对应channel 的消息队列 , 开启空间 , 第0个worker 对应 第0 个channel.. msgHandler.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen) fmt.Println("i = ", i, "taskQueue = ", msgHandler.TaskQueue[i]) //启动当前worker,阻塞等待消息从channel中传递过来 go msgHandler.StartOneWorker(i, msgHandler.TaskQueue[i]) } } // StartOneWorker 启动一个worker 工作流程 func (msgHandler *MsgHandler) StartOneWorker(workerId int, taskQueue chan ziface.IRequest) { fmt.Println("Worker ID = ", workerId, " is starting ...") //不断阻塞等待消息 for { select { case request := <-taskQueue: msgHandler.DoMsgHandler(request) } } } // 将消息交给taskQueue, 由worker 处理 func (msgHander *MsgHandler) SendMsgTaskQueue(request ziface.IRequest) { //将消息平均分配 给worker(根据ConnectionID 进行分配) workerID := request.GetConnection().GetConnectionID() % msgHander.WorkPoolSize fmt.Println("add connID = ", request.GetConnection().GetConnectionID(), " request MsgId = ", request.GetMsgId(), " to worker id =", workerID) //将消息发送给对应worker的taskQUeue即可 msgHander.TaskQueue[workerID] <- request fmt.Println("msgHander.TaskQueue[workerID] = ", workerID) }