netty client 2023年2月27日14:37:50

master
zhangmeng 2023-02-27 14:38:04 +08:00
parent 7321e83f27
commit d21890a5f8
9 changed files with 612 additions and 81 deletions

View File

@ -1,6 +1,10 @@
package com.zhangmeng.tools.controller;
import com.zhangmeng.tools.netty.Message;
import com.zhangmeng.tools.netty.NettyClient;
import com.zhangmeng.tools.utils.AlertUtils;
import com.zhangmeng.tools.utils.ExecutorUtils;
import com.zhangmeng.tools.utils.Multithreading;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
@ -9,7 +13,10 @@ import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringEncoder;
import javafx.application.Platform;
import javafx.beans.property.SimpleObjectProperty;
import javafx.beans.value.ChangeListener;
import javafx.beans.value.ObservableValue;
import javafx.collections.FXCollections;
import javafx.collections.ListChangeListener;
import javafx.collections.ObservableList;
import javafx.fxml.FXML;
import javafx.geometry.Pos;
@ -20,7 +27,9 @@ import javafx.util.Callback;
import javafx.util.StringConverter;
import lombok.extern.slf4j.Slf4j;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
/**
@ -58,9 +67,29 @@ public class NettyClientController {
@FXML
private Button connect;
private final SimpleObjectProperty<Channel> client = new SimpleObjectProperty<>();
@FXML
private TextArea send_msg;
private ObservableList<Data> list = FXCollections.observableArrayList();
private NettyClient nettyClient;
/**
*
*
* @since 2021-12-1
*/
private boolean isStarted;
private final ObservableList<Data> list = FXCollections.observableArrayList();
private static ObservableList<String> message_result_list = FXCollections.observableArrayList();
public static void clear_message(){
NettyClientController.message_result_list.clear();
}
public static void add_message(String msg){
NettyClientController.message_result_list.add(msg);
NettyClientController.message_result_list.add(System.lineSeparator());
}
private static final int socket_port = 3333;
private static final String socket_address = "127.0.0.1";
@ -91,13 +120,19 @@ public class NettyClientController {
}
}
public enum Type{
Ws,
Wss;
}
@FXML
public void initialize() {
message_result_list.addListener((ListChangeListener<String>) c -> {
if (c.wasAdded()) {
StringBuilder stringBuilder = new StringBuilder();
for (String s : message_result_list) {
stringBuilder.append(s);
}
receive(stringBuilder.toString());
}
});
param_list.setPlaceholder(new Label("没有数据"));
param_list.setEditable(true);
param_list.setCellFactory(new Callback<>() {
@ -135,28 +170,6 @@ public class NettyClientController {
value.setText(null);
});
connect.setOnAction(event -> {
String url = Type.Ws + "://" + url_address.getText() + ":" + port.getText();
log.info("url:{}",url);
if (client.getValue() == null){
new Thread(() -> {
netty_client(url_address.getText(),Integer.parseInt(port.getText()));
}).start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (client.getValue() != null){
AlertUtils.alert_msg("连接成功!");
}else {
AlertUtils.alert_warning("连接失败!");
}
}else {
AlertUtils.alert_warning("已连接至服务器!");
}
});
if (port.getText().length() == 0) {
port.setText(String.valueOf(socket_port));
}
@ -166,62 +179,52 @@ public class NettyClientController {
}
send.setOnAction(event -> {
Channel channel = client.getValue();
if (channel == null){
AlertUtils.alert_warning("请连接至服务器再试!");
return;
if (send_msg.getText().length() == 0){
AlertUtils.alert_warning("请输入内容再试!");
}
channel.writeAndFlush(result_show.getText());
String msg = send_msg.getText();
Multithreading.execute(() -> {
if (nettyClient == null){
this.nettyClient = NettyClient.getInstance();
this.nettyClient.setPort(socket_port);
this.nettyClient.setIp(socket_address);
StringBuilder params = new StringBuilder();
for (Data data : list) {
if (params.toString().equals("")){
params.append(data.key).append("=").append(data.value);
}else {
params.append("&").append(data.key).append("=").append(data.value);
}
}
this.nettyClient.setParams(params.toString());
log.info("params:{}",params.toString());
}
// 让客户端后台懒启动。避免先于服务端启动而引发异常
if (!isStarted) {
this.nettyClient.start();
isStarted = true;
}
Platform.runLater(() -> {
this.nettyClient.send(msg);
});
add_message(msg);
});
this.send_msg.requestFocus();
this.send_msg.clear(); // TODO
});
}
private void netty_client(String socket_address,int socket_port) {
log.info("netty client init .......");
NioEventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("client handler", new ChannelInboundHandlerAdapter() {
// 接收响应消息
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("msg: {}...............", msg);
}
// 在连接建立后触发 active 事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("channelActive.............");
}
public void receive(String msg) {
Platform.runLater(() -> {
this.result_show.appendText(msg);
});
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.info("exceptionCaught...................");
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("channelInactive...........");
}
});
}
});
Channel channel = bootstrap.connect(socket_address, socket_port).sync().channel();
client.setValue(channel);
channel.writeAndFlush("1111111111");
channel.closeFuture().sync();
} catch (Exception e) {
log.error("client error", e);
} finally {
try {
group.shutdownGracefully().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

View File

@ -0,0 +1,14 @@
package com.zhangmeng.tools.netty;
/**
* @author :
* @version : 1.0
* @date : 2023-02-27 10:02
*/
@FunctionalInterface
public interface Function {
/**
* @since 2021-10-3
*/
void run();
}

View File

@ -0,0 +1,23 @@
package com.zhangmeng.tools.netty;
import java.util.ArrayList;
import java.util.List;
/**
* @author :
* @version : 1.0
* @date : 2023-02-27 11:22
*/
public class Message {
public static List<String> message_result_list = new ArrayList<>();
public static void clear_message(){
Message.message_result_list.clear();
}
public static void add(String msg){
Message.message_result_list.add(msg);
Message.message_result_list.add(System.lineSeparator());
}
}

View File

@ -0,0 +1,161 @@
package com.zhangmeng.tools.netty;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultHttpHeaders;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
import io.netty.handler.codec.http.websocketx.WebSocketVersion;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import java.net.URI;
import java.net.URISyntaxException;
/**
* @author :
* @version : 1.0
* @date : 2023-02-27 09:15
*/
@Slf4j
public class NettyClient {
public enum Type{
Ws("ws"),
Wss("wss");
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
Type(String name) {
this.name = name;
}
}
private String serverIp;
private int serverPort;
private String params;
private Channel channel;
private final EventLoopGroup workerLoopGroup = new NioEventLoopGroup();
public void start() {
WebSocketClientHandshaker handshaker = this.getWebSocketClientHandshaker();
var businessHandler = new WebsocketClientHandler(handshaker);
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.remoteAddress(serverIp, serverPort);
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// HTTP编码解码器
ch.pipeline().addLast("http-codec", new HttpServerCodec());
// 把HTTP头、HTTP体拼成完整的HTTP请求
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
// 方便大文件传输,不过实质上都是短的文本数据
ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
// 定义业务处理器
ch.pipeline().addLast("businessHandler", businessHandler);
}
});
ChannelFuture future = bootstrap.connect();
future.addListener((ChannelFuture futureListener) -> {
if (futureListener.isSuccess()) {
log.info("客户端连接成功");
} else {
log.info("客户端连接失败");
}
});
try {
future.sync();
} catch (Exception exception) {
log.info(exception.getMessage());
}
this.channel = future.channel();
handshaker.handshake(channel);
try {
businessHandler.sync();
} catch (InterruptedException exception) {
log.info(exception.getMessage());
}
}
private String generateWebsocketUrl(String ip, int port, String relativePath,String params) {
// return String.format("ws://%s:%d/%s?%s", ip, port, relativePath,params);
// return String.format("ws://%s:%d?%s", ip, port,params);
// return String.format("ws://%s:%d", ip, port);
return String.format("ws://%s:%d/%s", ip, port, relativePath);
}
public static final String WEBSOCKET_PREFIX_PATH = "websocket";
/**
*
*
* @since 2021-12-2
*/
private WebSocketClientHandshaker getWebSocketClientHandshaker() {
URI websocketUri = null;
try {
// websocketUri = new URI(this.generateWebsocketUrl(this.serverIp, this.serverPort, WEBSOCKET_PREFIX_PATH,this.params));
websocketUri = new URI(this.generateWebsocketUrl(this.serverIp, this.serverPort, WEBSOCKET_PREFIX_PATH,null));
log.info(websocketUri.toString());
} catch (URISyntaxException exception) {
exception.printStackTrace(); // FIXME日志
}
return WebSocketClientHandshakerFactory.newHandshaker(websocketUri, WebSocketVersion.V13, null, false, new DefaultHttpHeaders());
}
public void send(String msg) {
channel.writeAndFlush(new TextWebSocketFrame(msg));
}
public void destroy() {
this.workerLoopGroup.shutdownGracefully();
}
private NettyClient() {
super();
}
public static NettyClient getInstance() {
return new NettyClient();
}
public NettyClient setIp(String serverIp) {
this.serverIp = serverIp;
return this;
}
public NettyClient setPort(int serverPort) {
this.serverPort = serverPort;
return this;
}
public NettyClient setParams(String params){
this.params = params;
return this;
}
}

