2023年3月3日17:15:41

master
zhangmeng 2023-03-03 17:15:54 +08:00
parent 8bfc462af7
commit 0f12ae23df
4 changed files with 94 additions and 150 deletions

27
pom.xml
View File

@ -559,33 +559,6 @@
<build>
<plugins>
<!-- <plugin>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-assembly-plugin</artifactId>-->
<!-- <version>3.3.0</version>-->
<!-- <configuration>-->
<!-- <archive>-->
<!-- <manifest>-->
<!-- &lt;!&ndash; <mainClass>入口类main 方法所在位置</mainClass> &ndash;&gt;-->
<!-- <mainClass>com.zhangmeng.tools.JavaFxToolsApplication</mainClass>-->
<!-- </manifest>-->
<!-- </archive>-->
<!-- <descriptorRefs>-->
<!-- &lt;!&ndash; jar 携带依赖 &ndash;&gt;-->
<!-- <descriptorRef>jar-with-dependencies</descriptorRef>-->
<!-- </descriptorRefs>-->
<!-- </configuration>-->
<!-- <executions>-->
<!-- <execution>-->
<!-- <phase>package</phase>-->
<!-- <goals>-->
<!-- <goal>single</goal>-->
<!-- </goals>-->
<!-- </execution>-->
<!-- </executions>-->
<!-- </plugin>-->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>

View File

