无效代码移除
购物车 规格字段
This commit is contained in:
@@ -1,20 +1,12 @@
|
||||
package com.chaozhanggui.system.cashierservice.netty.config;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.handler.codec.http.FullHttpRequest;
|
||||
import io.netty.handler.codec.http.HttpHeaders;
|
||||
import io.netty.handler.codec.http.QueryStringDecoder;
|
||||
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
|
||||
import io.netty.handler.codec.mqtt.*;
|
||||
import io.netty.util.CharsetUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.io.Charsets;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -87,66 +79,6 @@ public abstract class NettyChannelHandlerAdapter extends ChannelInboundHandlerAd
|
||||
*/
|
||||
@Override
|
||||
public final void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException {
|
||||
if (msg instanceof MqttMessage) {
|
||||
MqttMessage mqttMessage = (MqttMessage) msg;
|
||||
|
||||
MqttMessageType messageType = mqttMessage.fixedHeader().messageType();
|
||||
switch (messageType) {
|
||||
case PINGREQ:
|
||||
sendPingResponse(ctx);
|
||||
break;
|
||||
case CONNECT:
|
||||
MqttConnectMessage connectMessage = (MqttConnectMessage) mqttMessage;
|
||||
MqttConnectVariableHeader variableHeader = connectMessage.variableHeader();
|
||||
String id = connectMessage.payload().clientIdentifier();
|
||||
channelRead(ctx, "CONNECT#"+id);
|
||||
// 发送连接回执
|
||||
sendConnectAck(ctx, connectMessage.variableHeader().isCleanSession());
|
||||
break;
|
||||
case SUBSCRIBE:
|
||||
// 处理接收到的 SUBSCRIBE 消息
|
||||
MqttSubscribeMessage subscribeMessage = (MqttSubscribeMessage) mqttMessage;
|
||||
List<MqttTopicSubscription> mqttTopicSubscriptions = subscribeMessage.payload().topicSubscriptions();
|
||||
if (mqttTopicSubscriptions != null && !mqttTopicSubscriptions.isEmpty()) {
|
||||
for (MqttTopicSubscription topicSubscription : mqttTopicSubscriptions) {
|
||||
String topicFilter = topicSubscription.topicName();
|
||||
MqttQoS qos = topicSubscription.qualityOfService();
|
||||
// log.info("订阅主题:" + topicFilter + ",服务质量等级:" + qos.name());
|
||||
}
|
||||
} else {
|
||||
// log.info("没有订阅主题。");
|
||||
}
|
||||
// 发送订阅回执
|
||||
sendSubscribeAck(ctx, subscribeMessage.variableHeader().messageId(), subscribeMessage.payload().topicSubscriptions());
|
||||
break;
|
||||
case PUBLISH:
|
||||
// 处理接收到的消息
|
||||
MqttPublishMessage publishMessage = (MqttPublishMessage) mqttMessage;
|
||||
int i = publishMessage.variableHeader().packetId();
|
||||
String topic = publishMessage.variableHeader().topicName();
|
||||
// 提取发布消息的数据和主题等信息
|
||||
ByteBuf publishPayload = publishMessage.payload();
|
||||
String payloadContent = publishPayload.toString(CharsetUtil.UTF_8); // 将数据转换为字符串
|
||||
log.info("主题:"+topic+"数据:"+payloadContent);
|
||||
if (i >= 1 && i <= 65535) {
|
||||
sendPublishAck(ctx, publishMessage);
|
||||
} else {
|
||||
// log.info("无效消息id");
|
||||
}
|
||||
channelRead(ctx, topic+"#"+payloadContent);
|
||||
break;
|
||||
case DISCONNECT:
|
||||
ctx.close();
|
||||
break;
|
||||
case PUBACK:
|
||||
MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) mqttMessage;
|
||||
int messageId = pubAckMessage.variableHeader().messageId();
|
||||
break;
|
||||
default:
|
||||
log.info("收到其他类型的消息:" + messageType);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (msg instanceof TextWebSocketFrame) {
|
||||
String text =((TextWebSocketFrame) msg).text(); // 获取消息内容
|
||||
channelRead(ctx, text);
|
||||
@@ -166,52 +98,6 @@ public abstract class NettyChannelHandlerAdapter extends ChannelInboundHandlerAd
|
||||
}
|
||||
}
|
||||
|
||||
private void sendConnectAck(ChannelHandlerContext ctx, boolean cleanSession) {
|
||||
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
||||
MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, cleanSession);
|
||||
MqttConnAckMessage connAckMessage = new MqttConnAckMessage(fixedHeader, variableHeader);
|
||||
|
||||
ctx.writeAndFlush(connAckMessage);
|
||||
}
|
||||
|
||||
private void sendPingResponse(ChannelHandlerContext ctx) {
|
||||
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
||||
MqttMessage pingRespMessage = new MqttMessage(fixedHeader);
|
||||
ctx.writeAndFlush(pingRespMessage);
|
||||
}
|
||||
|
||||
private void sendPublishAck(ChannelHandlerContext ctx, MqttPublishMessage publishMessage) {
|
||||
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0);
|
||||
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(publishMessage.variableHeader().packetId());
|
||||
MqttPubAckMessage pubAckMessage = new MqttPubAckMessage(fixedHeader, variableHeader);
|
||||
ctx.writeAndFlush(pubAckMessage);
|
||||
}
|
||||
|
||||
private void sendSubscribeAck(ChannelHandlerContext ctx, int messageId, List<MqttTopicSubscription> subscriptions) {
|
||||
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0);
|
||||
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
|
||||
MqttSubAckPayload payload = createSubAckPayload(subscriptions); // 自定义方法,创建订阅回执的有效载荷
|
||||
|
||||
MqttSubAckMessage subAckMessage = new MqttSubAckMessage(fixedHeader, variableHeader, payload);
|
||||
|
||||
ctx.writeAndFlush(subAckMessage);
|
||||
}
|
||||
|
||||
private MqttSubAckPayload createSubAckPayload(List<MqttTopicSubscription> subscriptions) {
|
||||
List<Integer> grantedQosLevels = new ArrayList<>();
|
||||
for (MqttTopicSubscription subscription : subscriptions) {
|
||||
int qosLevel = getGrantedQosLevel(subscription); // 自定义方法,根据订阅主题获取授予的 QoS 等级
|
||||
grantedQosLevels.add(qosLevel);
|
||||
}
|
||||
return new MqttSubAckPayload(grantedQosLevels);
|
||||
}
|
||||
private int getGrantedQosLevel(MqttTopicSubscription subscription) {
|
||||
// 根据实际情况对订阅主题进行处理,并返回授予的 QoS 等级
|
||||
|
||||
// 这里我们简单地返回 1,即“至少一次”QoS 等级
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* 服务器接受客户端的数据信息时触发
|
||||
*
|
||||
|
||||
Reference in New Issue
Block a user