集成 mqtt 部分逻辑

This commit is contained in:
gong
2025-11-18 16:30:55 +08:00
parent 106968fb1f
commit 97d038f159
10 changed files with 800 additions and 7 deletions

View File

@@ -5,6 +5,7 @@ import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.transaction.annotation.EnableTransactionManagement;
/**
@@ -15,6 +16,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement;
@EnableTransactionManagement
@MapperScan("com.czg.service.system.mapper")
@EnableDubbo
@EnableScheduling
public class SystemApplication {
public static void main(String[] args) {

View File

@@ -0,0 +1,87 @@
package com.czg.mqtt;
import com.czg.mqtt.config.MqttServerProperties;
import com.czg.mqtt.handler.MqttServerHandler;
import com.czg.mqtt.manager.DeviceStatusManager;
import com.czg.mqtt.manager.MqttSubscriptionManager;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import javax.annotation.PreDestroy;
/**
* Netty 服务
*
* @author yjjie
*/
@Slf4j
@Component
@RequiredArgsConstructor
public class MqttNettyServer implements CommandLineRunner {
private final MqttServerProperties properties;
// 注入共享组件
private final MqttSubscriptionManager subscriptionManager;
private final DeviceStatusManager deviceStatusManager;
private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup;
@Override
public void run(String... args) throws Exception {
bossGroup = new NioEventLoopGroup(properties.getBossGroupThreads());
workerGroup = new NioEventLoopGroup(properties.getWorkerGroupThreads());
try {
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
// 关键修改:每次连接创建新的 MqttServerHandler 实例,注入共享组件
ch.pipeline()
.addLast("mqttDecoder", new MqttDecoder())
.addLast("mqttEncoder", MqttEncoder.INSTANCE)
.addLast("mqttServerHandler", new MqttServerHandler(subscriptionManager, deviceStatusManager)); // 新实例
}
});
ChannelFuture future = bootstrap.bind(properties.getPort()).sync();
log.info("MQTT服务启动成功端口{}", properties.getPort());
future.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
@PreDestroy
public void stop() {
log.info("MQTT服务开始关闭...");
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
log.info("MQTT服务关闭完成");
}
}

View File

@@ -0,0 +1,22 @@
package com.czg.mqtt.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* MQTT 配置
*
* @author yjjie
*/
@Data
@Component
@ConfigurationProperties(prefix = "mqtt.server")
public class MqttServerProperties {
// MQTT 服务端口(默认 1883MQTT 标准端口)
private int port = 1883;
// Netty boss 线程数(默认 CPU 核心数)
private int bossGroupThreads = Runtime.getRuntime().availableProcessors();
// Netty worker 线程数(默认 CPU 核心数 * 2
private int workerGroupThreads = Runtime.getRuntime().availableProcessors() * 2;
}

View File

