2025年2月27日11:15:07

master
qmstyle 2025-02-27 11:15:17 +08:00
parent d40cbf69d1
commit d32086be0b
8 changed files with 152 additions and 72 deletions

View File

@ -1,5 +1,7 @@
package com.zhangmeng.service; package com.zhangmeng.service;
import io.netty.buffer.ByteBuf;
/** /**
* *
* TCP,TCP * TCP,TCP
@ -13,6 +15,6 @@ public interface DataPack {
public byte[] Pack(Message msg); //封包方法 public byte[] Pack(Message msg); //封包方法
public Message Unpack(byte[] data); //拆包方法 public Message Unpack(ByteBuf byteBuf); //拆包方法
} }

View File

@ -16,7 +16,7 @@ public class BaseRouterImpl implements Router {
@Override @Override
public void PreHandle(Request request) { public void PreHandle(Request request) {
log.info("PreHandle ======================================"); //log.info("PreHandle ======================================");
} }
@Override @Override
@ -26,6 +26,6 @@ public class BaseRouterImpl implements Router {
@Override @Override
public void PostHandle(Request request) { public void PostHandle(Request request) {
log.info("PostHandle ======================================"); // log.info("PostHandle ======================================");
} }
} }

View File

@ -4,6 +4,9 @@ import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.zhangmeng.handler.HandlerApi; import com.zhangmeng.handler.HandlerApi;
import com.zhangmeng.service.*; import com.zhangmeng.service.*;
import com.zhangmeng.utils.ByteBufferUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.tomcat.util.http.fileupload.IOUtils; import org.apache.tomcat.util.http.fileupload.IOUtils;
import java.io.*; import java.io.*;
@ -105,28 +108,18 @@ public class ConnectionImpl implements Connection {
while (exit) { while (exit) {
//读取我们最大的数据到buf中 //读取我们最大的数据到buf中
try { try {
// String str = reader.readLine();
//调用当前链接业务(这里执行的是当前conn的绑定的handle方法) //调用当前链接业务(这里执行的是当前conn的绑定的handle方法)
// this.handleAPI.handle(this.writer, str.getBytes()); // this.handleAPI.handle(this.writer, str.getBytes());
//创建解析包对象 //创建解析包对象
DataPack dp = new DataPackImpl(); DataPack dp = new DataPackImpl();
byte[] headData = new byte[dp.GetHeadLen()];
IOUtils.readFully(inputStream, headData, 0, dp.GetHeadLen()); ByteBuf inData = ByteBufferUtil.getByteBuf(inputStream, -1);
//拆包得到msgid 和 datalen 放在msg中 //拆包得到msgid 和 datalen 放在msg中
byte[] data ; byte[] data ;
Message unpack = dp.Unpack(headData); Message unpack = dp.Unpack(inData);
if (unpack == null) {
int dataLen = unpack.GetDataLen();
if (dataLen > 0) {
data = new byte[dataLen];
IOUtils.readFully(inputStream, data, 0, dataLen);
//调用当前链接业务(这里执行的是当前conn的绑定的handle方法)
unpack.SetData(data);
}
}
//构建request对象,并调用router的PreHandle,Handle,PostHandle方法 //构建request对象,并调用router的PreHandle,Handle,PostHandle方法
Request request = new RequestImpl(this, unpack); Request request = new RequestImpl(this, unpack);
@ -136,8 +129,9 @@ public class ConnectionImpl implements Connection {
this.router.PostHandle(request); this.router.PostHandle(request);
// log.info("recv buf : " + str); // log.info("recv buf : " + str);
writer.write("\n"); // writer.write("\n");
writer.flush(); // writer.flush();
ByteBufferUtil.sendMsg(3,"\n".getBytes(),outputStream);
} catch (IOException e) { } catch (IOException e) {
// throw new RuntimeException(e); // throw new RuntimeException(e);

View File

@ -10,6 +10,7 @@ import io.netty.buffer.Unpooled;
import java.io.*; import java.io.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import com.zhangmeng.utils.ByteBufferUtil; import com.zhangmeng.utils.ByteBufferUtil;
/** /**
@ -30,7 +31,7 @@ public class DataPackImpl implements DataPack {
public byte[] Pack(Message msg) { public byte[] Pack(Message msg) {
DataMsg dataMsg = new DataMsg(); DataMsg dataMsg = new DataMsg();
dataMsg.setSequenceId(110); dataMsg.setSequenceId(0);
dataMsg.setData(msg.GetData()); dataMsg.setData(msg.GetData());
dataMsg.setDataLen(msg.GetDataLen()); dataMsg.setDataLen(msg.GetDataLen());
@ -62,7 +63,7 @@ public class DataPackImpl implements DataPack {
ByteBufferUtil.debugBuf(data); ByteBufferUtil.debugBuf(data);
out.release(); out.release();
return data; return data;
} catch (Exception e){ } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
} }
@ -70,32 +71,35 @@ public class DataPackImpl implements DataPack {
} }
@Override @Override
public Message Unpack(byte[] data) { public Message Unpack(ByteBuf in) {
try { if (in != null) {
ByteBuf in = Unpooled.buffer(data.length);
in.writeBytes(data); try {
int magicNum = in.readInt(); int magicNum = in.readInt();
byte version = in.readByte(); byte version = in.readByte();
byte serializerType = in.readByte(); byte serializerType = in.readByte();
byte messageType = in.readByte(); byte messageType = in.readByte();
int sequenceId = in.readInt(); int sequenceId = in.readInt();
in.readByte(); in.readByte();
int length = in.readInt(); int length = in.readInt();
byte[] bytes = new byte[length]; byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length); in.readBytes(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
DataMsg dataMsg = (DataMsg) ois.readObject(); DataMsg dataMsg = (DataMsg) ois.readObject();
log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length); log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{}", dataMsg); log.debug("{}", dataMsg);
in.release(); in.release();
Message message = new MessageImpl(); Message message = new MessageImpl();
message.SetData(dataMsg.getData()); message.SetData(dataMsg.getData());
message.SetDataLen(dataMsg.getDataLen()); message.SetDataLen(dataMsg.getDataLen());
return message; return message;
} catch (IOException | ClassNotFoundException e) { } catch (IOException | ClassNotFoundException e) {
throw new RuntimeException(e); // throw new RuntimeException(e);
log.info(e.getMessage());
}
} }
return null;
} }
public static class DataMsg implements Serializable { public static class DataMsg implements Serializable {

View File

@ -3,6 +3,7 @@ package com.zhangmeng.service.impl;
import cn.hutool.log.Log; import cn.hutool.log.Log;
import cn.hutool.log.LogFactory; import cn.hutool.log.LogFactory;
import com.zhangmeng.service.Request; import com.zhangmeng.service.Request;
import com.zhangmeng.utils.ByteBufferUtil;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
@ -41,9 +42,8 @@ public class PingRouter extends BaseRouterImpl {
try { try {
OutputStream outputStream = socket.getOutputStream(); OutputStream outputStream = socket.getOutputStream();
InputStream inputStream = socket.getInputStream(); InputStream inputStream = socket.getInputStream();
log.info("recv from client : msgId=", request.GetMsgID(), ", data=", new String(request.GetData())); log.info("recv from client : msgId={}, data={}", request.GetMsgID(), new String(request.GetData()));
outputStream.write(" ping ....".getBytes()); ByteBufferUtil.sendMsg(1, "ping ping ping\n".getBytes(), outputStream);
outputStream.flush();
} catch (IOException e) { } catch (IOException e) {
// throw new RuntimeException(e); // throw new RuntimeException(e);
log.error("call back ping ping ping error"); log.error("call back ping ping ping error");

View File

@ -1,9 +1,20 @@
package com.zhangmeng.utils; package com.zhangmeng.utils;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zhangmeng.service.DataPack;
import com.zhangmeng.service.Message;
import com.zhangmeng.service.impl.DataPackImpl;
import com.zhangmeng.service.impl.MessageImpl;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.util.internal.StringUtil; import io.netty.util.internal.StringUtil;
import org.apache.tomcat.util.http.fileupload.IOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import static io.netty.util.internal.MathUtil.isOutOfBounds; import static io.netty.util.internal.MathUtil.isOutOfBounds;
@ -16,6 +27,9 @@ import static io.netty.util.internal.StringUtil.NEWLINE;
* @version: 1.0 * @version: 1.0
*/ */
public class ByteBufferUtil { public class ByteBufferUtil {
public static final int DEFAULT_CAPACITY = 1024;
private static final char[] BYTE2CHAR = new char[256]; private static final char[] BYTE2CHAR = new char[256];
private static final char[] HEXDUMP_TABLE = new char[256 * 4]; private static final char[] HEXDUMP_TABLE = new char[256 * 4];
private static final String[] HEXPADDING = new String[16]; private static final String[] HEXPADDING = new String[16];
@ -79,6 +93,7 @@ public class ByteBufferUtil {
/** /**
* *
*
* @param buffer * @param buffer
*/ */
public static void debugAll(ByteBuffer buffer) { public static void debugAll(ByteBuffer buffer) {
@ -93,7 +108,7 @@ public class ByteBufferUtil {
} }
public static void debugBuf(byte[] bytes){ public static void debugBuf(byte[] bytes) {
int capacity = bytes.length; int capacity = bytes.length;
ByteBuffer byteBuffer = ByteBuffer.allocate(capacity); ByteBuffer byteBuffer = ByteBuffer.allocate(capacity);
byteBuffer.put(bytes); byteBuffer.put(bytes);
@ -102,6 +117,7 @@ public class ByteBufferUtil {
/** /**
* *
*
* @param buffer * @param buffer
*/ */
public static void debugRead(ByteBuffer buffer) { public static void debugRead(ByteBuffer buffer) {
@ -190,5 +206,51 @@ public class ByteBufferUtil {
public static short getUnsignedByte(ByteBuffer buffer, int index) { public static short getUnsignedByte(ByteBuffer buffer, int index) {
return (short) (buffer.get(index) & 0xFF); return (short) (buffer.get(index) & 0xFF);
} }
private static final Log log = LogFactory.get();
public static ByteBuf getByteBuf(InputStream inputStream, int bufSize) {
if (bufSize <= 0) {
bufSize = DEFAULT_CAPACITY;
}
try {
ByteBuf inData = Unpooled.buffer();
byte[] buffer = new byte[bufSize];
int len = inputStream.read(buffer);
inData.writeBytes(buffer, 0, len);
return inData;
} catch (IOException e) {
//throw new RuntimeException(e);
log.error(e.getMessage());
}
return null;
}
public static void sendMsg(int msgId, byte[] data, OutputStream outputStream) throws IOException {
DataPack dp = new DataPackImpl();
Message msg = new MessageImpl();
msg.SetMsgId(msgId);
msg.SetData(data);
msg.SetDataLen(msg.GetData().length);
byte[] pack = dp.Pack(msg);
outputStream.write(pack);
outputStream.flush();
}
public static void recvMsg(InputStream inputStream, int bufSize) {
if (bufSize <= 0) {
bufSize = DEFAULT_CAPACITY;
}
DataPack dp = new DataPackImpl();
//接收信息
ByteBuf headBuf = ByteBufferUtil.getByteBuf(inputStream, bufSize);
Message unpack = dp.Unpack(headBuf);
log.info("==> Recv Msg: ID={}, len={},data={}", unpack.GetMsgId(), unpack.GetDataLen(), new String(unpack.GetData()));
}
} }

View File

@ -6,6 +6,9 @@ import com.zhangmeng.service.DataPack;
import com.zhangmeng.service.Message; import com.zhangmeng.service.Message;
import com.zhangmeng.service.impl.DataPackImpl; import com.zhangmeng.service.impl.DataPackImpl;
import com.zhangmeng.service.impl.MessageImpl; import com.zhangmeng.service.impl.MessageImpl;
import com.zhangmeng.utils.ByteBufferUtil;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.tomcat.util.http.fileupload.IOUtils; import org.apache.tomcat.util.http.fileupload.IOUtils;
import java.io.*; import java.io.*;
@ -38,32 +41,12 @@ public class AppTest3 {
br = new BufferedReader(new InputStreamReader(System.in)); br = new BufferedReader(new InputStreamReader(System.in));
while (true) { while (true) {
//发送信息 //发送信息
ByteBufferUtil.sendMsg(0, "Hello, I'm Client!".getBytes(), outputStream);
DataPack dp = new DataPackImpl(); //接收信息
Message msg = new MessageImpl(); ByteBufferUtil.recvMsg(inputStream, -1);
msg.SetMsgId(0);
msg.SetData("hello world\n".getBytes());
msg.SetDataLen(msg.GetData().length);
byte[] pack = dp.Pack(msg);
outputStream.write(pack);
outputStream.flush();
byte[] headData = new byte[dp.GetHeadLen()];
IOUtils.readFully(inputStream, headData, 0, dp.GetHeadLen());
Message unpack = dp.Unpack(headData);
if (unpack!= null){
if (unpack.GetDataLen()>0) {
byte[] data = new byte[unpack.GetDataLen()];
IOUtils.readFully(inputStream, data, 0, unpack.GetDataLen());
log.info("==> Recv Msg: ID=", unpack.GetMsgId(), ", len=", unpack.GetDataLen(), ", data=", new String(unpack.GetData()));
}
}
Thread.sleep(1000); Thread.sleep(1000);
} }
} catch (UnknownHostException e) {
e.printStackTrace();
} catch (IOException e) { } catch (IOException e) {
e.printStackTrace(); e.printStackTrace();
} catch (InterruptedException e) { } catch (InterruptedException e) {

View File

@ -0,0 +1,35 @@
package com.zhangmeng;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.zhangmeng.service.DataPack;
import com.zhangmeng.service.Message;
import com.zhangmeng.service.impl.DataPackImpl;
import com.zhangmeng.service.impl.MessageImpl;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
/**
* @author zm
* @date 2025/2/27 10:03
* @version: 1.0
*/
public class DataPackTest {
private static final Log log = LogFactory.get();
public static void main(String[] args) {
DataPack dp = new DataPackImpl();
Message msg = new MessageImpl();
msg.SetMsgId(0);
msg.SetData("hello world\n".getBytes());
msg.SetDataLen(msg.GetData().length);
byte[] pack = dp.Pack(msg);
System.out.println("pack.length:=================" + pack.length);
// 解包
ByteBuf byteBuf = Unpooled.wrappedBuffer(pack);
Message unpack = dp.Unpack(byteBuf);
log.info("==> Recv Msg: ID={}, len={}, data={}",unpack.GetMsgId(), unpack.GetDataLen(), new String(unpack.GetData()));
}
}