zinx/znet/connection.go

261 lines
5.8 KiB
Go

package znet
import (
"errors"
"fmt"
"io"
"net"
"sync"
"zinx/utils"
"zinx/ziface"
)
// Connection 链接模块
type Connection struct {
//当前连接隶属于那个server
TcpServer ziface.IServer
//当前链接的socket tcp 套接字
Conn *net.TCPConn
//链接的id
ConnID uint32
//当前链接的状态
isClosed bool
////当前链接锁绑定的处理业务的api
//handleApi ziface.HandleFunc
//告知当前链接的退出/停止 (由reading 告诉 writering 退出)
ExitChan chan bool
//无缓冲的管道,用于读写goroutine之间的 消息通信
msgChan chan []byte
//消息处理的msgId 处理业务对应的api
MsgHandler ziface.IMsgHandler
//连接属性集合
property map[string]interface{}
//保护连接属性的锁
propertyLock sync.RWMutex
}
// NewConnection 初始化链接
func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, handler ziface.IMsgHandler) *Connection {
c := &Connection{
TcpServer: server,
Conn: conn,
ConnID: connID,
isClosed: false,
MsgHandler: handler,
msgChan: make(chan []byte),
ExitChan: make(chan bool, 1),
property: make(map[string]interface{}),
}
//将conn 加入到 connManager 中
c.TcpServer.GetConnMgr().Add(c)
return c
}
func (c *Connection) StartReader() {
fmt.Println("reading goroutine is running ...")
defer fmt.Println("connID = ", c.ConnID, " reading is exit , remote addr is =", c.Conn.RemoteAddr().String())
defer c.Stop()
for {
//读取数据
//buf := make([]byte, utils.GlobalObject.MaxPackageSize)
//
//_, err := c.Conn.Read(buf)
//if err != nil {
// fmt.Println("conn read err")
// continue
//}
//创建一个拆包解包对象
dp := NewDataPack()
//读取客户端的8个字节
headData := make([]byte, dp.GetHeadLen())
//拆包得到msgId 和 msgDataLen
if _, err := io.ReadFull(c.GetTcpConnection(), headData); err != nil {
fmt.Println("read msg head error", err)
break
}
msg, err := dp.UnPack(headData)
if err != nil {
fmt.Println("UnPack error", err)
break
}
var data []byte
if msg.GetDataLen() > 0 {
data = make([]byte, msg.GetDataLen())
if _, err := io.ReadFull(c.GetTcpConnection(), data); err != nil {
fmt.Println("read msg data error", err)
}
}
msg.SetData(data)
//调用链接处理的api
//if err = c.handleApi(c.Conn, buf, len); err != nil {
// fmt.Println("connID = ", c.ConnID, "handle is err = ", err)
//}
//的到当前conn 数据的request 请求数据
req := Request{
conn: c,
msg: msg,
}
//go func(request ziface.IRequest) {
// //从路由中,找到注册绑定的conn对应的router 调用
// c.Router.PreHandle(request)
// c.Router.Handle(request)
// c.Router.PostHandle(request)
//}(&req)
if utils.GlobalObject.WokePoolSize > 0 {
//已经开启工作池,将消息发给工作池
c.MsgHandler.SendMsgTaskQueue(&req)
} else {
//根据绑定msgID找到对应处理api业务执行
go c.MsgHandler.DoMsgHandler(&req)
}
}
}
// StartWriter写消息 goroutine, 专门发送给客户端消息的模块
func (c *Connection) StartWriter() {
fmt.Println("[Writer GoRoutine is Running...]")
defer fmt.Println(c.RemoteAddr().String(), " conn writering exit")
//不断阻塞的等待channel的消息
for {
select {
case data := <-c.msgChan:
if _, err := c.Conn.Write(data); err != nil {
fmt.Println("send data error ", err)
return
}
case <-c.ExitChan:
//代表已经退出,writer 也要退出
return
}
}
}
// Start 启动链接
func (c *Connection) Start() {
fmt.Println("conn start() ...conn:", c.ConnID)
//处理业务数据
go c.StartReader()
go c.StartWriter()
//按照开发者传递进来的 创建连接之后需要调用的处理业务对应执行hook 函数
c.TcpServer.CallOnConnStart(c)
}
// Stop 关闭连接
func (c *Connection) Stop() {
fmt.Println("conn stop .....,connID:", c.ConnID)
if c.isClosed == true {
return
}
c.isClosed = true
//调用开发者注册销毁连接之前执行的hook 函数
c.TcpServer.CallOnConnStop(c)
//关闭连接
c.Conn.Close()
//告知writer 关闭
c.ExitChan <- true
//将conn 删除掉
c.TcpServer.GetConnMgr().Remove(c)
//关闭资源
close(c.ExitChan)
close(c.msgChan)
}
// GetTcpConnection 获取当前链接绑定的socket
func (c *Connection) GetTcpConnection() *net.TCPConn {
return c.Conn
}
// GetConnectionID 获取当前链接模块的ID
func (c *Connection) GetConnectionID() uint32 {
return c.ConnID
}
// RemoteAddr 获取远程客户点的tcp 状态 ip port
func (c *Connection) RemoteAddr() net.Addr {
return c.Conn.RemoteAddr()
}
// Send 发送数据,将数据发送给远程客户端,先封包在发送
func (c *Connection) SendMsg(msgId uint32, data []byte) error {
if c.isClosed == true {
return errors.New("connection close when send msg")
}
//将data 在封包
dp := NewDataPack()
binaryMsg, err := dp.Pack(NewMessage(msgId, data))
if err != nil {
fmt.Println("pack msg id = ", msgId)
return errors.New("pack err msg")
}
//将数据发送到客户端
//if _, err := c.Conn.Write(binaryMsg); err != nil {
// fmt.Println("write msg id = ", msgId)
// return errors.New("write msg err")
//}
c.msgChan <- binaryMsg
return nil
}
// 设置连接属性
func (c *Connection) SetProperty(key string, value interface{}) {
c.propertyLock.Lock()
defer c.propertyLock.Unlock()
//添加一个连接属性
c.property[key] = value
}
// 获取连接属性
func (c *Connection) GetProperty(key string) (interface{}, error) {
c.propertyLock.RLock()
defer c.propertyLock.RUnlock()
if value, ok := c.property[key]; ok {
return value, nil
} else {
return nil, errors.New("not property found")
}
}
// 移除连接属性
func (c *Connection) RemoveProperty(key string) {
c.propertyLock.Lock()
defer c.propertyLock.Unlock()
//删除
delete(c.property, key)
}