View File

@ -0,0 +1,84 @@
package com.zhangmeng.tools.netty;
import com.alibaba.fastjson.JSONObject;
import org.springframework.http.HttpStatus;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
public class ResponseJson extends HashMap<String, Object> {
private static final long serialVersionUID = 1L;
private static final Integer SUCCESS_STATUS = 200;
private static final Integer ERROR_STATUS = -1;
private static final String SUCCESS_MSG = "一切正常";
public ResponseJson() {
super();
}
public ResponseJson(int code) {
super();
setStatus(code);
}
public ResponseJson(HttpStatus status) {
super();
setStatus(status.value());
setMsg(status.getReasonPhrase());
}
public ResponseJson success() {
put("msg", SUCCESS_MSG);
put("status", SUCCESS_STATUS);
return this;
}
public ResponseJson success(String msg) {
put("msg", msg);
put("status", SUCCESS_STATUS);
return this;
}
public ResponseJson error(String msg) {
put("msg", msg);
put("status", ERROR_STATUS);
return this;
}
public ResponseJson setData(String key, Object obj) {
@SuppressWarnings("unchecked")
HashMap<String, Object> data = (HashMap<String, Object>) get("data");
if (data == null) {
data = new HashMap<String, Object>();
put("data", data);
}
data.put(key, obj);
return this;
}
public ResponseJson setStatus(int status) {
put("status", status);
return this;
}
public ResponseJson setMsg(String msg) {
put("msg", msg);
return this;
}
public ResponseJson setValue(String key, Object val) {
put(key, val);
return this;
}
/**
* JSON
*/
@Override
public String toString() {
return JSONObject.toJSONString(this);
}
}