@@ -0,0 +1,490 @@
package com.czg.mqtt.handler;
import com.czg.mqtt.manager.DeviceStatusManager;
import com.czg.mqtt.manager.MqttSubscriptionManager;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.mqtt.*;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
* 实现 MQTT 消息处理器
* 自定义 ChannelInboundHandlerAdapter处理 MQTT 协议的核心消息类型:
* CONNECT客户端连接请求
* PUBLISH客户端发布消息
* SUBSCRIBE客户端订阅主题
* UNSUBSCRIBE客户端取消订阅
* DISCONNECT客户端断开连接
* PINGREQ客户端心跳请求
* PUBRELQoS2 第二步确认请求
*
* @author yjjie
*/
@Slf4j
public class MqttServerHandler extends ChannelInboundHandlerAdapter {
// 注入设备状态管理器(单例)
private final DeviceStatusManager deviceStatusManager;
// 并发安全的 Map缓存 Channel连接→ clientId设备唯一标识的映射
private final Map<Channel, String> channelClientIdMap = new ConcurrentHashMap<>();
// 注入共享的订阅关系管理器(单例)
private final MqttSubscriptionManager subscriptionManager;
// JSON 序列化工具(用于构造回复消息格式)
private final ObjectMapper objectMapper = new ObjectMapper();
// 创建一个调度线程池(可以全局复用)
private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(4);
// 构造函数(接收共享组件)
public MqttServerHandler(MqttSubscriptionManager subscriptionManager, DeviceStatusManager deviceStatusManager) {
this.subscriptionManager = subscriptionManager;
this.deviceStatusManager = deviceStatusManager;
}
// -------------------------- 连接相关 --------------------------
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("客户端连接成功:{}", ctx.channel().remoteAddress());
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
Channel channel = ctx.channel();
// 通过 Channel 找 clientId
String clientId = channelClientIdMap.get(channel);
if (clientId != null) {
// 标记离线
deviceStatusManager.markDeviceOffline(clientId);
// 清理映射
channelClientIdMap.remove(channel);
// 清理订阅(原有逻辑)
subscriptionManager.removeAllSubscriptions(channel);
}
log.info("客户端断开连接:{}", ctx.channel().remoteAddress());
super.channelInactive(ctx);
}
// -------------------------- 消息处理 --------------------------
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof MqttMessage mqttMsg) {
MqttFixedHeader fixedHeader = mqttMsg.fixedHeader();
MqttMessageType msgType = fixedHeader.messageType();
// 新增:处理 QoS2 第二步 - 客户端发送的 PUBREL 消息
if (msgType == MqttMessageType.PUBREL) {
handlePubRel(ctx, (MqttMessageIdVariableHeader) mqttMsg.variableHeader());
return;
}
// 根据消息类型分发处理
switch (msgType) {
case CONNECT:
handleConnect(ctx, (MqttConnectMessage) mqttMsg);
break;
case PUBLISH:
handlePublish(ctx, (MqttPublishMessage) mqttMsg);
break;
case SUBSCRIBE:
handleSubscribe(ctx, (MqttSubscribeMessage) mqttMsg);
break;
case UNSUBSCRIBE:
handleUnsubscribe(ctx, (MqttUnsubscribeMessage) mqttMsg);
break;
case DISCONNECT:
handleDisconnect(ctx);
break;
case PINGREQ:
handlePingReq(ctx);
break;
default:
log.warn("不支持的MQTT消息类型{}", msgType);
break;
}
}
}
// -------------------------- 具体消息处理逻辑 --------------------------
/**
* 处理 PINGREQ 消息(客户端心跳请求)
* 响应 PINGRESP 消息以维持连接
* 更新设备在线状态
*/
private void handlePingReq(ChannelHandlerContext ctx) {
Channel channel = ctx.channel();
// 关键:通过 Channel 关联 clientId
String clientId = channelClientIdMap.get(channel);
if (clientId != null) {
// 更新设备最后 Ping 时间(维持在线)
deviceStatusManager.updateDevicePing(clientId);
log.info("收到设备心跳clientId={}, 地址={}", clientId, channel.remoteAddress());
} else {
log.warn("收到未知设备心跳:地址={}(未找到 clientId可能连接未完成", channel.remoteAddress());
}
// 构造 PINGRESP 消息固定头PINGRESP无可变头和负载
MqttFixedHeader pingRespHeader = new MqttFixedHeader(
// 消息类型PINGRESP
MqttMessageType.PINGRESP,
// 是否为重复消息
false,
// QoS
MqttQoS.AT_MOST_ONCE,
// 是否保留
false,
// 剩余长度PINGRESP 无可变头和负载剩余长度为0
0
);
MqttMessage pingRespMsg = new MqttMessage(pingRespHeader);
// 回复 PINGRESP
ctx.writeAndFlush(pingRespMsg);
log.info("已向客户端 {} 发送 PINGRESP 心跳响应", ctx.channel().remoteAddress());
}
/**
* 处理 CONNECT 消息(设备连接请求):缓存 clientId ↔ Channel 映射,注册设备状态
*/
private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) {
MqttConnectPayload payload = msg.payload();
// 设备唯一标识(客户端必须携带)
String clientId = payload.clientIdentifier();
// 设备地址
String remoteAddress = ctx.channel().remoteAddress().toString();
// 1. 校验 clientId非空MQTT3.1.1 要求)
if (clientId == null || clientId.isEmpty()) {
sendConnectAck(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID);
log.error("设备连接拒绝clientId 为空,地址={}", remoteAddress);
return;
}
// 2. 缓存 Channel ↔ clientId 映射(核心:后续通过 Channel 找设备)
Channel channel = ctx.channel();
channelClientIdMap.put(channel, clientId);
// 3. 注册设备状态(标记为在线)
deviceStatusManager.registerDevice(clientId, remoteAddress);
// 4. 响应连接成功(原有逻辑)
sendConnectAck(ctx, MqttConnectReturnCode.CONNECTION_ACCEPTED);
log.info("设备连接成功clientId={}, 地址={}, 协议版本={}",
clientId, remoteAddress, msg.variableHeader().version());
}
private void handlePublish(ChannelHandlerContext ctx, MqttPublishMessage msg) {
String topic = msg.variableHeader().topicName();
ByteBuf payloadBuf = msg.payload();
byte[] payload = new byte[payloadBuf.readableBytes()];
payloadBuf.readBytes(payload);
// 释放ByteBuf避免内存泄漏
payloadBuf.release();
String content = new String(payload, StandardCharsets.UTF_8);
// 获取固定头存储QoS
MqttFixedHeader fixedHeader = msg.fixedHeader();
MqttPublishVariableHeader variableHeader = msg.variableHeader();
MqttQoS qosLevel = fixedHeader.qosLevel();
int messageId = variableHeader.packetId();
// 日志输出包含QoS等级和消息ID
log.info("收到发布消息topic={}, QoS={}, 消息ID={}, 内容={}, 客户端={}",
topic, qosLevel.value(), messageId, content, ctx.channel().remoteAddress());
// -------------------------- 核心新增:收到 con 主题消息,回复 7941610A 主题 --------------------------
if ("con".equals(topic)) {
// 延迟 2 秒执行
SCHEDULER.schedule(() -> {
sendResponseToClient(ctx, "微信到账,十二元");
}, 2, TimeUnit.SECONDS);
}
// 按QoS等级回复确认重点实现QoS2
switch (qosLevel) {
// QoS0自定义主题回复确认
case AT_MOST_ONCE:
sendQos0Ack(ctx, topic, messageId, content);
break;
// QoS1回复 PUBACK协议要求
case AT_LEAST_ONCE:
sendPubAck(ctx, messageId);
break;
// QoS2第一步回复 PUBREC协议要求
case EXACTLY_ONCE:
sendPubRec(ctx, messageId);
log.info("已处理QoS2消息发送PUBREC确认消息ID={}", messageId);
break;
default:
log.warn("不支持的QoS等级{}", qosLevel);
}
// 转发消息逻辑(不变)
Set<Channel> subscribers = subscriptionManager.getSubscribers(topic);
if (subscribers.isEmpty()) {
log.debug("主题 {} 无订阅者,消息丢弃", topic);
return;
}
MqttPublishMessage forwardMsg = buildForwardPublishMessage(msg, payload);
for (Channel subscriber : subscribers) {
if (subscriber.isActive() && subscriber != ctx.channel()) {
subscriber.writeAndFlush(forwardMsg.retain());
log.debug("转发消息到客户端 {}topic={}, 内容={}",
subscriber.remoteAddress(), topic, content);
}
}
}
/**
* 向设备指定格式的回复消息
*/
private void sendResponseToClient(ChannelHandlerContext ctx, String msg) {
try {
Channel channel = ctx.channel();
// 关键:通过 Channel 关联 clientId
String clientId = channelClientIdMap.get(channel);
// 1. 构造回复消息内容(按要求格式)
ResponseMessage responseMsg = new ResponseMessage();
responseMsg.setMsg(msg);
responseMsg.setSn("超管");
responseMsg.setName("超管");
responseMsg.setPly(51);
// 2. 序列化为 JSON 字符串
String jsonContent = objectMapper.writeValueAsString(responseMsg);
log.info("向主题 {} 发布回复消息:{}", clientId, jsonContent);
// 3. 构造 MQTT 发布消息QoS2与客户端发送的等级一致确保可靠送达
MqttFixedHeader fixedHeader = new MqttFixedHeader(
// 消息类型PUBLISH
MqttMessageType.PUBLISH,
// 不重复
false,
// QoS2与客户端一致确保可靠
MqttQoS.EXACTLY_ONCE,
// 不保留
false,
// 剩余长度Netty自动计算
0
);
// 消息ID生成随机不重复的ID1-65535之间
int responseMsgId = (int) (Math.random() * 65535) + 1;
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(clientId, responseMsgId);
ByteBuf payload = Unpooled.wrappedBuffer(jsonContent.getBytes(StandardCharsets.UTF_8));
// 4. 发布消息(所有订阅 7941610A 主题的客户端都会收到)
MqttPublishMessage publishMsg = new MqttPublishMessage(fixedHeader, variableHeader, payload);
ctx.writeAndFlush(publishMsg);
log.info("已向主题 {} 发送回复消息消息ID{}", clientId, responseMsgId);
} catch (JsonProcessingException e) {
log.error("构造回复消息失败:", e);
}
}
/**
* 回复消息实体类用于构造指定JSON格式
*/
@Data
private static class ResponseMessage {
private String msg;
private String sn;
private String name;
private int ply;
}
/**
* 处理 QoS2 第二步:客户端发送的 PUBREL 消息,回复 PUBCOMP 完成最终确认
* 用 MqttMessage 构造,避免依赖 MqttPubCompMessage
*/
private void handlePubRel(ChannelHandlerContext ctx, MqttMessageIdVariableHeader variableHeader) {
int messageId = variableHeader.messageId();
log.info("收到客户端 PUBREL 消息QoS2第二步客户端={}, 消息ID={}",
ctx.channel().remoteAddress(), messageId);
// 构造 PUBCOMP 固定头(消息类型为 PUBCOMP
MqttFixedHeader pubCompFixedHeader = new MqttFixedHeader(
// 关键:消息类型指定为 PUBCOMP
MqttMessageType.PUBCOMP,
// 不重复
false,
// QoS0
MqttQoS.AT_MOST_ONCE,
// 不保留
false,
// 可变头长度消息ID占2字节Netty可自动计算填2更明确
2
);
// 构造 PUBCOMP 消息(固定头 + 消息ID可变头
MqttMessage pubCompMsg = new MqttMessage(pubCompFixedHeader, variableHeader);
ctx.writeAndFlush(pubCompMsg);
log.info("发送PUBCOMPQoS2最终确认到客户端 {}消息ID={}",
ctx.channel().remoteAddress(), messageId);
}
private void handleSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) {
Channel clientChannel = ctx.channel();
msg.payload().topicSubscriptions().forEach(sub -> {
String topic = sub.topicFilter();
log.info("客户端订阅主题client={}, topic={}, 请求QoS={}",
clientChannel.remoteAddress(), topic, sub.qualityOfService().value());
// 调用共享管理器添加订阅
subscriptionManager.addSubscription(topic, clientChannel);
});
// 响应订阅成功SUBACK
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0
);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(msg.variableHeader().messageId());
MqttSubAckPayload payload = new MqttSubAckPayload(
msg.payload().topicSubscriptions().stream()
// 按客户端请求QoS返回
.map(sub -> sub.qualityOfService().value())
.toList()
);
MqttSubAckMessage subAckMsg = new MqttSubAckMessage(fixedHeader, variableHeader, payload);
ctx.writeAndFlush(subAckMsg);
log.info("订阅响应已发送client={}, 消息ID={}", clientChannel.remoteAddress(), variableHeader.messageId());
}
private void handleUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) {
Channel clientChannel = ctx.channel();
msg.payload().topics().forEach(topic -> {
log.info("客户端取消订阅client={}, topic={}", clientChannel.remoteAddress(), topic);
// 调用共享管理器移除订阅
subscriptionManager.removeSubscription(topic, clientChannel);
});
// 响应取消订阅成功UNSUBACK
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0
);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(msg.variableHeader().messageId());
MqttUnsubAckMessage unsubAckMsg = new MqttUnsubAckMessage(fixedHeader, variableHeader);
ctx.writeAndFlush(unsubAckMsg);
}
private void handleDisconnect(ChannelHandlerContext ctx) {
log.info("客户端主动断开连接:{}", ctx.channel().remoteAddress());
ctx.close();
}
// -------------------------- 工具方法 --------------------------
private void sendConnectAck(ChannelHandlerContext ctx, MqttConnectReturnCode returnCode) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0
);
MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(returnCode, false);
ctx.writeAndFlush(new MqttConnAckMessage(fixedHeader, variableHeader));
if (returnCode != MqttConnectReturnCode.CONNECTION_ACCEPTED) {
ctx.close();
}
}
private MqttPublishMessage buildForwardPublishMessage(MqttPublishMessage originalMsg, byte[] payload) {
MqttFixedHeader fixedHeader = originalMsg.fixedHeader();
MqttPublishVariableHeader variableHeader = originalMsg.variableHeader();
ByteBuf forwardPayload = Unpooled.wrappedBuffer(payload);
return new MqttPublishMessage(fixedHeader, variableHeader, forwardPayload);
}
// -------------------------- 异常处理 --------------------------
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("客户端处理异常:{},原因:{}", ctx.channel().remoteAddress(), cause.getMessage(), cause);
ctx.close();
}
// -------------------------- 不同QoS的回复实现 --------------------------
/**
* QoS0 自定义确认:发布一个「原主题/ack」的消息告知客户端已收到
*/
private void sendQos0Ack(ChannelHandlerContext ctx, String originalTopic, int messageId, String content) {
String ackTopic = originalTopic + "/ack";
String ackContent = String.format("服务端已收到消息ID=%d, 内容=%s", messageId, content);
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0
);
// QoS0 消息ID为0
MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(ackTopic, 0);
ByteBuf ackPayload = Unpooled.wrappedBuffer(ackContent.getBytes(StandardCharsets.UTF_8));
MqttPublishMessage ackMsg = new MqttPublishMessage(fixedHeader, variableHeader, ackPayload);
ctx.writeAndFlush(ackMsg);
log.debug("发送QoS0确认消息到客户端 {}topic={}, 内容={}",
ctx.channel().remoteAddress(), ackTopic, ackContent);
}
/**
* QoS1 协议确认:回复 PUBACK 消息带原消息ID
*/
private void sendPubAck(ChannelHandlerContext ctx, int messageId) {
MqttFixedHeader fixedHeader = new MqttFixedHeader(
MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2
);
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttPubAckMessage pubAckMsg = new MqttPubAckMessage(fixedHeader, variableHeader);
ctx.writeAndFlush(pubAckMsg);
log.debug("发送PUBACK确认QoS1到客户端 {}消息ID={}",
ctx.channel().remoteAddress(), messageId);
}
/**
* QoS2 第一步确认:回复 PUBREC 消息带原消息ID
* 用 MqttMessage 构造,避免依赖 MqttPubRecMessage
*/
private void sendPubRec(ChannelHandlerContext ctx, int messageId) {
// 构造 PUBREC 固定头(消息类型为 PUBREC
MqttFixedHeader pubRecFixedHeader = new MqttFixedHeader(
// 关键:消息类型指定为 PUBREC
MqttMessageType.PUBREC,
// 不重复
false,
// QoS0
MqttQoS.AT_MOST_ONCE,
// 不保留
false,
// 可变头长度消息ID占2字节
2
);
// 构造 PUBREC 消息(固定头 + 消息ID可变头
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
MqttMessage pubRecMsg = new MqttMessage(pubRecFixedHeader, variableHeader);
ctx.writeAndFlush(pubRecMsg);
log.info("发送PUBREC确认QoS2第一步到客户端 {}消息ID={}",
ctx.channel().remoteAddress(), messageId);
}
}

