package znet import ( "errors" "fmt" "io" "net" "sync" "zinx/utils" "zinx/ziface" ) // Connection 链接模块 type Connection struct { //当前连接隶属于那个server TcpServer ziface.IServer //当前链接的socket tcp 套接字 Conn *net.TCPConn //链接的id ConnID uint32 //当前链接的状态 isClosed bool ////当前链接锁绑定的处理业务的api //handleApi ziface.HandleFunc //告知当前链接的退出/停止 (由reading 告诉 writering 退出) ExitChan chan bool //无缓冲的管道,用于读写goroutine之间的 消息通信 msgChan chan []byte //消息处理的msgId 处理业务对应的api MsgHandler ziface.IMsgHandler //连接属性集合 property map[string]interface{} //保护连接属性的锁 propertyLock sync.RWMutex } // NewConnection 初始化链接 func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, handler ziface.IMsgHandler) *Connection { c := &Connection{ TcpServer: server, Conn: conn, ConnID: connID, isClosed: false, MsgHandler: handler, msgChan: make(chan []byte), ExitChan: make(chan bool, 1), property: make(map[string]interface{}), } //将conn 加入到 connManager 中 c.TcpServer.GetConnMgr().Add(c) return c } func (c *Connection) StartReader() { fmt.Println("reading goroutine is running ...") defer fmt.Println("connID = ", c.ConnID, " reading is exit , remote addr is =", c.Conn.RemoteAddr().String()) defer c.Stop() for { //读取数据 //buf := make([]byte, utils.GlobalObject.MaxPackageSize) // //_, err := c.Conn.Read(buf) //if err != nil { // fmt.Println("conn read err") // continue //} //创建一个拆包解包对象 dp := NewDataPack() //读取客户端的8个字节 headData := make([]byte, dp.GetHeadLen()) //拆包得到msgId 和 msgDataLen if _, err := io.ReadFull(c.GetTcpConnection(), headData); err != nil { fmt.Println("read msg head error", err) break } msg, err := dp.UnPack(headData) if err != nil { fmt.Println("UnPack error", err) break } var data []byte if msg.GetDataLen() > 0 { data = make([]byte, msg.GetDataLen()) if _, err := io.ReadFull(c.GetTcpConnection(), data); err != nil { fmt.Println("read msg data error", err) } } msg.SetData(data) //调用链接处理的api //if err = c.handleApi(c.Conn, buf, len); err != nil { // fmt.Println("connID = ", c.ConnID, "handle is err = ", err) //} //的到当前conn 数据的request 请求数据 req := Request{ conn: c, msg: msg, } //go func(request ziface.IRequest) { // //从路由中,找到注册绑定的conn对应的router 调用 // c.Router.PreHandle(request) // c.Router.Handle(request) // c.Router.PostHandle(request) //}(&req) if utils.GlobalObject.WokePoolSize > 0 { //已经开启工作池,将消息发给工作池 c.MsgHandler.SendMsgTaskQueue(&req) } else { //根据绑定msgID找到对应处理api业务执行 go c.MsgHandler.DoMsgHandler(&req) } } } // StartWriter写消息 goroutine, 专门发送给客户端消息的模块 func (c *Connection) StartWriter() { fmt.Println("[Writer GoRoutine is Running...]") defer fmt.Println(c.RemoteAddr().String(), " conn writering exit") //不断阻塞的等待channel的消息 for { select { case data := <-c.msgChan: if _, err := c.Conn.Write(data); err != nil { fmt.Println("send data error ", err) return } case <-c.ExitChan: //代表已经退出,writer 也要退出 return } } } // Start 启动链接 func (c *Connection) Start() { fmt.Println("conn start() ...conn:", c.ConnID) //处理业务数据 go c.StartReader() go c.StartWriter() //按照开发者传递进来的 创建连接之后需要调用的处理业务对应执行hook 函数 c.TcpServer.CallOnConnStart(c) } // Stop 关闭连接 func (c *Connection) Stop() { fmt.Println("conn stop .....,connID:", c.ConnID) if c.isClosed == true { return } c.isClosed = true //调用开发者注册销毁连接之前执行的hook 函数 c.TcpServer.CallOnConnStop(c) //关闭连接 c.Conn.Close() //告知writer 关闭 c.ExitChan <- true //将conn 删除掉 c.TcpServer.GetConnMgr().Remove(c) //关闭资源 close(c.ExitChan) close(c.msgChan) } // GetTcpConnection 获取当前链接绑定的socket func (c *Connection) GetTcpConnection() *net.TCPConn { return c.Conn } // GetConnectionID 获取当前链接模块的ID func (c *Connection) GetConnectionID() uint32 { return c.ConnID } // RemoteAddr 获取远程客户点的tcp 状态 ip port func (c *Connection) RemoteAddr() net.Addr { return c.Conn.RemoteAddr() } // Send 发送数据,将数据发送给远程客户端,先封包在发送 func (c *Connection) SendMsg(msgId uint32, data []byte) error { if c.isClosed == true { return errors.New("connection close when send msg") } //将data 在封包 dp := NewDataPack() binaryMsg, err := dp.Pack(NewMessage(msgId, data)) if err != nil { fmt.Println("pack msg id = ", msgId) return errors.New("pack err msg") } //将数据发送到客户端 //if _, err := c.Conn.Write(binaryMsg); err != nil { // fmt.Println("write msg id = ", msgId) // return errors.New("write msg err") //} c.msgChan <- binaryMsg return nil } // 设置连接属性 func (c *Connection) SetProperty(key string, value interface{}) { c.propertyLock.Lock() defer c.propertyLock.Unlock() //添加一个连接属性 c.property[key] = value } // 获取连接属性 func (c *Connection) GetProperty(key string) (interface{}, error) { c.propertyLock.RLock() defer c.propertyLock.RUnlock() if value, ok := c.property[key]; ok { return value, nil } else { return nil, errors.New("not property found") } } // 移除连接属性 func (c *Connection) RemoveProperty(key string) { c.propertyLock.Lock() defer c.propertyLock.Unlock() //删除 delete(c.property, key) }