2025年2月26日18:05:48

master
qmstyle 2025-02-26 18:05:55 +08:00
parent d839ff96ad
commit d40cbf69d1
8 changed files with 253 additions and 62 deletions

View File

@ -25,4 +25,7 @@ public interface Connection {
//获取远程客户端地址信息 //获取远程客户端地址信息
public InetAddress RemoteAddr(); public InetAddress RemoteAddr();
//直接将Message数据发送数据给远程的TCP客户端
public void SendMsg(int msgId ,byte[] data);
} }

View File

@ -11,4 +11,6 @@ public interface Request {
public byte[] GetData(); //获取请求消息的数据 public byte[] GetData(); //获取请求消息的数据
public int GetMsgID(); //获取请求消息的ID
} }

View File

@ -3,9 +3,8 @@ package com.zhangmeng.service.impl;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.zhangmeng.handler.HandlerApi; import com.zhangmeng.handler.HandlerApi;
import com.zhangmeng.service.Connection; import com.zhangmeng.service.*;
import com.zhangmeng.service.Request; import org.apache.tomcat.util.http.fileupload.IOUtils;
import com.zhangmeng.service.Router;
import java.io.*; import java.io.*;
import java.net.InetAddress; import java.net.InetAddress;
@ -74,6 +73,31 @@ public class ConnectionImpl implements Connection {
} }
} }
//直接将Message数据发送数据给远程的TCP客户端
@Override
public void SendMsg(int msgId, byte[] data) {
if (this.isClosed) {
throw new RuntimeException("Connection closed when send msg");
}
//将data封包并且发送
DataPack dp = new DataPackImpl();
Message message = new MessageImpl();
message.SetMsgId(msgId);
message.SetData(data);
message.SetDataLen(data.length);
byte[] msg = dp.Pack(message);
//写回客户端
try {
this.Conn.getOutputStream().write(msg);
} catch (IOException e) {
// throw new RuntimeException(e);
close();
}
}
//处理conn读数据的Goroutine //处理conn读数据的Goroutine
public void StartReader() { public void StartReader() {
log.info("Reader Goroutine is running"); log.info("Reader Goroutine is running");
@ -81,12 +105,31 @@ public class ConnectionImpl implements Connection {
while (exit) { while (exit) {
//读取我们最大的数据到buf中 //读取我们最大的数据到buf中
try { try {
String str = reader.readLine(); // String str = reader.readLine();
//调用当前链接业务(这里执行的是当前conn的绑定的handle方法) //调用当前链接业务(这里执行的是当前conn的绑定的handle方法)
// this.handleAPI.handle(this.writer, str.getBytes()); // this.handleAPI.handle(this.writer, str.getBytes());
//创建解析包对象
DataPack dp = new DataPackImpl();
byte[] headData = new byte[dp.GetHeadLen()];
IOUtils.readFully(inputStream, headData, 0, dp.GetHeadLen());
//拆包得到msgid 和 datalen 放在msg中
byte[] data ;
Message unpack = dp.Unpack(headData);
if (unpack == null) {
int dataLen = unpack.GetDataLen();
if (dataLen > 0) {
data = new byte[dataLen];
IOUtils.readFully(inputStream, data, 0, dataLen);
//调用当前链接业务(这里执行的是当前conn的绑定的handle方法)
unpack.SetData(data);
}
}
//构建request对象,并调用router的PreHandle,Handle,PostHandle方法 //构建request对象,并调用router的PreHandle,Handle,PostHandle方法
Request request = new RequestImpl(this, str.getBytes()); Request request = new RequestImpl(this, unpack);
this.router.PreHandle(request); this.router.PreHandle(request);
this.router.Handle(request); this.router.Handle(request);

View File

@ -1,10 +1,16 @@
package com.zhangmeng.service.impl; package com.zhangmeng.service.impl;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zhangmeng.service.DataPack; import com.zhangmeng.service.DataPack;
import com.zhangmeng.service.Message; import com.zhangmeng.service.Message;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import com.zhangmeng.utils.ByteBufferUtil;
/** /**
* @author zm * @author zm
@ -13,6 +19,8 @@ import java.nio.ByteBuffer;
*/ */
public class DataPackImpl implements DataPack { public class DataPackImpl implements DataPack {
private final Log log = LogFactory.get();
@Override @Override
public int GetHeadLen() { public int GetHeadLen() {
return 8; return 8;
@ -21,24 +29,102 @@ public class DataPackImpl implements DataPack {
@Override @Override
public byte[] Pack(Message msg) { public byte[] Pack(Message msg) {
ByteBuffer byteBuffer = ByteBuffer.allocate(16); DataMsg dataMsg = new DataMsg();
dataMsg.setSequenceId(110);
dataMsg.setData(msg.GetData());
dataMsg.setDataLen(msg.GetDataLen());
//写dataLen try {
byteBuffer.putInt(msg.GetDataLen()); ByteBuf out = Unpooled.buffer();
//写msgID // 1. 4 字节的魔数
byteBuffer.putInt(msg.GetMsgId()); out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 字节的版本,
//写data out.writeByte(1);
byteBuffer.put(msg.GetData()); // 3. 1 字节的序列化方式 jdk 0 , json 1
out.writeByte(0);
// 4. 1 字节的指令类型
byte[] data = byteBuffer.array(); out.writeByte(0);
// 5. 4 个字节
out.writeInt(dataMsg.getSequenceId());
// 无意义,对齐填充
out.writeByte(0xff);
// 6. 获取内容的字节数组
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(dataMsg);
byte[] bytes = bos.toByteArray();
// 7. 长度
out.writeInt(bytes.length);
// 8. 写入内容
out.writeBytes(bytes);
byte[] data = out.array();
ByteBufferUtil.debugBuf(data);
out.release();
return data; return data;
} catch (Exception e){
e.printStackTrace();
}
return null;
} }
@Override @Override
public Message Unpack(byte[] data) { public Message Unpack(byte[] data) {
return null;
try {
ByteBuf in = Unpooled.buffer(data.length);
in.writeBytes(data);
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
DataMsg dataMsg = (DataMsg) ois.readObject();
log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{}", dataMsg);
in.release();
Message message = new MessageImpl();
message.SetData(dataMsg.getData());
message.SetDataLen(dataMsg.getDataLen());
return message;
} catch (IOException | ClassNotFoundException e) {
throw new RuntimeException(e);
}
}
public static class DataMsg implements Serializable {
private int sequenceId;
private byte[] data;
private int dataLen;
public int getSequenceId() {
return sequenceId;
}
public void setSequenceId(int sequenceId) {
this.sequenceId = sequenceId;
}
public byte[] getData() {
return data;
}
public void setData(byte[] data) {
this.data = data;
}
public int getDataLen() {
return dataLen;
}
public void setDataLen(int dataLen) {
this.dataLen = dataLen;
}
} }
} }

View File

@ -5,6 +5,7 @@ import cn.hutool.log.LogFactory;
import com.zhangmeng.service.Request; import com.zhangmeng.service.Request;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
@ -17,21 +18,21 @@ public class PingRouter extends BaseRouterImpl {
private final Log log = LogFactory.get(); private final Log log = LogFactory.get();
@Override // @Override
public void PreHandle(Request request) { // public void PreHandle(Request request) {
log.info("Call Router PreHandle"); // log.info("Call Router PreHandle");
Socket socket = request.GetConnection().GetTCPConnection(); // Socket socket = request.GetConnection().GetTCPConnection();
//
try { // try {
OutputStream outputStream = socket.getOutputStream(); // OutputStream outputStream = socket.getOutputStream();
outputStream.write("before ping ....".getBytes()); // outputStream.write("before ping ....".getBytes());
outputStream.flush(); // outputStream.flush();
} catch (IOException e) { // } catch (IOException e) {
// throw new RuntimeException(e); //// throw new RuntimeException(e);
log.error("call back ping ping ping error"); // log.error("call back ping ping ping error");
} // }
//
} // }
@Override @Override
public void Handle(Request request) { public void Handle(Request request) {
@ -39,6 +40,8 @@ public class PingRouter extends BaseRouterImpl {
Socket socket = request.GetConnection().GetTCPConnection(); Socket socket = request.GetConnection().GetTCPConnection();
try { try {
OutputStream outputStream = socket.getOutputStream(); OutputStream outputStream = socket.getOutputStream();
InputStream inputStream = socket.getInputStream();
log.info("recv from client : msgId=", request.GetMsgID(), ", data=", new String(request.GetData()));
outputStream.write(" ping ....".getBytes()); outputStream.write(" ping ....".getBytes());
outputStream.flush(); outputStream.flush();
} catch (IOException e) { } catch (IOException e) {
@ -47,17 +50,17 @@ public class PingRouter extends BaseRouterImpl {
} }
} }
@Override // @Override
public void PostHandle(Request request) { // public void PostHandle(Request request) {
log.info("Call Router PostHandle"); // log.info("Call Router PostHandle");
Socket socket = request.GetConnection().GetTCPConnection(); // Socket socket = request.GetConnection().GetTCPConnection();
try { // try {
OutputStream outputStream = socket.getOutputStream(); // OutputStream outputStream = socket.getOutputStream();
outputStream.write("after ping ....".getBytes()); // outputStream.write("after ping ....".getBytes());
outputStream.flush(); // outputStream.flush();
} catch (IOException e) { // } catch (IOException e) {
// throw new RuntimeException(e); //// throw new RuntimeException(e);
log.error("call back ping ping ping error"); // log.error("call back ping ping ping error");
} // }
} // }
} }

View File

@ -1,6 +1,7 @@
package com.zhangmeng.service.impl; package com.zhangmeng.service.impl;
import com.zhangmeng.service.Connection; import com.zhangmeng.service.Connection;
import com.zhangmeng.service.Message;
import com.zhangmeng.service.Request; import com.zhangmeng.service.Request;
/** /**
@ -12,20 +13,40 @@ public class RequestImpl implements Request {
private Connection connection; private Connection connection;
private byte[] data; // private byte[] data;
public RequestImpl(Connection connection, byte[] data) { private Message message;
this.connection = connection;
this.data = data;
}
@Override @Override
public Connection GetConnection() { public Connection GetConnection() {
return this.connection; return this.connection;
} }
public Message getMessage() {
return message;
}
public void setMessage(Message message) {
this.message = message;
}
public RequestImpl(Connection connection, Message message) {
this.connection = connection;
this.message = message;
}
//使用message 代替 data
// public RequestImpl(Connection connection, byte[] data) {
// this.connection = connection;
// this.data = data;
// }
@Override @Override
public byte[] GetData() { public byte[] GetData() {
return this.data; return this.message.GetData();
}
public int GetMsgID() {
return this.message.GetMsgId();
} }
} }

View File

@ -1,6 +1,7 @@
package com.zhangmeng.utils; package com.zhangmeng.utils;
import io.netty.buffer.ByteBuf;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@ -91,6 +92,14 @@ public class ByteBufferUtil {
buffer.limit(oldlimit); buffer.limit(oldlimit);
} }
public static void debugBuf(byte[] bytes){
int capacity = bytes.length;
ByteBuffer byteBuffer = ByteBuffer.allocate(capacity);
byteBuffer.put(bytes);
debugAll(byteBuffer);
}
/** /**
* *
* @param buffer * @param buffer

View File

@ -1,5 +1,13 @@
package com.zhangmeng; package com.zhangmeng;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zhangmeng.service.DataPack;
import com.zhangmeng.service.Message;
import com.zhangmeng.service.impl.DataPackImpl;
import com.zhangmeng.service.impl.MessageImpl;
import org.apache.tomcat.util.http.fileupload.IOUtils;
import java.io.*; import java.io.*;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.Socket; import java.net.Socket;
@ -12,6 +20,8 @@ import java.net.UnknownHostException;
*/ */
public class AppTest3 { public class AppTest3 {
private static final Log log = LogFactory.get();
public static void main(String[] args) { public static void main(String[] args) {
Socket socket = null; Socket socket = null;
BufferedReader in = null; BufferedReader in = null;
@ -20,22 +30,36 @@ public class AppTest3 {
try { try {
//创建 Socket 对象,指定服务器端的 IP 与端口 //创建 Socket 对象,指定服务器端的 IP 与端口
socket = new Socket(InetAddress.getLocalHost(), 9999); socket = new Socket(InetAddress.getLocalHost(), 9999);
OutputStream outputStream = socket.getOutputStream();
InputStream inputStream = socket.getInputStream();
//获取 scoket 的输入输出流接收和发送信息 //获取 scoket 的输入输出流接收和发送信息
in = new BufferedReader(new InputStreamReader(socket.getInputStream())); in = new BufferedReader(new InputStreamReader(inputStream));
out = new BufferedWriter(new out = new BufferedWriter(new OutputStreamWriter(outputStream));
OutputStreamWriter(socket.getOutputStream()));
br = new BufferedReader(new InputStreamReader(System.in)); br = new BufferedReader(new InputStreamReader(System.in));
while (true) { while (true) {
//发送信息 //发送信息
String str = "br.readLine()";
out.write(str + "\n"); DataPack dp = new DataPackImpl();
out.flush(); Message msg = new MessageImpl();
//如果输入的信息为“end”则终止连接 msg.SetMsgId(0);
if (str.equals("end")) { msg.SetData("hello world\n".getBytes());
break; msg.SetDataLen(msg.GetData().length);
byte[] pack = dp.Pack(msg);
outputStream.write(pack);
outputStream.flush();
byte[] headData = new byte[dp.GetHeadLen()];
IOUtils.readFully(inputStream, headData, 0, dp.GetHeadLen());
Message unpack = dp.Unpack(headData);
if (unpack!= null){
if (unpack.GetDataLen()>0) {
byte[] data = new byte[unpack.GetDataLen()];
IOUtils.readFully(inputStream, data, 0, unpack.GetDataLen());
log.info("==> Recv Msg: ID=", unpack.GetMsgId(), ", len=", unpack.GetDataLen(), ", data=", new String(unpack.GetData()));
} }
//否则,接收并输出服务器端信息 }
System.out.println("服务器端说:" + in.readLine());
Thread.sleep(1000); Thread.sleep(1000);
} }
} catch (UnknownHostException e) { } catch (UnknownHostException e) {