100 lines
2.9 KiB
Go
100 lines
2.9 KiB
Go
package znet
|
|
|
|
import (
|
|
"fmt"
|
|
"strconv"
|
|
"zinx/utils"
|
|
"zinx/ziface"
|
|
)
|
|
|
|
/**
|
|
消息模块的实现层
|
|
*/
|
|
|
|
type MsgHandler struct {
|
|
|
|
//存放每个msgId 对应的处理方法
|
|
Apis map[uint32]ziface.IRouter
|
|
|
|
//负责worker 取任务的消息对列
|
|
TaskQueue []chan ziface.IRequest
|
|
|
|
//业务工作池的数量
|
|
WorkPoolSize uint32
|
|
}
|
|
|
|
// 添加初始化方法
|
|
func NewMsgHandler() *MsgHandler {
|
|
return &MsgHandler{
|
|
Apis: make(map[uint32]ziface.IRouter),
|
|
WorkPoolSize: utils.GlobalObject.WokePoolSize,
|
|
TaskQueue: make([]chan ziface.IRequest, utils.GlobalObject.WokePoolSize),
|
|
}
|
|
}
|
|
|
|
// 调度/执行队形router 消息处理方法
|
|
func (msgHandler *MsgHandler) DoMsgHandler(request ziface.IRequest) {
|
|
//request 获取msgId
|
|
router, ok := msgHandler.Apis[request.GetMsgId()]
|
|
if !ok {
|
|
fmt.Println("api msg id = ", request.GetMsgId(), " is not found , need register")
|
|
return
|
|
}
|
|
//根据路由调用对应的业务
|
|
router.PreHandle(request)
|
|
router.Handle(request)
|
|
router.PostHandle(request)
|
|
}
|
|
|
|
// 为消息添加具体的处理模块
|
|
func (msgHandler *MsgHandler) AddRouter(msgId uint32, router ziface.IRouter) {
|
|
//判断msg 绑定的api 是否存在
|
|
if _, ok := msgHandler.Apis[msgId]; ok {
|
|
//id已经注册过
|
|
panic("repeat api , msgId = " + strconv.Itoa(int(msgId)))
|
|
}
|
|
|
|
//添加对应关系
|
|
msgHandler.Apis[msgId] = router
|
|
fmt.Println("add api msgid = ", msgId, " successful")
|
|
}
|
|
|
|
// StartWorkPool 启动一个worker 工作池,(开启工作池只能发生一次,一个zinx 框架只能有一个工作池)
|
|
func (msgHandler *MsgHandler) StartWorkPool() {
|
|
|
|
//根据workpoolsize分别开启worker,每个worker 用一个go 承载
|
|
for i := 0; i < int(msgHandler.WorkPoolSize); i++ {
|
|
|
|
//1.当前worker对应channel 的消息队列 , 开启空间 , 第0个worker 对应 第0 个channel..
|
|
msgHandler.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen)
|
|
|
|
fmt.Println("i = ", i, "taskQueue = ", msgHandler.TaskQueue[i])
|
|
|
|
//启动当前worker,阻塞等待消息从channel中传递过来
|
|
go msgHandler.StartOneWorker(i, msgHandler.TaskQueue[i])
|
|
}
|
|
}
|
|
|
|
// StartOneWorker 启动一个worker 工作流程
|
|
func (msgHandler *MsgHandler) StartOneWorker(workerId int, taskQueue chan ziface.IRequest) {
|
|
fmt.Println("Worker ID = ", workerId, " is starting ...")
|
|
|
|
//不断阻塞等待消息
|
|
for {
|
|
select {
|
|
case request := <-taskQueue:
|
|
msgHandler.DoMsgHandler(request)
|
|
}
|
|
}
|
|
}
|
|
|
|
// 将消息交给taskQueue, 由worker 处理
|
|
func (msgHander *MsgHandler) SendMsgTaskQueue(request ziface.IRequest) {
|
|
//将消息平均分配 给worker(根据ConnectionID 进行分配)
|
|
workerID := request.GetConnection().GetConnectionID() % msgHander.WorkPoolSize
|
|
fmt.Println("add connID = ", request.GetConnection().GetConnectionID(), " request MsgId = ", request.GetMsgId(), " to worker id =", workerID)
|
|
//将消息发送给对应worker的taskQUeue即可
|
|
msgHander.TaskQueue[workerID] <- request
|
|
fmt.Println("msgHander.TaskQueue[workerID] = ", workerID)
|
|
}
|