diff --git a/pom.xml b/pom.xml index a05c7c9..a79dd7e 100644 --- a/pom.xml +++ b/pom.xml @@ -559,33 +559,6 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - org.springframework.boot spring-boot-maven-plugin diff --git a/src/main/java/com/zhangmeng/tools/netty/NettyClient.java b/src/main/java/com/zhangmeng/tools/netty/NettyClient.java index 11eedd6..7c8b5ff 100644 --- a/src/main/java/com/zhangmeng/tools/netty/NettyClient.java +++ b/src/main/java/com/zhangmeng/tools/netty/NettyClient.java @@ -6,10 +6,7 @@ import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; -import io.netty.handler.codec.http.DefaultHttpHeaders; -import io.netty.handler.codec.http.HttpClientCodec; -import io.netty.handler.codec.http.HttpObjectAggregator; -import io.netty.handler.codec.http.HttpServerCodec; +import io.netty.handler.codec.http.*; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory; @@ -28,9 +25,8 @@ import java.net.URISyntaxException; @Slf4j public class NettyClient { - public enum Type{ - Ws("ws"), - Wss("wss"); + public enum Type { + Ws("ws"), Wss("wss"); private String name; public String getName() { @@ -52,14 +48,13 @@ public class NettyClient { private String params; - private Channel channel; + private Channel channel = null; - private final EventLoopGroup workerLoopGroup = new NioEventLoopGroup(); + private EventLoopGroup workerLoopGroup = null; + private ChannelFuture future = null; public void start() { - - WebSocketClientHandshaker handshaker = this.getWebSocketClientHandshaker(); - var businessHandler = new WebsocketClientHandler(handshaker); + workerLoopGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerLoopGroup); bootstrap.channel(NioSocketChannel.class); @@ -75,58 +70,40 @@ public class NettyClient { // 方便大文件传输,不过实质上都是短的文本数据 ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler()); // 定义业务处理器 - ch.pipeline().addLast("businessHandler", businessHandler); + ch.pipeline().addLast("businessHandler", new WebsocketClientHandler()); } }); - ChannelFuture future = bootstrap.connect(); - future.addListener((ChannelFuture futureListener) -> { - if (futureListener.isSuccess()) { - log.info("客户端连接成功"); - } else { - log.info("客户端连接失败"); - } - }); - try { - future.sync(); - } catch (Exception exception) { - log.info(exception.getMessage()); - } - this.channel = future.channel(); - handshaker.handshake(channel); try { - businessHandler.sync(); - } catch (InterruptedException exception) { - log.info(exception.getMessage()); + URI websocketURI = new URI(String.format("ws://%s:%d/websocket?%s", serverIp, serverPort,params)); + log.info(String.format("ws://%s:%d/", serverIp, serverPort)); + HttpHeaders httpHeaders = new DefaultHttpHeaders(); + //进行握手 + WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(websocketURI, WebSocketVersion.V13, (String)null, true,httpHeaders); + channel = bootstrap.connect(websocketURI.getHost(), websocketURI.getPort()).sync().channel(); + WebsocketClientHandler handler = (WebsocketClientHandler)channel.pipeline().get("businessHandler"); + handler.setHandshaker(handshaker); + // 通过它构造握手响应消息返回给客户端, + // 同时将WebSocket相关的编码和解码类动态添加到ChannelPipeline中,用于WebSocket消息的编解码, + // 添加WebSocketEncoder和WebSocketDecoder之后,服务端就可以自动对WebSocket消息进行编解码了 + handshaker.handshake(channel); + //阻塞等待是否握手成功 + future = handler.handshakeFuture().sync(); + + future.addListener((ChannelFuture futureListener) -> { + if (futureListener.isSuccess()) { + log.info("客户端连接成功"); + } else { + log.info("客户端连接失败"); + } + }); + + } catch (Exception e) { + e.printStackTrace(); } } - private String generateWebsocketUrl(String ip, int port, String relativePath,String params) { -// return String.format("ws://%s:%d/%s?%s", ip, port, relativePath,params); -// return String.format("ws://%s:%d?%s", ip, port,params); -// return String.format("ws://%s:%d", ip, port); - return String.format("ws://%s:%d/%s", ip, port, relativePath); - } - - public static final String WEBSOCKET_PREFIX_PATH = "websocket"; - - /** - * 进行三报文握手中的第一握手,由客户端发起 - * - * @since 2021-12-2 - */ - private WebSocketClientHandshaker getWebSocketClientHandshaker() { - URI websocketUri = null; - try { -// websocketUri = new URI(this.generateWebsocketUrl(this.serverIp, this.serverPort, WEBSOCKET_PREFIX_PATH,this.params)); - websocketUri = new URI(this.generateWebsocketUrl(this.serverIp, this.serverPort, WEBSOCKET_PREFIX_PATH,null)); - log.info(websocketUri.toString()); - } catch (URISyntaxException exception) { - exception.printStackTrace(); // FIXME:日志 - } - return WebSocketClientHandshakerFactory.newHandshaker(websocketUri, WebSocketVersion.V13, null, false, new DefaultHttpHeaders()); - } public void send(String msg) { channel.writeAndFlush(new TextWebSocketFrame(msg)); @@ -154,7 +131,7 @@ public class NettyClient { return this; } - public NettyClient setParams(String params){ + public NettyClient setParams(String params) { this.params = params; return this; } diff --git a/src/main/java/com/zhangmeng/tools/netty/WebsocketClientHandler.java b/src/main/java/com/zhangmeng/tools/netty/WebsocketClientHandler.java index bfc145f..758f4d5 100644 --- a/src/main/java/com/zhangmeng/tools/netty/WebsocketClientHandler.java +++ b/src/main/java/com/zhangmeng/tools/netty/WebsocketClientHandler.java @@ -1,87 +1,81 @@ package com.zhangmeng.tools.netty; import com.zhangmeng.tools.controller.NettyClientController; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelInboundHandlerAdapter; -import io.netty.channel.ChannelPromise; -import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.*; +import io.netty.handler.codec.http.*; import io.netty.handler.codec.http.websocketx.*; import io.netty.util.CharsetUtil; +import io.netty.util.concurrent.Future; import lombok.extern.slf4j.Slf4j; +import java.text.MessageFormat; +import java.util.HashMap; +import java.util.Map; + /** * @author : 芊芊墨客 * @version : 1.0 * @date : 2023-02-27 09:20 */ @Slf4j -public class WebsocketClientHandler extends ChannelInboundHandlerAdapter { +public class WebsocketClientHandler extends SimpleChannelInboundHandler { - private WebSocketClientHandshaker handshaker; + WebSocketClientHandshaker handshaker; + ChannelPromise handshakeFuture; - private ChannelPromise channelPromise; + public void handlerAdded(ChannelHandlerContext ctx) { + this.handshakeFuture = ctx.newPromise(); + } + public WebSocketClientHandshaker getHandshaker() { + return handshaker; + } - public WebsocketClientHandler(WebSocketClientHandshaker handshaker) { + public void setHandshaker(WebSocketClientHandshaker handshaker) { this.handshaker = handshaker; } - @Override - public void channelRead(ChannelHandlerContext ctx, Object obj) { - if (!this.handshaker.isHandshakeComplete()) { // 如果三报文握手的流程还没有走完 - // 如果现在进行的是三报文握手中的第三握手 - finishHandshake(ctx, (FullHttpResponse) obj); - } else if (obj instanceof FullHttpResponse response) { - var msg = String.format("Unexpected FullHttpResponse, status=%s, content=%s", response.status(), response.content().toString(CharsetUtil.UTF_8)); - System.out.println(msg); // FIXME:日志 - } else if (obj instanceof WebSocketFrame) { - // TODO:如果需要服务器反馈信息,可在此添加业务 - handleWebSocketResponse(ctx, (WebSocketFrame) obj); - } // 此分支不应该发生 + public ChannelPromise getHandshakeFuture() { + return handshakeFuture; + } + public void setHandshakeFuture(ChannelPromise handshakeFuture) { + this.handshakeFuture = handshakeFuture; + } + + public ChannelFuture handshakeFuture() { + return this.handshakeFuture; } @Override - public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { - ctx.close(); - } - - @Override - public void handlerAdded(ChannelHandlerContext ctx) { - this.channelPromise = ctx.newPromise(); - } - - public ChannelFuture sync() throws InterruptedException { - return this.channelPromise.sync(); - } - - private void finishHandshake(ChannelHandlerContext ctx, FullHttpResponse response) { - try { - this.handshaker.finishHandshake(ctx.channel(), response); - //设置成功 - this.channelPromise.setSuccess(); - } catch (WebSocketHandshakeException exception) { - FullHttpResponse rsp = response; - String errorMsg = String.format("WebSocket Client failed to connect, status=%s, reason=%s", rsp.status(), rsp.content().toString(CharsetUtil.UTF_8)); - this.channelPromise.setFailure(new Exception(errorMsg)); // TODO:日志 - } - } - - private void handleWebSocketResponse(ChannelHandlerContext ctx, WebSocketFrame frame) { - + protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { // 关闭请求 if (frame instanceof CloseWebSocketFrame) { - handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); + if (handshaker == null) { + sendErrorMessage(ctx, "不存在的客户端连接!"); + } else { + handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); + } return; } - // ping请求 if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } - String text = ((TextWebSocketFrame) frame).text(); - log.info("text:{}", text); - NettyClientController.add_message(text); + // 只支持文本格式,不支持二进制消息 + if (!(frame instanceof TextWebSocketFrame)) { + sendErrorMessage(ctx, "仅支持文本(Text)格式,不支持二进制消息"); + } + String request = ((TextWebSocketFrame) frame).text(); + log.info(request); + } + + private void sendErrorMessage(ChannelHandlerContext ctx, String errorMsg) { + String responseJson = new ResponseJson() + .error(errorMsg) + .toString(); + ctx.channel().writeAndFlush(new TextWebSocketFrame(responseJson)); } } \ No newline at end of file diff --git a/src/main/java/com/zhangmeng/tools/utils/JsoupUtil.java b/src/main/java/com/zhangmeng/tools/utils/JsoupUtil.java index 9830dfb..e391861 100644 --- a/src/main/java/com/zhangmeng/tools/utils/JsoupUtil.java +++ b/src/main/java/com/zhangmeng/tools/utils/JsoupUtil.java @@ -45,17 +45,17 @@ public class JsoupUtil { .post(); } - public static void main(String[] args) { - try { - Document document = sendGet("https://so.gushiwen.cn/shiwenv_45c396367f59.aspx", 5000); - Element body = document.body(); - Element sonsyuanwen = body.getElementById("sonsyuanwen"); - String title = sonsyuanwen.getElementsByTag("h1").get(0).text(); - String user_name = sonsyuanwen.getElementsByTag("p").get(0).getElementsByTag("a").get(0).text(); - String date_name = sonsyuanwen.getElementsByTag("p").get(0).getElementsByTag("a").get(1).text(); - String content = sonsyuanwen.getElementsByClass("cont").get(0).getElementsByClass("contson").text(); - } catch (IOException e) { - e.printStackTrace(); - } - } +// public static void main(String[] args) { +// try { +// Document document = sendGet("https://so.gushiwen.cn/shiwenv_45c396367f59.aspx", 5000); +// Element body = document.body(); +// Element sonsyuanwen = body.getElementById("sonsyuanwen"); +// String title = sonsyuanwen.getElementsByTag("h1").get(0).text(); +// String user_name = sonsyuanwen.getElementsByTag("p").get(0).getElementsByTag("a").get(0).text(); +// String date_name = sonsyuanwen.getElementsByTag("p").get(0).getElementsByTag("a").get(1).text(); +// String content = sonsyuanwen.getElementsByClass("cont").get(0).getElementsByClass("contson").text(); +// } catch (IOException e) { +// e.printStackTrace(); +// } +// } }