diff --git a/cash-api/system-server/pom.xml b/cash-api/system-server/pom.xml index edaf6d676..9be1eeb85 100644 --- a/cash-api/system-server/pom.xml +++ b/cash-api/system-server/pom.xml @@ -24,6 +24,13 @@ cash-common-log ${project.version} + + + com.czg + cash-common-mq + 1.0.0 + + com.czg system-service diff --git a/cash-api/system-server/src/main/java/com/czg/controller/admin/SysDeviceController.java b/cash-api/system-server/src/main/java/com/czg/controller/admin/SysDeviceController.java index 4392c4a27..5d36b8bc4 100644 --- a/cash-api/system-server/src/main/java/com/czg/controller/admin/SysDeviceController.java +++ b/cash-api/system-server/src/main/java/com/czg/controller/admin/SysDeviceController.java @@ -1,6 +1,5 @@ package com.czg.controller.admin; -import com.czg.annotation.SaAdminCheckPermission; import com.czg.resp.CzgResult; import com.czg.system.dto.SysDevicesDTO; import com.czg.system.dto.SysDevicesPageDTO; @@ -29,7 +28,7 @@ public class SysDeviceController { * 新增设备 */ @PostMapping - @SaAdminCheckPermission(value = "devices:add", name = "新增设备") +// @SaAdminCheckPermission(value = "devices:add", name = "新增设备") public CzgResult addDevice(@RequestBody @Validated(InsertGroup.class) SysDevicesDTO param) { return CzgResult.success(sysDevicesService.addDevice(param)); } @@ -38,7 +37,7 @@ public class SysDeviceController { * 修改设备 */ @PutMapping - @SaAdminCheckPermission(value = "devices:update", name = "修改设备") +// @SaAdminCheckPermission(value = "devices:update", name = "修改设备") public CzgResult updateDevice(@RequestBody @Validated(UpdateGroup.class) SysDevicesDTO param) { return CzgResult.success(sysDevicesService.updateDevice(param)); } @@ -47,7 +46,7 @@ public class SysDeviceController { * 删除设备 */ @DeleteMapping("/{id}") - @SaAdminCheckPermission(value = "devices:delete", name = "删除设备") +// @SaAdminCheckPermission(value = "devices:delete", name = "删除设备") public CzgResult deleteDevice(@PathVariable Long id) { return CzgResult.success(sysDevicesService.deleteDevice(id)); } @@ -56,8 +55,8 @@ public class SysDeviceController { * 查询设备分页 */ @GetMapping("page") - @SaAdminCheckPermission(value = "devices:page", name = "查询设备分页") +// @SaAdminCheckPermission(value = "devices:page", name = "查询设备分页") public CzgResult> queryDevice(SysDevicesPageDTO param) { - return CzgResult.success(sysDevicesService.queryDevice(param)); + return CzgResult.success(sysDevicesService.queryDevicePage(param)); } } diff --git a/cash-api/system-server/src/main/java/com/czg/mqtt/handler/MqttServerHandler.java b/cash-api/system-server/src/main/java/com/czg/mqtt/handler/MqttServerHandler.java index cadf1d530..87918269f 100644 --- a/cash-api/system-server/src/main/java/com/czg/mqtt/handler/MqttServerHandler.java +++ b/cash-api/system-server/src/main/java/com/czg/mqtt/handler/MqttServerHandler.java @@ -41,6 +41,7 @@ public class MqttServerHandler extends ChannelInboundHandlerAdapter { // 并发安全的 Map:缓存 Channel(连接)→ clientId(设备唯一标识)的映射 private final Map channelClientIdMap = new ConcurrentHashMap<>(); + private final Map clientIdChannelMap = new ConcurrentHashMap<>(); // 注入共享的订阅关系管理器(单例) private final MqttSubscriptionManager subscriptionManager; @@ -50,6 +51,7 @@ public class MqttServerHandler extends ChannelInboundHandlerAdapter { // 创建一个调度线程池(可以全局复用) private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(4); + // 构造函数(接收共享组件) public MqttServerHandler(MqttSubscriptionManager subscriptionManager, DeviceStatusManager deviceStatusManager) { this.subscriptionManager = subscriptionManager; @@ -66,18 +68,20 @@ public class MqttServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); - // 通过 Channel 找 clientId String clientId = channelClientIdMap.get(channel); + if (clientId != null) { - // 标记离线 - deviceStatusManager.markDeviceOffline(clientId); - // 清理映射 + // 移除两个映射 channelClientIdMap.remove(channel); - // 清理订阅(原有逻辑) - subscriptionManager.removeAllSubscriptions(channel); + clientIdChannelMap.remove(clientId); + + // 标记设备离线 + deviceStatusManager.markDeviceOffline(clientId); + log.info("设备断开连接,已标记离线:clientId={}, 地址={}", clientId, channel.remoteAddress()); + } else { + log.warn("未知 Channel 断开:{}", channel.remoteAddress()); } - log.info("客户端断开连接:{}", ctx.channel().remoteAddress()); super.channelInactive(ctx); } @@ -167,29 +171,39 @@ public class MqttServerHandler extends ChannelInboundHandlerAdapter { */ private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) { MqttConnectPayload payload = msg.payload(); - // 设备唯一标识(客户端必须携带) String clientId = payload.clientIdentifier(); - // 设备地址 String remoteAddress = ctx.channel().remoteAddress().toString(); - // 1. 校验 clientId(非空,MQTT3.1.1 要求) if (clientId == null || clientId.isEmpty()) { sendConnectAck(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID); log.error("设备连接拒绝:clientId 为空,地址={}", remoteAddress); return; } - // 2. 缓存 Channel ↔ clientId 映射(核心:后续通过 Channel 找设备) + // 🔒 检查是否已经存在该 clientId 的连接 + if (clientIdChannelMap.containsKey(clientId)) { + Channel existingChannel = clientIdChannelMap.get(clientId); + log.warn("设备重复连接,已拒绝:clientId={}, 原连接={}, 新连接={}", + clientId, existingChannel.remoteAddress(), ctx.channel().remoteAddress()); + + // 返回 "服务器不可用" 或 "已存在连接" 错误码,拒绝新连接 + sendConnectAck(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE); + ctx.close(); // 主动关闭多余的连接 + return; + } + + // ✅ 当前连接是合法的、唯一的,继续处理 Channel channel = ctx.channel(); channelClientIdMap.put(channel, clientId); + // 维护反向映射 + clientIdChannelMap.put(clientId, channel); - // 3. 注册设备状态(标记为在线) + // 注册设备状态 deviceStatusManager.registerDevice(clientId, remoteAddress); - // 4. 响应连接成功(原有逻辑) + // 回复 CONNACK sendConnectAck(ctx, MqttConnectReturnCode.CONNECTION_ACCEPTED); - log.info("设备连接成功:clientId={}, 地址={}, 协议版本={}", - clientId, remoteAddress, msg.variableHeader().version()); + log.info("设备连接成功:clientId={}, 地址={}, 协议版本={}", clientId, remoteAddress, msg.variableHeader().version()); } private void handlePublish(ChannelHandlerContext ctx, MqttPublishMessage msg) { @@ -211,12 +225,16 @@ public class MqttServerHandler extends ChannelInboundHandlerAdapter { log.info("收到发布消息:topic={}, QoS={}, 消息ID={}, 内容={}, 客户端={}", topic, qosLevel.value(), messageId, content, ctx.channel().remoteAddress()); - // -------------------------- 核心新增:收到 con 主题消息,回复 7941610A 主题 -------------------------- + // -------------------------- 收到 con 主题消息,回复 -------------------------- if ("con".equals(topic)) { // 延迟 2 秒执行 - SCHEDULER.schedule(() -> { - sendResponseToClient(ctx, "微信到账,十二元"); - }, 2, TimeUnit.SECONDS); + SCHEDULER.schedule(() -> sendResponseToClient(ctx, "处理成功"), 2, TimeUnit.SECONDS); +// SCHEDULER.schedule(() -> deviceStatusManager.sendMqMsg(content), 0, TimeUnit.SECONDS); + try { + deviceStatusManager.sendMqMsg(content); + } catch (Exception e) { + e.printStackTrace(); + } } // 按QoS等级回复确认(重点实现QoS2) diff --git a/cash-api/system-server/src/main/java/com/czg/mqtt/manager/DeviceStatusManager.java b/cash-api/system-server/src/main/java/com/czg/mqtt/manager/DeviceStatusManager.java index 2f254280e..f0c12c33b 100644 --- a/cash-api/system-server/src/main/java/com/czg/mqtt/manager/DeviceStatusManager.java +++ b/cash-api/system-server/src/main/java/com/czg/mqtt/manager/DeviceStatusManager.java @@ -1,5 +1,9 @@ package com.czg.mqtt.manager; +import com.alibaba.fastjson2.JSONObject; +import com.czg.config.RabbitPublisher; +import com.czg.system.service.SysDevicesService; +import jakarta.annotation.Resource; import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.scheduling.annotation.Scheduled; @@ -11,6 +15,7 @@ import java.util.concurrent.ConcurrentHashMap; /** * 设备在线状态管理器(单例) * 维护 clientId → 设备状态的映射,线程安全 + * * @author yjjie * @date 2025/11/18 15:33 */ @@ -20,6 +25,12 @@ public class DeviceStatusManager { // 并发安全的 Map:key=clientId(设备唯一标识),value=设备状态 private final Map deviceStatusMap = new ConcurrentHashMap<>(); + @Resource + private SysDevicesService sysDevicesService; + + @Resource + private RabbitPublisher rabbitPublisher; + /** * 设备状态实体 */ @@ -46,7 +57,9 @@ public class DeviceStatusManager { public void registerDevice(String clientId, String remoteAddress) { DeviceStatus status = new DeviceStatus(clientId, remoteAddress); deviceStatusMap.put(clientId, status); - System.out.printf("设备注册上线:clientId=%s, 地址=%s%n", clientId, remoteAddress); + log.info("设备注册上线:clientId={}, 地址={}", clientId, remoteAddress); + + sysDevicesService.updateDeviceOnlineStatus(clientId, 1); } /** @@ -58,8 +71,9 @@ public class DeviceStatusManager { status.setLastPingTime(System.currentTimeMillis()); // 刷新为在线状态 status.setOnline(true); - System.out.printf("设备 Ping 刷新:clientId=%s, 最后活跃时间=%d%n", - clientId, status.getLastPingTime()); + + log.info("设备上线:clientId={}, 最后Ping时间={}", clientId, status.getLastPingTime()); + sysDevicesService.updateDeviceOnlineStatus(clientId, 1); } } @@ -71,8 +85,10 @@ public class DeviceStatusManager { if (status != null) { status.setOnline(false); System.out.printf("设备离线:clientId=%s%n", clientId); - // 可选:保留离线状态一段时间,或直接移除(根据业务需求) - // deviceStatusMap.remove(clientId); + log.info("设备下线:clientId={}, 最后Ping时间={}", clientId, status.getLastPingTime()); + + deviceStatusMap.remove(clientId); + sysDevicesService.updateDeviceOnlineStatus(clientId, 0); } } @@ -83,6 +99,11 @@ public class DeviceStatusManager { return deviceStatusMap.get(clientId); } + public void sendMqMsg(String content) { + JSONObject jsonObject = JSONObject.parseObject(content); + rabbitPublisher.sendOrderProductStatusMsg(jsonObject.getString("msg")); + } + /** * 定时清理长时间离线的设备 * offlineThresholdMs = 4 * 60 * 1000 = 14400000 毫秒 = 4分钟 diff --git a/cash-api/system-server/src/main/resources/application-dev.yml b/cash-api/system-server/src/main/resources/application-dev.yml index d8063b188..5492bd057 100644 --- a/cash-api/system-server/src/main/resources/application-dev.yml +++ b/cash-api/system-server/src/main/resources/application-dev.yml @@ -26,6 +26,12 @@ spring: server-addr: 121.40.109.122:8848 namespace: 237e1905-0a66-4375-9bb6-a51c3c034aca + rabbitmq: + host: 121.40.109.122 + port: 5672 + username: chaozg + password: chaozg123 + dubbo: application: name: system-server diff --git a/cash-api/system-server/src/main/resources/application-prod.yml b/cash-api/system-server/src/main/resources/application-prod.yml index 2dd0c2264..21024df25 100644 --- a/cash-api/system-server/src/main/resources/application-prod.yml +++ b/cash-api/system-server/src/main/resources/application-prod.yml @@ -26,6 +26,12 @@ spring: server-addr: 121.40.109.122:8848 namespace: 237e1905-0a66-4375-9bb6-a51c3c034aca + rabbitmq: + host: 121.40.109.122 + port: 5672 + username: chaozg + password: chaozg123 + dubbo: application: name: system-server diff --git a/cash-api/system-server/src/main/resources/application-test.yml b/cash-api/system-server/src/main/resources/application-test.yml index ac4eb853b..b98b6e569 100644 --- a/cash-api/system-server/src/main/resources/application-test.yml +++ b/cash-api/system-server/src/main/resources/application-test.yml @@ -26,6 +26,12 @@ spring: server-addr: 121.40.109.122:8848 namespace: 237e1905-0a66-4375-9bb6-a51c3c034aca + rabbitmq: + host: 121.40.109.122 + port: 5672 + username: chaozg + password: chaozg123 + dubbo: application: name: system-server diff --git a/cash-common/cash-common-mq/src/main/java/com/czg/config/RabbitConfig.java b/cash-common/cash-common-mq/src/main/java/com/czg/config/RabbitConfig.java index 43591e1ae..73137b7d0 100644 --- a/cash-common/cash-common-mq/src/main/java/com/czg/config/RabbitConfig.java +++ b/cash-common/cash-common-mq/src/main/java/com/czg/config/RabbitConfig.java @@ -143,4 +143,14 @@ public class RabbitConfig { public Binding bindingBirthdayGiftSmsExchange(Queue birthdayGiftSmsQueue, DirectExchange exchange) { return BindingBuilder.bind(birthdayGiftSmsQueue).to(exchange).with(activeProfile + "-" + RabbitConstants.Queue.BIRTHDAY_GIFT_SMS_QUEUE); } + + //------------------------------------------------------ 订单商品状态 + @Bean + public Queue orderProductStatusQueue() { + return new Queue(activeProfile + "-" + RabbitConstants.Queue.ORDER_PRODUCT_STATUS_QUEUE, true); + } + @Bean + public Binding bindingOrderProductStatusExchange(Queue orderProductStatusQueue, DirectExchange exchange) { + return BindingBuilder.bind(orderProductStatusQueue).to(exchange).with(activeProfile + "-" + RabbitConstants.Queue.ORDER_PRODUCT_STATUS_QUEUE); + } } diff --git a/cash-common/cash-common-mq/src/main/java/com/czg/config/RabbitConstants.java b/cash-common/cash-common-mq/src/main/java/com/czg/config/RabbitConstants.java index 20e7c1fc7..d59ec931e 100644 --- a/cash-common/cash-common-mq/src/main/java/com/czg/config/RabbitConstants.java +++ b/cash-common/cash-common-mq/src/main/java/com/czg/config/RabbitConstants.java @@ -19,6 +19,11 @@ public interface RabbitConstants { public static final String CALL_TABLE_PRINT_QUEUE = "call.table.print.queue"; public static final String PRODUCT_INFO_CHANGE_QUEUE = "product.info.change.queue"; + /** + * 订单商品状态队列 + */ + public static final String ORDER_PRODUCT_STATUS_QUEUE = "order.product.status.queue"; + /** * 1,2,applySmsTemp 模版审核 * 1,2,sendMarkSms 发送营销短信 diff --git a/cash-common/cash-common-mq/src/main/java/com/czg/config/RabbitPublisher.java b/cash-common/cash-common-mq/src/main/java/com/czg/config/RabbitPublisher.java index b84444505..545c9c1d0 100644 --- a/cash-common/cash-common-mq/src/main/java/com/czg/config/RabbitPublisher.java +++ b/cash-common/cash-common-mq/src/main/java/com/czg/config/RabbitPublisher.java @@ -121,6 +121,13 @@ public class RabbitPublisher { sendMsg(RabbitConstants.Queue.BIRTHDAY_GIFT_SMS_QUEUE, param); } + /** + * 订单商品状态消息 + */ + public void sendOrderProductStatusMsg(String qrContent) { + sendMsg(RabbitConstants.Queue.ORDER_PRODUCT_STATUS_QUEUE, qrContent); + } + private void sendMsg(String queue, String msg) { log.info("开始发送mq消息,exchange:{}, queue: {}, msg: {}", activeProfile + "-" + RabbitConstants.Exchange.CASH_EXCHANGE, activeProfile + "-" + queue, msg); rabbitTemplate.convertAndSend(activeProfile + "-" + RabbitConstants.Exchange.CASH_EXCHANGE, activeProfile + "-" + queue, msg); diff --git a/cash-common/cash-common-service/src/main/java/com/czg/system/dto/SysDevicesDTO.java b/cash-common/cash-common-service/src/main/java/com/czg/system/dto/SysDevicesDTO.java index de11b2535..b6a46b887 100644 --- a/cash-common/cash-common-service/src/main/java/com/czg/system/dto/SysDevicesDTO.java +++ b/cash-common/cash-common-service/src/main/java/com/czg/system/dto/SysDevicesDTO.java @@ -59,7 +59,7 @@ public class SysDevicesDTO implements Serializable { /** * 在线状态:0 离线,1 在线 */ - private Integer unlineStatus; + private Integer onlineStatus; /** * 最后上线时间 diff --git a/cash-common/cash-common-service/src/main/java/com/czg/system/dto/SysDevicesPageDTO.java b/cash-common/cash-common-service/src/main/java/com/czg/system/dto/SysDevicesPageDTO.java index 820024a71..0e7d0adfe 100644 --- a/cash-common/cash-common-service/src/main/java/com/czg/system/dto/SysDevicesPageDTO.java +++ b/cash-common/cash-common-service/src/main/java/com/czg/system/dto/SysDevicesPageDTO.java @@ -29,7 +29,7 @@ public class SysDevicesPageDTO { /** * 在线状态:0 离线,1 在线 */ - private Integer unlineStatus; + private Integer onlineStatus; /** * 页码 diff --git a/cash-common/cash-common-service/src/main/java/com/czg/system/entity/SysDevices.java b/cash-common/cash-common-service/src/main/java/com/czg/system/entity/SysDevices.java index 1ca25ed49..12277195b 100644 --- a/cash-common/cash-common-service/src/main/java/com/czg/system/entity/SysDevices.java +++ b/cash-common/cash-common-service/src/main/java/com/czg/system/entity/SysDevices.java @@ -2,6 +2,7 @@ package com.czg.system.entity; import com.mybatisflex.annotation.Column; import com.mybatisflex.annotation.Id; +import com.mybatisflex.annotation.KeyType; import com.mybatisflex.annotation.Table; import java.io.Serializable; import java.time.LocalDateTime; @@ -32,7 +33,7 @@ public class SysDevices implements Serializable { /** * 主键 */ - @Id + @Id(keyType = KeyType.Auto) private Long id; /** @@ -53,7 +54,7 @@ public class SysDevices implements Serializable { /** * 在线状态:0 离线,1 在线 */ - private Integer unlineStatus; + private Integer onlineStatus; /** * 最后上线时间 diff --git a/cash-common/cash-common-service/src/main/java/com/czg/system/service/SysDevicesService.java b/cash-common/cash-common-service/src/main/java/com/czg/system/service/SysDevicesService.java index ef3fc179d..ced88580d 100644 --- a/cash-common/cash-common-service/src/main/java/com/czg/system/service/SysDevicesService.java +++ b/cash-common/cash-common-service/src/main/java/com/czg/system/service/SysDevicesService.java @@ -20,5 +20,7 @@ public interface SysDevicesService extends IService { Long deleteDevice(Long id); - Page queryDevice(SysDevicesPageDTO reqDTO); + Page queryDevicePage(SysDevicesPageDTO reqDTO); + + void updateDeviceOnlineStatus(String sn, Integer online); } diff --git a/cash-service/system-service/src/main/java/com/czg/service/system/service/impl/SysDevicesServiceImpl.java b/cash-service/system-service/src/main/java/com/czg/service/system/service/impl/SysDevicesServiceImpl.java index 99b363078..364abba0a 100644 --- a/cash-service/system-service/src/main/java/com/czg/service/system/service/impl/SysDevicesServiceImpl.java +++ b/cash-service/system-service/src/main/java/com/czg/service/system/service/impl/SysDevicesServiceImpl.java @@ -8,17 +8,22 @@ import com.czg.system.dto.SysDevicesDTO; import com.czg.system.dto.SysDevicesPageDTO; import com.czg.system.entity.SysDevices; import com.czg.system.service.SysDevicesService; +import com.czg.utils.PageUtil; import com.mybatisflex.core.paginate.Page; import com.mybatisflex.core.query.QueryWrapper; import com.mybatisflex.spring.service.impl.ServiceImpl; +import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; +import java.time.LocalDateTime; + /** * 设备管理 服务层实现。 * * @author ww * @since 2025-11-24 */ +@Slf4j @Service public class SysDevicesServiceImpl extends ServiceImpl implements SysDevicesService { @@ -50,7 +55,7 @@ public class SysDevicesServiceImpl extends ServiceImpl queryDevice(SysDevicesPageDTO reqDTO) { + public Page queryDevicePage(SysDevicesPageDTO reqDTO) { QueryWrapper wrapper = new QueryWrapper(); if (StrUtil.isNotBlank(reqDTO.getDeviceSn())) { wrapper.like(SysDevices::getDeviceSn, reqDTO.getDeviceSn()); @@ -79,7 +84,32 @@ public class SysDevicesServiceImpl extends ServiceImpl BeanUtil.toBean(devices, SysDevicesDTO.class)); + wrapper.orderBy(SysDevices::getOnlineStatus, false); + wrapper.orderBy(SysDevices::getId, false); + + return page(PageUtil.buildPage(), wrapper).map(devices -> BeanUtil.toBean(devices, SysDevicesDTO.class)); + } + + @Override + public void updateDeviceOnlineStatus(String sn, Integer online) { + SysDevices devices = getOne(query().eq(SysDevices::getDeviceSn, sn)); + if (devices == null) { + log.info("心跳设备不存在:sn={}", sn); + return; + } + + if (online == 0) { + devices.setOnlineStatus(0); + devices.setOfflineTime(LocalDateTime.now()); + } else { + devices.setOnlineStatus(1); + devices.setLastUnlineTime(LocalDateTime.now()); + devices.setOfflineTime(null); + } + updateById(devices); } }