From d32086be0b2567f28a92befa4c2406fcecf62105 Mon Sep 17 00:00:00 2001 From: qmstyle Date: Thu, 27 Feb 2025 11:15:17 +0800 Subject: [PATCH] =?UTF-8?q?2025=E5=B9=B42=E6=9C=8827=E6=97=A511:15:07?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/zhangmeng/service/DataPack.java | 4 +- .../service/impl/BaseRouterImpl.java | 4 +- .../service/impl/ConnectionImpl.java | 26 +++----- .../zhangmeng/service/impl/DataPackImpl.java | 56 ++++++++-------- .../zhangmeng/service/impl/PingRouter.java | 6 +- .../com/zhangmeng/utils/ByteBufferUtil.java | 64 ++++++++++++++++++- src/test/java/com/zhangmeng/AppTest3.java | 29 ++------- src/test/java/com/zhangmeng/DataPackTest.java | 35 ++++++++++ 8 files changed, 152 insertions(+), 72 deletions(-) create mode 100644 src/test/java/com/zhangmeng/DataPackTest.java diff --git a/src/main/java/com/zhangmeng/service/DataPack.java b/src/main/java/com/zhangmeng/service/DataPack.java index 6b51be2..7f3b693 100644 --- a/src/main/java/com/zhangmeng/service/DataPack.java +++ b/src/main/java/com/zhangmeng/service/DataPack.java @@ -1,5 +1,7 @@ package com.zhangmeng.service; +import io.netty.buffer.ByteBuf; + /** * 封包数据和拆包数据 * 直接面向TCP连接中的数据流,为传输数据添加头部信息,用于处理TCP粘包问题。 @@ -13,6 +15,6 @@ public interface DataPack { public byte[] Pack(Message msg); //封包方法 - public Message Unpack(byte[] data); //拆包方法 + public Message Unpack(ByteBuf byteBuf); //拆包方法 } diff --git a/src/main/java/com/zhangmeng/service/impl/BaseRouterImpl.java b/src/main/java/com/zhangmeng/service/impl/BaseRouterImpl.java index ae28215..bc7031b 100644 --- a/src/main/java/com/zhangmeng/service/impl/BaseRouterImpl.java +++ b/src/main/java/com/zhangmeng/service/impl/BaseRouterImpl.java @@ -16,7 +16,7 @@ public class BaseRouterImpl implements Router { @Override public void PreHandle(Request request) { - log.info("PreHandle ======================================"); + //log.info("PreHandle ======================================"); } @Override @@ -26,6 +26,6 @@ public class BaseRouterImpl implements Router { @Override public void PostHandle(Request request) { - log.info("PostHandle ======================================"); +// log.info("PostHandle ======================================"); } } diff --git a/src/main/java/com/zhangmeng/service/impl/ConnectionImpl.java b/src/main/java/com/zhangmeng/service/impl/ConnectionImpl.java index d58b2aa..598f9c6 100644 --- a/src/main/java/com/zhangmeng/service/impl/ConnectionImpl.java +++ b/src/main/java/com/zhangmeng/service/impl/ConnectionImpl.java @@ -4,6 +4,9 @@ import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zhangmeng.handler.HandlerApi; import com.zhangmeng.service.*; +import com.zhangmeng.utils.ByteBufferUtil; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import org.apache.tomcat.util.http.fileupload.IOUtils; import java.io.*; @@ -105,28 +108,18 @@ public class ConnectionImpl implements Connection { while (exit) { //读取我们最大的数据到buf中 try { -// String str = reader.readLine(); + //调用当前链接业务(这里执行的是当前conn的绑定的handle方法) // this.handleAPI.handle(this.writer, str.getBytes()); //创建解析包对象 DataPack dp = new DataPackImpl(); - byte[] headData = new byte[dp.GetHeadLen()]; - IOUtils.readFully(inputStream, headData, 0, dp.GetHeadLen()); - + ByteBuf inData = ByteBufferUtil.getByteBuf(inputStream, -1); + //拆包,得到msgid 和 datalen 放在msg中 byte[] data ; - Message unpack = dp.Unpack(headData); - if (unpack == null) { - int dataLen = unpack.GetDataLen(); - if (dataLen > 0) { - data = new byte[dataLen]; - IOUtils.readFully(inputStream, data, 0, dataLen); - //调用当前链接业务(这里执行的是当前conn的绑定的handle方法) - unpack.SetData(data); - } - } + Message unpack = dp.Unpack(inData); //构建request对象,并调用router的PreHandle,Handle,PostHandle方法 Request request = new RequestImpl(this, unpack); @@ -136,8 +129,9 @@ public class ConnectionImpl implements Connection { this.router.PostHandle(request); // log.info("recv buf : " + str); - writer.write("\n"); - writer.flush(); +// writer.write("\n"); +// writer.flush(); + ByteBufferUtil.sendMsg(3,"\n".getBytes(),outputStream); } catch (IOException e) { // throw new RuntimeException(e); diff --git a/src/main/java/com/zhangmeng/service/impl/DataPackImpl.java b/src/main/java/com/zhangmeng/service/impl/DataPackImpl.java index 6167f0d..4de4891 100644 --- a/src/main/java/com/zhangmeng/service/impl/DataPackImpl.java +++ b/src/main/java/com/zhangmeng/service/impl/DataPackImpl.java @@ -10,6 +10,7 @@ import io.netty.buffer.Unpooled; import java.io.*; import java.nio.ByteBuffer; + import com.zhangmeng.utils.ByteBufferUtil; /** @@ -30,7 +31,7 @@ public class DataPackImpl implements DataPack { public byte[] Pack(Message msg) { DataMsg dataMsg = new DataMsg(); - dataMsg.setSequenceId(110); + dataMsg.setSequenceId(0); dataMsg.setData(msg.GetData()); dataMsg.setDataLen(msg.GetDataLen()); @@ -62,7 +63,7 @@ public class DataPackImpl implements DataPack { ByteBufferUtil.debugBuf(data); out.release(); return data; - } catch (Exception e){ + } catch (Exception e) { e.printStackTrace(); } @@ -70,32 +71,35 @@ public class DataPackImpl implements DataPack { } @Override - public Message Unpack(byte[] data) { + public Message Unpack(ByteBuf in) { - try { - ByteBuf in = Unpooled.buffer(data.length); - in.writeBytes(data); - int magicNum = in.readInt(); - byte version = in.readByte(); - byte serializerType = in.readByte(); - byte messageType = in.readByte(); - int sequenceId = in.readInt(); - in.readByte(); - int length = in.readInt(); - byte[] bytes = new byte[length]; - in.readBytes(bytes, 0, length); - ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); - DataMsg dataMsg = (DataMsg) ois.readObject(); - log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length); - log.debug("{}", dataMsg); - in.release(); - Message message = new MessageImpl(); - message.SetData(dataMsg.getData()); - message.SetDataLen(dataMsg.getDataLen()); - return message; - } catch (IOException | ClassNotFoundException e) { - throw new RuntimeException(e); + if (in != null) { + + try { + int magicNum = in.readInt(); + byte version = in.readByte(); + byte serializerType = in.readByte(); + byte messageType = in.readByte(); + int sequenceId = in.readInt(); + in.readByte(); + int length = in.readInt(); + byte[] bytes = new byte[length]; + in.readBytes(bytes, 0, length); + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); + DataMsg dataMsg = (DataMsg) ois.readObject(); + log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length); + log.debug("{}", dataMsg); + in.release(); + Message message = new MessageImpl(); + message.SetData(dataMsg.getData()); + message.SetDataLen(dataMsg.getDataLen()); + return message; + } catch (IOException | ClassNotFoundException e) { +// throw new RuntimeException(e); + log.info(e.getMessage()); + } } + return null; } public static class DataMsg implements Serializable { diff --git a/src/main/java/com/zhangmeng/service/impl/PingRouter.java b/src/main/java/com/zhangmeng/service/impl/PingRouter.java index 79e78e8..2636c66 100644 --- a/src/main/java/com/zhangmeng/service/impl/PingRouter.java +++ b/src/main/java/com/zhangmeng/service/impl/PingRouter.java @@ -3,6 +3,7 @@ package com.zhangmeng.service.impl; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zhangmeng.service.Request; +import com.zhangmeng.utils.ByteBufferUtil; import java.io.IOException; import java.io.InputStream; @@ -41,9 +42,8 @@ public class PingRouter extends BaseRouterImpl { try { OutputStream outputStream = socket.getOutputStream(); InputStream inputStream = socket.getInputStream(); - log.info("recv from client : msgId=", request.GetMsgID(), ", data=", new String(request.GetData())); - outputStream.write(" ping ....".getBytes()); - outputStream.flush(); + log.info("recv from client : msgId={}, data={}", request.GetMsgID(), new String(request.GetData())); + ByteBufferUtil.sendMsg(1, "ping ping ping\n".getBytes(), outputStream); } catch (IOException e) { // throw new RuntimeException(e); log.error("call back ping ping ping error"); diff --git a/src/main/java/com/zhangmeng/utils/ByteBufferUtil.java b/src/main/java/com/zhangmeng/utils/ByteBufferUtil.java index ccc2997..a86c1aa 100644 --- a/src/main/java/com/zhangmeng/utils/ByteBufferUtil.java +++ b/src/main/java/com/zhangmeng/utils/ByteBufferUtil.java @@ -1,9 +1,20 @@ package com.zhangmeng.utils; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zhangmeng.service.DataPack; +import com.zhangmeng.service.Message; +import com.zhangmeng.service.impl.DataPackImpl; +import com.zhangmeng.service.impl.MessageImpl; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.util.internal.StringUtil; +import org.apache.tomcat.util.http.fileupload.IOUtils; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.nio.ByteBuffer; import static io.netty.util.internal.MathUtil.isOutOfBounds; @@ -16,6 +27,9 @@ import static io.netty.util.internal.StringUtil.NEWLINE; * @version: 1.0 */ public class ByteBufferUtil { + + public static final int DEFAULT_CAPACITY = 1024; + private static final char[] BYTE2CHAR = new char[256]; private static final char[] HEXDUMP_TABLE = new char[256 * 4]; private static final String[] HEXPADDING = new String[16]; @@ -79,6 +93,7 @@ public class ByteBufferUtil { /** * 打印所有内容 + * * @param buffer */ public static void debugAll(ByteBuffer buffer) { @@ -93,7 +108,7 @@ public class ByteBufferUtil { } - public static void debugBuf(byte[] bytes){ + public static void debugBuf(byte[] bytes) { int capacity = bytes.length; ByteBuffer byteBuffer = ByteBuffer.allocate(capacity); byteBuffer.put(bytes); @@ -102,6 +117,7 @@ public class ByteBufferUtil { /** * 打印可读取内容 + * * @param buffer */ public static void debugRead(ByteBuffer buffer) { @@ -190,5 +206,51 @@ public class ByteBufferUtil { public static short getUnsignedByte(ByteBuffer buffer, int index) { return (short) (buffer.get(index) & 0xFF); } + + private static final Log log = LogFactory.get(); + + public static ByteBuf getByteBuf(InputStream inputStream, int bufSize) { + + + if (bufSize <= 0) { + bufSize = DEFAULT_CAPACITY; + } + + try { + ByteBuf inData = Unpooled.buffer(); + byte[] buffer = new byte[bufSize]; + int len = inputStream.read(buffer); + inData.writeBytes(buffer, 0, len); + return inData; + } catch (IOException e) { + //throw new RuntimeException(e); + log.error(e.getMessage()); + } + return null; + } + + public static void sendMsg(int msgId, byte[] data, OutputStream outputStream) throws IOException { + DataPack dp = new DataPackImpl(); + Message msg = new MessageImpl(); + msg.SetMsgId(msgId); + msg.SetData(data); + msg.SetDataLen(msg.GetData().length); + byte[] pack = dp.Pack(msg); + outputStream.write(pack); + outputStream.flush(); + } + + public static void recvMsg(InputStream inputStream, int bufSize) { + + if (bufSize <= 0) { + bufSize = DEFAULT_CAPACITY; + } + + DataPack dp = new DataPackImpl(); + //接收信息 + ByteBuf headBuf = ByteBufferUtil.getByteBuf(inputStream, bufSize); + Message unpack = dp.Unpack(headBuf); + log.info("==> Recv Msg: ID={}, len={},data={}", unpack.GetMsgId(), unpack.GetDataLen(), new String(unpack.GetData())); + } } diff --git a/src/test/java/com/zhangmeng/AppTest3.java b/src/test/java/com/zhangmeng/AppTest3.java index c6257ef..3e2320c 100644 --- a/src/test/java/com/zhangmeng/AppTest3.java +++ b/src/test/java/com/zhangmeng/AppTest3.java @@ -6,6 +6,9 @@ import com.zhangmeng.service.DataPack; import com.zhangmeng.service.Message; import com.zhangmeng.service.impl.DataPackImpl; import com.zhangmeng.service.impl.MessageImpl; +import com.zhangmeng.utils.ByteBufferUtil; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import org.apache.tomcat.util.http.fileupload.IOUtils; import java.io.*; @@ -38,32 +41,12 @@ public class AppTest3 { br = new BufferedReader(new InputStreamReader(System.in)); while (true) { //发送信息 - - DataPack dp = new DataPackImpl(); - Message msg = new MessageImpl(); - msg.SetMsgId(0); - msg.SetData("hello world\n".getBytes()); - msg.SetDataLen(msg.GetData().length); - byte[] pack = dp.Pack(msg); - outputStream.write(pack); - outputStream.flush(); - - byte[] headData = new byte[dp.GetHeadLen()]; - IOUtils.readFully(inputStream, headData, 0, dp.GetHeadLen()); - - Message unpack = dp.Unpack(headData); - if (unpack!= null){ - if (unpack.GetDataLen()>0) { - byte[] data = new byte[unpack.GetDataLen()]; - IOUtils.readFully(inputStream, data, 0, unpack.GetDataLen()); - log.info("==> Recv Msg: ID=", unpack.GetMsgId(), ", len=", unpack.GetDataLen(), ", data=", new String(unpack.GetData())); - } - } + ByteBufferUtil.sendMsg(0, "Hello, I'm Client!".getBytes(), outputStream); + //接收信息 + ByteBufferUtil.recvMsg(inputStream, -1); Thread.sleep(1000); } - } catch (UnknownHostException e) { - e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { diff --git a/src/test/java/com/zhangmeng/DataPackTest.java b/src/test/java/com/zhangmeng/DataPackTest.java new file mode 100644 index 0000000..f2298cb --- /dev/null +++ b/src/test/java/com/zhangmeng/DataPackTest.java @@ -0,0 +1,35 @@ +package com.zhangmeng; + +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zhangmeng.service.DataPack; +import com.zhangmeng.service.Message; +import com.zhangmeng.service.impl.DataPackImpl; +import com.zhangmeng.service.impl.MessageImpl; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + +/** + * @author zm + * @date 2025/2/27 10:03 + * @version: 1.0 + */ +public class DataPackTest { + private static final Log log = LogFactory.get(); + public static void main(String[] args) { + DataPack dp = new DataPackImpl(); + Message msg = new MessageImpl(); + msg.SetMsgId(0); + msg.SetData("hello world\n".getBytes()); + msg.SetDataLen(msg.GetData().length); + byte[] pack = dp.Pack(msg); + System.out.println("pack.length:=================" + pack.length); + + // 解包 + ByteBuf byteBuf = Unpooled.wrappedBuffer(pack); + Message unpack = dp.Unpack(byteBuf); + log.info("==> Recv Msg: ID={}, len={}, data={}",unpack.GetMsgId(), unpack.GetDataLen(), new String(unpack.GetData())); + + } + +}