2025年2月27日15:46:25 增加连接管理 / 开始结束回调函数

master
qmstyle 2025-02-27 15:46:49 +08:00
parent 3b19085717
commit 71258c81c0
8 changed files with 280 additions and 14 deletions

View File

@ -1,10 +1,9 @@
package com.zhangmeng;
import com.zhangmeng.callBack.DoConnectionBegin;
import com.zhangmeng.callBack.DoConnectionLost;
import com.zhangmeng.service.MsgHandle;
import com.zhangmeng.service.impl.HelloRouter;
import com.zhangmeng.service.impl.MsgHandleImpl;
import com.zhangmeng.service.impl.PingRouter;
import com.zhangmeng.service.impl.ServerImpl;
import com.zhangmeng.service.impl.*;
import com.zhangmeng.utils.Globalobj;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@ -37,6 +36,9 @@ public class TcpApplication {
MsgHandle msgHandle =new MsgHandleImpl();
server.setMsgHandle(msgHandle);
server.setOnConnStart(new DoConnectionBegin());
server.setOnConnStop(new DoConnectionLost());
server.setConnMgr(new ConnManagerImpl());
server.AddRouter(0,new PingRouter());
server.AddRouter(1,new HelloRouter());
server.Start();

View File

@ -0,0 +1,26 @@
package com.zhangmeng.callBack;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zhangmeng.service.Connection;
import com.zhangmeng.utils.ByteBufferUtil;
import java.io.IOException;
/**
* @author zm
* @date 2025/2/27 15:19
* @version: 1.0
*/
public class DoConnectionBegin {
private final Log log = LogFactory.get();
public void run(Connection connection){
log.info("DoConnecionBegin is Called ... ");
try {
ByteBufferUtil.sendMsg(2, "DoConnection BEGIN...".getBytes(), connection.GetTCPConnection().getOutputStream());
} catch (IOException e) {
log.error(e.getMessage());
}
}
}

View File

@ -0,0 +1,19 @@
package com.zhangmeng.callBack;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zhangmeng.service.Connection;
/**
* @author zm
* @date 2025/2/27 15:20
* @version: 1.0
*/
public class DoConnectionLost {
private final Log log = LogFactory.get();
public void run(Connection connection){
log.info("DoConneciotnLost is Called ... ");
}
}

View File

@ -0,0 +1,21 @@
package com.zhangmeng.service;
/**
* @author zm
* @date 2025/2/27 14:49
* @version: 1.0
*/
public interface ConnManager {
public void Add(Connection conn); //添加链接
public void Remove(Connection conn); //删除连接
public Connection Get(int connID); //利用ConnID获取链接
public int Len(); //获取当前连接
public void ClearConn(); //删除并停止所有链接
}

View File

@ -1,5 +1,8 @@
package com.zhangmeng.service;
import com.zhangmeng.callBack.DoConnectionBegin;
import com.zhangmeng.callBack.DoConnectionLost;
/**
* @author zm
* @Description:
@ -18,5 +21,20 @@ public interface Server {
public void Serve();
//路由功能:给当前服务注册一个路由业务方法,供客户端链接处理使用
public void AddRouter( int msgId,Router router);
public void AddRouter(int msgId, Router router);
//设置该Server的连接创建时Hook函数
public void SetOnConnStart(DoConnectionBegin onConnStart);
//设置该Server的连接断开时的Hook函数
public void SetOnConnStop(DoConnectionLost onConnStop);
//调用连接OnConnStart Hook函数
public void CallOnConnStart(Connection connection);
//调用连接OnConnStop Hook函数
public void CallOnConnStop(Connection connection);
public ConnManager GetConnMgr();//得到链接管理
}

View File

@ -0,0 +1,79 @@
package com.zhangmeng.service.impl;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zhangmeng.service.ConnManager;
import com.zhangmeng.service.Connection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* @author zm
* @date 2025/2/27 14:49
* @version: 1.0
*/
public class ConnManagerImpl implements ConnManager {
private final Log log = LogFactory.get();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private Map<Integer, Connection> connections = new HashMap<>(); //管理的连接信息
@Override
public void Add(Connection conn) {
//保护共享资源Map 加写锁
rwl.writeLock().lock(); // 获取写锁
try {
log.info("connection add to ConnManager successfully: conn num = {}", connections.size());
//将conn连接添加到ConnMananger中
this.connections.put(conn.GetConnID(), conn);
} finally {
rwl.writeLock().unlock(); // 释放写锁
}
}
@Override
public void Remove(Connection conn) {
//保护共享资源Map 加写锁
rwl.writeLock().lock(); // 获取写锁
try {
//删除连接信息
this.connections.remove(conn.GetConnID());
log.info("connection Remove ConnID=",conn.GetConnID(), " successfully: conn num = ", connections.size());
} finally {
rwl.writeLock().unlock(); // 释放写锁
}
}
@Override
public Connection Get(int connID) {
//保护共享资源Map 加读锁
rwl.readLock().lock(); // 获取读锁
try {
//获取指定连接信息
return this.connections.get(connID);
} finally {
rwl.readLock().unlock(); // 释放读锁
}
}
@Override
public int Len() {
return this.connections.size();
}
@Override
public void ClearConn() {
//保护共享资源Map 加写锁
rwl.writeLock().lock(); // 获取写锁
try {
//清空连接信息
this.connections.clear();
} finally {
rwl.writeLock().unlock(); // 释放写锁
}
}
}

View File

@ -44,9 +44,12 @@ public class ConnectionImpl implements Connection {
private OutputStream outputStream;
private InputStream inputStream;
//当前Conn属于哪个Server
private Server TcpServer; //当前conn属于哪个server在conn初始化的时候添加即可
//创建连接的方法
public ConnectionImpl(Socket conn, int connID, MsgHandle msgHandler) {
public ConnectionImpl(Server tcpServer,Socket conn, int connID, MsgHandle msgHandler) {
this.Conn = conn;
this.ConnID = connID;
this.isClosed = false;
@ -64,6 +67,9 @@ public class ConnectionImpl implements Connection {
} catch (IOException e) {
throw new RuntimeException(e);
}
this.TcpServer = tcpServer;
//将新创建的Conn添加到链接管理中
this.TcpServer.GetConnMgr().Add(this); //将当前新创建的连接添加到ConnManager中
}
public void close() {
@ -72,6 +78,10 @@ public class ConnectionImpl implements Connection {
writer.close();
outputStream.close();
inputStream.close();
//将链接从连接管理器中删除
this.TcpServer.GetConnMgr().Remove(this); //删除conn从ConnManager中
Conn.close();
} catch (IOException e) {
throw new RuntimeException(e);
@ -85,7 +95,7 @@ public class ConnectionImpl implements Connection {
throw new RuntimeException("Connection closed when send msg");
}
//将data封包并且发送
DataPack dp = new DataPackImpl();
DataPack dp = new DataPackImpl();
Message message = new MessageImpl();
message.SetMsgId(msgId);
@ -120,7 +130,7 @@ public class ConnectionImpl implements Connection {
ByteBuf inData = ByteBufferUtil.getByteBuf(inputStream, -1);
//拆包得到msgid 和 datalen 放在msg中
byte[] data ;
byte[] data;
Message unpack = dp.Unpack(inData);
//构建request对象,并调用router的PreHandle,Handle,PostHandle方法
@ -135,7 +145,7 @@ public class ConnectionImpl implements Connection {
// log.info("recv buf : " + str);
// writer.write("\n");
// writer.flush();
ByteBufferUtil.sendMsg(3,"\n".getBytes(),outputStream);
ByteBufferUtil.sendMsg(3, "\n".getBytes(), outputStream);
} catch (IOException e) {
// throw new RuntimeException(e);
@ -152,13 +162,29 @@ public class ConnectionImpl implements Connection {
@Override
public void Start() {
//==================
//按照用户传递进来的创建连接时需要处理的业务,执行钩子方法
this.TcpServer.CallOnConnStart(this);
//==================
log.info("Start ConnectionImpl ..............................................");
StartReader();
}
@Override
public void Stop() {
log.info("Stop ConnectionImpl ..............................................");
//==================
//如果用户注册了该链接的关闭回调业务,那么在此刻应该显示调用
this.TcpServer.CallOnConnStop(this);
//==================
//将链接从连接管理器中删除
this.TcpServer.GetConnMgr().Remove(this); //删除conn从ConnManager中
//关闭当前连接
this.close();
}

View File

@ -9,12 +9,12 @@ import cn.hutool.log.StaticLog;
import cn.hutool.socket.aio.AioServer;
import cn.hutool.socket.aio.AioSession;
import cn.hutool.socket.aio.SimpleIoAction;
import com.zhangmeng.callBack.DoConnectionBegin;
import com.zhangmeng.callBack.DoConnectionLost;
import com.zhangmeng.callBack.HandlerApiCallBack;
import com.zhangmeng.handler.ClientHandler;
import com.zhangmeng.service.Connection;
import com.zhangmeng.service.MsgHandle;
import com.zhangmeng.service.Router;
import com.zhangmeng.service.Server;
import com.zhangmeng.service.*;
import com.zhangmeng.utils.Globalobj;
import java.io.*;
import java.net.ServerSocket;
@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
/**
* @author zm
* @date 2025/2/21 15:04
@ -48,6 +49,14 @@ public class ServerImpl implements Server {
public static Map<String, Socket> socketList = new HashMap<>();
//该Server的连接创建时Hook函数
public DoConnectionBegin OnConnStart;
//该Server的连接断开时的Hook函数
public DoConnectionLost OnConnStop;
//当前Server的链接管理器
private ConnManager ConnMgr;
@Override
public void Start() {
log.info("[START] Server listenner at Name: {}, IPVersion {}, is starting", config.getName(), config.getIPVersion());
@ -64,7 +73,16 @@ public class ServerImpl implements Server {
log.info("key=====================================: " + key);
//new ClientHandler(accept).start();
log.info("[ACCEPT] Accept a client at IP: {}, Port {}", accept.getInetAddress().getHostAddress(), accept.getPort());
Connection conn = new ConnectionImpl(accept,cid,this.msgHandle);
//3.2 设置服务器最大连接控制,如果超过最大连接,那么则关闭此新的连接
if (this.ConnMgr.Len() >= Globalobj.getInstance().getMaxConn()) {
accept.close();
log.info("[ACCEPT] The number of connections exceeds the maximum limit, close the new connection");
continue;
}
Connection conn = new ConnectionImpl(this,accept, cid, this.msgHandle);
conn.Start();
cid++;
}
@ -78,6 +96,8 @@ public class ServerImpl implements Server {
@Override
public void Stop() {
log.info("[STOP] Server listenner at IP: {}, Port {}, is stoped", config.getIP(), config.getPort());
//将其他需要清理的连接信息或者其他信息 也要一并停止或者清理
this.ConnMgr.ClearConn();
try {
serverSocket.close();
} catch (IOException e) {
@ -85,6 +105,32 @@ public class ServerImpl implements Server {
}
}
@Override
public void SetOnConnStart(DoConnectionBegin onConnStart) {
this.OnConnStart = onConnStart;
}
@Override
public void SetOnConnStop(DoConnectionLost onConnStop) {
this.OnConnStop = onConnStop;
}
@Override
public void CallOnConnStart(Connection connection) {
if (this.OnConnStart != null) {
log.info("[ONCONNSTART] Call OnConnStart");
this.OnConnStart.run(connection);
}
}
@Override
public void CallOnConnStop(Connection connection) {
if (this.OnConnStop != null){
log.info("[ONCONNSTOP] Call OnConnStop");
this.OnConnStop.run(connection);
}
}
@Override
public void Serve() {
@ -96,6 +142,11 @@ public class ServerImpl implements Server {
log.info("[ADD ROUTER] Add a router to server");
}
@Override
public ConnManager GetConnMgr() {
return this.ConnMgr;
}
public void setConfig(Config config) {
this.config = config;
}
@ -161,4 +212,28 @@ public class ServerImpl implements Server {
public void setMsgHandle(MsgHandle msgHandle) {
this.msgHandle = msgHandle;
}
public DoConnectionBegin getOnConnStart() {
return OnConnStart;
}
public void setOnConnStart(DoConnectionBegin onConnStart) {
OnConnStart = onConnStart;
}
public DoConnectionLost getOnConnStop() {
return OnConnStop;
}
public void setOnConnStop(DoConnectionLost onConnStop) {
OnConnStop = onConnStop;
}
public ConnManager getConnMgr() {
return ConnMgr;
}
public void setConnMgr(ConnManager connMgr) {
ConnMgr = connMgr;
}
}