View File

@ -0,0 +1,87 @@
package com.zhangmeng.tools.netty;
import com.zhangmeng.tools.controller.NettyClientController;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
/**
* @author :
* @version : 1.0
* @date : 2023-02-27 09:20
*/
@Slf4j
public class WebsocketClientHandler extends ChannelInboundHandlerAdapter {
private WebSocketClientHandshaker handshaker;
private ChannelPromise channelPromise;
public WebsocketClientHandler(WebSocketClientHandshaker handshaker) {
this.handshaker = handshaker;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object obj) {
if (!this.handshaker.isHandshakeComplete()) { // 如果三报文握手的流程还没有走完
// 如果现在进行的是三报文握手中的第三握手
finishHandshake(ctx, (FullHttpResponse) obj);
} else if (obj instanceof FullHttpResponse response) {
var msg = String.format("Unexpected FullHttpResponse, status=%s, content=%s", response.status(), response.content().toString(CharsetUtil.UTF_8));
System.out.println(msg); // FIXME日志
} else if (obj instanceof WebSocketFrame) {
// TODO如果需要服务器反馈信息可在此添加业务
handleWebSocketResponse(ctx, (WebSocketFrame) obj);
} // 此分支不应该发生
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
this.channelPromise = ctx.newPromise();
}
public ChannelFuture sync() throws InterruptedException {
return this.channelPromise.sync();
}
private void finishHandshake(ChannelHandlerContext ctx, FullHttpResponse response) {
try {
this.handshaker.finishHandshake(ctx.channel(), response);
//设置成功
this.channelPromise.setSuccess();
} catch (WebSocketHandshakeException exception) {
FullHttpResponse rsp = response;
String errorMsg = String.format("WebSocket Client failed to connect, status=%s, reason=%s", rsp.status(), rsp.content().toString(CharsetUtil.UTF_8));
this.channelPromise.setFailure(new Exception(errorMsg)); // TODO日志
}
}
private void handleWebSocketResponse(ChannelHandlerContext ctx, WebSocketFrame frame) {
// 关闭请求
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
// ping请求
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
String text = ((TextWebSocketFrame) frame).text();
log.info("text:{}", text);
NettyClientController.add_message(text);
}
}

View File