View File

@@ -0,0 +1,111 @@
package com.czg.mqtt.manager;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 设备在线状态管理器(单例)
* 维护 clientId → 设备状态的映射,线程安全
* @author yjjie
* @date 2025/11/18 15:33
*/
@Slf4j
@Component
public class DeviceStatusManager {
// 并发安全的 Mapkey=clientId设备唯一标识value=设备状态
private final Map<String, DeviceStatus> deviceStatusMap = new ConcurrentHashMap<>();
/**
* 设备状态实体
*/
@Data
public static class DeviceStatus {
private String clientId; // 设备唯一标识
private boolean online; // 在线状态true=在线false=离线)
private long lastPingTime; // 最后一次 Ping 时间(时间戳,毫秒)
private long connectTime; // 连接建立时间(时间戳,毫秒)
private String remoteAddress; // 设备IP:端口
public DeviceStatus(String clientId, String remoteAddress) {
this.clientId = clientId;
this.online = true;
this.connectTime = System.currentTimeMillis();
this.lastPingTime = this.connectTime; // 初始 Ping 时间=连接时间
this.remoteAddress = remoteAddress;
}
}
/**
* 设备连接成功时:注册设备状态
*/
public void registerDevice(String clientId, String remoteAddress) {
DeviceStatus status = new DeviceStatus(clientId, remoteAddress);
deviceStatusMap.put(clientId, status);
System.out.printf("设备注册上线clientId=%s, 地址=%s%n", clientId, remoteAddress);
}
/**
* 收到 Ping 消息时:更新设备最后活跃时间(维持在线状态)
*/
public void updateDevicePing(String clientId) {
DeviceStatus status = deviceStatusMap.get(clientId);
if (status != null) {
status.setLastPingTime(System.currentTimeMillis());
// 刷新为在线状态
status.setOnline(true);
System.out.printf("设备 Ping 刷新clientId=%s, 最后活跃时间=%d%n",
clientId, status.getLastPingTime());
}
}
/**
* 设备断开连接时:标记设备离线
*/
public void markDeviceOffline(String clientId) {
DeviceStatus status = deviceStatusMap.get(clientId);
if (status != null) {
status.setOnline(false);
System.out.printf("设备离线clientId=%s%n", clientId);
// 可选:保留离线状态一段时间,或直接移除(根据业务需求)
// deviceStatusMap.remove(clientId);
}
}
/**
* 查询设备状态
*/
public DeviceStatus getDeviceStatus(String clientId) {
return deviceStatusMap.get(clientId);
}
/**
* 定时清理长时间离线的设备
* offlineThresholdMs = 4 * 60 * 1000 = 14400000 毫秒 = 4分钟
*/
@Scheduled(cron = "0 * * * * ?") // 每分钟第0秒执行如 10:01:00、10:02:00
public void cleanOfflineDevices() {
// 4分钟无Ping则清理
long offlineThresholdMs = 4 * 60 * 1000;
long now = System.currentTimeMillis();
deviceStatusMap.entrySet().removeIf(entry -> {
DeviceStatus status = entry.getValue();
// 最后一次Ping距今超过4分钟
boolean needRemove = (now - status.getLastPingTime()) > offlineThresholdMs;
if (needRemove) {
markDeviceOffline(entry.getKey());
log.info("清理离线设备clientId={}, 最后Ping时间={}, 离线时长={}",
entry.getKey(),
status.getLastPingTime(),
(now - status.getLastPingTime()) / 1000);
}
return needRemove;
});
}
}

