This commit is contained in:
韩鹏辉
2024-03-21 10:22:29 +08:00
parent 1c47f567d8
commit b77eacdccb
270 changed files with 32916 additions and 0 deletions

View File

@@ -0,0 +1,314 @@
package com.chaozhanggui.system.cashierservice.socket;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.chaozhanggui.system.cashierservice.config.WebSocketCustomEncoding;
import com.chaozhanggui.system.cashierservice.dao.TbShopTableMapper;
import com.chaozhanggui.system.cashierservice.entity.TbCashierCart;
import com.chaozhanggui.system.cashierservice.entity.TbShopTable;
import com.chaozhanggui.system.cashierservice.exception.MsgException;
import com.chaozhanggui.system.cashierservice.rabbit.RabbitProducer;
import com.chaozhanggui.system.cashierservice.redis.RedisCst;
import com.chaozhanggui.system.cashierservice.redis.RedisUtil;
import com.chaozhanggui.system.cashierservice.util.JSONUtil;
import com.chaozhanggui.system.cashierservice.util.SpringUtils;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
@ServerEndpoint(value = "/websocket/table/{tableId}/{shopId}/{userId}", encoders = WebSocketCustomEncoding.class)
@Component
@Slf4j
@Data
public class AppWebSocketServer {
@Resource
private RabbitProducer a;
//注入为空
public static RabbitProducer rabbitProducer;
@PostConstruct
public void b() {
rabbitProducer = this.a;
}
private RedisUtil redisUtils = SpringUtils.getBean(RedisUtil.class);
private TbShopTableMapper shopTableMapper = SpringUtils.getBean(TbShopTableMapper.class);
/**
* concurrent包的线程安全Set用来存放每个客户端对应的MyWebSocket对象。
*/
//一个 AppWebSocketServer 就是一个用户一个tableId下有一个 List<AppWebSocketServer> 也就是多个用户
private static ConcurrentHashMap<String, List<AppWebSocketServer>> webSocketMap = new ConcurrentHashMap<>();
public static ConcurrentHashMap<String, Set<String>> userMap = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, AppWebSocketServer> userSocketMap = new ConcurrentHashMap<>();
//购物车的记录,用于第一次扫码的人同步购物车
private static ConcurrentHashMap<String, List<JSONObject>> recordMap = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, Session> sessionMap = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 接收tableId
*/
private String tableId = "";
private String shopId = "";
private String userId = "";
/**
* 用来标识这个用户需要接收同步的购物车信息
*/
private volatile AtomicBoolean sync = new AtomicBoolean(true);
private volatile AtomicBoolean createOrder = new AtomicBoolean(false);
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("tableId") String tableId, @PathParam("shopId") String shopId, @PathParam("userId") String userId) {
this.session = session;
this.tableId = tableId;
this.shopId = shopId;
this.userId = userId;
try {
TbShopTable shopTable = shopTableMapper.selectQRcode(tableId);
if (Objects.isNull(shopTable)) {
JSONObject jsonObject1 = new JSONObject();
jsonObject1.put("status", "fail");
jsonObject1.put("msg", "桌码不存在");
jsonObject1.put("type", "addCart");
jsonObject1.put("data", new ArrayList<>());
jsonObject1.put("amount", BigDecimal.ZERO);
sendMessage(jsonObject1);
onClose();
}
if (webSocketMap.containsKey(tableId + "-" + shopId)) {
List<AppWebSocketServer> serverList = webSocketMap.get(tableId + "-" + shopId);
serverList.add(this);
} else {
List<AppWebSocketServer> serverList = new ArrayList<>();
serverList.add(this);
webSocketMap.put(tableId + "-" + shopId, serverList);
}
if (userMap.containsKey(tableId + "-" + shopId)) {
Set<String> userSet = userMap.get(tableId + "-" + shopId);
userSet.add(userId);
} else {
Set<String> userSet = new HashSet<>();
userSet.add(userId);
userMap.put(tableId + "-" + shopId,userSet);
}
userSocketMap.put(userId, this);
// sessionMap.put(userId,session);
String mes = redisUtils.getMessage(RedisCst.TABLE_CART.concat(tableId + "-" + shopId));
if (StringUtils.isEmpty(mes)) {
JSONObject jsonObject1 = new JSONObject();
jsonObject1.put("status", "success");
jsonObject1.put("msg", "成功");
jsonObject1.put("type", "addCart");
jsonObject1.put("data", new ArrayList<>());
jsonObject1.put("amount", BigDecimal.ZERO);
sendMessage(jsonObject1);
} else {
JSONObject jsonObject1 = new JSONObject();
jsonObject1.put("status", "success");
jsonObject1.put("msg", "成功");
jsonObject1.put("type", "addCart");
BigDecimal amount = BigDecimal.ZERO;
JSONArray jsonArray = JSON.parseArray(redisUtils.getMessage(RedisCst.TABLE_CART.concat(tableId + "-" + shopId)));
for (int i = 0; i < jsonArray.size(); i++) {
JSONObject object = jsonArray.getJSONObject(i);
amount = amount.add(object.getBigDecimal("totalAmount"));
}
jsonObject1.put("amount", amount);
jsonObject1.put("data", jsonArray);
sendMessage(jsonObject1);
}
// sendMessage(recordMap.get(tableId));
} catch (IOException e) {
log.error("用户:" + tableId + ",网络异常!!!!!!");
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
if (webSocketMap.containsKey(tableId + "-" + shopId)) {
List<AppWebSocketServer> serverList = webSocketMap.get(tableId + "-" + shopId);
if (serverList.isEmpty()) {
webSocketMap.remove(tableId + "-" + shopId);
}
serverList.remove(this);
}
if (userMap.containsKey(tableId + "-" + shopId)){
Set<String> userSet = userMap.get(tableId + "-" + shopId);
if (userSet.isEmpty()){
userMap.remove(tableId + "-" + shopId);
}
userSet.remove(userId);
}
}
public static void onClosed(String user) throws IOException {
Session session1 = sessionMap.get(user);
session1.close();
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
System.out.println(message);
//可以群发消息
//消息保存到数据库、redis
if (StringUtils.isNotBlank(message) && !message.equals("undefined")) {
try {
//解析发送的报文
JSONObject jsonObject = new JSONObject();
if (StringUtils.isNotEmpty(message)) {
jsonObject = JSONObject.parseObject(message);
}
//追加发送人(防止串改)
jsonObject.put("tableId", this.tableId);
jsonObject.put("shopId", this.shopId);
//这里采用责任链模式,每一个处理器对应一个前段发过来的请,这里还可以用工厂模式来生成对象
// ChangeHandler changeHandler = new ChangeHandler();
// CreateOrderHandler createOrderHandler = new CreateOrderHandler();
// SyncHandler syncHandler = new SyncHandler();
// ClearHandler clearHandler = new ClearHandler();
// OtherHandler otherHandler = new OtherHandler();
//
// changeHandler.addNextHandler(syncHandler).addNextHandler(createOrderHandler).addNextHandler(clearHandler).addNextHandler(otherHandler);
// changeHandler.handleRequest(webSocketMap,jsonObject,recordMap,this);
if ("sku".equals(jsonObject.getString("type"))){
boolean exist = redisUtils.exists(RedisCst.TABLE_CART.concat(jsonObject.getString("tableId").concat("-").concat(shopId)));
Integer num = 0;
if (exist){
JSONArray array = JSON.parseArray(redisUtils.getMessage(RedisCst.TABLE_CART.concat(jsonObject.getString("tableId").concat("-").concat(shopId))));
for (int i = 0; i < array.size(); i++) {
JSONObject object = array.getJSONObject(i);
if (object.getString("skuId").equals(jsonObject.getString("skuId"))) {
num = object.getIntValue("totalNumber");
break;
}
}
}
JSONObject jsonObject1 = new JSONObject();
jsonObject1.put("status", "success");
jsonObject1.put("msg", "成功");
jsonObject1.put("type", "sku");
jsonObject1.put("data", new ArrayList<>());
jsonObject1.put("amount", num);
sendMessage(jsonObject1);
}else {
rabbitProducer.putCart(jsonObject.toJSONString());
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 发生错误时候
*
* @param session
* @param error
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("用户错误:" + this.tableId + ",原因:" + error.getMessage());
error.printStackTrace();
}
/**
* 实现服务器主动推送
*/
public void sendMessage(Object message) throws IOException {
//加入线程锁
synchronized (session) {
try {
//同步发送信息
this.session.getBasicRemote().sendObject(message);
} catch (Exception e) {
log.error("服务器推送失败:" + e.getMessage());
}
}
}
/**
* 发送自定义消息
* */
/**
* 发送自定义消息
*
* @param message 发送的信息
* @param tableId 如果为null默认发送所有
* @throws IOException
*/
public static void AppSendInfo(Object message, String tableId, boolean userFlag) throws IOException {
if (userFlag) {
if (userSocketMap.containsKey(tableId)) {
AppWebSocketServer server = userSocketMap.get(tableId);
server.sendMessage(message);
} else {
log.error("请求的userId:" + tableId + "不在该服务器上");
}
} else {
if (StringUtils.isEmpty(tableId)) {
// 向所有用户发送信息
for (List<AppWebSocketServer> serverList : webSocketMap.values()) {
for (AppWebSocketServer server : serverList) {
server.sendMessage(message);
}
}
} else if (webSocketMap.containsKey(tableId)) {
// 发送给指定用户信息
List<AppWebSocketServer> serverList = webSocketMap.get(tableId);
for (AppWebSocketServer server : serverList) {
server.sendMessage(message);
}
} else {
log.error("请求的tableId:" + tableId + "不在该服务器上");
}
}
}
public static synchronized ConcurrentHashMap<String, List<AppWebSocketServer>> getWebSocketMap() {
return AppWebSocketServer.webSocketMap;
}
public static synchronized ConcurrentHashMap<String, List<JSONObject>> getRecordMap() {
return AppWebSocketServer.recordMap;
}
}

View File

@@ -0,0 +1,27 @@
package com.chaozhanggui.system.cashierservice.socket;
import org.springframework.boot.web.servlet.ServletContextInitializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
/**
* WebSocket配置
*/
@Configuration
public class WebSocketConfig implements ServletContextInitializer {
@Bean
public ServerEndpointExporter serverEndpointExporter () {
return new ServerEndpointExporter();
}
@Override
public void onStartup(ServletContext servletContext) throws ServletException {
}
}

View File

@@ -0,0 +1,113 @@
package com.chaozhanggui.system.cashierservice.socket;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicInteger;
@Component
@ServerEndpoint(value = "/ws")
public class WebSocketServer {
//与某个客户端的连接会话,需要通过它来给客户端发送数据
private Session session;
private static final AtomicInteger OnlineCount = new AtomicInteger(0);
private static CopyOnWriteArraySet<Session> SessionSet = new CopyOnWriteArraySet<>();
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
SessionSet.add(session);
this.session = session;
int cnt = OnlineCount.incrementAndGet(); // 在线数加1
System.out.println("有连接加入,当前连接数为:" + cnt);
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
SessionSet.remove(this.session);
int cnt = OnlineCount.decrementAndGet();
System.out.println("有连接关闭,当前连接数为:" + cnt);
}
/**
* 收到客户端消息后调用的方法
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
System.out.println(message);
BroadCastInfo(message);
}
/**
* 出现错误
* @param error
*/
@OnError
public void onError(Throwable error) {
error.printStackTrace();
}
/**
* 发送消息
*
* @param session
* @param message
*/
public static void SendMessage(Session session, String message) {
try {
if (session.isOpen()) {
session.getBasicRemote().sendText(message);
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 群发消息
*
* @param message
* @throws IOException
*/
public static void BroadCastInfo(String message) {
for (Session session : SessionSet) {
SendMessage(session, message);
}
}
/**
* 指定Session发送消息
*
* @param sessionId
* @param message
* @throws IOException
*/
public static void SendMessage(String message, String sessionId) {
Session session = null;
for (Session s : SessionSet) {
if (s.getId().equals(sessionId)) {
session = s;
break;
}
}
if (session != null) {
SendMessage(session, message);
}
}
}