diff --git a/src/main/java/com/zhangmeng/service/Connection.java b/src/main/java/com/zhangmeng/service/Connection.java index 7071982..b1bcc6b 100644 --- a/src/main/java/com/zhangmeng/service/Connection.java +++ b/src/main/java/com/zhangmeng/service/Connection.java @@ -25,4 +25,7 @@ public interface Connection { //获取远程客户端地址信息 public InetAddress RemoteAddr(); + + //直接将Message数据发送数据给远程的TCP客户端 + public void SendMsg(int msgId ,byte[] data); } diff --git a/src/main/java/com/zhangmeng/service/Request.java b/src/main/java/com/zhangmeng/service/Request.java index eef7e1f..7f30adb 100644 --- a/src/main/java/com/zhangmeng/service/Request.java +++ b/src/main/java/com/zhangmeng/service/Request.java @@ -11,4 +11,6 @@ public interface Request { public byte[] GetData(); //获取请求消息的数据 + public int GetMsgID(); //获取请求消息的ID + } diff --git a/src/main/java/com/zhangmeng/service/impl/ConnectionImpl.java b/src/main/java/com/zhangmeng/service/impl/ConnectionImpl.java index 80ee298..d58b2aa 100644 --- a/src/main/java/com/zhangmeng/service/impl/ConnectionImpl.java +++ b/src/main/java/com/zhangmeng/service/impl/ConnectionImpl.java @@ -3,9 +3,8 @@ package com.zhangmeng.service.impl; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.zhangmeng.handler.HandlerApi; -import com.zhangmeng.service.Connection; -import com.zhangmeng.service.Request; -import com.zhangmeng.service.Router; +import com.zhangmeng.service.*; +import org.apache.tomcat.util.http.fileupload.IOUtils; import java.io.*; import java.net.InetAddress; @@ -74,6 +73,31 @@ public class ConnectionImpl implements Connection { } } + //直接将Message数据发送数据给远程的TCP客户端 + @Override + public void SendMsg(int msgId, byte[] data) { + if (this.isClosed) { + throw new RuntimeException("Connection closed when send msg"); + } + //将data封包,并且发送 + DataPack dp = new DataPackImpl(); + + Message message = new MessageImpl(); + message.SetMsgId(msgId); + message.SetData(data); + message.SetDataLen(data.length); + + byte[] msg = dp.Pack(message); + + //写回客户端 + try { + this.Conn.getOutputStream().write(msg); + } catch (IOException e) { +// throw new RuntimeException(e); + close(); + } + } + //处理conn读数据的Goroutine public void StartReader() { log.info("Reader Goroutine is running"); @@ -81,12 +105,31 @@ public class ConnectionImpl implements Connection { while (exit) { //读取我们最大的数据到buf中 try { - String str = reader.readLine(); +// String str = reader.readLine(); //调用当前链接业务(这里执行的是当前conn的绑定的handle方法) // this.handleAPI.handle(this.writer, str.getBytes()); + //创建解析包对象 + DataPack dp = new DataPackImpl(); + byte[] headData = new byte[dp.GetHeadLen()]; + + IOUtils.readFully(inputStream, headData, 0, dp.GetHeadLen()); + + //拆包,得到msgid 和 datalen 放在msg中 + byte[] data ; + Message unpack = dp.Unpack(headData); + if (unpack == null) { + int dataLen = unpack.GetDataLen(); + if (dataLen > 0) { + data = new byte[dataLen]; + IOUtils.readFully(inputStream, data, 0, dataLen); + //调用当前链接业务(这里执行的是当前conn的绑定的handle方法) + unpack.SetData(data); + } + } + //构建request对象,并调用router的PreHandle,Handle,PostHandle方法 - Request request = new RequestImpl(this, str.getBytes()); + Request request = new RequestImpl(this, unpack); this.router.PreHandle(request); this.router.Handle(request); diff --git a/src/main/java/com/zhangmeng/service/impl/DataPackImpl.java b/src/main/java/com/zhangmeng/service/impl/DataPackImpl.java index de8d6de..6167f0d 100644 --- a/src/main/java/com/zhangmeng/service/impl/DataPackImpl.java +++ b/src/main/java/com/zhangmeng/service/impl/DataPackImpl.java @@ -1,10 +1,16 @@ package com.zhangmeng.service.impl; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; import com.zhangmeng.service.DataPack; import com.zhangmeng.service.Message; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.io.*; import java.nio.ByteBuffer; +import com.zhangmeng.utils.ByteBufferUtil; /** * @author zm @@ -13,6 +19,8 @@ import java.nio.ByteBuffer; */ public class DataPackImpl implements DataPack { + private final Log log = LogFactory.get(); + @Override public int GetHeadLen() { return 8; @@ -21,24 +29,102 @@ public class DataPackImpl implements DataPack { @Override public byte[] Pack(Message msg) { - ByteBuffer byteBuffer = ByteBuffer.allocate(16); + DataMsg dataMsg = new DataMsg(); + dataMsg.setSequenceId(110); + dataMsg.setData(msg.GetData()); + dataMsg.setDataLen(msg.GetDataLen()); - //写dataLen - byteBuffer.putInt(msg.GetDataLen()); + try { + ByteBuf out = Unpooled.buffer(); - //写msgID - byteBuffer.putInt(msg.GetMsgId()); + // 1. 4 字节的魔数 + out.writeBytes(new byte[]{1, 2, 3, 4}); + // 2. 1 字节的版本, + out.writeByte(1); + // 3. 1 字节的序列化方式 jdk 0 , json 1 + out.writeByte(0); + // 4. 1 字节的指令类型 + out.writeByte(0); + // 5. 4 个字节 + out.writeInt(dataMsg.getSequenceId()); + // 无意义,对齐填充 + out.writeByte(0xff); + // 6. 获取内容的字节数组 + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(bos); + oos.writeObject(dataMsg); + byte[] bytes = bos.toByteArray(); + // 7. 长度 + out.writeInt(bytes.length); + // 8. 写入内容 + out.writeBytes(bytes); + byte[] data = out.array(); + ByteBufferUtil.debugBuf(data); + out.release(); + return data; + } catch (Exception e){ + e.printStackTrace(); + } - //写data - byteBuffer.put(msg.GetData()); - - - byte[] data = byteBuffer.array(); - return data; + return null; } @Override public Message Unpack(byte[] data) { - return null; + + try { + ByteBuf in = Unpooled.buffer(data.length); + in.writeBytes(data); + int magicNum = in.readInt(); + byte version = in.readByte(); + byte serializerType = in.readByte(); + byte messageType = in.readByte(); + int sequenceId = in.readInt(); + in.readByte(); + int length = in.readInt(); + byte[] bytes = new byte[length]; + in.readBytes(bytes, 0, length); + ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes)); + DataMsg dataMsg = (DataMsg) ois.readObject(); + log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length); + log.debug("{}", dataMsg); + in.release(); + Message message = new MessageImpl(); + message.SetData(dataMsg.getData()); + message.SetDataLen(dataMsg.getDataLen()); + return message; + } catch (IOException | ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + + public static class DataMsg implements Serializable { + private int sequenceId; + private byte[] data; + private int dataLen; + + public int getSequenceId() { + return sequenceId; + } + + public void setSequenceId(int sequenceId) { + this.sequenceId = sequenceId; + } + + public byte[] getData() { + return data; + } + + public void setData(byte[] data) { + this.data = data; + } + + public int getDataLen() { + return dataLen; + } + + public void setDataLen(int dataLen) { + this.dataLen = dataLen; + } } } diff --git a/src/main/java/com/zhangmeng/service/impl/PingRouter.java b/src/main/java/com/zhangmeng/service/impl/PingRouter.java index 6d07c2f..79e78e8 100644 --- a/src/main/java/com/zhangmeng/service/impl/PingRouter.java +++ b/src/main/java/com/zhangmeng/service/impl/PingRouter.java @@ -5,6 +5,7 @@ import cn.hutool.log.LogFactory; import com.zhangmeng.service.Request; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import java.net.Socket; @@ -17,21 +18,21 @@ public class PingRouter extends BaseRouterImpl { private final Log log = LogFactory.get(); - @Override - public void PreHandle(Request request) { - log.info("Call Router PreHandle"); - Socket socket = request.GetConnection().GetTCPConnection(); - - try { - OutputStream outputStream = socket.getOutputStream(); - outputStream.write("before ping ....".getBytes()); - outputStream.flush(); - } catch (IOException e) { -// throw new RuntimeException(e); - log.error("call back ping ping ping error"); - } - - } +// @Override +// public void PreHandle(Request request) { +// log.info("Call Router PreHandle"); +// Socket socket = request.GetConnection().GetTCPConnection(); +// +// try { +// OutputStream outputStream = socket.getOutputStream(); +// outputStream.write("before ping ....".getBytes()); +// outputStream.flush(); +// } catch (IOException e) { +//// throw new RuntimeException(e); +// log.error("call back ping ping ping error"); +// } +// +// } @Override public void Handle(Request request) { @@ -39,6 +40,8 @@ public class PingRouter extends BaseRouterImpl { Socket socket = request.GetConnection().GetTCPConnection(); try { OutputStream outputStream = socket.getOutputStream(); + InputStream inputStream = socket.getInputStream(); + log.info("recv from client : msgId=", request.GetMsgID(), ", data=", new String(request.GetData())); outputStream.write(" ping ....".getBytes()); outputStream.flush(); } catch (IOException e) { @@ -47,17 +50,17 @@ public class PingRouter extends BaseRouterImpl { } } - @Override - public void PostHandle(Request request) { - log.info("Call Router PostHandle"); - Socket socket = request.GetConnection().GetTCPConnection(); - try { - OutputStream outputStream = socket.getOutputStream(); - outputStream.write("after ping ....".getBytes()); - outputStream.flush(); - } catch (IOException e) { -// throw new RuntimeException(e); - log.error("call back ping ping ping error"); - } - } +// @Override +// public void PostHandle(Request request) { +// log.info("Call Router PostHandle"); +// Socket socket = request.GetConnection().GetTCPConnection(); +// try { +// OutputStream outputStream = socket.getOutputStream(); +// outputStream.write("after ping ....".getBytes()); +// outputStream.flush(); +// } catch (IOException e) { +//// throw new RuntimeException(e); +// log.error("call back ping ping ping error"); +// } +// } } diff --git a/src/main/java/com/zhangmeng/service/impl/RequestImpl.java b/src/main/java/com/zhangmeng/service/impl/RequestImpl.java index ebd6d77..63a65f1 100644 --- a/src/main/java/com/zhangmeng/service/impl/RequestImpl.java +++ b/src/main/java/com/zhangmeng/service/impl/RequestImpl.java @@ -1,6 +1,7 @@ package com.zhangmeng.service.impl; import com.zhangmeng.service.Connection; +import com.zhangmeng.service.Message; import com.zhangmeng.service.Request; /** @@ -12,20 +13,40 @@ public class RequestImpl implements Request { private Connection connection; - private byte[] data; +// private byte[] data; - public RequestImpl(Connection connection, byte[] data) { - this.connection = connection; - this.data = data; - } + private Message message; @Override public Connection GetConnection() { return this.connection; } + public Message getMessage() { + return message; + } + + public void setMessage(Message message) { + this.message = message; + } + + public RequestImpl(Connection connection, Message message) { + this.connection = connection; + this.message = message; + } + + //使用message 代替 data + // public RequestImpl(Connection connection, byte[] data) { +// this.connection = connection; +// this.data = data; +// } + @Override public byte[] GetData() { - return this.data; + return this.message.GetData(); + } + + public int GetMsgID() { + return this.message.GetMsgId(); } } diff --git a/src/main/java/com/zhangmeng/utils/ByteBufferUtil.java b/src/main/java/com/zhangmeng/utils/ByteBufferUtil.java index c1dc222..ccc2997 100644 --- a/src/main/java/com/zhangmeng/utils/ByteBufferUtil.java +++ b/src/main/java/com/zhangmeng/utils/ByteBufferUtil.java @@ -1,6 +1,7 @@ package com.zhangmeng.utils; +import io.netty.buffer.ByteBuf; import io.netty.util.internal.StringUtil; import java.nio.ByteBuffer; @@ -91,6 +92,14 @@ public class ByteBufferUtil { buffer.limit(oldlimit); } + + public static void debugBuf(byte[] bytes){ + int capacity = bytes.length; + ByteBuffer byteBuffer = ByteBuffer.allocate(capacity); + byteBuffer.put(bytes); + debugAll(byteBuffer); + } + /** * 打印可读取内容 * @param buffer diff --git a/src/test/java/com/zhangmeng/AppTest3.java b/src/test/java/com/zhangmeng/AppTest3.java index d912728..c6257ef 100644 --- a/src/test/java/com/zhangmeng/AppTest3.java +++ b/src/test/java/com/zhangmeng/AppTest3.java @@ -1,5 +1,13 @@ package com.zhangmeng; +import cn.hutool.log.Log; +import cn.hutool.log.LogFactory; +import com.zhangmeng.service.DataPack; +import com.zhangmeng.service.Message; +import com.zhangmeng.service.impl.DataPackImpl; +import com.zhangmeng.service.impl.MessageImpl; +import org.apache.tomcat.util.http.fileupload.IOUtils; + import java.io.*; import java.net.InetAddress; import java.net.Socket; @@ -12,6 +20,8 @@ import java.net.UnknownHostException; */ public class AppTest3 { + private static final Log log = LogFactory.get(); + public static void main(String[] args) { Socket socket = null; BufferedReader in = null; @@ -20,22 +30,36 @@ public class AppTest3 { try { //创建 Socket 对象,指定服务器端的 IP 与端口 socket = new Socket(InetAddress.getLocalHost(), 9999); + OutputStream outputStream = socket.getOutputStream(); + InputStream inputStream = socket.getInputStream(); //获取 scoket 的输入输出流接收和发送信息 - in = new BufferedReader(new InputStreamReader(socket.getInputStream())); - out = new BufferedWriter(new - OutputStreamWriter(socket.getOutputStream())); + in = new BufferedReader(new InputStreamReader(inputStream)); + out = new BufferedWriter(new OutputStreamWriter(outputStream)); br = new BufferedReader(new InputStreamReader(System.in)); while (true) { //发送信息 - String str = "br.readLine()"; - out.write(str + "\n"); - out.flush(); - //如果输入的信息为“end”则终止连接 - if (str.equals("end")) { - break; + + DataPack dp = new DataPackImpl(); + Message msg = new MessageImpl(); + msg.SetMsgId(0); + msg.SetData("hello world\n".getBytes()); + msg.SetDataLen(msg.GetData().length); + byte[] pack = dp.Pack(msg); + outputStream.write(pack); + outputStream.flush(); + + byte[] headData = new byte[dp.GetHeadLen()]; + IOUtils.readFully(inputStream, headData, 0, dp.GetHeadLen()); + + Message unpack = dp.Unpack(headData); + if (unpack!= null){ + if (unpack.GetDataLen()>0) { + byte[] data = new byte[unpack.GetDataLen()]; + IOUtils.readFully(inputStream, data, 0, unpack.GetDataLen()); + log.info("==> Recv Msg: ID=", unpack.GetMsgId(), ", len=", unpack.GetDataLen(), ", data=", new String(unpack.GetData())); + } } - //否则,接收并输出服务器端信息 - System.out.println("服务器端说:" + in.readLine()); + Thread.sleep(1000); } } catch (UnknownHostException e) {