View File

@@ -0,0 +1,46 @@
package com.czg.mqtt.manager;
import io.netty.channel.Channel;
import org.springframework.stereotype.Component;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* MQTT 订阅关系管理器(单例,所有 Handler 共享)
*
* @author yjjie
*/
@Component
public class MqttSubscriptionManager {
// 主题 -> 订阅的客户端 Channel线程安全
private final ConcurrentMap<String, Set<Channel>> topicSubscribers = new ConcurrentHashMap<>();
// 添加订阅(主题-客户端绑定)
public void addSubscription(String topic, Channel channel) {
topicSubscribers.computeIfAbsent(topic, k -> ConcurrentHashMap.newKeySet())
.add(channel);
}
// 移除订阅(客户端断开时调用)
public void removeSubscription(String topic, Channel channel) {
Set<Channel> channels = topicSubscribers.get(topic);
if (channels != null) {
channels.remove(channel);
if (channels.isEmpty()) {
topicSubscribers.remove(topic);
}
}
}
// 移除客户端的所有订阅(客户端断开时调用)
public void removeAllSubscriptions(Channel channel) {
topicSubscribers.forEach((topic, channels) -> channels.remove(channel));
}
// 获取主题的所有订阅者
public Set<Channel> getSubscribers(String topic) {
return topicSubscribers.getOrDefault(topic, Set.of());
}
}

