处理 设备在线 发消息

This commit is contained in:
gong
2025-11-24 16:37:56 +08:00
parent e76a85b521
commit 7146128b4c
15 changed files with 156 additions and 38 deletions

View File

@@ -24,6 +24,13 @@
<artifactId>cash-common-log</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.czg</groupId>
<artifactId>cash-common-mq</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>com.czg</groupId>
<artifactId>system-service</artifactId>

View File

@@ -1,6 +1,5 @@
package com.czg.controller.admin;
import com.czg.annotation.SaAdminCheckPermission;
import com.czg.resp.CzgResult;
import com.czg.system.dto.SysDevicesDTO;
import com.czg.system.dto.SysDevicesPageDTO;
@@ -29,7 +28,7 @@ public class SysDeviceController {
* 新增设备
*/
@PostMapping
@SaAdminCheckPermission(value = "devices:add", name = "新增设备")
// @SaAdminCheckPermission(value = "devices:add", name = "新增设备")
public CzgResult<Long> addDevice(@RequestBody @Validated(InsertGroup.class) SysDevicesDTO param) {
return CzgResult.success(sysDevicesService.addDevice(param));
}
@@ -38,7 +37,7 @@ public class SysDeviceController {
* 修改设备
*/
@PutMapping
@SaAdminCheckPermission(value = "devices:update", name = "修改设备")
// @SaAdminCheckPermission(value = "devices:update", name = "修改设备")
public CzgResult<Long> updateDevice(@RequestBody @Validated(UpdateGroup.class) SysDevicesDTO param) {
return CzgResult.success(sysDevicesService.updateDevice(param));
}
@@ -47,7 +46,7 @@ public class SysDeviceController {
* 删除设备
*/
@DeleteMapping("/{id}")
@SaAdminCheckPermission(value = "devices:delete", name = "删除设备")
// @SaAdminCheckPermission(value = "devices:delete", name = "删除设备")
public CzgResult<Long> deleteDevice(@PathVariable Long id) {
return CzgResult.success(sysDevicesService.deleteDevice(id));
}
@@ -56,8 +55,8 @@ public class SysDeviceController {
* 查询设备分页
*/
@GetMapping("page")
@SaAdminCheckPermission(value = "devices:page", name = "查询设备分页")
// @SaAdminCheckPermission(value = "devices:page", name = "查询设备分页")
public CzgResult<Page<SysDevicesDTO>> queryDevice(SysDevicesPageDTO param) {
return CzgResult.success(sysDevicesService.queryDevice(param));
return CzgResult.success(sysDevicesService.queryDevicePage(param));
}
}

View File

@@ -41,6 +41,7 @@ public class MqttServerHandler extends ChannelInboundHandlerAdapter {
// 并发安全的 Map缓存 Channel连接→ clientId设备唯一标识的映射
private final Map<Channel, String> channelClientIdMap = new ConcurrentHashMap<>();
private final Map<String, Channel> clientIdChannelMap = new ConcurrentHashMap<>();
// 注入共享的订阅关系管理器(单例)
private final MqttSubscriptionManager subscriptionManager;
@@ -50,6 +51,7 @@ public class MqttServerHandler extends ChannelInboundHandlerAdapter {
// 创建一个调度线程池(可以全局复用)
private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(4);
// 构造函数(接收共享组件)
public MqttServerHandler(MqttSubscriptionManager subscriptionManager, DeviceStatusManager deviceStatusManager) {
this.subscriptionManager = subscriptionManager;
@@ -66,18 +68,20 @@ public class MqttServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
// 通过 Channel 找 clientId
String clientId = channelClientIdMap.get(channel);
if (clientId != null) {
// 标记离线
deviceStatusManager.markDeviceOffline(clientId);
// 清理映射
// 移除两个映射
channelClientIdMap.remove(channel);
// 清理订阅(原有逻辑)
subscriptionManager.removeAllSubscriptions(channel);
clientIdChannelMap.remove(clientId);
// 标记设备离线
deviceStatusManager.markDeviceOffline(clientId);
log.info("设备断开连接已标记离线clientId={}, 地址={}", clientId, channel.remoteAddress());
} else {
log.warn("未知 Channel 断开:{}", channel.remoteAddress());
}
log.info("客户端断开连接:{}", ctx.channel().remoteAddress());
super.channelInactive(ctx);
}
@@ -167,29 +171,39 @@ public class MqttServerHandler extends ChannelInboundHandlerAdapter {
*/
private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
MqttConnectPayload payload = msg.payload();
// 设备唯一标识(客户端必须携带)
String clientId = payload.clientIdentifier();
// 设备地址
String remoteAddress = ctx.channel().remoteAddress().toString();
// 1. 校验 clientId非空MQTT3.1.1 要求)
if (clientId == null || clientId.isEmpty()) {
sendConnectAck(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID);
log.error("设备连接拒绝clientId 为空,地址={}", remoteAddress);
return;
}
// 2. 缓存 Channel ↔ clientId 映射(核心:后续通过 Channel 找设备)
// 🔒 检查是否已经存在该 clientId 的连接
if (clientIdChannelMap.containsKey(clientId)) {
Channel existingChannel = clientIdChannelMap.get(clientId);
log.warn("设备重复连接已拒绝clientId={}, 原连接={}, 新连接={}",
clientId, existingChannel.remoteAddress(), ctx.channel().remoteAddress());
// 返回 "服务器不可用" 或 "已存在连接" 错误码,拒绝新连接
sendConnectAck(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_SERVER_UNAVAILABLE);
ctx.close(); // 主动关闭多余的连接
return;
}
// ✅ 当前连接是合法的、唯一的,继续处理
Channel channel = ctx.channel();
channelClientIdMap.put(channel, clientId);
// 维护反向映射
clientIdChannelMap.put(clientId, channel);
// 3. 注册设备状态(标记为在线)
// 注册设备状态
deviceStatusManager.registerDevice(clientId, remoteAddress);
// 4. 响应连接成功(原有逻辑)
// 回复 CONNACK
sendConnectAck(ctx, MqttConnectReturnCode.CONNECTION_ACCEPTED);
log.info("设备连接成功clientId={}, 地址={}, 协议版本={}",
clientId, remoteAddress, msg.variableHeader().version());
log.info("设备连接成功clientId={}, 地址={}, 协议版本={}", clientId, remoteAddress, msg.variableHeader().version());
}
private void handlePublish(ChannelHandlerContext ctx, MqttPublishMessage msg) {
@@ -211,12 +225,16 @@ public class MqttServerHandler extends ChannelInboundHandlerAdapter {
log.info("收到发布消息topic={}, QoS={}, 消息ID={}, 内容={}, 客户端={}",
topic, qosLevel.value(), messageId, content, ctx.channel().remoteAddress());
// -------------------------- 核心新增:收到 con 主题消息,回复 7941610A 主题 --------------------------
// -------------------------- 收到 con 主题消息,回复 --------------------------
if ("con".equals(topic)) {
// 延迟 2 秒执行
SCHEDULER.schedule(() -> {
sendResponseToClient(ctx, "微信到账,十二元");
}, 2, TimeUnit.SECONDS);
SCHEDULER.schedule(() -> sendResponseToClient(ctx, "处理成功"), 2, TimeUnit.SECONDS);
// SCHEDULER.schedule(() -> deviceStatusManager.sendMqMsg(content), 0, TimeUnit.SECONDS);
try {
deviceStatusManager.sendMqMsg(content);
} catch (Exception e) {
e.printStackTrace();
}
}
// 按QoS等级回复确认重点实现QoS2

View File

@@ -1,5 +1,9 @@
package com.czg.mqtt.manager;
import com.alibaba.fastjson2.JSONObject;
import com.czg.config.RabbitPublisher;
import com.czg.system.service.SysDevicesService;
import jakarta.annotation.Resource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
@@ -11,6 +15,7 @@ import java.util.concurrent.ConcurrentHashMap;
/**
* 设备在线状态管理器(单例)
* 维护 clientId → 设备状态的映射,线程安全
*
* @author yjjie
* @date 2025/11/18 15:33
*/
@@ -20,6 +25,12 @@ public class DeviceStatusManager {
// 并发安全的 Mapkey=clientId设备唯一标识value=设备状态
private final Map<String, DeviceStatus> deviceStatusMap = new ConcurrentHashMap<>();
@Resource
private SysDevicesService sysDevicesService;
@Resource
private RabbitPublisher rabbitPublisher;
/**
* 设备状态实体
*/
@@ -46,7 +57,9 @@ public class DeviceStatusManager {
public void registerDevice(String clientId, String remoteAddress) {
DeviceStatus status = new DeviceStatus(clientId, remoteAddress);
deviceStatusMap.put(clientId, status);
System.out.printf("设备注册上线clientId=%s, 地址=%s%n", clientId, remoteAddress);
log.info("设备注册上线clientId={}, 地址={}", clientId, remoteAddress);
sysDevicesService.updateDeviceOnlineStatus(clientId, 1);
}
/**
@@ -58,8 +71,9 @@ public class DeviceStatusManager {
status.setLastPingTime(System.currentTimeMillis());
// 刷新为在线状态
status.setOnline(true);
System.out.printf("设备 Ping 刷新clientId=%s, 最后活跃时间=%d%n",
clientId, status.getLastPingTime());
log.info("设备上线clientId={}, 最后Ping时间={}", clientId, status.getLastPingTime());
sysDevicesService.updateDeviceOnlineStatus(clientId, 1);
}
}
@@ -71,8 +85,10 @@ public class DeviceStatusManager {
if (status != null) {
status.setOnline(false);
System.out.printf("设备离线clientId=%s%n", clientId);
// 可选:保留离线状态一段时间,或直接移除(根据业务需求)
// deviceStatusMap.remove(clientId);
log.info("设备下线clientId={}, 最后Ping时间={}", clientId, status.getLastPingTime());
deviceStatusMap.remove(clientId);
sysDevicesService.updateDeviceOnlineStatus(clientId, 0);
}
}
@@ -83,6 +99,11 @@ public class DeviceStatusManager {
return deviceStatusMap.get(clientId);
}
public void sendMqMsg(String content) {
JSONObject jsonObject = JSONObject.parseObject(content);
rabbitPublisher.sendOrderProductStatusMsg(jsonObject.getString("msg"));
}
/**
* 定时清理长时间离线的设备
* offlineThresholdMs = 4 * 60 * 1000 = 14400000 毫秒 = 4分钟

View File

@@ -26,6 +26,12 @@ spring:
server-addr: 121.40.109.122:8848
namespace: 237e1905-0a66-4375-9bb6-a51c3c034aca
rabbitmq:
host: 121.40.109.122
port: 5672
username: chaozg
password: chaozg123
dubbo:
application:
name: system-server

View File

@@ -26,6 +26,12 @@ spring:
server-addr: 121.40.109.122:8848
namespace: 237e1905-0a66-4375-9bb6-a51c3c034aca
rabbitmq:
host: 121.40.109.122
port: 5672
username: chaozg
password: chaozg123
dubbo:
application:
name: system-server

View File

@@ -26,6 +26,12 @@ spring:
server-addr: 121.40.109.122:8848
namespace: 237e1905-0a66-4375-9bb6-a51c3c034aca
rabbitmq:
host: 121.40.109.122
port: 5672
username: chaozg
password: chaozg123
dubbo:
application:
name: system-server