From ccd247cff22a9e6e15e90c23780ffcceba8d7779 Mon Sep 17 00:00:00 2001 From: wangw <1594593906@qq.com> Date: Fri, 5 Jul 2024 14:16:46 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=8B=E5=8D=95=20=E6=89=93=E7=A5=A8?= =?UTF-8?q?=E9=87=8D=E8=AF=95=E6=9C=BA=E5=88=B6=204=E7=A7=92=E9=87=8D?= =?UTF-8?q?=E8=AF=95=205=E6=AC=A1=205=E6=AC=A1=E5=90=8E=E4=B8=A2=E5=A4=B1?= =?UTF-8?q?=20=E4=B8=8B=E5=8D=95=20order=E6=95=B0=E6=8D=AE=20=E4=B8=8D?= =?UTF-8?q?=E5=8F=91=E7=BB=99=E7=94=9F=E6=88=90=E8=AE=A2=E5=8D=95=E7=9A=84?= =?UTF-8?q?=E7=94=A8=E6=88=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 3 +- .../netty/PushToAppChannelHandlerAdapter.java | 28 +++ .../PushToClientChannelHandlerAdapter.java | 204 +++++++++++------- .../cashierservice/service/CartService.java | 3 +- .../cashierservice/service/PayService.java | 2 +- 5 files changed, 161 insertions(+), 79 deletions(-) diff --git a/.gitignore b/.gitignore index 7425e2a..a637660 100644 --- a/.gitignore +++ b/.gitignore @@ -8,4 +8,5 @@ /.gradle/ /application.pid /target/* -**.jar \ No newline at end of file +**.jar +**.log \ No newline at end of file diff --git a/src/main/java/com/chaozhanggui/system/cashierservice/netty/PushToAppChannelHandlerAdapter.java b/src/main/java/com/chaozhanggui/system/cashierservice/netty/PushToAppChannelHandlerAdapter.java index e3ccf17..339735f 100644 --- a/src/main/java/com/chaozhanggui/system/cashierservice/netty/PushToAppChannelHandlerAdapter.java +++ b/src/main/java/com/chaozhanggui/system/cashierservice/netty/PushToAppChannelHandlerAdapter.java @@ -197,6 +197,34 @@ public class PushToAppChannelHandlerAdapter extends NettyChannelHandlerAdapter { sendMessage(ctx, str); } + + /** + * 发送给当前用户以外的用户 + */ + @Async + public void AppSendInfo(String message, String tableId, String userId) { + log.info("netty连接 发送消息 除当前用户 tableId:{} userId:{}", tableId, userId); + if (webSocketMap.containsKey(tableId)) { + ConcurrentHashMap webSockets = webSocketMap.get(tableId); + if (!webSockets.isEmpty()) { + for (String user : webSockets.keySet()) { + if (StringUtils.isNotBlank(userId) && userId.equals(user)) { + continue; + } + ChannelHandlerContext ctx = webSockets.get(user); + if (ctx != null) { + sendMesToApp(message, ctx); + } else { + log.info("netty连接 发送消息 除当前用户 userId:{} 失败", user); + } + } + } else { + log.info("netty连接 发送消息 除当前用户 tableId:{} 失败", tableId); + } + } + + } + @Async public void AppSendInfo(String message, String tableId,String userId, boolean userFlag) { log.info("netty连接 发送消息 tableId:{} userId:{} userFlag:{} message:{}",tableId,userId,userFlag, JSONUtil.toJSONString(message)); diff --git a/src/main/java/com/chaozhanggui/system/cashierservice/netty/PushToClientChannelHandlerAdapter.java b/src/main/java/com/chaozhanggui/system/cashierservice/netty/PushToClientChannelHandlerAdapter.java index ff709fc..f8a7825 100644 --- a/src/main/java/com/chaozhanggui/system/cashierservice/netty/PushToClientChannelHandlerAdapter.java +++ b/src/main/java/com/chaozhanggui/system/cashierservice/netty/PushToClientChannelHandlerAdapter.java @@ -2,17 +2,20 @@ package com.chaozhanggui.system.cashierservice.netty; import com.alibaba.fastjson.JSONObject; import com.chaozhanggui.system.cashierservice.netty.config.NettyChannelHandlerAdapter; -import com.chaozhanggui.system.cashierservice.util.JSONUtil; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; /** @@ -32,7 +35,10 @@ public class PushToClientChannelHandlerAdapter extends NettyChannelHandlerAdapte * [ctx, shopId:clientId] */ private static Map clientIdMap = new ConcurrentHashMap<>(); - + private static Map> retryQueue = new HashMap<>(); + private static ConcurrentHashMap retryCounts = new ConcurrentHashMap<>(); + private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3); + int maxRetryAttempts = 5; private String clientId = ""; private String shopId = ""; @@ -51,8 +57,8 @@ public class PushToClientChannelHandlerAdapter extends NettyChannelHandlerAdapte } @Override - public void channelInactive(ChannelHandlerContext ctx) { - log.info("netty连接client 长连接关闭:{}, {}",clientId,shopId); + public void channelInactive(ChannelHandlerContext ctx) { + log.info("netty连接client 长连接关闭:{}, {}", clientId, shopId); ctx.close(); removeCtx(ctx); } @@ -68,9 +74,6 @@ public class PushToClientChannelHandlerAdapter extends NettyChannelHandlerAdapte ConcurrentHashMap tableMap = webSocketMap.get(split[0]); if (tableMap != null && !tableMap.isEmpty() && tableMap.size() > 0) { tableMap.remove(split[1]); - if (tableMap.isEmpty() || tableMap.size() == 0) { - webSocketMap.remove(split[0]); - } } } clientIdMap.remove(ctx); @@ -92,43 +95,49 @@ public class PushToClientChannelHandlerAdapter extends NettyChannelHandlerAdapte JSONObject jsonObject = new JSONObject(); if (StringUtils.isNotEmpty(msg)) { jsonObject = JSONObject.parseObject(msg); - }else { - log.info("netty连接client 接收到空数据:{}",msg); + } else { + log.info("netty连接client 接收到空数据:{}", msg); } String type = jsonObject.getString("type"); - if(type.equals("heartbeat")){//心跳 - log.info("netty连接client 接收到心跳数据:shop:{} clientId:{} meg:{}",shopId,clientId,msg); - }else { + if (type.equals("heartbeat")) {//心跳 + log.info("netty连接client 接收到心跳数据:shop:{} clientId:{} meg:{}", shopId, clientId, msg); + } else { if (type.equals("connect")) { String clientId = jsonObject.getString("clientId"); String shopId = jsonObject.getString("shopId"); - if (StringUtils.isBlank(type) || StringUtils.isBlank(shopId) || StringUtils.isBlank(clientId)) { - log.info("netty连接client 建立连接请求失败:{}",jsonObject); + if (StringUtils.isBlank(type) || StringUtils.isBlank(shopId) || StringUtils.isBlank(clientId)) { + log.info("netty连接client 建立连接请求失败:{}", jsonObject); channelInactive(ctx); return; } - - log.info("netty连接client 接收到数据 建立连接参数 param:{}",jsonObject); - this.clientId=clientId; - this.shopId=shopId; - if (webSocketMap.containsKey(shopId)) { - ConcurrentHashMap clientSocketMap = webSocketMap.get(shopId); - ChannelHandlerContext channelHandlerContext = clientSocketMap.get(clientId); - if (channelHandlerContext != null) { - channelHandlerContext.close(); - } - clientSocketMap.put(clientId, ctx); - } else { - ConcurrentHashMap clientSocketMap = new ConcurrentHashMap<>(); - clientSocketMap.put(clientId, ctx); - webSocketMap.put(shopId,clientSocketMap); - } + log.info("netty连接client 接收到数据 建立连接参数 param:{}", jsonObject); + this.clientId = clientId; + this.shopId = shopId; + webSocketMap.computeIfAbsent(shopId, k -> new ConcurrentHashMap<>()).put(clientId, ctx); clientIdMap.put(ctx, shopId + ":" + clientId); JSONObject jsonObject1 = new JSONObject(); jsonObject1.put("status", "success"); jsonObject1.put("msg", "连接成功"); jsonObject1.put("type", "connect"); sendMesToApp(jsonObject1.toString(), ctx); + }else if(type.equals("send")){ + String orderNo = jsonObject.getString("orderNo"); +// log.info("netty连接client 接收到send 数据 param:{} orderNo:{} shopId:{}", jsonObject,orderNo,shopId); + if (retryQueue.containsKey(shopId)) { + // 获取内层Map + Queue queue = retryQueue.get(shopId); + if (queue != null) { + Iterator iterator = queue.iterator(); + while (iterator.hasNext()) { + JSONObject customObject = iterator.next(); +// log.info("netty连接client 接收到send 数据 retryQueue数据:{}", customObject); + if (customObject.getJSONObject("orderInfo").getString("orderNo").equals(orderNo)) { + iterator.remove(); + retryCounts.remove(orderNo); + } + } + } + } } } //业务逻辑代码处理框架。。。 @@ -139,53 +148,96 @@ public class PushToClientChannelHandlerAdapter extends NettyChannelHandlerAdapte sendMessage(ctx, str); } - /** - * @param message 发送的消息内容 - * @param shopId 店铺Id - * @param clientId 客户端Id - * @param userFlag - * 为true 单发给clientId - * 为false 群发 shopId为空 发给所有人 - */ @Async - public void AppSendInfo(String message, String shopId,String clientId, boolean userFlag) { - log.info("netty连接client 发送消息 shopId:{} clientId:{} userFlag:{} message:{}",shopId,clientId,userFlag, JSONUtil.toJSONString(message)); - if (userFlag) { - if (webSocketMap.containsKey(shopId)) { - ConcurrentHashMap webSockets = webSocketMap.get(shopId); - if(!webSockets.isEmpty()){ - if (StringUtils.isNotBlank(clientId)) { - ChannelHandlerContext ctx = webSockets.get(clientId); - if (ctx != null) { - sendMesToApp(message,ctx); - } - } + public void sendMesToApp(String shopId,String orderNo,JSONObject str, ChannelHandlerContext ctx) { + ctx.channel().writeAndFlush(new TextWebSocketFrame(str.toString())).addListener((GenericFutureListener>) future -> { + if (!future.isSuccess()) { + if (shopId != null) { + retryQueue.computeIfAbsent(shopId, k -> new ConcurrentLinkedQueue<>()).offer(str); + scheduleRetry(shopId,orderNo); } } - } else { - if (StringUtils.isEmpty(shopId)) { - // 向所有用户发送信息 - for (ConcurrentHashMap value : webSocketMap.values()) { - for (ChannelHandlerContext ctx : value.values()) { - sendMesToApp(message,ctx); - } - } - } else if (webSocketMap.containsKey(shopId)) { - ConcurrentHashMap webSockets = webSocketMap.get(shopId); - if(!webSockets.isEmpty()) { - for (String user : webSockets.keySet()) { - ChannelHandlerContext ctx = webSockets.get(user); - if (ctx != null) { - log.info("netty连接client 发送消息 桌码群发 clientId:{}",user); - sendMesToApp(message,ctx); - }else { - log.info("netty连接client 发送消息 桌码群发 clientId:{} 失败",user); - } - } - }else { - log.info("netty连接client 发送消息 桌码群发 clientId:{} 失败",clientId); - } + }); + } + + /** + * @param message 发送的消息内容 + * @param shopId 店铺Id + // * @param clientId 客户端Id + // * @param userFlag 为true 单发给clientId + * 为false 群发 shopId为空 发给所有人 + */ +// @Async +// public void AppSendInfo(String message, String shopId,String clientId, boolean userFlag) { +// log.info("netty连接client 发送消息 shopId:{} clientId:{} userFlag:{} message:{}",shopId,clientId,userFlag, JSONUtil.toJSONString(message)); +// if (userFlag) { +// if (webSocketMap.containsKey(shopId)) { +// ConcurrentHashMap webSockets = webSocketMap.get(shopId); +// if(!webSockets.isEmpty()){ +// if (StringUtils.isNotBlank(clientId)) { +// ChannelHandlerContext ctx = webSockets.get(clientId); +// if (ctx != null) { +// sendMesToApp(message,ctx); +// } +// } +// } +// } +// } else { +// if (StringUtils.isEmpty(shopId)) { +// // 向所有用户发送信息 +// for (ConcurrentHashMap value : webSocketMap.values()) { +// for (ChannelHandlerContext ctx : value.values()) { +// sendMesToApp(message,ctx); +// } +// } +// } else if (webSocketMap.containsKey(shopId)) { +// ConcurrentHashMap webSockets = webSocketMap.get(shopId); +// if((!webSockets.isEmpty() && !webSockets.keySet().isEmpty())) { +// for (String user : webSockets.keySet()) { +// ChannelHandlerContext ctx = webSockets.get(user); +// if (ctx != null) { +// log.info("netty连接client 发送消息 桌码群发 clientId:{}",user); +// sendMesToApp(message,ctx); +// }else { +// log.info("netty连接client 发送消息 桌码群发 clientId:{} 失败",user); +// } +// } +// }else { +// log.info("netty连接client 发送消息 桌码群发 clientId:{} 失败",clientId); +// } +// } +// } +// } + public void AppSendInfoV1(String shopId, String orderNo, JSONObject message) { + log.info("netty连接client 发送消息 shopId:{} clientId:{} userFlag:{} message:{}", shopId, message.get("orderInfo")); + retryQueue.computeIfAbsent(shopId, k -> new ConcurrentLinkedQueue<>()).offer(message); + scheduleRetry(shopId, orderNo); + ConcurrentHashMap webSockets = webSocketMap.get(shopId); + if (webSockets != null) { + for (ChannelHandlerContext ctx : webSockets.values()) { + sendMesToApp(shopId, orderNo, message, ctx); + log.info("netty连接client 向:{}发送消息", shopId); } } } + + // 定时重试方法 + private void scheduleRetry(String shopId,String orderNo) { +// log.info("定时重发"); + scheduler.schedule(() -> { + Queue shopRetryQueue = retryQueue.get(shopId); + if (shopRetryQueue != null) { + JSONObject message = shopRetryQueue.poll(); + if (message != null) { + AtomicInteger retryCount = retryCounts.computeIfAbsent(orderNo, k -> new AtomicInteger(0)); + int currentRetryCount = retryCount.incrementAndGet(); + if (currentRetryCount <= maxRetryAttempts) { + AppSendInfoV1(shopId,orderNo,message); + } else { + log.error("重试次数超过最大限制,放弃重试。shopId: {}, orderNo: {}", shopId, orderNo); + } + } + } + }, 4, TimeUnit.SECONDS); + } } \ No newline at end of file diff --git a/src/main/java/com/chaozhanggui/system/cashierservice/service/CartService.java b/src/main/java/com/chaozhanggui/system/cashierservice/service/CartService.java index 6a73a99..c7ee8ea 100644 --- a/src/main/java/com/chaozhanggui/system/cashierservice/service/CartService.java +++ b/src/main/java/com/chaozhanggui/system/cashierservice/service/CartService.java @@ -796,7 +796,8 @@ public class CartService { jsonObject12.put("amount", BigDecimal.ZERO); jsonObject12.put("data", new JSONArray()); - PushToAppChannelHandlerAdapter.getInstance().AppSendInfo(jsonObject12.toString(), jsonObject.getString("tableId").concat("-").concat(shopId), "", false); +// PushToAppChannelHandlerAdapter.getInstance().AppSendInfo(jsonObject12.toString(), jsonObject.getString("tableId").concat("-").concat(shopId), "", false); + PushToAppChannelHandlerAdapter.getInstance().AppSendInfo(jsonObject12.toString(), jsonObject.getString("tableId").concat("-").concat(shopId), jsonObject.getString("userId")); redisUtil.saveMessage(RedisCst.ORDER_EXPIRED.concat(orderId.toString()), orderId.toString(), 60 * 16L); diff --git a/src/main/java/com/chaozhanggui/system/cashierservice/service/PayService.java b/src/main/java/com/chaozhanggui/system/cashierservice/service/PayService.java index 585aecc..e2d76ed 100644 --- a/src/main/java/com/chaozhanggui/system/cashierservice/service/PayService.java +++ b/src/main/java/com/chaozhanggui/system/cashierservice/service/PayService.java @@ -1208,7 +1208,7 @@ public class PayService { // e.printStackTrace(); // } // } - PushToClientChannelHandlerAdapter.getInstance().AppSendInfo(client.toString(), orderInfo.getShopId(), "", false); + PushToClientChannelHandlerAdapter.getInstance().AppSendInfoV1(orderInfo.getShopId(),orderInfo.getOrderNo(), client); }