@@ -2,17 +2,20 @@ package com.chaozhanggui.system.cashierservice.netty;
import com.alibaba.fastjson.JSONObject ;
import com.chaozhanggui.system.cashierservice.netty.config.NettyChannelHandlerAdapter ;
import com.chaozhanggui.system.cashierservice.util.JSONUtil ;
import io.netty.channel.ChannelHandler.Sharable ;
import io.netty.channel.ChannelHandlerContext ;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame ;
import lombok.extern.slf4j.Slf4j ;
import org.apache.commons.lang3.StringUtils ;
import org.springframework.scheduling.annotation.Async ;
import org.springframework.stereotype.Component ;
import java.util.HashMap ;
import java.util.Map ;
import java.util.concurrent.ConcurrentHashMap ;
import java.util.* ;
import java.util.concurrent.* ;
import java.util.concurrent.atomic.AtomicInteger ;
import io.netty.util.concurrent.Future ;
import io.netty.util.concurrent.GenericFutureListener ;
/**
@@ -32,7 +35,10 @@ public class PushToClientChannelHandlerAdapter extends NettyChannelHandlerAdapte
* [ctx, shopId:clientId]
*/
private static Map < ChannelHandlerContext , String > clientIdMap = new ConcurrentHashMap < > ( ) ;
private static Map < String , Queue < JSONObject > > retryQueue = new HashMap < > ( ) ;
private static ConcurrentHashMap < String , AtomicInteger > retryCounts = new ConcurrentHashMap < > ( ) ;
private ScheduledExecutorService scheduler = Executors . newScheduledThreadPool ( 3 ) ;
int maxRetryAttempts = 5 ;
private String clientId = " " ;
private String shopId = " " ;
@@ -51,8 +57,8 @@ public class PushToClientChannelHandlerAdapter extends NettyChannelHandlerAdapte
}
@Override
public void channelInactive ( ChannelHandlerContext ctx ) {
log . info ( " netty连接client 长连接关闭:{}, {} " , clientId , shopId ) ;
public void channelInactive ( ChannelHandlerContext ctx ) {
log . info ( " netty连接client 长连接关闭:{}, {} " , clientId , shopId ) ;
ctx . close ( ) ;
removeCtx ( ctx ) ;
}
@@ -68,9 +74,6 @@ public class PushToClientChannelHandlerAdapter extends NettyChannelHandlerAdapte
ConcurrentHashMap < String , ChannelHandlerContext > tableMap = webSocketMap . get ( split [ 0 ] ) ;
if ( tableMap ! = null & & ! tableMap . isEmpty ( ) & & tableMap . size ( ) > 0 ) {
tableMap . remove ( split [ 1 ] ) ;
if ( tableMap . isEmpty ( ) | | tableMap . size ( ) = = 0 ) {
webSocketMap . remove ( split [ 0 ] ) ;
}
}
}
clientIdMap . remove ( ctx ) ;
@@ -92,43 +95,49 @@ public class PushToClientChannelHandlerAdapter extends NettyChannelHandlerAdapte
JSONObject jsonObject = new JSONObject ( ) ;
if ( StringUtils . isNotEmpty ( msg ) ) {
jsonObject = JSONObject . parseObject ( msg ) ;
} else {
log . info ( " netty连接client 接收到空数据:{} " , msg ) ;
} else {
log . info ( " netty连接client 接收到空数据:{} " , msg ) ;
}
String type = jsonObject . getString ( " type " ) ;
if ( type . equals ( " heartbeat " ) ) { //心跳
log . info ( " netty连接client 接收到心跳数据: shop:{} clientId:{} meg:{} " , shopId , clientId , msg ) ;
} else {
if ( type . equals ( " heartbeat " ) ) { //心跳
log . info ( " netty连接client 接收到心跳数据: shop:{} clientId:{} meg:{} " , shopId , clientId , msg ) ;
} else {
if ( type . equals ( " connect " ) ) {
String clientId = jsonObject . getString ( " clientId " ) ;
String shopId = jsonObject . getString ( " shopId " ) ;
if ( StringUtils . isBlank ( type ) | | StringUtils . isBlank ( shopId ) | | StringUtils . isBlank ( clientId ) ) {
log . info ( " netty连接client 建立连接请求失败:{} " , jsonObject ) ;
if ( StringUtils . isBlank ( type ) | | StringUtils . isBlank ( shopId ) | | StringUtils . isBlank ( clientId ) ) {
log . info ( " netty连接client 建立连接请求失败:{} " , jsonObject ) ;
channelInactive ( ctx ) ;
return ;
}
log . info ( " netty连接client 接收到数据 建立连接参数 param:{} " , jsonObject ) ;
this . clientId = client Id;
this . shopId = shopId ;
if ( webSocketMap . containsKey ( shopId ) ) {
ConcurrentHashMap < String , ChannelHandlerContext > clientSocketMap = webSocketMap . get ( shopId ) ;
ChannelHandlerContext channelHandlerContext = clientSocketMap . get ( clientId ) ;
if ( channelHandlerContext ! = null ) {
channelHandlerContext . close ( ) ;
}
clientSocketMap . put ( clientId , ctx ) ;
} else {
ConcurrentHashMap < String , ChannelHandlerContext > clientSocketMap = new ConcurrentHashMap < > ( ) ;
clientSocketMap . put ( clientId , ctx ) ;
webSocketMap . put ( shopId , clientSocketMap ) ;
}
log . info ( " netty连接client 接收到数据 建立连接参数 param:{} " , jsonObject ) ;
this . clientId = clientId ;
this . shopId = shop Id;
webSocketMap . computeIfAbsent ( shopId , k - > new ConcurrentHashMap < > ( ) ) . put ( clientId , ctx ) ;
clientIdMap . put ( ctx , shopId + " : " + clientId ) ;
JSONObject jsonObject1 = new JSONObject ( ) ;
jsonObject1 . put ( " status " , " success " ) ;
jsonObject1 . put ( " msg " , " 连接成功 " ) ;
jsonObject1 . put ( " type " , " connect " ) ;
sendMesToApp ( jsonObject1 . toString ( ) , ctx ) ;
} else if ( type . equals ( " send " ) ) {
String orderNo = jsonObject . getString ( " orderNo " ) ;
// log.info("netty连接client 接收到send 数据 param:{} orderNo:{} shopId:{}", jsonObject,orderNo,shopId);
if ( retryQueue . containsKey ( shopId ) ) {
// 获取内层Map
Queue < JSONObject > queue = retryQueue . get ( shopId ) ;
if ( queue ! = null ) {
Iterator < JSONObject > iterator = queue . iterator ( ) ;
while ( iterator . hasNext ( ) ) {
JSONObject customObject = iterator . next ( ) ;
// log.info("netty连接client 接收到send 数据 retryQueue数据: {}", customObject);
if ( customObject . getJSONObject ( " orderInfo " ) . getString ( " orderNo " ) . equals ( orderNo ) ) {
iterator . remove ( ) ;
retryCounts . remove ( orderNo ) ;
}
}
}
}
}
}
//业务逻辑代码处理框架。。。
@@ -139,53 +148,96 @@ public class PushToClientChannelHandlerAdapter extends NettyChannelHandlerAdapte
sendMessage ( ctx , str ) ;
}
/**
* @param message 发送的消息内容
* @param shopId 店铺Id
* @param clientId 客户端Id
* @param userFlag
* 为true 单发给clientId
* 为false 群发 shopId为空 发给所有人
*/
@Async
public void AppSendInfo ( String message , String shopId , String clientId , boolean userFlag ) {
log . info ( " netty连接client 发送消息 shopId:{} clientId:{} userFlag:{} message:{} " , shopId , clientId , userFlag , JSONUtil . toJSONString ( message ) ) ;
if ( userFlag ) {
if ( webSocketMap . containsKey ( shopId ) ) {
ConcurrentHashMap < String , ChannelHandlerContext > webSockets = webSocketMap . get ( shopId ) ;
if ( ! webSockets . isEmpty ( ) ) {
if ( StringUtils . isNotBlank ( clientId ) ) {
ChannelHandlerContext ctx = webSockets . get ( clientId ) ;
if ( ctx ! = null ) {
sendMesToApp ( message , ctx ) ;
}
}
public void sendMesToApp ( String shopId , String orderNo , JSONObject str , ChannelHandlerContext ctx ) {
ctx . channel ( ) . writeAndFlush ( new TextWebSocketFrame ( str . toString ( ) ) ) . addListener ( ( GenericFutureListener < Future < ? super Void > > ) future - > {
if ( ! future . isSuccess ( ) ) {
if ( shopId ! = null ) {
retryQueue . computeIfAbsent ( shopId , k - > new ConcurrentLinkedQueue < > ( ) ) . offer ( str ) ;
scheduleRetry ( shopId , orderNo ) ;
}
}
} else {
if ( StringUtils . isEmpty ( shopId ) ) {
// 向所有用户发送信息
for ( ConcurrentHashMap < String , ChannelHandlerContext > value : webSocketMap . values ( ) ) {
for ( ChannelHandlerContext ctx : value . values ( ) ) {
sendMesToApp ( message , ctx ) ;
}
}
} else if ( webSocketMap . containsKey ( shopId ) ) {
ConcurrentHashMap < String , ChannelHandlerContext > webSockets = webSocketMap . get ( shopId ) ;
if ( ! webSockets . isEmpty ( ) ) {
for ( String user : webSockets . keySet ( ) ) {
ChannelHandlerContext ctx = webSockets . get ( user ) ;
if ( ctx ! = null ) {
log . info ( " netty连接client 发送消息 桌码群发 clientId:{} " , user ) ;
sendMesToApp ( message , ctx ) ;
} else {
log . info ( " netty连接client 发送消息 桌码群发 clientId:{} 失败 " , user ) ;
}
}
} else {
log . info ( " netty连接client 发送消息 桌码群发 clientId:{} 失败 " , clientId ) ;
}
} ) ;
}
/**
* @param message 发送的消息内容
* @param shopId 店铺Id
// * @param clientId 客户端Id
// * @param userFlag 为true 单发给clientId
* 为false 群发 shopId为空 发给所有人
*/
// @Async
// public void AppSendInfo(String message, String shopId,String clientId, boolean userFlag) {
// log.info("netty连接client 发送消息 shopId:{} clientId:{} userFlag:{} message:{}",shopId,clientId,userFlag, JSONUtil.toJSONString(message)) ;
// if (userFlag) {
// if (webSocketMap.containsKey(shopId)) {
// ConcurrentHashMap<String, ChannelHandlerContext> webSockets = webSocketMap.get(shopId) ;
// if(!webSockets.isEmpty()) {
// if (StringUtils.isNotBlank(clientId)) {
// ChannelHandlerContext ctx = webSockets.get(clientId);
// if (ctx != null) {
// sendMesToApp(message,ctx);
// }
// }
// }
// }
// } else {
// if (StringUtils.isEmpty(shopId)) {
// // 向所有用户发送信息
// for (ConcurrentHashMap<String, ChannelHandlerContext> value : webSocketMap.values()) {
// for (ChannelHandlerContext ctx : value.values()) {
// sendMesToApp(message,ctx);
// }
// }
// } else if (webSocketMap.containsKey(shopId)) {
// ConcurrentHashMap<String, ChannelHandlerContext> webSockets = webSocketMap.get(shopId);
// if((!webSockets.isEmpty() && !webSockets.keySet().isEmpty())) {
// for (String user : webSockets.keySet()) {
// ChannelHandlerContext ctx = webSockets.get(user);
// if (ctx != null) {
// log.info("netty连接client 发送消息 桌码群发 clientId:{}",user);
// sendMesToApp(message,ctx);
// }else {
// log.info("netty连接client 发送消息 桌码群发 clientId:{} 失败",user);
// }
// }
// }else {
// log.info("netty连接client 发送消息 桌码群发 clientId:{} 失败",clientId);
// }
// }
// }
// }
public void AppSendInfoV1 ( String shopId , String orderNo , JSONObject message ) {
log . info ( " netty连接client 发送消息 shopId:{} clientId:{} userFlag:{} message:{} " , shopId , message . get ( " orderInfo " ) ) ;
retryQueue . computeIfAbsent ( shopId , k - > new ConcurrentLinkedQueue < > ( ) ) . offer ( message ) ;
scheduleRetry ( shopId , orderNo ) ;
ConcurrentHashMap < String , ChannelHandlerContext > webSockets = webSocketMap . get ( shopId ) ;
if ( webSockets ! = null ) {
for ( ChannelHandlerContext ctx : webSockets . values ( ) ) {
sendMesToApp ( shopId , orderNo , message , ctx ) ;
log . info ( " netty连接client 向:{}发送消息 " , shopId ) ;
}
}
}
// 定时重试方法
private void scheduleRetry ( String shopId , String orderNo ) {
// log.info("定时重发");
scheduler . schedule ( ( ) - > {
Queue < JSONObject > shopRetryQueue = retryQueue . get ( shopId ) ;
if ( shopRetryQueue ! = null ) {
JSONObject message = shopRetryQueue . poll ( ) ;
if ( message ! = null ) {
AtomicInteger retryCount = retryCounts . computeIfAbsent ( orderNo , k - > new AtomicInteger ( 0 ) ) ;
int currentRetryCount = retryCount . incrementAndGet ( ) ;
if ( currentRetryCount < = maxRetryAttempts ) {
AppSendInfoV1 ( shopId , orderNo , message ) ;
} else {
log . error ( " 重试次数超过最大限制, 放弃重试。shopId: {}, orderNo: {} " , shopId , orderNo ) ;
}
}
}
} , 4 , TimeUnit . SECONDS ) ;
}
}