diff --git a/cash-api/system-server/src/main/java/com/czg/SystemApplication.java b/cash-api/system-server/src/main/java/com/czg/SystemApplication.java index 763aa6845..5616ed785 100644 --- a/cash-api/system-server/src/main/java/com/czg/SystemApplication.java +++ b/cash-api/system-server/src/main/java/com/czg/SystemApplication.java @@ -5,6 +5,7 @@ import org.mybatis.spring.annotation.MapperScan; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.client.discovery.EnableDiscoveryClient; +import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.transaction.annotation.EnableTransactionManagement; /** @@ -15,6 +16,7 @@ import org.springframework.transaction.annotation.EnableTransactionManagement; @EnableTransactionManagement @MapperScan("com.czg.service.system.mapper") @EnableDubbo +@EnableScheduling public class SystemApplication { public static void main(String[] args) { diff --git a/cash-api/system-server/src/main/java/com/czg/mqtt/MqttNettyServer.java b/cash-api/system-server/src/main/java/com/czg/mqtt/MqttNettyServer.java new file mode 100644 index 000000000..349c32adb --- /dev/null +++ b/cash-api/system-server/src/main/java/com/czg/mqtt/MqttNettyServer.java @@ -0,0 +1,87 @@ +package com.czg.mqtt; + +import com.czg.mqtt.config.MqttServerProperties; +import com.czg.mqtt.handler.MqttServerHandler; +import com.czg.mqtt.manager.DeviceStatusManager; +import com.czg.mqtt.manager.MqttSubscriptionManager; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.handler.codec.mqtt.MqttDecoder; +import io.netty.handler.codec.mqtt.MqttEncoder; +import io.netty.handler.logging.LogLevel; +import io.netty.handler.logging.LoggingHandler; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.CommandLineRunner; +import org.springframework.stereotype.Component; + +import javax.annotation.PreDestroy; + +/** + * Netty 服务 + * + * @author yjjie + */ +@Slf4j +@Component +@RequiredArgsConstructor +public class MqttNettyServer implements CommandLineRunner { + private final MqttServerProperties properties; + // 注入共享组件 + private final MqttSubscriptionManager subscriptionManager; + + private final DeviceStatusManager deviceStatusManager; + + private EventLoopGroup bossGroup; + private EventLoopGroup workerGroup; + + @Override + public void run(String... args) throws Exception { + bossGroup = new NioEventLoopGroup(properties.getBossGroupThreads()); + workerGroup = new NioEventLoopGroup(properties.getWorkerGroupThreads()); + + try { + ServerBootstrap bootstrap = new ServerBootstrap() + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 128) + .childOption(ChannelOption.SO_KEEPALIVE, true) + .handler(new LoggingHandler(LogLevel.INFO)) + .childHandler(new ChannelInitializer() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + // 关键修改:每次连接创建新的 MqttServerHandler 实例,注入共享组件 + ch.pipeline() + .addLast("mqttDecoder", new MqttDecoder()) + .addLast("mqttEncoder", MqttEncoder.INSTANCE) + .addLast("mqttServerHandler", new MqttServerHandler(subscriptionManager, deviceStatusManager)); // 新实例 + } + }); + + ChannelFuture future = bootstrap.bind(properties.getPort()).sync(); + log.info("MQTT服务启动成功,端口:{}", properties.getPort()); + future.channel().closeFuture().sync(); + } finally { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + } + + @PreDestroy + public void stop() { + log.info("MQTT服务开始关闭..."); + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + } + if (bossGroup != null) { + bossGroup.shutdownGracefully(); + } + log.info("MQTT服务关闭完成"); + } +} diff --git a/cash-api/system-server/src/main/java/com/czg/mqtt/config/MqttServerProperties.java b/cash-api/system-server/src/main/java/com/czg/mqtt/config/MqttServerProperties.java new file mode 100644 index 000000000..36f51f868 --- /dev/null +++ b/cash-api/system-server/src/main/java/com/czg/mqtt/config/MqttServerProperties.java @@ -0,0 +1,22 @@ +package com.czg.mqtt.config; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +/** + * MQTT 配置 + * + * @author yjjie + */ +@Data +@Component +@ConfigurationProperties(prefix = "mqtt.server") +public class MqttServerProperties { + // MQTT 服务端口(默认 1883,MQTT 标准端口) + private int port = 1883; + // Netty boss 线程数(默认 CPU 核心数) + private int bossGroupThreads = Runtime.getRuntime().availableProcessors(); + // Netty worker 线程数(默认 CPU 核心数 * 2) + private int workerGroupThreads = Runtime.getRuntime().availableProcessors() * 2; +} diff --git a/cash-api/system-server/src/main/java/com/czg/mqtt/handler/MqttServerHandler.java b/cash-api/system-server/src/main/java/com/czg/mqtt/handler/MqttServerHandler.java new file mode 100644 index 000000000..cadf1d530 --- /dev/null +++ b/cash-api/system-server/src/main/java/com/czg/mqtt/handler/MqttServerHandler.java @@ -0,0 +1,490 @@ +package com.czg.mqtt.handler; + +import com.czg.mqtt.manager.DeviceStatusManager; +import com.czg.mqtt.manager.MqttSubscriptionManager; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.codec.mqtt.*; +import lombok.Data; +import lombok.extern.slf4j.Slf4j; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +/** + * 实现 MQTT 消息处理器 + * 自定义 ChannelInboundHandlerAdapter,处理 MQTT 协议的核心消息类型: + * CONNECT:客户端连接请求 + * PUBLISH:客户端发布消息 + * SUBSCRIBE:客户端订阅主题 + * UNSUBSCRIBE:客户端取消订阅 + * DISCONNECT:客户端断开连接 + * PINGREQ:客户端心跳请求 + * PUBREL:QoS2 第二步确认请求 + * + * @author yjjie + */ +@Slf4j +public class MqttServerHandler extends ChannelInboundHandlerAdapter { + // 注入设备状态管理器(单例) + private final DeviceStatusManager deviceStatusManager; + + // 并发安全的 Map:缓存 Channel(连接)→ clientId(设备唯一标识)的映射 + private final Map channelClientIdMap = new ConcurrentHashMap<>(); + + // 注入共享的订阅关系管理器(单例) + private final MqttSubscriptionManager subscriptionManager; + // JSON 序列化工具(用于构造回复消息格式) + private final ObjectMapper objectMapper = new ObjectMapper(); + + // 创建一个调度线程池(可以全局复用) + private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(4); + + // 构造函数(接收共享组件) + public MqttServerHandler(MqttSubscriptionManager subscriptionManager, DeviceStatusManager deviceStatusManager) { + this.subscriptionManager = subscriptionManager; + this.deviceStatusManager = deviceStatusManager; + } + + // -------------------------- 连接相关 -------------------------- + @Override + public void channelActive(ChannelHandlerContext ctx) throws Exception { + log.info("客户端连接成功:{}", ctx.channel().remoteAddress()); + super.channelActive(ctx); + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + Channel channel = ctx.channel(); + // 通过 Channel 找 clientId + String clientId = channelClientIdMap.get(channel); + if (clientId != null) { + // 标记离线 + deviceStatusManager.markDeviceOffline(clientId); + // 清理映射 + channelClientIdMap.remove(channel); + // 清理订阅(原有逻辑) + subscriptionManager.removeAllSubscriptions(channel); + } + + log.info("客户端断开连接:{}", ctx.channel().remoteAddress()); + super.channelInactive(ctx); + } + + // -------------------------- 消息处理 -------------------------- + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof MqttMessage mqttMsg) { + MqttFixedHeader fixedHeader = mqttMsg.fixedHeader(); + MqttMessageType msgType = fixedHeader.messageType(); + + // 新增:处理 QoS2 第二步 - 客户端发送的 PUBREL 消息 + if (msgType == MqttMessageType.PUBREL) { + handlePubRel(ctx, (MqttMessageIdVariableHeader) mqttMsg.variableHeader()); + return; + } + + // 根据消息类型分发处理 + switch (msgType) { + case CONNECT: + handleConnect(ctx, (MqttConnectMessage) mqttMsg); + break; + case PUBLISH: + handlePublish(ctx, (MqttPublishMessage) mqttMsg); + break; + case SUBSCRIBE: + handleSubscribe(ctx, (MqttSubscribeMessage) mqttMsg); + break; + case UNSUBSCRIBE: + handleUnsubscribe(ctx, (MqttUnsubscribeMessage) mqttMsg); + break; + case DISCONNECT: + handleDisconnect(ctx); + break; + case PINGREQ: + handlePingReq(ctx); + break; + default: + log.warn("不支持的MQTT消息类型:{}", msgType); + break; + } + } + } + + // -------------------------- 具体消息处理逻辑 -------------------------- + + /** + * 处理 PINGREQ 消息(客户端心跳请求) + * 响应 PINGRESP 消息以维持连接 + * 更新设备在线状态 + */ + private void handlePingReq(ChannelHandlerContext ctx) { + Channel channel = ctx.channel(); + // 关键:通过 Channel 关联 clientId + String clientId = channelClientIdMap.get(channel); + + if (clientId != null) { + // 更新设备最后 Ping 时间(维持在线) + deviceStatusManager.updateDevicePing(clientId); + log.info("收到设备心跳:clientId={}, 地址={}", clientId, channel.remoteAddress()); + } else { + log.warn("收到未知设备心跳:地址={}(未找到 clientId,可能连接未完成)", channel.remoteAddress()); + } + + // 构造 PINGRESP 消息(固定头:PINGRESP,无可变头和负载) + MqttFixedHeader pingRespHeader = new MqttFixedHeader( + // 消息类型:PINGRESP + MqttMessageType.PINGRESP, + // 是否为重复消息 + false, + // QoS + MqttQoS.AT_MOST_ONCE, + // 是否保留 + false, + // 剩余长度(PINGRESP 无可变头和负载,剩余长度为0) + 0 + ); + + MqttMessage pingRespMsg = new MqttMessage(pingRespHeader); + + // 回复 PINGRESP + ctx.writeAndFlush(pingRespMsg); + log.info("已向客户端 {} 发送 PINGRESP 心跳响应", ctx.channel().remoteAddress()); + } + + /** + * 处理 CONNECT 消息(设备连接请求):缓存 clientId ↔ Channel 映射,注册设备状态 + */ + private void handleConnect(ChannelHandlerContext ctx, MqttConnectMessage msg) { + MqttConnectPayload payload = msg.payload(); + // 设备唯一标识(客户端必须携带) + String clientId = payload.clientIdentifier(); + // 设备地址 + String remoteAddress = ctx.channel().remoteAddress().toString(); + + // 1. 校验 clientId(非空,MQTT3.1.1 要求) + if (clientId == null || clientId.isEmpty()) { + sendConnectAck(ctx, MqttConnectReturnCode.CONNECTION_REFUSED_CLIENT_IDENTIFIER_NOT_VALID); + log.error("设备连接拒绝:clientId 为空,地址={}", remoteAddress); + return; + } + + // 2. 缓存 Channel ↔ clientId 映射(核心:后续通过 Channel 找设备) + Channel channel = ctx.channel(); + channelClientIdMap.put(channel, clientId); + + // 3. 注册设备状态(标记为在线) + deviceStatusManager.registerDevice(clientId, remoteAddress); + + // 4. 响应连接成功(原有逻辑) + sendConnectAck(ctx, MqttConnectReturnCode.CONNECTION_ACCEPTED); + log.info("设备连接成功:clientId={}, 地址={}, 协议版本={}", + clientId, remoteAddress, msg.variableHeader().version()); + } + + private void handlePublish(ChannelHandlerContext ctx, MqttPublishMessage msg) { + String topic = msg.variableHeader().topicName(); + ByteBuf payloadBuf = msg.payload(); + byte[] payload = new byte[payloadBuf.readableBytes()]; + payloadBuf.readBytes(payload); + // 释放ByteBuf,避免内存泄漏 + payloadBuf.release(); + + String content = new String(payload, StandardCharsets.UTF_8); + // 获取固定头(存储QoS) + MqttFixedHeader fixedHeader = msg.fixedHeader(); + MqttPublishVariableHeader variableHeader = msg.variableHeader(); + MqttQoS qosLevel = fixedHeader.qosLevel(); + int messageId = variableHeader.packetId(); + + // 日志输出(包含QoS等级和消息ID) + log.info("收到发布消息:topic={}, QoS={}, 消息ID={}, 内容={}, 客户端={}", + topic, qosLevel.value(), messageId, content, ctx.channel().remoteAddress()); + + // -------------------------- 核心新增:收到 con 主题消息,回复 7941610A 主题 -------------------------- + if ("con".equals(topic)) { + // 延迟 2 秒执行 + SCHEDULER.schedule(() -> { + sendResponseToClient(ctx, "微信到账,十二元"); + }, 2, TimeUnit.SECONDS); + } + + // 按QoS等级回复确认(重点实现QoS2) + switch (qosLevel) { + // QoS0:自定义主题回复确认 + case AT_MOST_ONCE: + sendQos0Ack(ctx, topic, messageId, content); + break; + // QoS1:回复 PUBACK(协议要求) + case AT_LEAST_ONCE: + sendPubAck(ctx, messageId); + break; + // QoS2:第一步回复 PUBREC(协议要求) + case EXACTLY_ONCE: + sendPubRec(ctx, messageId); + log.info("已处理QoS2消息,发送PUBREC确认:消息ID={}", messageId); + break; + default: + log.warn("不支持的QoS等级:{}", qosLevel); + } + + // 转发消息逻辑(不变) + Set subscribers = subscriptionManager.getSubscribers(topic); + if (subscribers.isEmpty()) { + log.debug("主题 {} 无订阅者,消息丢弃", topic); + return; + } + + MqttPublishMessage forwardMsg = buildForwardPublishMessage(msg, payload); + for (Channel subscriber : subscribers) { + if (subscriber.isActive() && subscriber != ctx.channel()) { + subscriber.writeAndFlush(forwardMsg.retain()); + log.debug("转发消息到客户端 {}:topic={}, 内容={}", + subscriber.remoteAddress(), topic, content); + } + } + } + + /** + * 向设备指定格式的回复消息 + */ + private void sendResponseToClient(ChannelHandlerContext ctx, String msg) { + try { + Channel channel = ctx.channel(); + // 关键:通过 Channel 关联 clientId + String clientId = channelClientIdMap.get(channel); + + // 1. 构造回复消息内容(按要求格式) + ResponseMessage responseMsg = new ResponseMessage(); + responseMsg.setMsg(msg); + responseMsg.setSn("超管"); + responseMsg.setName("超管"); + responseMsg.setPly(51); + + // 2. 序列化为 JSON 字符串 + String jsonContent = objectMapper.writeValueAsString(responseMsg); + log.info("向主题 {} 发布回复消息:{}", clientId, jsonContent); + + // 3. 构造 MQTT 发布消息(QoS2,与客户端发送的等级一致,确保可靠送达) + MqttFixedHeader fixedHeader = new MqttFixedHeader( + // 消息类型:PUBLISH + MqttMessageType.PUBLISH, + // 不重复 + false, + // QoS2(与客户端一致,确保可靠) + MqttQoS.EXACTLY_ONCE, + // 不保留 + false, + // 剩余长度(Netty自动计算) + 0 + ); + // 消息ID:生成随机不重复的ID(1-65535之间) + int responseMsgId = (int) (Math.random() * 65535) + 1; + MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(clientId, responseMsgId); + ByteBuf payload = Unpooled.wrappedBuffer(jsonContent.getBytes(StandardCharsets.UTF_8)); + + // 4. 发布消息(所有订阅 7941610A 主题的客户端都会收到) + MqttPublishMessage publishMsg = new MqttPublishMessage(fixedHeader, variableHeader, payload); + ctx.writeAndFlush(publishMsg); + + log.info("已向主题 {} 发送回复消息,消息ID:{}", clientId, responseMsgId); + } catch (JsonProcessingException e) { + log.error("构造回复消息失败:", e); + } + } + + /** + * 回复消息实体类(用于构造指定JSON格式) + */ + @Data + private static class ResponseMessage { + private String msg; + private String sn; + private String name; + private int ply; + + } + + /** + * 处理 QoS2 第二步:客户端发送的 PUBREL 消息,回复 PUBCOMP 完成最终确认 + * 用 MqttMessage 构造,避免依赖 MqttPubCompMessage + */ + private void handlePubRel(ChannelHandlerContext ctx, MqttMessageIdVariableHeader variableHeader) { + int messageId = variableHeader.messageId(); + log.info("收到客户端 PUBREL 消息(QoS2第二步):客户端={}, 消息ID={}", + ctx.channel().remoteAddress(), messageId); + + // 构造 PUBCOMP 固定头(消息类型为 PUBCOMP) + MqttFixedHeader pubCompFixedHeader = new MqttFixedHeader( + // 关键:消息类型指定为 PUBCOMP + MqttMessageType.PUBCOMP, + // 不重复 + false, + // QoS0 + MqttQoS.AT_MOST_ONCE, + // 不保留 + false, + // 可变头长度(消息ID占2字节,Netty可自动计算,填2更明确) + 2 + ); + + // 构造 PUBCOMP 消息(固定头 + 消息ID可变头) + MqttMessage pubCompMsg = new MqttMessage(pubCompFixedHeader, variableHeader); + + ctx.writeAndFlush(pubCompMsg); + log.info("发送PUBCOMP(QoS2最终确认)到客户端 {}:消息ID={}", + ctx.channel().remoteAddress(), messageId); + } + + private void handleSubscribe(ChannelHandlerContext ctx, MqttSubscribeMessage msg) { + Channel clientChannel = ctx.channel(); + msg.payload().topicSubscriptions().forEach(sub -> { + String topic = sub.topicFilter(); + log.info("客户端订阅主题:client={}, topic={}, 请求QoS={}", + clientChannel.remoteAddress(), topic, sub.qualityOfService().value()); + + // 调用共享管理器添加订阅 + subscriptionManager.addSubscription(topic, clientChannel); + }); + + // 响应订阅成功(SUBACK) + MqttFixedHeader fixedHeader = new MqttFixedHeader( + MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0 + ); + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()); + MqttSubAckPayload payload = new MqttSubAckPayload( + msg.payload().topicSubscriptions().stream() + // 按客户端请求QoS返回 + .map(sub -> sub.qualityOfService().value()) + .toList() + ); + + MqttSubAckMessage subAckMsg = new MqttSubAckMessage(fixedHeader, variableHeader, payload); + ctx.writeAndFlush(subAckMsg); + log.info("订阅响应已发送:client={}, 消息ID={}", clientChannel.remoteAddress(), variableHeader.messageId()); + } + + private void handleUnsubscribe(ChannelHandlerContext ctx, MqttUnsubscribeMessage msg) { + Channel clientChannel = ctx.channel(); + msg.payload().topics().forEach(topic -> { + log.info("客户端取消订阅:client={}, topic={}", clientChannel.remoteAddress(), topic); + // 调用共享管理器移除订阅 + subscriptionManager.removeSubscription(topic, clientChannel); + }); + + // 响应取消订阅成功(UNSUBACK) + MqttFixedHeader fixedHeader = new MqttFixedHeader( + MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0 + ); + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(msg.variableHeader().messageId()); + MqttUnsubAckMessage unsubAckMsg = new MqttUnsubAckMessage(fixedHeader, variableHeader); + ctx.writeAndFlush(unsubAckMsg); + } + + private void handleDisconnect(ChannelHandlerContext ctx) { + log.info("客户端主动断开连接:{}", ctx.channel().remoteAddress()); + ctx.close(); + } + + // -------------------------- 工具方法 -------------------------- + private void sendConnectAck(ChannelHandlerContext ctx, MqttConnectReturnCode returnCode) { + MqttFixedHeader fixedHeader = new MqttFixedHeader( + MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0 + ); + MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(returnCode, false); + ctx.writeAndFlush(new MqttConnAckMessage(fixedHeader, variableHeader)); + if (returnCode != MqttConnectReturnCode.CONNECTION_ACCEPTED) { + ctx.close(); + } + } + + private MqttPublishMessage buildForwardPublishMessage(MqttPublishMessage originalMsg, byte[] payload) { + MqttFixedHeader fixedHeader = originalMsg.fixedHeader(); + MqttPublishVariableHeader variableHeader = originalMsg.variableHeader(); + ByteBuf forwardPayload = Unpooled.wrappedBuffer(payload); + return new MqttPublishMessage(fixedHeader, variableHeader, forwardPayload); + } + + // -------------------------- 异常处理 -------------------------- + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + log.error("客户端处理异常:{},原因:{}", ctx.channel().remoteAddress(), cause.getMessage(), cause); + ctx.close(); + } + + // -------------------------- 不同QoS的回复实现 -------------------------- + + /** + * QoS0 自定义确认:发布一个「原主题/ack」的消息,告知客户端已收到 + */ + private void sendQos0Ack(ChannelHandlerContext ctx, String originalTopic, int messageId, String content) { + String ackTopic = originalTopic + "/ack"; + String ackContent = String.format("服务端已收到消息:ID=%d, 内容=%s", messageId, content); + + MqttFixedHeader fixedHeader = new MqttFixedHeader( + MqttMessageType.PUBLISH, false, MqttQoS.AT_MOST_ONCE, false, 0 + ); + // QoS0 消息ID为0 + MqttPublishVariableHeader variableHeader = new MqttPublishVariableHeader(ackTopic, 0); + ByteBuf ackPayload = Unpooled.wrappedBuffer(ackContent.getBytes(StandardCharsets.UTF_8)); + + MqttPublishMessage ackMsg = new MqttPublishMessage(fixedHeader, variableHeader, ackPayload); + ctx.writeAndFlush(ackMsg); + + log.debug("发送QoS0确认消息到客户端 {}:topic={}, 内容={}", + ctx.channel().remoteAddress(), ackTopic, ackContent); + } + + /** + * QoS1 协议确认:回复 PUBACK 消息(带原消息ID) + */ + private void sendPubAck(ChannelHandlerContext ctx, int messageId) { + MqttFixedHeader fixedHeader = new MqttFixedHeader( + MqttMessageType.PUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2 + ); + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId); + MqttPubAckMessage pubAckMsg = new MqttPubAckMessage(fixedHeader, variableHeader); + + ctx.writeAndFlush(pubAckMsg); + log.debug("发送PUBACK确认(QoS1)到客户端 {}:消息ID={}", + ctx.channel().remoteAddress(), messageId); + } + + /** + * QoS2 第一步确认:回复 PUBREC 消息(带原消息ID) + * 用 MqttMessage 构造,避免依赖 MqttPubRecMessage + */ + private void sendPubRec(ChannelHandlerContext ctx, int messageId) { + // 构造 PUBREC 固定头(消息类型为 PUBREC) + MqttFixedHeader pubRecFixedHeader = new MqttFixedHeader( + // 关键:消息类型指定为 PUBREC + MqttMessageType.PUBREC, + // 不重复 + false, + // QoS0 + MqttQoS.AT_MOST_ONCE, + // 不保留 + false, + // 可变头长度(消息ID占2字节) + 2 + ); + + // 构造 PUBREC 消息(固定头 + 消息ID可变头) + MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId); + MqttMessage pubRecMsg = new MqttMessage(pubRecFixedHeader, variableHeader); + + ctx.writeAndFlush(pubRecMsg); + log.info("发送PUBREC确认(QoS2第一步)到客户端 {}:消息ID={}", + ctx.channel().remoteAddress(), messageId); + } +} \ No newline at end of file diff --git a/cash-api/system-server/src/main/java/com/czg/mqtt/manager/DeviceStatusManager.java b/cash-api/system-server/src/main/java/com/czg/mqtt/manager/DeviceStatusManager.java new file mode 100644 index 000000000..2f254280e --- /dev/null +++ b/cash-api/system-server/src/main/java/com/czg/mqtt/manager/DeviceStatusManager.java @@ -0,0 +1,111 @@ +package com.czg.mqtt.manager; + +import lombok.Data; +import lombok.extern.slf4j.Slf4j; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 设备在线状态管理器(单例) + * 维护 clientId → 设备状态的映射,线程安全 + * @author yjjie + * @date 2025/11/18 15:33 + */ +@Slf4j +@Component +public class DeviceStatusManager { + // 并发安全的 Map:key=clientId(设备唯一标识),value=设备状态 + private final Map deviceStatusMap = new ConcurrentHashMap<>(); + + /** + * 设备状态实体 + */ + @Data + public static class DeviceStatus { + private String clientId; // 设备唯一标识 + private boolean online; // 在线状态(true=在线,false=离线) + private long lastPingTime; // 最后一次 Ping 时间(时间戳,毫秒) + private long connectTime; // 连接建立时间(时间戳,毫秒) + private String remoteAddress; // 设备IP:端口 + + public DeviceStatus(String clientId, String remoteAddress) { + this.clientId = clientId; + this.online = true; + this.connectTime = System.currentTimeMillis(); + this.lastPingTime = this.connectTime; // 初始 Ping 时间=连接时间 + this.remoteAddress = remoteAddress; + } + } + + /** + * 设备连接成功时:注册设备状态 + */ + public void registerDevice(String clientId, String remoteAddress) { + DeviceStatus status = new DeviceStatus(clientId, remoteAddress); + deviceStatusMap.put(clientId, status); + System.out.printf("设备注册上线:clientId=%s, 地址=%s%n", clientId, remoteAddress); + } + + /** + * 收到 Ping 消息时:更新设备最后活跃时间(维持在线状态) + */ + public void updateDevicePing(String clientId) { + DeviceStatus status = deviceStatusMap.get(clientId); + if (status != null) { + status.setLastPingTime(System.currentTimeMillis()); + // 刷新为在线状态 + status.setOnline(true); + System.out.printf("设备 Ping 刷新:clientId=%s, 最后活跃时间=%d%n", + clientId, status.getLastPingTime()); + } + } + + /** + * 设备断开连接时:标记设备离线 + */ + public void markDeviceOffline(String clientId) { + DeviceStatus status = deviceStatusMap.get(clientId); + if (status != null) { + status.setOnline(false); + System.out.printf("设备离线:clientId=%s%n", clientId); + // 可选:保留离线状态一段时间,或直接移除(根据业务需求) + // deviceStatusMap.remove(clientId); + } + } + + /** + * 查询设备状态 + */ + public DeviceStatus getDeviceStatus(String clientId) { + return deviceStatusMap.get(clientId); + } + + /** + * 定时清理长时间离线的设备 + * offlineThresholdMs = 4 * 60 * 1000 = 14400000 毫秒 = 4分钟 + */ + @Scheduled(cron = "0 * * * * ?") // 每分钟第0秒执行(如 10:01:00、10:02:00) + public void cleanOfflineDevices() { + // 4分钟无Ping则清理 + long offlineThresholdMs = 4 * 60 * 1000; + long now = System.currentTimeMillis(); + + deviceStatusMap.entrySet().removeIf(entry -> { + DeviceStatus status = entry.getValue(); + // 最后一次Ping距今超过4分钟 + boolean needRemove = (now - status.getLastPingTime()) > offlineThresholdMs; + + if (needRemove) { + markDeviceOffline(entry.getKey()); + log.info("清理离线设备:clientId={}, 最后Ping时间={}, 离线时长={}", + entry.getKey(), + status.getLastPingTime(), + (now - status.getLastPingTime()) / 1000); + } + return needRemove; + }); + } +} diff --git a/cash-api/system-server/src/main/java/com/czg/mqtt/manager/MqttSubscriptionManager.java b/cash-api/system-server/src/main/java/com/czg/mqtt/manager/MqttSubscriptionManager.java new file mode 100644 index 000000000..34a6f38ae --- /dev/null +++ b/cash-api/system-server/src/main/java/com/czg/mqtt/manager/MqttSubscriptionManager.java @@ -0,0 +1,46 @@ +package com.czg.mqtt.manager; + +import io.netty.channel.Channel; +import org.springframework.stereotype.Component; + +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * MQTT 订阅关系管理器(单例,所有 Handler 共享) + * + * @author yjjie + */ +@Component +public class MqttSubscriptionManager { + // 主题 -> 订阅的客户端 Channel(线程安全) + private final ConcurrentMap> topicSubscribers = new ConcurrentHashMap<>(); + + // 添加订阅(主题-客户端绑定) + public void addSubscription(String topic, Channel channel) { + topicSubscribers.computeIfAbsent(topic, k -> ConcurrentHashMap.newKeySet()) + .add(channel); + } + + // 移除订阅(客户端断开时调用) + public void removeSubscription(String topic, Channel channel) { + Set channels = topicSubscribers.get(topic); + if (channels != null) { + channels.remove(channel); + if (channels.isEmpty()) { + topicSubscribers.remove(topic); + } + } + } + + // 移除客户端的所有订阅(客户端断开时调用) + public void removeAllSubscriptions(Channel channel) { + topicSubscribers.forEach((topic, channels) -> channels.remove(channel)); + } + + // 获取主题的所有订阅者 + public Set getSubscribers(String topic) { + return topicSubscribers.getOrDefault(topic, Set.of()); + } +} diff --git a/cash-api/system-server/src/main/resources/application-test.yml b/cash-api/system-server/src/main/resources/application-test.yml index ef0ddeeca..ac4eb853b 100644 --- a/cash-api/system-server/src/main/resources/application-test.yml +++ b/cash-api/system-server/src/main/resources/application-test.yml @@ -2,17 +2,17 @@ spring: datasource: driver-class-name: com.mysql.cj.jdbc.Driver - url: jdbc:mysql://rm-bp1kn7h89nz62cno1ro.mysql.rds.aliyuncs.com:3306/czg_cashier?useUnicode=true&characterEncoding=utf-8 - username: cashier - password: Cashier@1@ + url: jdbc:mysql://192.168.1.42:3306/czg_cashier?useUnicode=true&characterEncoding=utf-8 + username: root + password: Chaozg123. data: redis: - host: 121.40.109.122 + host: 192.168.1.42 port: 6379 - password: chaozg123 + password: Chaozg123. timeout: 1000 - database: 2 + database: 0 lettuce: pool: min-idle: 0 diff --git a/cash-api/system-server/src/main/resources/application.yml b/cash-api/system-server/src/main/resources/application.yml index 6b3054e57..3b3e1fd01 100644 --- a/cash-api/system-server/src/main/resources/application.yml +++ b/cash-api/system-server/src/main/resources/application.yml @@ -26,4 +26,9 @@ dubbo: filter: traceProviderFilter consumer: filter: traceConsumerFilter - check: false \ No newline at end of file + check: false + +mqtt: + server: + port: 1883 + diff --git a/cash-dependencies/pom.xml b/cash-dependencies/pom.xml index 818ad28aa..a844e7a5d 100644 --- a/cash-dependencies/pom.xml +++ b/cash-dependencies/pom.xml @@ -41,6 +41,7 @@ 3.8.0 2.5.1 2.9.10 + 4.1.128.Final @@ -242,6 +243,20 @@ excel-spring-boot-starter 3.4.0 + + + + io.netty + netty-all + ${netty.version} + + + + + io.netty + netty-codec-mqtt + ${netty.version} + diff --git a/cash-service/system-service/pom.xml b/cash-service/system-service/pom.xml index ff1c05dd7..8ed8ce096 100644 --- a/cash-service/system-service/pom.xml +++ b/cash-service/system-service/pom.xml @@ -17,6 +17,21 @@ UTF-8 + + + org.springframework.integration + spring-integration-mqtt + + + + io.netty + netty-all + + + + io.netty + netty-codec-mqtt +