View File

@@ -2,17 +2,17 @@
spring:
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://rm-bp1kn7h89nz62cno1ro.mysql.rds.aliyuncs.com:3306/czg_cashier?useUnicode=true&characterEncoding=utf-8
username: cashier
password: Cashier@1@
url: jdbc:mysql://192.168.1.42:3306/czg_cashier?useUnicode=true&characterEncoding=utf-8
username: root
password: Chaozg123.
data:
redis:
host: 121.40.109.122
host: 192.168.1.42
port: 6379
password: chaozg123
password: Chaozg123.
timeout: 1000
database: 2
database: 0
lettuce:
pool:
min-idle: 0

View File

@@ -26,4 +26,9 @@ dubbo:
filter: traceProviderFilter
consumer:
filter: traceConsumerFilter
check: false
check: false
mqtt:
server:
port: 1883

View File

@@ -41,6 +41,7 @@
<weixin.java.miniapp.version>3.8.0</weixin.java.miniapp.version>
<pinyin.version>2.5.1</pinyin.version>
<IJPay.version>2.9.10</IJPay.version>
<netty.version>4.1.128.Final</netty.version>
</properties>
<dependencyManagement>
@@ -242,6 +243,20 @@
<artifactId>excel-spring-boot-starter</artifactId>
<version>3.4.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>${netty.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/io.netty/netty-codec-mqtt -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-mqtt</artifactId>
<version>${netty.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

View File

@@ -17,6 +17,21 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<!-- MQTT 依赖Spring Integration MQTT -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-mqtt</artifactId>
</dependency>
</dependencies>