@ -6,10 +6,7 @@ import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
@ -28,9 +25,8 @@ import java.net.URISyntaxException;
@Slf4j
public class NettyClient {
public enum Type{
Ws("ws"),
Wss("wss");
public enum Type {
Ws("ws"), Wss("wss");
private String name;
public String getName() {
@ -52,14 +48,13 @@ public class NettyClient {
private String params;
private Channel channel;
private Channel channel = null;
private final EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
private EventLoopGroup workerLoopGroup = null;
private ChannelFuture future = null;
public void start() {
WebSocketClientHandshaker handshaker = this.getWebSocketClientHandshaker();
var businessHandler = new WebsocketClientHandler(handshaker);
workerLoopGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerLoopGroup);
bootstrap.channel(NioSocketChannel.class);
@ -75,58 +70,40 @@ public class NettyClient {
// 方便大文件传输,不过实质上都是短的文本数据
ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
// 定义业务处理器
ch.pipeline().addLast("businessHandler", businessHandler);
ch.pipeline().addLast("businessHandler", new WebsocketClientHandler());
}
});
ChannelFuture future = bootstrap.connect();
future.addListener((ChannelFuture futureListener) -> {
if (futureListener.isSuccess()) {
log.info("客户端连接成功");
} else {
log.info("客户端连接失败");
}
});
try {
future.sync();
} catch (Exception exception) {
log.info(exception.getMessage());
}
this.channel = future.channel();
handshaker.handshake(channel);
try {
businessHandler.sync();
} catch (InterruptedException exception) {
log.info(exception.getMessage());
URI websocketURI = new URI(String.format("ws://%s:%d/websocket?%s", serverIp, serverPort,params));
log.info(String.format("ws://%s:%d/", serverIp, serverPort));
HttpHeaders httpHeaders = new DefaultHttpHeaders();
//进行握手
WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(websocketURI, WebSocketVersion.V13, (String)null, true,httpHeaders);
channel = bootstrap.connect(websocketURI.getHost(), websocketURI.getPort()).sync().channel();
WebsocketClientHandler handler = (WebsocketClientHandler)channel.pipeline().get("businessHandler");
handler.setHandshaker(handshaker);
// 通过它构造握手响应消息返回给客户端,
// 同时将WebSocket相关的编码和解码类动态添加到ChannelPipeline中用于WebSocket消息的编解码
// 添加WebSocketEncoder和WebSocketDecoder之后服务端就可以自动对WebSocket消息进行编解码了
handshaker.handshake(channel);
//阻塞等待是否握手成功
future = handler.handshakeFuture().sync();
future.addListener((ChannelFuture futureListener) -> {
if (futureListener.isSuccess()) {
log.info("客户端连接成功");
} else {
log.info("客户端连接失败");
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
private String generateWebsocketUrl(String ip, int port, String relativePath,String params) {
// return String.format("ws://%s:%d/%s?%s", ip, port, relativePath,params);
// return String.format("ws://%s:%d?%s", ip, port,params);
// return String.format("ws://%s:%d", ip, port);
return String.format("ws://%s:%d/%s", ip, port, relativePath);
}
public static final String WEBSOCKET_PREFIX_PATH = "websocket";
/**
*
*
* @since 2021-12-2
*/
private WebSocketClientHandshaker getWebSocketClientHandshaker() {
URI websocketUri = null;
try {
// websocketUri = new URI(this.generateWebsocketUrl(this.serverIp, this.serverPort, WEBSOCKET_PREFIX_PATH,this.params));
websocketUri = new URI(this.generateWebsocketUrl(this.serverIp, this.serverPort, WEBSOCKET_PREFIX_PATH,null));
log.info(websocketUri.toString());
} catch (URISyntaxException exception) {
exception.printStackTrace(); // FIXME日志
}
return WebSocketClientHandshakerFactory.newHandshaker(websocketUri, WebSocketVersion.V13, null, false, new DefaultHttpHeaders());
}
public void send(String msg) {
channel.writeAndFlush(new TextWebSocketFrame(msg));
@ -154,7 +131,7 @@ public class NettyClient {
return this;
}
public NettyClient setParams(String params){
public NettyClient setParams(String params) {
this.params = params;
return this;
}

View File

@ -1,87 +1,81 @@
package com.zhangmeng.tools.netty;
import com.zhangmeng.tools.controller.NettyClientController;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.handler.codec.http.*;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import io.netty.util.concurrent.Future;
import lombok.extern.slf4j.Slf4j;
import java.text.MessageFormat;
import java.util.HashMap;
import java.util.Map;
/**
* @author :
* @version : 1.0
* @date : 2023-02-27 09:20
*/
@Slf4j
public class WebsocketClientHandler extends ChannelInboundHandlerAdapter {
public class WebsocketClientHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
private WebSocketClientHandshaker handshaker;
WebSocketClientHandshaker handshaker;
ChannelPromise handshakeFuture;
private ChannelPromise channelPromise;
public void handlerAdded(ChannelHandlerContext ctx) {
this.handshakeFuture = ctx.newPromise();
}
public WebSocketClientHandshaker getHandshaker() {
return handshaker;
}
public WebsocketClientHandler(WebSocketClientHandshaker handshaker) {
public void setHandshaker(WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object obj) {
if (!this.handshaker.isHandshakeComplete()) { // 如果三报文握手的流程还没有走完
// 如果现在进行的是三报文握手中的第三握手
finishHandshake(ctx, (FullHttpResponse) obj);
} else if (obj instanceof FullHttpResponse response) {
var msg = String.format("Unexpected FullHttpResponse, status=%s, content=%s", response.status(), response.content().toString(CharsetUtil.UTF_8));
System.out.println(msg); // FIXME日志
} else if (obj instanceof WebSocketFrame) {
// TODO如果需要服务器反馈信息可在此添加业务
handleWebSocketResponse(ctx, (WebSocketFrame) obj);
} // 此分支不应该发生
public ChannelPromise getHandshakeFuture() {
return handshakeFuture;
}
public void setHandshakeFuture(ChannelPromise handshakeFuture) {
this.handshakeFuture = handshakeFuture;
}
public ChannelFuture handshakeFuture() {
return this.handshakeFuture;
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
this.channelPromise = ctx.newPromise();
}
public ChannelFuture sync() throws InterruptedException {
return this.channelPromise.sync();
}
private void finishHandshake(ChannelHandlerContext ctx, FullHttpResponse response) {
try {
this.handshaker.finishHandshake(ctx.channel(), response);
//设置成功
this.channelPromise.setSuccess();
} catch (WebSocketHandshakeException exception) {
FullHttpResponse rsp = response;
String errorMsg = String.format("WebSocket Client failed to connect, status=%s, reason=%s", rsp.status(), rsp.content().toString(CharsetUtil.UTF_8));
this.channelPromise.setFailure(new Exception(errorMsg)); // TODO日志
}
}
private void handleWebSocketResponse(ChannelHandlerContext ctx, WebSocketFrame frame) {
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
// 关闭请求
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
if (handshaker == null) {
sendErrorMessage(ctx, "不存在的客户端连接!");
} else {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
}
return;
}
// ping请求
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
String text = ((TextWebSocketFrame) frame).text();
log.info("text:{}", text);
NettyClientController.add_message(text);
// 只支持文本格式,不支持二进制消息
if (!(frame instanceof TextWebSocketFrame)) {
sendErrorMessage(ctx, "仅支持文本(Text)格式,不支持二进制消息");
}
String request = ((TextWebSocketFrame) frame).text();
log.info(request);
}
private void sendErrorMessage(ChannelHandlerContext ctx, String errorMsg) {
String responseJson = new ResponseJson()
.error(errorMsg)
.toString();
ctx.channel().writeAndFlush(new TextWebSocketFrame(responseJson));
}
}

View File

@ -45,17 +45,17 @@ public class JsoupUtil {
.post();
}
public static void main(String[] args) {
try {
Document document = sendGet("https://so.gushiwen.cn/shiwenv_45c396367f59.aspx", 5000);
Element body = document.body();
Element sonsyuanwen = body.getElementById("sonsyuanwen");
String title = sonsyuanwen.getElementsByTag("h1").get(0).text();
String user_name = sonsyuanwen.getElementsByTag("p").get(0).getElementsByTag("a").get(0).text();
String date_name = sonsyuanwen.getElementsByTag("p").get(0).getElementsByTag("a").get(1).text();
String content = sonsyuanwen.getElementsByClass("cont").get(0).getElementsByClass("contson").text();
} catch (IOException e) {
e.printStackTrace();
}
}
// public static void main(String[] args) {
// try {
// Document document = sendGet("https://so.gushiwen.cn/shiwenv_45c396367f59.aspx", 5000);
// Element body = document.body();
// Element sonsyuanwen = body.getElementById("sonsyuanwen");
// String title = sonsyuanwen.getElementsByTag("h1").get(0).text();
// String user_name = sonsyuanwen.getElementsByTag("p").get(0).getElementsByTag("a").get(0).text();
// String date_name = sonsyuanwen.getElementsByTag("p").get(0).getElementsByTag("a").get(1).text();
// String content = sonsyuanwen.getElementsByClass("cont").get(0).getElementsByClass("contson").text();
// } catch (IOException e) {
// e.printStackTrace();
// }
// }
}