@ -0,0 +1,100 @@
package com.zhangmeng.tools.utils;
import javafx.concurrent.Task;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @author :
* @version : 1.0
* @date : 2023-02-27 09:57
*/
public class ExecutorUtils {
/**
* 使 volatile 使
*/
/**
* executor null 使 executor.shutdown();
*
* @since 2021-9-28
*/
private static volatile ExecutorService executor;
private static final Object EXECUTOR_LOCK = new Object();
/**
* tasks null task使 task.cancel();
*
* @since 2021-9-28
*/
private static volatile List<Task<Object>> tasks;
private static final Object TASKS_LOCK = new Object();
/**
*
*
* 使
*
* @since 2021-9-28
*/
public static ExecutorService getExecutor() {
// 第一重判断
if (ExecutorUtils.executor == null) {
// 上锁
synchronized (EXECUTOR_LOCK) {
// 第二重判断
if (ExecutorUtils.executor == null) {
// 设置初始线程个数,大致为 6
ExecutorUtils.executor = Executors.newFixedThreadPool(6);
}
}
}
return ExecutorUtils.executor;
}
/**
*
*
* @since 2021-9-28
*/
public static List<Task<Object>> getTasks() {
// 第一重判断
if (ExecutorUtils.tasks == null) {
// 上锁
synchronized (TASKS_LOCK) {
// 第二重判断
if (ExecutorUtils.tasks == null) {
ExecutorUtils.tasks = new ArrayList<>();
}
}
}
return ExecutorUtils.tasks;
}
/**
*
*
*
* 线
*
* @since 2021-9-28
*/
public static void multithreadingClosed() {
if (ExecutorUtils.tasks != null) {
for (var task : ExecutorUtils.tasks) {
if (!task.isDone()) {
task.cancel();
}
}
}
if (ExecutorUtils.executor != null) {
ExecutorUtils.executor.shutdown();
}
}
}

View File

@ -0,0 +1,58 @@
package com.zhangmeng.tools.utils;
import java.util.concurrent.Future;
import com.zhangmeng.tools.netty.Function;
import javafx.concurrent.Task;
/**
* @author :
* @version : 1.0
* @date : 2023-02-27 10:00
*/
public class Multithreading {
/**
*
*
* @since 2021-10-3
* @lastModified 2021-10-10
*/
public static void execute(Function function) {
/**
* 线
*/
Task<Object> task = new Task<>() {
@Override
protected Integer call() {
function.run();
return null; // 因为此处不需要结果反馈,所以返回 null
}
};
ExecutorUtils.getTasks().add(task);
ExecutorUtils.getExecutor().execute(task);
}
/**
*
*
* @since 2021-10-10
*/
public static Future<?> submit(Function function) {
/**
* 线
*/
Task<Object> task = new Task<>() {
@Override
protected Object call() {
function.run();
return null; // 因为此处暂时没定好应该反馈什么东西,所以返回 null
}
};
ExecutorUtils.getTasks().add(task);
return ExecutorUtils.getExecutor().submit(task);
}
}

View File

@ -21,9 +21,10 @@
<Text layoutX="588.0" layoutY="107.0" strokeType="OUTSIDE" strokeWidth="0.0" text="(value)" />
<Label layoutX="798.0" layoutY="94.0" text="参数列表:" />
<ListView fx:id="param_list" layoutX="888.0" layoutY="44.0" prefHeight="200.0" prefWidth="241.0" AnchorPane.leftAnchor="888.0" AnchorPane.rightAnchor="71.0" AnchorPane.topAnchor="45.0" />
<TextArea fx:id="result_show" layoutX="98.0" layoutY="260.0" prefHeight="200.0" prefWidth="1031.0" AnchorPane.bottomAnchor="189.0" AnchorPane.leftAnchor="98.0" AnchorPane.rightAnchor="71.0" AnchorPane.topAnchor="260.0" />
<Button fx:id="send" layoutX="568.0" layoutY="510.0" mnemonicParsing="false" text="发送" AnchorPane.bottomAnchor="114.0" />
<TextArea fx:id="send_msg" layoutX="98.0" layoutY="255.0" prefHeight="114.0" prefWidth="1031.0" AnchorPane.bottomAnchor="280.0" AnchorPane.leftAnchor="98.0" AnchorPane.rightAnchor="71.0" AnchorPane.topAnchor="255.0" />
<Button fx:id="send" layoutX="653.0" layoutY="379.0" mnemonicParsing="false" text="发送" AnchorPane.bottomAnchor="245.0" AnchorPane.leftAnchor="653.0" />
<Button fx:id="add" layoutX="653.0" layoutY="90.0" mnemonicParsing="false" text="添加" />
<Button fx:id="connect" layoutX="422.0" layoutY="173.0" mnemonicParsing="false" text="连接" />
<TextArea fx:id="result_show" layoutX="98.0" layoutY="410.0" prefHeight="200.0" prefWidth="1031.0" AnchorPane.bottomAnchor="39.0" AnchorPane.leftAnchor="98.0" AnchorPane.rightAnchor="71.0" />
</children>
</AnchorPane>