diff --git a/src/main/java/com/zhangmeng/TcpApplication.java b/src/main/java/com/zhangmeng/TcpApplication.java index e08745d..514997f 100644 --- a/src/main/java/com/zhangmeng/TcpApplication.java +++ b/src/main/java/com/zhangmeng/TcpApplication.java @@ -1,10 +1,9 @@ package com.zhangmeng; +import com.zhangmeng.callBack.DoConnectionBegin; +import com.zhangmeng.callBack.DoConnectionLost; import com.zhangmeng.service.MsgHandle; -import com.zhangmeng.service.impl.HelloRouter; -import com.zhangmeng.service.impl.MsgHandleImpl; -import com.zhangmeng.service.impl.PingRouter; -import com.zhangmeng.service.impl.ServerImpl; +import com.zhangmeng.service.impl.*; import com.zhangmeng.utils.Globalobj; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -37,6 +36,9 @@ public class TcpApplication { MsgHandle msgHandle =new MsgHandleImpl(); server.setMsgHandle(msgHandle); + server.setOnConnStart(new DoConnectionBegin()); + server.setOnConnStop(new DoConnectionLost()); + server.setConnMgr(new ConnManagerImpl()); server.AddRouter(0,new PingRouter()); server.AddRouter(1,new HelloRouter()); server.Start(); diff --git a/src/main/java/com/zhangmeng/callBack/DoConnectionBegin.java b/src/main/java/com/zhangmeng/callBack/DoConnectionBegin.java new file mode 100644 index 0000000..f648d2b --- /dev/null +++ b/src/main/java/com/zhangmeng/callBack/DoConnectionBegin.java @@ -0,0 +1,26 @@ +package com.zhangmeng.callBack; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zhangmeng.service.Connection; +import com.zhangmeng.utils.ByteBufferUtil; + +import java.io.IOException; + +/** + * @author zm + * @date 2025/2/27 15:19 + * @version: 1.0 + */ +public class DoConnectionBegin { + private final Log log = LogFactory.get(); + public void run(Connection connection){ + log.info("DoConnecionBegin is Called ... "); + try { + ByteBufferUtil.sendMsg(2, "DoConnection BEGIN...".getBytes(), connection.GetTCPConnection().getOutputStream()); + } catch (IOException e) { + log.error(e.getMessage()); + } + } + +} diff --git a/src/main/java/com/zhangmeng/callBack/DoConnectionLost.java b/src/main/java/com/zhangmeng/callBack/DoConnectionLost.java new file mode 100644 index 0000000..b702041 --- /dev/null +++ b/src/main/java/com/zhangmeng/callBack/DoConnectionLost.java @@ -0,0 +1,19 @@ +package com.zhangmeng.callBack; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zhangmeng.service.Connection; + +/** + * @author zm + * @date 2025/2/27 15:20 + * @version: 1.0 + */ +public class DoConnectionLost { + + private final Log log = LogFactory.get(); + + public void run(Connection connection){ + log.info("DoConneciotnLost is Called ... "); + } +} diff --git a/src/main/java/com/zhangmeng/service/ConnManager.java b/src/main/java/com/zhangmeng/service/ConnManager.java new file mode 100644 index 0000000..c64a4bc --- /dev/null +++ b/src/main/java/com/zhangmeng/service/ConnManager.java @@ -0,0 +1,21 @@ +package com.zhangmeng.service; + +/** + * @author zm + * @date 2025/2/27 14:49 + * @version: 1.0 + */ +public interface ConnManager { + + public void Add(Connection conn); //添加链接 + + public void Remove(Connection conn); //删除连接 + + public Connection Get(int connID); //利用ConnID获取链接 + + public int Len(); //获取当前连接 + + public void ClearConn(); //删除并停止所有链接 + + +} diff --git a/src/main/java/com/zhangmeng/service/Server.java b/src/main/java/com/zhangmeng/service/Server.java index 7bcbba6..13a003f 100644 --- a/src/main/java/com/zhangmeng/service/Server.java +++ b/src/main/java/com/zhangmeng/service/Server.java @@ -1,5 +1,8 @@ package com.zhangmeng.service; +import com.zhangmeng.callBack.DoConnectionBegin; +import com.zhangmeng.callBack.DoConnectionLost; + /** * @author zm * @Description: 服务器接口 @@ -18,5 +21,20 @@ public interface Server { public void Serve(); //路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用 - public void AddRouter( int msgId,Router router); + public void AddRouter(int msgId, Router router); + + //设置该Server的连接创建时Hook函数 + public void SetOnConnStart(DoConnectionBegin onConnStart); + + //设置该Server的连接断开时的Hook函数 + public void SetOnConnStop(DoConnectionLost onConnStop); + + //调用连接OnConnStart Hook函数 + public void CallOnConnStart(Connection connection); + + //调用连接OnConnStop Hook函数 + public void CallOnConnStop(Connection connection); + + public ConnManager GetConnMgr();//得到链接管理 + } diff --git a/src/main/java/com/zhangmeng/service/impl/ConnManagerImpl.java b/src/main/java/com/zhangmeng/service/impl/ConnManagerImpl.java new file mode 100644 index 0000000..680054c --- /dev/null +++ b/src/main/java/com/zhangmeng/service/impl/ConnManagerImpl.java @@ -0,0 +1,79 @@ +package com.zhangmeng.service.impl; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zhangmeng.service.ConnManager; +import com.zhangmeng.service.Connection; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * @author zm + * @date 2025/2/27 14:49 + * @version: 1.0 + */ +public class ConnManagerImpl implements ConnManager { + + private final Log log = LogFactory.get(); + + private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); + private Map connections = new HashMap<>(); //管理的连接信息 + + @Override + public void Add(Connection conn) { + //保护共享资源Map 加写锁 + rwl.writeLock().lock(); // 获取写锁 + try { + log.info("connection add to ConnManager successfully: conn num = {}", connections.size()); + //将conn连接添加到ConnMananger中 + this.connections.put(conn.GetConnID(), conn); + } finally { + rwl.writeLock().unlock(); // 释放写锁 + } + } + + @Override + public void Remove(Connection conn) { + //保护共享资源Map 加写锁 + rwl.writeLock().lock(); // 获取写锁 + try { + //删除连接信息 + this.connections.remove(conn.GetConnID()); + log.info("connection Remove ConnID=",conn.GetConnID(), " successfully: conn num = ", connections.size()); + } finally { + rwl.writeLock().unlock(); // 释放写锁 + } + } + + @Override + public Connection Get(int connID) { + + //保护共享资源Map 加读锁 + rwl.readLock().lock(); // 获取读锁 + try { + //获取指定连接信息 + return this.connections.get(connID); + } finally { + rwl.readLock().unlock(); // 释放读锁 + } + } + + @Override + public int Len() { + return this.connections.size(); + } + + @Override + public void ClearConn() { + //保护共享资源Map 加写锁 + rwl.writeLock().lock(); // 获取写锁 + try { + //清空连接信息 + this.connections.clear(); + } finally { + rwl.writeLock().unlock(); // 释放写锁 + } + } +} diff --git a/src/main/java/com/zhangmeng/service/impl/ConnectionImpl.java b/src/main/java/com/zhangmeng/service/impl/ConnectionImpl.java index b13350a..e86649b 100644 --- a/src/main/java/com/zhangmeng/service/impl/ConnectionImpl.java +++ b/src/main/java/com/zhangmeng/service/impl/ConnectionImpl.java @@ -44,9 +44,12 @@ public class ConnectionImpl implements Connection { private OutputStream outputStream; private InputStream inputStream; + //当前Conn属于哪个Server + private Server TcpServer; //当前conn属于哪个server,在conn初始化的时候添加即可 + //创建连接的方法 - public ConnectionImpl(Socket conn, int connID, MsgHandle msgHandler) { + public ConnectionImpl(Server tcpServer,Socket conn, int connID, MsgHandle msgHandler) { this.Conn = conn; this.ConnID = connID; this.isClosed = false; @@ -64,6 +67,9 @@ public class ConnectionImpl implements Connection { } catch (IOException e) { throw new RuntimeException(e); } + this.TcpServer = tcpServer; + //将新创建的Conn添加到链接管理中 + this.TcpServer.GetConnMgr().Add(this); //将当前新创建的连接添加到ConnManager中 } public void close() { @@ -72,6 +78,10 @@ public class ConnectionImpl implements Connection { writer.close(); outputStream.close(); inputStream.close(); + + //将链接从连接管理器中删除 + this.TcpServer.GetConnMgr().Remove(this); //删除conn从ConnManager中 + Conn.close(); } catch (IOException e) { throw new RuntimeException(e); @@ -85,7 +95,7 @@ public class ConnectionImpl implements Connection { throw new RuntimeException("Connection closed when send msg"); } //将data封包,并且发送 - DataPack dp = new DataPackImpl(); + DataPack dp = new DataPackImpl(); Message message = new MessageImpl(); message.SetMsgId(msgId); @@ -120,7 +130,7 @@ public class ConnectionImpl implements Connection { ByteBuf inData = ByteBufferUtil.getByteBuf(inputStream, -1); //拆包,得到msgid 和 datalen 放在msg中 - byte[] data ; + byte[] data; Message unpack = dp.Unpack(inData); //构建request对象,并调用router的PreHandle,Handle,PostHandle方法 @@ -135,7 +145,7 @@ public class ConnectionImpl implements Connection { // log.info("recv buf : " + str); // writer.write("\n"); // writer.flush(); - ByteBufferUtil.sendMsg(3,"\n".getBytes(),outputStream); + ByteBufferUtil.sendMsg(3, "\n".getBytes(), outputStream); } catch (IOException e) { // throw new RuntimeException(e); @@ -152,13 +162,29 @@ public class ConnectionImpl implements Connection { @Override public void Start() { + + //================== + //按照用户传递进来的创建连接时需要处理的业务,执行钩子方法 + this.TcpServer.CallOnConnStart(this); + //================== + log.info("Start ConnectionImpl .............................................."); StartReader(); + } @Override public void Stop() { log.info("Stop ConnectionImpl .............................................."); + + //================== + //如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用 + this.TcpServer.CallOnConnStop(this); + //================== + + //将链接从连接管理器中删除 + this.TcpServer.GetConnMgr().Remove(this); //删除conn从ConnManager中 + //关闭当前连接 this.close(); } diff --git a/src/main/java/com/zhangmeng/service/impl/ServerImpl.java b/src/main/java/com/zhangmeng/service/impl/ServerImpl.java index 3aefdaa..736b603 100644 --- a/src/main/java/com/zhangmeng/service/impl/ServerImpl.java +++ b/src/main/java/com/zhangmeng/service/impl/ServerImpl.java @@ -9,12 +9,12 @@ import cn.hutool.log.StaticLog; import cn.hutool.socket.aio.AioServer; import cn.hutool.socket.aio.AioSession; import cn.hutool.socket.aio.SimpleIoAction; +import com.zhangmeng.callBack.DoConnectionBegin; +import com.zhangmeng.callBack.DoConnectionLost; import com.zhangmeng.callBack.HandlerApiCallBack; import com.zhangmeng.handler.ClientHandler; -import com.zhangmeng.service.Connection; -import com.zhangmeng.service.MsgHandle; -import com.zhangmeng.service.Router; -import com.zhangmeng.service.Server; +import com.zhangmeng.service.*; +import com.zhangmeng.utils.Globalobj; import java.io.*; import java.net.ServerSocket; @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; + /** * @author zm * @date 2025/2/21 15:04 @@ -48,6 +49,14 @@ public class ServerImpl implements Server { public static Map socketList = new HashMap<>(); + //该Server的连接创建时Hook函数 + public DoConnectionBegin OnConnStart; + //该Server的连接断开时的Hook函数 + public DoConnectionLost OnConnStop; + + //当前Server的链接管理器 + private ConnManager ConnMgr; + @Override public void Start() { log.info("[START] Server listenner at Name: {}, IPVersion {}, is starting", config.getName(), config.getIPVersion()); @@ -64,7 +73,16 @@ public class ServerImpl implements Server { log.info("key=====================================: " + key); //new ClientHandler(accept).start(); log.info("[ACCEPT] Accept a client at IP: {}, Port {}", accept.getInetAddress().getHostAddress(), accept.getPort()); - Connection conn = new ConnectionImpl(accept,cid,this.msgHandle); + + //3.2 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接 + if (this.ConnMgr.Len() >= Globalobj.getInstance().getMaxConn()) { + accept.close(); + log.info("[ACCEPT] The number of connections exceeds the maximum limit, close the new connection"); + continue; + } + + + Connection conn = new ConnectionImpl(this,accept, cid, this.msgHandle); conn.Start(); cid++; } @@ -78,6 +96,8 @@ public class ServerImpl implements Server { @Override public void Stop() { log.info("[STOP] Server listenner at IP: {}, Port {}, is stoped", config.getIP(), config.getPort()); + //将其他需要清理的连接信息或者其他信息 也要一并停止或者清理 + this.ConnMgr.ClearConn(); try { serverSocket.close(); } catch (IOException e) { @@ -85,6 +105,32 @@ public class ServerImpl implements Server { } } + @Override + public void SetOnConnStart(DoConnectionBegin onConnStart) { + this.OnConnStart = onConnStart; + } + + @Override + public void SetOnConnStop(DoConnectionLost onConnStop) { + this.OnConnStop = onConnStop; + } + + @Override + public void CallOnConnStart(Connection connection) { + if (this.OnConnStart != null) { + log.info("[ONCONNSTART] Call OnConnStart"); + this.OnConnStart.run(connection); + } + } + + @Override + public void CallOnConnStop(Connection connection) { + if (this.OnConnStop != null){ + log.info("[ONCONNSTOP] Call OnConnStop"); + this.OnConnStop.run(connection); + } + } + @Override public void Serve() { @@ -96,6 +142,11 @@ public class ServerImpl implements Server { log.info("[ADD ROUTER] Add a router to server"); } + @Override + public ConnManager GetConnMgr() { + return this.ConnMgr; + } + public void setConfig(Config config) { this.config = config; } @@ -161,4 +212,28 @@ public class ServerImpl implements Server { public void setMsgHandle(MsgHandle msgHandle) { this.msgHandle = msgHandle; } + + public DoConnectionBegin getOnConnStart() { + return OnConnStart; + } + + public void setOnConnStart(DoConnectionBegin onConnStart) { + OnConnStart = onConnStart; + } + + public DoConnectionLost getOnConnStop() { + return OnConnStop; + } + + public void setOnConnStop(DoConnectionLost onConnStop) { + OnConnStop = onConnStop; + } + + public ConnManager getConnMgr() { + return ConnMgr; + } + + public void setConnMgr(ConnManager connMgr) { + ConnMgr = connMgr; + } }