zinx/znet/messageHandler.go

100 lines
2.9 KiB
Go

package znet
import (
"fmt"
"strconv"
"zinx/utils"
"zinx/ziface"
)
/**
消息模块的实现层
*/
type MsgHandler struct {
//存放每个msgId 对应的处理方法
Apis map[uint32]ziface.IRouter
//负责worker 取任务的消息对列
TaskQueue []chan ziface.IRequest
//业务工作池的数量
WorkPoolSize uint32
}
// 添加初始化方法
func NewMsgHandler() *MsgHandler {
return &MsgHandler{
Apis: make(map[uint32]ziface.IRouter),
WorkPoolSize: utils.GlobalObject.WokePoolSize,
TaskQueue: make([]chan ziface.IRequest, utils.GlobalObject.WokePoolSize),
}
}
// 调度/执行队形router 消息处理方法
func (msgHandler *MsgHandler) DoMsgHandler(request ziface.IRequest) {
//request 获取msgId
router, ok := msgHandler.Apis[request.GetMsgId()]
if !ok {
fmt.Println("api msg id = ", request.GetMsgId(), " is not found , need register")
return
}
//根据路由调用对应的业务
router.PreHandle(request)
router.Handle(request)
router.PostHandle(request)
}
// 为消息添加具体的处理模块
func (msgHandler *MsgHandler) AddRouter(msgId uint32, router ziface.IRouter) {
//判断msg 绑定的api 是否存在
if _, ok := msgHandler.Apis[msgId]; ok {
//id已经注册过
panic("repeat api , msgId = " + strconv.Itoa(int(msgId)))
}
//添加对应关系
msgHandler.Apis[msgId] = router
fmt.Println("add api msgid = ", msgId, " successful")
}
// StartWorkPool 启动一个worker 工作池,(开启工作池只能发生一次,一个zinx 框架只能有一个工作池)
func (msgHandler *MsgHandler) StartWorkPool() {
//根据workpoolsize分别开启worker,每个worker 用一个go 承载
for i := 0; i < int(msgHandler.WorkPoolSize); i++ {
//1.当前worker对应channel 的消息队列 , 开启空间 , 第0个worker 对应 第0 个channel..
msgHandler.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen)
fmt.Println("i = ", i, "taskQueue = ", msgHandler.TaskQueue[i])
//启动当前worker,阻塞等待消息从channel中传递过来
go msgHandler.StartOneWorker(i, msgHandler.TaskQueue[i])
}
}
// StartOneWorker 启动一个worker 工作流程
func (msgHandler *MsgHandler) StartOneWorker(workerId int, taskQueue chan ziface.IRequest) {
fmt.Println("Worker ID = ", workerId, " is starting ...")
//不断阻塞等待消息
for {
select {
case request := <-taskQueue:
msgHandler.DoMsgHandler(request)
}
}
}
// 将消息交给taskQueue, 由worker 处理
func (msgHander *MsgHandler) SendMsgTaskQueue(request ziface.IRequest) {
//将消息平均分配 给worker(根据ConnectionID 进行分配)
workerID := request.GetConnection().GetConnectionID() % msgHander.WorkPoolSize
fmt.Println("add connID = ", request.GetConnection().GetConnectionID(), " request MsgId = ", request.GetMsgId(), " to worker id =", workerID)
//将消息发送给对应worker的taskQUeue即可
msgHander.TaskQueue[workerID] <- request
fmt.Println("msgHander.TaskQueue[workerID] = ", workerID)
}