From d6bacf3452bb47db0c9795e58ab86904d231a592 Mon Sep 17 00:00:00 2001 From: zm <1334717033@qq.com> Date: Wed, 29 May 2024 18:05:56 +0800 Subject: [PATCH] =?UTF-8?q?2024=E5=B9=B45=E6=9C=8829=E6=97=A518:05:44?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 3 + utils/globalobj.go | 92 ++++++++++++++ ziface/iconnManager.go | 22 ++++ ziface/iconnection.go | 37 ++++++ ziface/idatapack.go | 18 +++ ziface/imessage.go | 22 ++++ ziface/imessageHandler.go | 22 ++++ ziface/irequest.go | 16 +++ ziface/irouter.go | 17 +++ ziface/iserver.go | 32 +++++ znet/connManager.go | 87 +++++++++++++ znet/connection.go | 260 ++++++++++++++++++++++++++++++++++++++ znet/datapack.go | 71 +++++++++++ znet/message.go | 49 +++++++ znet/messageHandler.go | 99 +++++++++++++++ znet/request.go | 27 ++++ znet/router.go | 21 +++ znet/server.go | 188 +++++++++++++++++++++++++++ 18 files changed, 1083 insertions(+) create mode 100644 go.mod create mode 100644 utils/globalobj.go create mode 100644 ziface/iconnManager.go create mode 100644 ziface/iconnection.go create mode 100644 ziface/idatapack.go create mode 100644 ziface/imessage.go create mode 100644 ziface/imessageHandler.go create mode 100644 ziface/irequest.go create mode 100644 ziface/irouter.go create mode 100644 ziface/iserver.go create mode 100644 znet/connManager.go create mode 100644 znet/connection.go create mode 100644 znet/datapack.go create mode 100644 znet/message.go create mode 100644 znet/messageHandler.go create mode 100644 znet/request.go create mode 100644 znet/router.go create mode 100644 znet/server.go diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..12e4767 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module zinx + +go 1.20 diff --git a/utils/globalobj.go b/utils/globalobj.go new file mode 100644 index 0000000..483aa00 --- /dev/null +++ b/utils/globalobj.go @@ -0,0 +1,92 @@ +package utils + +import ( + "encoding/json" + "fmt" + "os" + "zinx/ziface" +) + +/** +存储有关zinx 框架的全局参数 , 供其他模块使用 +参数可以通过 zinx.json 由用户配置 +*/ + +type GlobalObj struct { + + /** + Server + */ + TcpServer ziface.IServer //当前zinx 全局的server 对象 + Host string //当前服务器监听的ip + TcpPort int //当前服务器监听的端口 + Name string //当前服务的名称 + + /** + zinx + */ + Version string //当前zinx 的版本号 + MaxConn int //当前服务器允许的最大连接数 + MaxPackageSize uint32 //当前zinx 框架数据包的最大值 + WokePoolSize uint32 //当前业务工作worker 池的goroutine 数量 + MaxWorkerTaskLen uint32 // zinx 框架最多允许用户开辟多少个worker (限定条件) +} + +/* +GlobalObject 定义一个全局对外的globalobj 对象 +*/ +var GlobalObject *GlobalObj + +/* + 提供一个init方法, 初始化 GlobalObject +*/ + +func init() { + + //默认配置 + GlobalObject = &GlobalObj{ + Name: "ZinxServer", + Host: "0.0.0.0", + Version: "V0.4", + TcpPort: 8999, + MaxConn: 1000, + MaxPackageSize: 4096, + WokePoolSize: 10, + MaxWorkerTaskLen: 1024, + } + + //尝试从配置文件加载 config/zinx.json + GlobalObject.Reload() +} + +/* +Reload 从config/zinx.json 中加载用户配置 +*/ +func (g *GlobalObj) Reload() { + data, err := os.ReadFile("conf/zinx.json") + if err != nil { + fmt.Println("conf/zinx.json 读取错误!") + panic(err) + } + err = json.Unmarshal(data, &GlobalObject) + if err != nil { + fmt.Println("conf/zinx.json 解析错误!") + panic(err) + } + + // 打开 JSON 文件 + //file, err := os.Open("conf/zinx.json") + //if err != nil { + // fmt.Println(err) + // return + //} + //defer file.Close() + //// 解码 JSON 数据 + //decoder := json.NewDecoder(file) + //err = decoder.Decode(&GlobalObject) + //if err != nil { + // fmt.Println(err) + // return + //} + //fmt.Println(GlobalObject) +} diff --git a/ziface/iconnManager.go b/ziface/iconnManager.go new file mode 100644 index 0000000..aff7316 --- /dev/null +++ b/ziface/iconnManager.go @@ -0,0 +1,22 @@ +package ziface + +/* +IConnManager 连接管理模块抽象层 +*/ +type IConnManager interface { + + //添加连接 + Add(conn IConnection) + + //删除连接 + Remove(conn IConnection) + + //根据connID 获取连接 + + Get(connID uint32) (IConnection, error) + //得到当前的连接总数 + Len() int + + //清楚或终止所有连接 + ClearConn() +} diff --git a/ziface/iconnection.go b/ziface/iconnection.go new file mode 100644 index 0000000..7d9a00a --- /dev/null +++ b/ziface/iconnection.go @@ -0,0 +1,37 @@ +package ziface + +import "net" + +// IConnection 定义链接的抽象层 +type IConnection interface { + + // Start 启动链接 + Start() + + // Stop 关闭连接 + Stop() + + // GetTcpConnection 获取当前链接绑定的socket + GetTcpConnection() *net.TCPConn + + // GetConnectionID 获取当前链接模块的ID + GetConnectionID() uint32 + + // RemoteAddr 获取远程客户点的tcp 状态 ip port + RemoteAddr() net.Addr + + // SendMsg 发送数据,将数据发送给远程客户端 + SendMsg(msgId uint32, data []byte) error + + //设置连接属性 + SetProperty(key string, value interface{}) + + //获取连接属性 + GetProperty(key string) (interface{}, error) + + //移除连接属性 + RemoveProperty(key string) +} + +// 定义一个处理链接的方法 +type HandleFunc func(*net.TCPConn, []byte, int) error diff --git a/ziface/idatapack.go b/ziface/idatapack.go new file mode 100644 index 0000000..b3b16dc --- /dev/null +++ b/ziface/idatapack.go @@ -0,0 +1,18 @@ +package ziface + +/* +IDataPack +封包,拆包 +直接面向tcp中的流,用于处理tcp 粘包问题 +*/ +type IDataPack interface { + + //获取包的头长度方法 + GetHeadLen() uint32 + + //封包方法 + Pack(msg IMessage) ([]byte, error) + + //拆包方法 + UnPack([]byte) (IMessage, error) +} diff --git a/ziface/imessage.go b/ziface/imessage.go new file mode 100644 index 0000000..4ba1cc0 --- /dev/null +++ b/ziface/imessage.go @@ -0,0 +1,22 @@ +package ziface + +/* +* +将消息封装到message 中,定义抽象接口 +*/ +type IMessage interface { + + //获取消息的id + GetMsgId() uint32 + //获取消息的长度 + GetDataLen() uint32 + //获取消息的内容 + GetData() []byte + + //设置消息的id + SetMsgId(uint32) + //设置消息的长度 + SetDataLen(uint32) + //设置消息的内容 + SetData([]byte) +} diff --git a/ziface/imessageHandler.go b/ziface/imessageHandler.go new file mode 100644 index 0000000..010bc69 --- /dev/null +++ b/ziface/imessageHandler.go @@ -0,0 +1,22 @@ +package ziface + +/* +* +消息管理层抽象 +*/ +type IMsgHandler interface { + + //调度/执行队形router 消息处理方法 + DoMsgHandler(request IRequest) + + //为消息添加具体的处理模块 + AddRouter(msgId uint32, router IRouter) + + //启动work 工作池 + StartWorkPool() + + //启动一个worker 工作流程 + //StartOneWorker(workerId int, taskQueue chan IRequest) + + SendMsgTaskQueue(request IRequest) +} diff --git a/ziface/irequest.go b/ziface/irequest.go new file mode 100644 index 0000000..24225bf --- /dev/null +++ b/ziface/irequest.go @@ -0,0 +1,16 @@ +package ziface + +/** +封装客户端请求的链接信息 +*/ + +type IRequest interface { + //获取连接 + GetConnection() IConnection + + //请求的消息数据 + GetData() []byte + + //获取请求消息的id + GetMsgId() uint32 +} diff --git a/ziface/irouter.go b/ziface/irouter.go new file mode 100644 index 0000000..53d144c --- /dev/null +++ b/ziface/irouter.go @@ -0,0 +1,17 @@ +package ziface + +/** +路由抽象接口 + 路由里的接口都是IRequest +*/ + +type IRouter interface { + //在处理conn 业务之前的钩子方法 hook + PreHandle(request IRequest) + + //在处理conn 业务的主方法hook + Handle(request IRequest) + + //在处理conn业务之后的钩子方法 + PostHandle(request IRequest) +} diff --git a/ziface/iserver.go b/ziface/iserver.go new file mode 100644 index 0000000..73279a5 --- /dev/null +++ b/ziface/iserver.go @@ -0,0 +1,32 @@ +package ziface + +// 定义一个服务器接口 +type IServer interface { + + //启动服务器 + Start() + + //停止服务器 + Stop() + + //运行服务器 + Serve() + + //路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用 + AddRouter(msgId uint32, router IRouter) + + //获取当前server 的连接管理器 + GetConnMgr() IConnManager + + //注册OnConnStat 钩子函数方法 + SetOnConnStart(func(connection IConnection)) + + //注册OnConnStop 钩子函数方法 + SetOnConnStop(func(connection IConnection)) + + //调用OnConnStat 钩子函数方法 + CallOnConnStart(connection IConnection) + + //调用OnConnStop 钩子函数方法 + CallOnConnStop(connection IConnection) +} diff --git a/znet/connManager.go b/znet/connManager.go new file mode 100644 index 0000000..67c4087 --- /dev/null +++ b/znet/connManager.go @@ -0,0 +1,87 @@ +package znet + +import ( + "errors" + "fmt" + "sync" + "zinx/ziface" +) + +/* + 连接管理模块 +*/ + +type ConnManager struct { + + //管理的连接集合 + connections map[uint32]ziface.IConnection + + //保护连接集合的读写锁 + connLock sync.RWMutex +} + +// NewConnManager 创建当前连接的方法 +func NewConnManager() *ConnManager { + return &ConnManager{ + connections: make(map[uint32]ziface.IConnection), + } +} + +// Add 添加连接 +func (connMgr *ConnManager) Add(conn ziface.IConnection) { + //保护共享资源map ,加写锁 + connMgr.connLock.Lock() + defer connMgr.connLock.Unlock() + + //将conn 加入到connmanager 中 + connMgr.connections[conn.GetConnectionID()] = conn + + fmt.Println("connID = ", conn.GetConnectionID(), "add connection add to connmanager successfully : conn num = ", connMgr.Len()) +} + +// Remove 删除连接 +func (connMgr *ConnManager) Remove(conn ziface.IConnection) { + //保护共享资源map ,加写锁 + connMgr.connLock.Lock() + defer connMgr.connLock.Unlock() + + //删除链接 + delete(connMgr.connections, conn.GetConnectionID()) + fmt.Println("connID = ", conn.GetConnectionID(), "delete connection from connmanager successfully : conn num = ", connMgr.Len()) +} + +// 根据connID 获取连接 +func (connMgr *ConnManager) Get(connID uint32) (ziface.IConnection, error) { + //保护共享资源map ,加写锁 + connMgr.connLock.RLock() + defer connMgr.connLock.RUnlock() + + if conn, ok := connMgr.connections[connID]; ok { + //找到了 + return conn, nil + } else { + return nil, errors.New("connection is not found ") + } +} + +// 得到当前的连接总数 +func (connMgr *ConnManager) Len() int { + return len(connMgr.connections) +} + +// 清楚或终止所有连接 +func (connMgr *ConnManager) ClearConn() { + //保护共享资源map ,加写锁 + connMgr.connLock.Lock() + defer connMgr.connLock.Unlock() + + //删除conn 饼停止conn 的工作 + for connID, conn := range connMgr.connections { + //停止 + conn.Stop() + //删除 + delete(connMgr.connections, connID) + } + + fmt.Println("clear all connection successfully ! conn num = ", connMgr.Len()) +} diff --git a/znet/connection.go b/znet/connection.go new file mode 100644 index 0000000..08a0250 --- /dev/null +++ b/znet/connection.go @@ -0,0 +1,260 @@ +package znet + +import ( + "errors" + "fmt" + "io" + "net" + "sync" + "zinx/utils" + "zinx/ziface" +) + +// Connection 链接模块 +type Connection struct { + + //当前连接隶属于那个server + TcpServer ziface.IServer + + //当前链接的socket tcp 套接字 + Conn *net.TCPConn + + //链接的id + ConnID uint32 + + //当前链接的状态 + isClosed bool + + ////当前链接锁绑定的处理业务的api + //handleApi ziface.HandleFunc + + //告知当前链接的退出/停止 (由reading 告诉 writering 退出) + ExitChan chan bool + + //无缓冲的管道,用于读写goroutine之间的 消息通信 + msgChan chan []byte + + //消息处理的msgId 处理业务对应的api + MsgHandler ziface.IMsgHandler + + //连接属性集合 + property map[string]interface{} + + //保护连接属性的锁 + propertyLock sync.RWMutex +} + +// NewConnection 初始化链接 +func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, handler ziface.IMsgHandler) *Connection { + c := &Connection{ + TcpServer: server, + Conn: conn, + ConnID: connID, + isClosed: false, + MsgHandler: handler, + msgChan: make(chan []byte), + ExitChan: make(chan bool, 1), + property: make(map[string]interface{}), + } + + //将conn 加入到 connManager 中 + c.TcpServer.GetConnMgr().Add(c) + return c +} + +func (c *Connection) StartReader() { + fmt.Println("reading goroutine is running ...") + defer fmt.Println("connID = ", c.ConnID, " reading is exit , remote addr is =", c.Conn.RemoteAddr().String()) + defer c.Stop() + + for { + //读取数据 + //buf := make([]byte, utils.GlobalObject.MaxPackageSize) + // + //_, err := c.Conn.Read(buf) + //if err != nil { + // fmt.Println("conn read err") + // continue + //} + + //创建一个拆包解包对象 + dp := NewDataPack() + + //读取客户端的8个字节 + headData := make([]byte, dp.GetHeadLen()) + + //拆包得到msgId 和 msgDataLen + if _, err := io.ReadFull(c.GetTcpConnection(), headData); err != nil { + fmt.Println("read msg head error", err) + break + } + + msg, err := dp.UnPack(headData) + if err != nil { + fmt.Println("UnPack error", err) + break + } + var data []byte + if msg.GetDataLen() > 0 { + data = make([]byte, msg.GetDataLen()) + if _, err := io.ReadFull(c.GetTcpConnection(), data); err != nil { + fmt.Println("read msg data error", err) + } + } + + msg.SetData(data) + + //调用链接处理的api + //if err = c.handleApi(c.Conn, buf, len); err != nil { + // fmt.Println("connID = ", c.ConnID, "handle is err = ", err) + //} + + //的到当前conn 数据的request 请求数据 + req := Request{ + conn: c, + msg: msg, + } + + //go func(request ziface.IRequest) { + // //从路由中,找到注册绑定的conn对应的router 调用 + // c.Router.PreHandle(request) + // c.Router.Handle(request) + // c.Router.PostHandle(request) + //}(&req) + + if utils.GlobalObject.WokePoolSize > 0 { + //已经开启工作池,将消息发给工作池 + c.MsgHandler.SendMsgTaskQueue(&req) + } else { + //根据绑定msgID找到对应处理api业务执行 + go c.MsgHandler.DoMsgHandler(&req) + } + } +} + +// StartWriter写消息 goroutine, 专门发送给客户端消息的模块 +func (c *Connection) StartWriter() { + fmt.Println("[Writer GoRoutine is Running...]") + + defer fmt.Println(c.RemoteAddr().String(), " conn writering exit") + + //不断阻塞的等待channel的消息 + for { + select { + case data := <-c.msgChan: + if _, err := c.Conn.Write(data); err != nil { + fmt.Println("send data error ", err) + return + } + case <-c.ExitChan: + //代表已经退出,writer 也要退出 + return + } + } +} + +// Start 启动链接 +func (c *Connection) Start() { + fmt.Println("conn start() ...conn:", c.ConnID) + //处理业务数据 + go c.StartReader() + go c.StartWriter() + + //按照开发者传递进来的 创建连接之后需要调用的处理业务对应执行hook 函数 + c.TcpServer.CallOnConnStart(c) +} + +// Stop 关闭连接 +func (c *Connection) Stop() { + fmt.Println("conn stop .....,connID:", c.ConnID) + + if c.isClosed == true { + return + } + c.isClosed = true + + //调用开发者注册销毁连接之前执行的hook 函数 + c.TcpServer.CallOnConnStop(c) + + //关闭连接 + c.Conn.Close() + + //告知writer 关闭 + c.ExitChan <- true + + //将conn 删除掉 + c.TcpServer.GetConnMgr().Remove(c) + + //关闭资源 + close(c.ExitChan) + close(c.msgChan) +} + +// GetTcpConnection 获取当前链接绑定的socket +func (c *Connection) GetTcpConnection() *net.TCPConn { + return c.Conn +} + +// GetConnectionID 获取当前链接模块的ID +func (c *Connection) GetConnectionID() uint32 { + return c.ConnID +} + +// RemoteAddr 获取远程客户点的tcp 状态 ip port +func (c *Connection) RemoteAddr() net.Addr { + return c.Conn.RemoteAddr() +} + +// Send 发送数据,将数据发送给远程客户端,先封包在发送 +func (c *Connection) SendMsg(msgId uint32, data []byte) error { + + if c.isClosed == true { + return errors.New("connection close when send msg") + } + //将data 在封包 + dp := NewDataPack() + binaryMsg, err := dp.Pack(NewMessage(msgId, data)) + + if err != nil { + fmt.Println("pack msg id = ", msgId) + return errors.New("pack err msg") + } + + //将数据发送到客户端 + //if _, err := c.Conn.Write(binaryMsg); err != nil { + // fmt.Println("write msg id = ", msgId) + // return errors.New("write msg err") + //} + c.msgChan <- binaryMsg + return nil +} + +// 设置连接属性 +func (c *Connection) SetProperty(key string, value interface{}) { + c.propertyLock.Lock() + defer c.propertyLock.Unlock() + + //添加一个连接属性 + c.property[key] = value +} + +// 获取连接属性 +func (c *Connection) GetProperty(key string) (interface{}, error) { + c.propertyLock.RLock() + defer c.propertyLock.RUnlock() + + if value, ok := c.property[key]; ok { + return value, nil + } else { + return nil, errors.New("not property found") + } +} + +// 移除连接属性 +func (c *Connection) RemoveProperty(key string) { + c.propertyLock.Lock() + defer c.propertyLock.Unlock() + + //删除 + delete(c.property, key) +} diff --git a/znet/datapack.go b/znet/datapack.go new file mode 100644 index 0000000..47b5697 --- /dev/null +++ b/znet/datapack.go @@ -0,0 +1,71 @@ +package znet + +import ( + "bytes" + "encoding/binary" + "errors" + "zinx/utils" + "zinx/ziface" +) + +/** +封包拆包的具体模块 +*/ + +type DataPack struct { +} + +// 初始化示例 +func NewDataPack() *DataPack { + return &DataPack{} +} + +// 获取包的头长度方法 +func (d *DataPack) GetHeadLen() uint32 { + + //datalen uint32(4个字节) +id (4个字节) + return 8 +} + +// 封包方法 +func (d *DataPack) Pack(msg ziface.IMessage) ([]byte, error) { + //创建bytes字节缓冲 + dataBuf := bytes.NewBuffer([]byte{}) + //将datalen 写入buf + if err := binary.Write(dataBuf, binary.LittleEndian, msg.GetDataLen()); err != nil { + return nil, err + } + //将msgid写入buf + if err := binary.Write(dataBuf, binary.LittleEndian, msg.GetMsgId()); err != nil { + return nil, err + } + //将data写入buf + if err := binary.Write(dataBuf, binary.LittleEndian, msg.GetData()); err != nil { + return nil, err + } + return dataBuf.Bytes(), nil +} + +// 拆包方法 先读head 长度 ,根据head 长度读取内容 +func (d *DataPack) UnPack(binaryData []byte) (ziface.IMessage, error) { + //创建bytes字节缓冲 + dataBuf := bytes.NewReader(binaryData) + + //直解压 head 获取datalen 与 msgid + msg := &Message{} + + //读取datalen + if err := binary.Read(dataBuf, binary.LittleEndian, &msg.DataLen); err != nil { + return nil, err + } + //读取msgId + if err := binary.Read(dataBuf, binary.LittleEndian, &msg.Id); err != nil { + return nil, err + } + //判断datalen是否超出最大包的限制 + if utils.GlobalObject.MaxPackageSize > 0 && msg.DataLen > utils.GlobalObject.MaxPackageSize { + return nil, errors.New("too large data recv") + } + + return msg, nil +} diff --git a/znet/message.go b/znet/message.go new file mode 100644 index 0000000..7865a0c --- /dev/null +++ b/znet/message.go @@ -0,0 +1,49 @@ +package znet + +type Message struct { + + //消息的id + Id uint32 + //消息的内容的长度 + DataLen uint32 + //消息的内容 + Data []byte +} + +func NewMessage(id uint32, data []byte) *Message { + return &Message{ + Id: id, + Data: data, + DataLen: uint32(len(data)), + } +} + +// GetMsgId 获取消息的id +func (m *Message) GetMsgId() uint32 { + return m.Id +} + +// GetDataLen 获取消息的长度 +func (m *Message) GetDataLen() uint32 { + return m.DataLen +} + +// GetData 获取消息的内容 +func (m *Message) GetData() []byte { + return m.Data +} + +// SetMsgId 设置消息的id +func (m *Message) SetMsgId(id uint32) { + m.Id = id +} + +// SetDataLen 设置消息的长度 +func (m *Message) SetDataLen(dataLen uint32) { + m.DataLen = dataLen +} + +// SetData 设置消息的内容 +func (m *Message) SetData(data []byte) { + m.Data = data +} diff --git a/znet/messageHandler.go b/znet/messageHandler.go new file mode 100644 index 0000000..dbc0ff2 --- /dev/null +++ b/znet/messageHandler.go @@ -0,0 +1,99 @@ +package znet + +import ( + "fmt" + "strconv" + "zinx/utils" + "zinx/ziface" +) + +/** +消息模块的实现层 +*/ + +type MsgHandler struct { + + //存放每个msgId 对应的处理方法 + Apis map[uint32]ziface.IRouter + + //负责worker 取任务的消息对列 + TaskQueue []chan ziface.IRequest + + //业务工作池的数量 + WorkPoolSize uint32 +} + +// 添加初始化方法 +func NewMsgHandler() *MsgHandler { + return &MsgHandler{ + Apis: make(map[uint32]ziface.IRouter), + WorkPoolSize: utils.GlobalObject.WokePoolSize, + TaskQueue: make([]chan ziface.IRequest, utils.GlobalObject.WokePoolSize), + } +} + +// 调度/执行队形router 消息处理方法 +func (msgHandler *MsgHandler) DoMsgHandler(request ziface.IRequest) { + //request 获取msgId + router, ok := msgHandler.Apis[request.GetMsgId()] + if !ok { + fmt.Println("api msg id = ", request.GetMsgId(), " is not found , need register") + return + } + //根据路由调用对应的业务 + router.PreHandle(request) + router.Handle(request) + router.PostHandle(request) +} + +// 为消息添加具体的处理模块 +func (msgHandler *MsgHandler) AddRouter(msgId uint32, router ziface.IRouter) { + //判断msg 绑定的api 是否存在 + if _, ok := msgHandler.Apis[msgId]; ok { + //id已经注册过 + panic("repeat api , msgId = " + strconv.Itoa(int(msgId))) + } + + //添加对应关系 + msgHandler.Apis[msgId] = router + fmt.Println("add api msgid = ", msgId, " successful") +} + +// StartWorkPool 启动一个worker 工作池,(开启工作池只能发生一次,一个zinx 框架只能有一个工作池) +func (msgHandler *MsgHandler) StartWorkPool() { + + //根据workpoolsize分别开启worker,每个worker 用一个go 承载 + for i := 0; i < int(msgHandler.WorkPoolSize); i++ { + + //1.当前worker对应channel 的消息队列 , 开启空间 , 第0个worker 对应 第0 个channel.. + msgHandler.TaskQueue[i] = make(chan ziface.IRequest, utils.GlobalObject.MaxWorkerTaskLen) + + fmt.Println("i = ", i, "taskQueue = ", msgHandler.TaskQueue[i]) + + //启动当前worker,阻塞等待消息从channel中传递过来 + go msgHandler.StartOneWorker(i, msgHandler.TaskQueue[i]) + } +} + +// StartOneWorker 启动一个worker 工作流程 +func (msgHandler *MsgHandler) StartOneWorker(workerId int, taskQueue chan ziface.IRequest) { + fmt.Println("Worker ID = ", workerId, " is starting ...") + + //不断阻塞等待消息 + for { + select { + case request := <-taskQueue: + msgHandler.DoMsgHandler(request) + } + } +} + +// 将消息交给taskQueue, 由worker 处理 +func (msgHander *MsgHandler) SendMsgTaskQueue(request ziface.IRequest) { + //将消息平均分配 给worker(根据ConnectionID 进行分配) + workerID := request.GetConnection().GetConnectionID() % msgHander.WorkPoolSize + fmt.Println("add connID = ", request.GetConnection().GetConnectionID(), " request MsgId = ", request.GetMsgId(), " to worker id =", workerID) + //将消息发送给对应worker的taskQUeue即可 + msgHander.TaskQueue[workerID] <- request + fmt.Println("msgHander.TaskQueue[workerID] = ", workerID) +} diff --git a/znet/request.go b/znet/request.go new file mode 100644 index 0000000..3330a62 --- /dev/null +++ b/znet/request.go @@ -0,0 +1,27 @@ +package znet + +import "zinx/ziface" + +type Request struct { + + //客户端链接 + conn ziface.IConnection + + //客户端的请求数据 + msg ziface.IMessage +} + +// GetConnection 获取连接 +func (r *Request) GetConnection() ziface.IConnection { + return r.conn +} + +// GetData 请求的消息数据 +func (r *Request) GetData() []byte { + return r.msg.GetData() +} + +// 获取请求消息的id +func (r *Request) GetMsgId() uint32 { + return r.msg.GetMsgId() +} diff --git a/znet/router.go b/znet/router.go new file mode 100644 index 0000000..3d0d6dd --- /dev/null +++ b/znet/router.go @@ -0,0 +1,21 @@ +package znet + +import "zinx/ziface" + +type BaseRouter struct { +} + +// PreHandle 在处理conn 业务之前的钩子方法 hook +func (br *BaseRouter) PreHandle(request ziface.IRequest) { + +} + +// Handle 在处理conn 业务的主方法hook +func (br *BaseRouter) Handle(request ziface.IRequest) { + +} + +// PostHandle 在处理conn业务之后的钩子方法 +func (br *BaseRouter) PostHandle(request ziface.IRequest) { + +} diff --git a/znet/server.go b/znet/server.go new file mode 100644 index 0000000..3b1d776 --- /dev/null +++ b/znet/server.go @@ -0,0 +1,188 @@ +package znet + +import ( + "errors" + "fmt" + "net" + "zinx/utils" + "zinx/ziface" +) + +// Server /* +/** +IServer 的接口实现,定义一个Serve 的服务器模块 +*/ +type Server struct { + + //服务器名称 + Name string + + //服务器绑定的ip版本 + IPVersion string + + //服务器监听的ip地址 + IP string + + //f服务器监听的端口 + Port int + + //当前Server由用户绑定的回调router,也就是Server注册的链接对应的处理业务 + MsgHandler ziface.IMsgHandler + + //该server 的连接管理器 + ConnMgr ziface.IConnManager + + //该server创建之后自动调用的hook 函数 -- OnConnStart + OnConnStart func(conn ziface.IConnection) + + //该server销毁连接之前自动调用的hook 函数 -- OnConnStop + OnConnStop func(conn ziface.IConnection) +} + +//实现IServer 接口 + +func (s *Server) Start() { + fmt.Printf("[Zinx GlobalObject] Server Name : %s,Ip : %s,Port : %d\n", + utils.GlobalObject.Name, utils.GlobalObject.Host, utils.GlobalObject.TcpPort) + fmt.Printf("[Zinx GlobalObject] Version Name : %s,MaxConn : %d,MaxPackageSize : %d\n", + utils.GlobalObject.Version, utils.GlobalObject.MaxConn, utils.GlobalObject.MaxPackageSize) + fmt.Printf("[Start] Server Listenner at Ip :%s ,Port %d, is startring \n", s.IP, s.Port) + go func() { + + //开启worker 消息队列几工作池 + s.MsgHandler.StartWorkPool() + + //1.获取tcp地址 + addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port)) + if err != nil { + fmt.Println("resplve tcp addr error :", err) + } + + //2.监听服务器地址 + listenner, err := net.ListenTCP(s.IPVersion, addr) + if err != nil { + fmt.Println("listen ", s.IPVersion, " error", err) + } + fmt.Println("start zinx server ", s.Name, "successful listenning") + + //3.阻塞等待客户端链接,处理客户端连接请求 + + var cid uint32 + cid = 0 + for { + //如果有客户端连接,阻塞返回 + conn, err := listenner.AcceptTCP() + if err != nil { + fmt.Println("accept error", err) + continue + } + ////已经建立客户端连接,处理数据 + //go func() { + // for { + // buf := make([]byte, 512) + // len, err2 := conn.Read(buf) + // if err2 != nil { + // fmt.Println("recv buf error", err) + // continue + // } + // + // fmt.Printf("recv buf:%s,len:%d\n", string(buf[:len]), len) + // //回显功能 + // if _, err := conn.Write(buf[:len]); err != nil { + // fmt.Println("write back buf error ", err) + // continue + // } + // } + //}() + + //设置最大连接个数判断,如果超过最大连接,那么关闭次新的连接 + if s.ConnMgr.Len() >= utils.GlobalObject.MaxConn { + + //todo 给客户端响应超出最大连接数,错误包 + fmt.Println("Too Many Connections MaxConn = ", utils.GlobalObject.MaxConn) + conn.Close() + continue + } + + //处理新连接的业务和方法与conn 进行绑定 的到链接模块 + dealConn := NewConnection(s, conn, cid, s.MsgHandler) + cid++ + go dealConn.Start() + } + }() +} + +func CallBackToClient(conn *net.TCPConn, data []byte, cnt int) error { + //回显业务 + fmt.Println("[Conn Handle] CallBackToClient ... ") + if _, err := conn.Write(data[:cnt]); err != nil { + fmt.Println("write back buf err ", err) + return errors.New("CallBackToClient error") + } + return nil +} + +func (s *Server) Stop() { + //将服务的资源,状态,链接信息进行停止,或回收 + + fmt.Println("[Stop] zinx Server name = ", s.Name) + s.ConnMgr.ClearConn() +} + +func (s *Server) Serve() { + //启动server + s.Start() + + //TODO 启动服务之后一些扩展业务 + + //阻塞 + select {} +} + +func (s *Server) AddRouter(msgId uint32, router ziface.IRouter) { + s.MsgHandler.AddRouter(msgId, router) + fmt.Println("add router successful") +} + +// NewServer 初始化serve +func NewServer() ziface.IServer { + s := &Server{ + Name: utils.GlobalObject.Name, + IPVersion: "tcp4", + IP: utils.GlobalObject.Host, + Port: utils.GlobalObject.TcpPort, + MsgHandler: NewMsgHandler(), + ConnMgr: NewConnManager(), + } + return s +} + +func (s *Server) GetConnMgr() ziface.IConnManager { + return s.ConnMgr +} + +// SetOnConnStart 注册OnConnStat 钩子函数方法 +func (s *Server) SetOnConnStart(hookFunc func(connection ziface.IConnection)) { + s.OnConnStart = hookFunc +} + +// SetOnConnStop 注册OnConnStop 钩子函数方法 +func (s *Server) SetOnConnStop(hookFunc func(connection ziface.IConnection)) { + s.OnConnStop = hookFunc +} + +// CallOnConnStart 调用OnConnStat 钩子函数方法 +func (s *Server) CallOnConnStart(conn ziface.IConnection) { + if s.OnConnStart != nil { + fmt.Println("-----> CallOnConnStart() ....") + s.OnConnStart(conn) + } +} + +// CallOnConnStop 调用OnConnStop 钩子函数方法 +func (s *Server) CallOnConnStop(conn ziface.IConnection) { + if s.OnConnStop != nil { + fmt.Println("-----> CallOnConnStop() ....") + s.OnConnStop(conn) + } +}