From c564266576025afc5f82300acca2a49fae0ec911 Mon Sep 17 00:00:00 2001 From: gong <1157756119@qq.com> Date: Wed, 10 Dec 2025 15:59:10 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B0=86=20mqtt=20=E5=90=AF=E5=8A=A8=E6=94=B9?= =?UTF-8?q?=E4=B8=BA=E5=BC=82=E6=AD=A5=E5=90=AF=E5=8A=A8=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/czg/mqtt/MqttNettyServer.java | 47 +++++++++++++++---- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/cash-api/system-server/src/main/java/com/czg/mqtt/MqttNettyServer.java b/cash-api/system-server/src/main/java/com/czg/mqtt/MqttNettyServer.java index 349c32adb..20502a12b 100644 --- a/cash-api/system-server/src/main/java/com/czg/mqtt/MqttNettyServer.java +++ b/cash-api/system-server/src/main/java/com/czg/mqtt/MqttNettyServer.java @@ -33,16 +33,23 @@ import javax.annotation.PreDestroy; @RequiredArgsConstructor public class MqttNettyServer implements CommandLineRunner { private final MqttServerProperties properties; - // 注入共享组件 private final MqttSubscriptionManager subscriptionManager; - private final DeviceStatusManager deviceStatusManager; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; + private ChannelFuture channelFuture; @Override - public void run(String... args) throws Exception { + public void run(String... args) { + // 在单独的线程中启动 Netty 服务器 + Thread nettyThread = new Thread(this::startServer, "netty-server"); + // 设置为非守护线程,确保应用关闭时服务器也能正常关闭 + nettyThread.setDaemon(false); + nettyThread.start(); + } + + private void startServer() { bossGroup = new NioEventLoopGroup(properties.getBossGroupThreads()); workerGroup = new NioEventLoopGroup(properties.getWorkerGroupThreads()); @@ -55,27 +62,47 @@ public class MqttNettyServer implements CommandLineRunner { .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer() { @Override - protected void initChannel(SocketChannel ch) throws Exception { - // 关键修改:每次连接创建新的 MqttServerHandler 实例,注入共享组件 + protected void initChannel(SocketChannel ch) { ch.pipeline() .addLast("mqttDecoder", new MqttDecoder()) .addLast("mqttEncoder", MqttEncoder.INSTANCE) - .addLast("mqttServerHandler", new MqttServerHandler(subscriptionManager, deviceStatusManager)); // 新实例 + .addLast("mqttServerHandler", new MqttServerHandler(subscriptionManager, deviceStatusManager)); } }); - ChannelFuture future = bootstrap.bind(properties.getPort()).sync(); + channelFuture = bootstrap.bind(properties.getPort()).sync(); log.info("MQTT服务启动成功,端口:{}", properties.getPort()); - future.channel().closeFuture().sync(); + + // 添加关闭钩子 + Runtime.getRuntime().addShutdownHook(new Thread(this::stop)); + + // 阻塞等待服务器关闭 + channelFuture.channel().closeFuture().sync(); + } catch (InterruptedException e) { + log.error("MQTT服务被中断", e); + Thread.currentThread().interrupt(); + } catch (Exception e) { + log.error("MQTT服务启动失败", e); } finally { - workerGroup.shutdownGracefully(); - bossGroup.shutdownGracefully(); + stop(); } } @PreDestroy public void stop() { log.info("MQTT服务开始关闭..."); + + // 先关闭服务器通道 + if (channelFuture != null && channelFuture.channel() != null) { + try { + channelFuture.channel().close().sync(); + } catch (InterruptedException e) { + log.warn("关闭服务器通道时被中断", e); + Thread.currentThread().interrupt(); + } + } + + // 再关闭事件循环组 if (workerGroup != null) { workerGroup.shutdownGracefully(); }