将 mqtt 启动改为异步启动服务

This commit is contained in:
gong
2025-12-10 15:59:10 +08:00
parent 6ff5436128
commit c564266576

View File

@@ -33,16 +33,23 @@ import javax.annotation.PreDestroy;
@RequiredArgsConstructor @RequiredArgsConstructor
public class MqttNettyServer implements CommandLineRunner { public class MqttNettyServer implements CommandLineRunner {
private final MqttServerProperties properties; private final MqttServerProperties properties;
// 注入共享组件
private final MqttSubscriptionManager subscriptionManager; private final MqttSubscriptionManager subscriptionManager;
private final DeviceStatusManager deviceStatusManager; private final DeviceStatusManager deviceStatusManager;
private EventLoopGroup bossGroup; private EventLoopGroup bossGroup;
private EventLoopGroup workerGroup; private EventLoopGroup workerGroup;
private ChannelFuture channelFuture;
@Override @Override
public void run(String... args) throws Exception { public void run(String... args) {
// 在单独的线程中启动 Netty 服务器
Thread nettyThread = new Thread(this::startServer, "netty-server");
// 设置为非守护线程,确保应用关闭时服务器也能正常关闭
nettyThread.setDaemon(false);
nettyThread.start();
}
private void startServer() {
bossGroup = new NioEventLoopGroup(properties.getBossGroupThreads()); bossGroup = new NioEventLoopGroup(properties.getBossGroupThreads());
workerGroup = new NioEventLoopGroup(properties.getWorkerGroupThreads()); workerGroup = new NioEventLoopGroup(properties.getWorkerGroupThreads());
@@ -55,27 +62,47 @@ public class MqttNettyServer implements CommandLineRunner {
.handler(new LoggingHandler(LogLevel.INFO)) .handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() { .childHandler(new ChannelInitializer<SocketChannel>() {
@Override @Override
protected void initChannel(SocketChannel ch) throws Exception { protected void initChannel(SocketChannel ch) {
// 关键修改:每次连接创建新的 MqttServerHandler 实例,注入共享组件
ch.pipeline() ch.pipeline()
.addLast("mqttDecoder", new MqttDecoder()) .addLast("mqttDecoder", new MqttDecoder())
.addLast("mqttEncoder", MqttEncoder.INSTANCE) .addLast("mqttEncoder", MqttEncoder.INSTANCE)
.addLast("mqttServerHandler", new MqttServerHandler(subscriptionManager, deviceStatusManager)); // 新实例 .addLast("mqttServerHandler", new MqttServerHandler(subscriptionManager, deviceStatusManager));
} }
}); });
ChannelFuture future = bootstrap.bind(properties.getPort()).sync(); channelFuture = bootstrap.bind(properties.getPort()).sync();
log.info("MQTT服务启动成功端口{}", properties.getPort()); log.info("MQTT服务启动成功端口{}", properties.getPort());
future.channel().closeFuture().sync();
// 添加关闭钩子
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
// 阻塞等待服务器关闭
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("MQTT服务被中断", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
log.error("MQTT服务启动失败", e);
} finally { } finally {
workerGroup.shutdownGracefully(); stop();
bossGroup.shutdownGracefully();
} }
} }
@PreDestroy @PreDestroy
public void stop() { public void stop() {
log.info("MQTT服务开始关闭..."); log.info("MQTT服务开始关闭...");
// 先关闭服务器通道
if (channelFuture != null && channelFuture.channel() != null) {
try {
channelFuture.channel().close().sync();
} catch (InterruptedException e) {
log.warn("关闭服务器通道时被中断", e);
Thread.currentThread().interrupt();
}
}
// 再关闭事件循环组
if (workerGroup != null) { if (workerGroup != null) {
workerGroup.shutdownGracefully(); workerGroup.shutdownGracefully();
} }