client 标签打印 长链接

会员绑定
This commit is contained in:
2024-06-21 09:18:56 +08:00
parent 4d7e6c3650
commit 4a4c5623aa
15 changed files with 493 additions and 21 deletions

View File

@@ -0,0 +1,35 @@
package com.chaozhanggui.system.cashierservice.netty;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Slf4j
public class ConnectionDebouncerHandler extends ChannelDuplexHandler {
private static final Map<Channel, Long> lastConnectionTimes = new ConcurrentHashMap<>();
private static final long debounceIntervalMillis = 5*1000; // 防抖时间间隔,单位:毫秒
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
long currentTimeMillis = System.currentTimeMillis();
Long lastConnectionTime = lastConnectionTimes.get(channel);
if (lastConnectionTime == null || (currentTimeMillis - lastConnectionTime) > debounceIntervalMillis) {
// 允许新连接
lastConnectionTimes.put(channel, currentTimeMillis);
super.channelActive(ctx); // 将事件传递给下一个处理器
} else {
channel.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}

View File

@@ -28,7 +28,7 @@ public class HeartbeatHandler extends ChannelDuplexHandler {
super.userEventTriggered(ctx, evt);
} else if (event.state() == IdleState.WRITER_IDLE) {
// log.info("发送心跳");
ctx.channel().writeAndFlush(new TextWebSocketFrame("Heartbeat")).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
ctx.channel().writeAndFlush(new TextWebSocketFrame("{\"type\":\"heartbeat\"}")).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
// ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
} else {

View File

@@ -11,14 +11,10 @@ import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.io.ClassPathResource;
import org.springframework.stereotype.Component;
import javax.net.ssl.SSLException;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.concurrent.TimeUnit;
@Component
@@ -35,6 +31,8 @@ public class PushToAppChannelInitializer extends ChannelInitializer<NioSocketCha
pipeline.addLast(new IdleStateHandler(30, 10, 120, TimeUnit.SECONDS));
pipeline.addLast(new HeartbeatHandler());
// 添加连接防抖处理器(没用)
// pipeline.addLast(new ConnectionDebouncerHandler());
//本地试调时使用
// SslContext sslContext = SslContextBuilder.forServer(loadResourceAsByteArrayInputStream("\\pem\\fullchain.pem"), loadResourceAsByteArrayInputStream("\\pem\\privkey.key")).build();
// pipeline.addLast(sslContext.newHandler(ch.alloc()));

View File

@@ -0,0 +1,187 @@
package com.chaozhanggui.system.cashierservice.netty;
import com.alibaba.fastjson.JSONObject;
import com.chaozhanggui.system.cashierservice.netty.config.NettyChannelHandlerAdapter;
import com.chaozhanggui.system.cashierservice.util.JSONUtil;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 客户端
*/
@Slf4j
@Sharable
@Component
public class PushToClientChannelHandlerAdapter extends NettyChannelHandlerAdapter {
/**
* [shopId, [clientId, ctx]]
*/
private static Map<String, ConcurrentHashMap<String, ChannelHandlerContext>> webSocketMap = new HashMap<>();
/**
* [ctx, shopId:clientId]
*/
private static Map<ChannelHandlerContext, String> clientIdMap = new ConcurrentHashMap<>();
private String clientId = "";
private String shopId = "";
public PushToClientChannelHandlerAdapter() {
}
public static PushToClientChannelHandlerAdapter getInstance() {
return new PushToClientChannelHandlerAdapter();
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("netty连接client 长连接激活");
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) {
log.info("netty连接client 长连接关闭:{}, {}",clientId,shopId);
ctx.close();
removeCtx(ctx);
}
/**
* 移除ctx
*/
private void removeCtx(ChannelHandlerContext ctx) {
// shopId:clientId
String key = clientIdMap.get(ctx);
if (StringUtils.isNotBlank(key)) {
String[] split = key.split(":");
ConcurrentHashMap<String, ChannelHandlerContext> tableMap = webSocketMap.get(split[0]);
if (tableMap != null && !tableMap.isEmpty() && tableMap.size() > 0) {
tableMap.remove(split[1]);
if (tableMap.isEmpty() || tableMap.size() == 0) {
webSocketMap.remove(split[0]);
}
}
}
clientIdMap.remove(ctx);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
super.channelReadComplete(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
super.exceptionCaught(ctx, cause);
removeCtx(ctx);
}
@Override
public void channelRead(ChannelHandlerContext ctx, String msg) {
JSONObject jsonObject = new JSONObject();
if (StringUtils.isNotEmpty(msg)) {
jsonObject = JSONObject.parseObject(msg);
}else {
log.info("netty连接client 接收到空数据:{}",msg);
}
String type = jsonObject.getString("type");
if(type.equals("heartbeat")){//心跳
log.info("netty连接client 接收到心跳数据shop:{} clientId:{} meg:{}",shopId,clientId,msg);
}else {
if (type.equals("connect")) {
String clientId = jsonObject.getString("clientId");
String shopId = jsonObject.getString("shopId");
if (StringUtils.isBlank(type) || StringUtils.isBlank(shopId) || StringUtils.isBlank(clientId)) {
log.info("netty连接client 建立连接请求失败:{}",jsonObject);
channelInactive(ctx);
return;
}
log.info("netty连接client 接收到数据 建立连接参数 param:{}",jsonObject);
this.clientId=clientId;
this.shopId=shopId;
if (webSocketMap.containsKey(shopId)) {
ConcurrentHashMap<String, ChannelHandlerContext> clientSocketMap = webSocketMap.get(shopId);
clientSocketMap.put(clientId, ctx);
} else {
ConcurrentHashMap<String, ChannelHandlerContext> clientSocketMap = new ConcurrentHashMap<>();
clientSocketMap.put(clientId, ctx);
webSocketMap.put(shopId,clientSocketMap);
}
clientIdMap.put(ctx, shopId + ":" + clientId);
JSONObject jsonObject1 = new JSONObject();
jsonObject1.put("status", "success");
jsonObject1.put("msg", "连接成功");
jsonObject1.put("type", "connect");
sendMesToApp(jsonObject1.toString(), ctx);
}
}
//业务逻辑代码处理框架。。。
ctx.flush();
}
public void sendMesToApp(String str, ChannelHandlerContext ctx) {
sendMessage(ctx, str);
}
/**
* @param message 发送的消息内容
* @param shopId 店铺Id
* @param clientId 客户端Id
* @param userFlag
* 为true 单发给clientId
* 为false 群发 shopId为空 发给所有人
*/
@Async
public void AppSendInfo(String message, String shopId,String clientId, boolean userFlag) {
log.info("netty连接client 发送消息 shopId:{} clientId:{} userFlag:{} message:{}",shopId,clientId,userFlag, JSONUtil.toJSONString(message));
if (userFlag) {
if (webSocketMap.containsKey(shopId)) {
ConcurrentHashMap<String, ChannelHandlerContext> webSockets = webSocketMap.get(shopId);
if(!webSockets.isEmpty()){
if (StringUtils.isNotBlank(clientId)) {
ChannelHandlerContext ctx = webSockets.get(clientId);
if (ctx != null) {
sendMesToApp(message,ctx);
}
}
}
}
} else {
if (StringUtils.isEmpty(shopId)) {
// 向所有用户发送信息
for (ConcurrentHashMap<String, ChannelHandlerContext> value : webSocketMap.values()) {
for (ChannelHandlerContext ctx : value.values()) {
sendMesToApp(message,ctx);
}
}
} else if (webSocketMap.containsKey(shopId)) {
ConcurrentHashMap<String, ChannelHandlerContext> webSockets = webSocketMap.get(shopId);
if(!webSockets.isEmpty()) {
for (String user : webSockets.keySet()) {
ChannelHandlerContext ctx = webSockets.get(user);
if (ctx != null) {
log.info("netty连接client 发送消息 桌码群发 clientId:{}",user);
sendMesToApp(message,ctx);
}else {
log.info("netty连接client 发送消息 桌码群发 clientId:{} 失败",user);
}
}
}else {
log.info("netty连接client 发送消息 桌码群发 clientId:{} 失败",clientId);
}
}
}
}
}

View File

@@ -0,0 +1,52 @@
package com.chaozhanggui.system.cashierservice.netty;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.net.ssl.SSLException;
import java.io.InputStream;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class PushToClientChannelInitializer extends ChannelInitializer<NioSocketChannel> {
@Override
protected void initChannel(NioSocketChannel ch) throws SSLException {
ChannelPipeline pipeline = ch.pipeline();
// 添加心跳处理器 多久没收到消息 断开 心跳时间(秒) 读写空闲时间(秒)
pipeline.addLast(new IdleStateHandler(30, 10, 120, TimeUnit.SECONDS));
pipeline.addLast(new HeartbeatHandler());
// 添加连接防抖处理器(没用)
// pipeline.addLast(new ConnectionDebouncerHandler());
//本地试调时使用 wss
// SslContext sslContext = SslContextBuilder.forServer(loadResourceAsByteArrayInputStream("\\pem\\fullchain.pem"), loadResourceAsByteArrayInputStream("\\pem\\privkey.key")).build();
// pipeline.addLast(sslContext.newHandler(ch.alloc()));
// 添加HttpServerCodec用于处理HTTP编解码
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new ChunkedWriteHandler());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new WebSocketServerProtocolHandler("/client"));
ch.pipeline().addLast(new PushToClientChannelHandlerAdapter());
}
public static InputStream loadResourceAsByteArrayInputStream(String path) {
InputStream inputStream = PushToClientChannelInitializer.class.getClassLoader().getResourceAsStream(path);
return inputStream;
}
}

View File

@@ -1,6 +1,7 @@
package com.chaozhanggui.system.cashierservice.netty.config;
import com.chaozhanggui.system.cashierservice.netty.PushToAppChannelInitializer;
import com.chaozhanggui.system.cashierservice.netty.PushToClientChannelInitializer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@@ -18,6 +19,9 @@ public class NettyConfig {
@Value("${netty.server.port}")
private int port;
@Value("${netty.server.client-port}")
private int clientPort;
/**
* 接收者的线程数
*/
@@ -31,8 +35,11 @@ public class NettyConfig {
@Resource
private PushToAppChannelInitializer pushToAppChannelInitializer;
@Resource
private PushToClientChannelInitializer pushToClientChannelInitializer;
@PostConstruct
public void startCameraNetty() {
NettyUtils.getInstance().initNetty(port, parentGroupThreads, childGroupThreads, pushToAppChannelInitializer);
NettyUtils.getInstance().initNetty(clientPort, parentGroupThreads, childGroupThreads, pushToClientChannelInitializer);
}
}