netty
This commit is contained in:
@@ -0,0 +1,38 @@
|
||||
package com.chaozhanggui.system.cashierservice.netty;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelDuplexHandler;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
|
||||
import io.netty.handler.timeout.IdleState;
|
||||
import io.netty.handler.timeout.IdleStateEvent;
|
||||
import io.netty.util.CharsetUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
@Slf4j
|
||||
public class HeartbeatHandler extends ChannelDuplexHandler {
|
||||
private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.UTF_8));
|
||||
|
||||
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||
if (evt instanceof IdleStateEvent) {
|
||||
IdleStateEvent event = (IdleStateEvent) evt;
|
||||
if (event.state() == IdleState.READER_IDLE) {
|
||||
ctx.close();
|
||||
super.userEventTriggered(ctx, evt);
|
||||
} else if (event.state() == IdleState.WRITER_IDLE) {
|
||||
// log.info("发送心跳");
|
||||
ctx.channel().writeAndFlush(new TextWebSocketFrame("Heartbeat")).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
|
||||
// ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
|
||||
}
|
||||
} else {
|
||||
super.userEventTriggered(ctx, evt);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,233 @@
|
||||
package com.chaozhanggui.system.cashierservice.netty;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.JSONArray;
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.chaozhanggui.system.cashierservice.netty.config.NettyChannelHandlerAdapter;
|
||||
import com.chaozhanggui.system.cashierservice.rabbit.RabbitProducer;
|
||||
import com.chaozhanggui.system.cashierservice.redis.RedisCst;
|
||||
import com.chaozhanggui.system.cashierservice.redis.RedisUtil;
|
||||
import com.chaozhanggui.system.cashierservice.util.JSONUtil;
|
||||
import com.chaozhanggui.system.cashierservice.util.SpringUtils;
|
||||
import io.netty.channel.ChannelFutureListener;
|
||||
import io.netty.channel.ChannelHandler.Sharable;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
|
||||
import io.netty.handler.timeout.IdleState;
|
||||
import io.netty.handler.timeout.IdleStateEvent;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.Resource;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
|
||||
/**
|
||||
* 摄像机TCP通讯的信息存储处理
|
||||
*/
|
||||
@Slf4j
|
||||
@Sharable
|
||||
@Component
|
||||
public class PushToAppChannelHandlerAdapter extends NettyChannelHandlerAdapter {
|
||||
|
||||
/**
|
||||
* [tableID-shopId, [userId, ctx]]
|
||||
*/
|
||||
private static Map<String, ConcurrentHashMap<String, ChannelHandlerContext>> webSocketMap = new HashMap<>();
|
||||
|
||||
/**
|
||||
* [ctx, tableID-shopId:userId]
|
||||
*/
|
||||
private static Map<ChannelHandlerContext, String> ctxToUserIdMap = new ConcurrentHashMap<>();
|
||||
|
||||
private RedisUtil redisUtils = SpringUtils.getBean(RedisUtil.class);
|
||||
private String tableId = "";
|
||||
private String shopId = "";
|
||||
@Resource
|
||||
private RabbitProducer a;
|
||||
//注入为空
|
||||
public static RabbitProducer rabbitProducer;
|
||||
@PostConstruct
|
||||
public void b() {
|
||||
rabbitProducer = this.a;
|
||||
}
|
||||
|
||||
public PushToAppChannelHandlerAdapter() {
|
||||
}
|
||||
|
||||
public static PushToAppChannelHandlerAdapter getInstance() {
|
||||
return new PushToAppChannelHandlerAdapter();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
super.channelActive(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) {
|
||||
log.info("netty连接 长连接关闭:{}, {}",tableId,shopId);
|
||||
ctx.close();
|
||||
removeCtx(ctx);
|
||||
}
|
||||
|
||||
public void close(ChannelHandlerContext ctx) {
|
||||
log.info("netty连接 长连接关闭:{}, {}",tableId,shopId);
|
||||
ctx.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* 移除ctx
|
||||
*/
|
||||
private void removeCtx(ChannelHandlerContext ctx) {
|
||||
// 遍历webSocketMap,查找并移除对应的ChannelHandlerContext
|
||||
String key = ctxToUserIdMap.get(ctx);
|
||||
if (StringUtils.isNotBlank(key)) {
|
||||
String[] split = key.split(":");
|
||||
ConcurrentHashMap<String, ChannelHandlerContext> tableMap = webSocketMap.get(split[0]);
|
||||
if (tableMap != null && !tableMap.isEmpty() && tableMap.size() > 0) {
|
||||
tableMap.remove(split[1]);
|
||||
if (tableMap.isEmpty() || tableMap.size() == 0) {
|
||||
webSocketMap.remove(split[0]);
|
||||
redisUtils.deleteByKey(RedisCst.TABLE_CART.concat(split[0]));
|
||||
}
|
||||
}
|
||||
}
|
||||
ctxToUserIdMap.remove(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) {
|
||||
super.channelReadComplete(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||
super.exceptionCaught(ctx, cause);
|
||||
removeCtx(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, String msg) {
|
||||
log.info("netty连接 接收到数据:{}",msg);
|
||||
JSONObject jsonObject = new JSONObject();
|
||||
if (StringUtils.isNotEmpty(msg)) {
|
||||
jsonObject = JSONObject.parseObject(msg);
|
||||
}else {
|
||||
log.info("netty连接 接收到空数据:{}",msg);
|
||||
}
|
||||
String type = jsonObject.getString("type");
|
||||
if(type.equals("heartbeat")){//心跳
|
||||
log.info("netty连接 接收到数据:tableId:{} meg:{}",tableId,msg);
|
||||
}else {
|
||||
if (type.equals("connect")) {
|
||||
String tableId = jsonObject.getString("tableId");
|
||||
String shopId = jsonObject.getString("shopId");
|
||||
String userId = jsonObject.getString("userId");
|
||||
if (StringUtils.isBlank(type) || StringUtils.isBlank(shopId) || StringUtils.isBlank(userId)) {
|
||||
log.info("netty连接 建立连接请求失败:{}",jsonObject);
|
||||
channelInactive(ctx);
|
||||
return;
|
||||
}
|
||||
String key = tableId + "-" + shopId;
|
||||
log.info("netty连接 接收到数据 建立连接参数 param:{}",jsonObject);
|
||||
this.tableId=tableId;
|
||||
this.shopId=shopId;
|
||||
if (webSocketMap.containsKey(key)) {
|
||||
ConcurrentHashMap<String, ChannelHandlerContext> userSocketMap = webSocketMap.get(key);
|
||||
// ChannelHandlerContext channelHandlerContext = userSocketMap.get(userId);
|
||||
// if (channelHandlerContext != null) {
|
||||
// channelHandlerContext.close();
|
||||
// }
|
||||
userSocketMap.put(userId, ctx);
|
||||
} else {
|
||||
ConcurrentHashMap<String, ChannelHandlerContext> userSocketMap = new ConcurrentHashMap<>();
|
||||
userSocketMap.put(userId, ctx);
|
||||
webSocketMap.put(key,userSocketMap);
|
||||
}
|
||||
ctxToUserIdMap.put(ctx, key + ":" + userId);
|
||||
JSONObject jsonObject1 = new JSONObject();
|
||||
jsonObject1.put("status", "success");
|
||||
jsonObject1.put("msg", "连接成功");
|
||||
jsonObject1.put("type", "connect");
|
||||
sendMesToApp(jsonObject1.toString(), ctx);
|
||||
|
||||
jsonObject.put("type","initCart");
|
||||
rabbitProducer.putCart(jsonObject.toJSONString());
|
||||
}
|
||||
else{
|
||||
jsonObject.put("tableId", this.tableId);
|
||||
jsonObject.put("shopId", this.shopId);
|
||||
if("sku".equals(type)){
|
||||
boolean exist = redisUtils.exists(RedisCst.TABLE_CART.concat(jsonObject.getString("tableId").concat("-").concat(shopId)));
|
||||
Integer num = 0;
|
||||
if (exist){
|
||||
String message = redisUtils.getMessage(RedisCst.TABLE_CART.concat(jsonObject.getString("tableId").concat("-").concat(shopId)));
|
||||
JSONArray array = JSON.parseArray(message);
|
||||
for (int i = 0; i < array.size(); i++) {
|
||||
JSONObject object = array.getJSONObject(i);
|
||||
if (object.getString("skuId").equals(jsonObject.getString("skuId"))) {
|
||||
num = object.getIntValue("totalNumber");
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
JSONObject jsonObject1 = new JSONObject();
|
||||
jsonObject1.put("status", "success");
|
||||
jsonObject1.put("msg", "成功");
|
||||
jsonObject1.put("type", "sku");
|
||||
jsonObject1.put("data", new ArrayList<>());
|
||||
jsonObject1.put("amount", num);
|
||||
sendMesToApp(jsonObject1.toString(),ctx);
|
||||
}else {
|
||||
rabbitProducer.putCart(jsonObject.toJSONString());
|
||||
}
|
||||
}
|
||||
}
|
||||
//业务逻辑代码处理框架。。。
|
||||
ctx.flush();
|
||||
}
|
||||
|
||||
public void sendMesToApp(String str, ChannelHandlerContext ctx) {
|
||||
sendMessage(ctx, str);
|
||||
}
|
||||
|
||||
@Async
|
||||
public void AppSendInfo(String message, String tableId,String userId, boolean userFlag) {
|
||||
log.info("netty连接 发送消息 tableId:{} userId:{} userFlag:{} message:{}",tableId,userId,userFlag, JSONUtil.toJSONString(message));
|
||||
if (userFlag) {
|
||||
if (webSocketMap.containsKey(tableId)) {
|
||||
ConcurrentHashMap<String, ChannelHandlerContext> webSockets = webSocketMap.get(tableId);
|
||||
if(!webSockets.isEmpty()){
|
||||
if (StringUtils.isNotBlank(userId)) {
|
||||
ChannelHandlerContext ctx = webSockets.get(userId);
|
||||
if (ctx != null) {
|
||||
sendMesToApp(message,ctx);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (StringUtils.isEmpty(tableId)) {
|
||||
// 向所有用户发送信息
|
||||
for (ConcurrentHashMap<String, ChannelHandlerContext> value : webSocketMap.values()) {
|
||||
for (ChannelHandlerContext ctx : value.values()) {
|
||||
sendMesToApp(message,ctx);
|
||||
}
|
||||
}
|
||||
} else if (webSocketMap.containsKey(tableId)) {
|
||||
ConcurrentHashMap<String, ChannelHandlerContext> webSockets = webSocketMap.get(tableId);
|
||||
if(!webSockets.isEmpty()) {
|
||||
for (ChannelHandlerContext ctx : webSockets.values()) {
|
||||
sendMesToApp(message,ctx);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,55 @@
|
||||
package com.chaozhanggui.system.cashierservice.netty;
|
||||
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.handler.codec.http.HttpObjectAggregator;
|
||||
import io.netty.handler.codec.http.HttpServerCodec;
|
||||
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
|
||||
import io.netty.handler.ssl.SslContext;
|
||||
import io.netty.handler.ssl.SslContextBuilder;
|
||||
import io.netty.handler.stream.ChunkedWriteHandler;
|
||||
import io.netty.handler.timeout.IdleStateHandler;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.core.io.ClassPathResource;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import javax.net.ssl.SSLException;
|
||||
import java.io.*;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Component
|
||||
@Slf4j
|
||||
public class PushToAppChannelInitializer extends ChannelInitializer<NioSocketChannel> {
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
protected void initChannel(NioSocketChannel ch) throws SSLException {
|
||||
ChannelPipeline pipeline = ch.pipeline();
|
||||
|
||||
// 添加心跳处理器 多久没收到消息 断开 心跳时间(秒) 读写空闲时间(秒)
|
||||
pipeline.addLast(new IdleStateHandler(30, 10, 120, TimeUnit.SECONDS));
|
||||
pipeline.addLast(new HeartbeatHandler());
|
||||
|
||||
//本地试调时使用
|
||||
// SslContext sslContext = SslContextBuilder.forServer(loadResourceAsByteArrayInputStream("\\pem\\fullchain.pem"), loadResourceAsByteArrayInputStream("\\pem\\privkey.key")).build();
|
||||
// pipeline.addLast(sslContext.newHandler(ch.alloc()));
|
||||
|
||||
// 添加HttpServerCodec用于处理HTTP编解码
|
||||
pipeline.addLast(new HttpServerCodec());
|
||||
pipeline.addLast(new ChunkedWriteHandler());
|
||||
pipeline.addLast(new HttpObjectAggregator(65536));
|
||||
pipeline.addLast(new WebSocketServerProtocolHandler("/netty"));
|
||||
ch.pipeline().addLast(new PushToAppChannelHandlerAdapter());
|
||||
}
|
||||
|
||||
|
||||
public static InputStream loadResourceAsByteArrayInputStream(String path) {
|
||||
InputStream inputStream = PushToAppChannelInitializer.class.getClassLoader().getResourceAsStream(path);
|
||||
return inputStream;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,262 @@
|
||||
package com.chaozhanggui.system.cashierservice.netty.config;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.handler.codec.http.FullHttpRequest;
|
||||
import io.netty.handler.codec.http.HttpHeaders;
|
||||
import io.netty.handler.codec.http.QueryStringDecoder;
|
||||
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
|
||||
import io.netty.handler.codec.mqtt.*;
|
||||
import io.netty.util.CharsetUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.io.Charsets;
|
||||
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* netty核心处理类
|
||||
* 通道的信息需要自己存储,一般实现存储在一个ConcurrentHashMap中
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class NettyChannelHandlerAdapter extends ChannelInboundHandlerAdapter {
|
||||
|
||||
/**
|
||||
* 给客户端发送消息
|
||||
*/
|
||||
public static void sendMessage(ChannelHandlerContext ctx, String content) {
|
||||
// ctx.channel().writeAndFlush(content);
|
||||
ctx.channel().writeAndFlush(new TextWebSocketFrame(content));
|
||||
}
|
||||
|
||||
/**
|
||||
* 群发消息
|
||||
*/
|
||||
public static void sendAllMessage(Collection<ChannelHandlerContext> c, String content) {
|
||||
for (ChannelHandlerContext ctx : c) {
|
||||
ctx.channel().writeAndFlush(content);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 客户端与服务器建立连接的时候触发
|
||||
*
|
||||
* @param ctx 连接信息
|
||||
*/
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) throws Exception {
|
||||
super.channelActive(ctx);
|
||||
}
|
||||
|
||||
/**
|
||||
* 客户端与服务器关闭连接的时候触发
|
||||
*
|
||||
* @param ctx 连接信息
|
||||
*/
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
||||
super.channelInactive(ctx);
|
||||
// ctx.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* 服务端接收客户端发送过来的数据结束之后调用
|
||||
*/
|
||||
@Override
|
||||
public void channelReadComplete(ChannelHandlerContext ctx) {
|
||||
ctx.flush();
|
||||
}
|
||||
|
||||
/**
|
||||
* 工程出现异常的时候调用<br/>
|
||||
* 需要自己remove掉map中的内容
|
||||
*/
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||
log.info("netty出现异常:", cause);
|
||||
ctx.close();
|
||||
}
|
||||
|
||||
/**
|
||||
* 服务器接受客户端的数据信息时触发
|
||||
*/
|
||||
@Override
|
||||
public final void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException {
|
||||
if (msg instanceof MqttMessage) {
|
||||
MqttMessage mqttMessage = (MqttMessage) msg;
|
||||
|
||||
MqttMessageType messageType = mqttMessage.fixedHeader().messageType();
|
||||
switch (messageType) {
|
||||
case PINGREQ:
|
||||
sendPingResponse(ctx);
|
||||
break;
|
||||
case CONNECT:
|
||||
MqttConnectMessage connectMessage = (MqttConnectMessage) mqttMessage;
|
||||
MqttConnectVariableHeader variableHeader = connectMessage.variableHeader();
|
||||
String id = connectMessage.payload().clientIdentifier();
|
||||
channelRead(ctx, "CONNECT#"+id);
|
||||
// 发送连接回执
|
||||
sendConnectAck(ctx, connectMessage.variableHeader().isCleanSession());
|
||||
break;
|
||||
case SUBSCRIBE:
|
||||
// 处理接收到的 SUBSCRIBE 消息
|
||||
MqttSubscribeMessage subscribeMessage = (MqttSubscribeMessage) mqttMessage;
|
||||
List<MqttTopicSubscription> mqttTopicSubscriptions = subscribeMessage.payload().topicSubscriptions();
|
||||
if (mqttTopicSubscriptions != null && !mqttTopicSubscriptions.isEmpty()) {
|
||||
for (MqttTopicSubscription topicSubscription : mqttTopicSubscriptions) {
|
||||
String topicFilter = topicSubscription.topicName();
|
||||
MqttQoS qos = topicSubscription.qualityOfService();
|
||||
// log.info("订阅主题:" + topicFilter + ",服务质量等级:" + qos.name());
|
||||
}
|
||||
} else {
|
||||
// log.info("没有订阅主题。");
|
||||
}
|
||||
// 发送订阅回执
|
||||
sendSubscribeAck(ctx, subscribeMessage.variableHeader().messageId(), subscribeMessage.payload().topicSubscriptions());
|
||||
break;
|
||||
case PUBLISH:
|
||||
// 处理接收到的消息
|
||||
MqttPublishMessage publishMessage = (MqttPublishMessage) mqttMessage;
|
||||
int i = publishMessage.variableHeader().packetId();
|
||||
String topic = publishMessage.variableHeader().topicName();
|
||||
// 提取发布消息的数据和主题等信息
|
||||
ByteBuf publishPayload = publishMessage.payload();
|
||||
String payloadContent = publishPayload.toString(CharsetUtil.UTF_8); // 将数据转换为字符串
|
||||
log.info("主题:"+topic+"数据:"+payloadContent);
|
||||
if (i >= 1 && i <= 65535) {
|
||||
sendPublishAck(ctx, publishMessage);
|
||||
} else {
|
||||
// log.info("无效消息id");
|
||||
}
|
||||
channelRead(ctx, topic+"#"+payloadContent);
|
||||
break;
|
||||
case DISCONNECT:
|
||||
ctx.close();
|
||||
break;
|
||||
case PUBACK:
|
||||
MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) mqttMessage;
|
||||
int messageId = pubAckMessage.variableHeader().messageId();
|
||||
break;
|
||||
default:
|
||||
log.info("收到其他类型的消息:" + messageType);
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (msg instanceof TextWebSocketFrame) {
|
||||
String text =((TextWebSocketFrame) msg).text(); // 获取消息内容
|
||||
channelRead(ctx, text);
|
||||
}
|
||||
else if (msg instanceof FullHttpRequest) {
|
||||
FullHttpRequest httpRequest =(FullHttpRequest) msg; // 获取消息内容
|
||||
String uri = httpRequest.uri();
|
||||
// 使用QueryStringDecoder来解析查询参数
|
||||
QueryStringDecoder decoder = new QueryStringDecoder(uri);
|
||||
// 获取所有的参数
|
||||
Map<String, List<String>> params = decoder.parameters();
|
||||
// 如果是普通的 HTTP 请求,进行相应的处理
|
||||
channelRead(ctx, uri);
|
||||
}
|
||||
else{
|
||||
channelRead(ctx, msg.toString());
|
||||
}
|
||||
}
|
||||
|
||||
private void sendConnectAck(ChannelHandlerContext ctx, boolean cleanSession) {
|
||||
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
||||
MqttConnAckVariableHeader variableHeader = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, cleanSession);
|
||||
MqttConnAckMessage connAckMessage = new MqttConnAckMessage(fixedHeader, variableHeader);
|
||||
|
||||
ctx.writeAndFlush(connAckMessage);
|
||||
}
|
||||
|
||||
private void sendPingResponse(ChannelHandlerContext ctx) {
|
||||
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
|
||||
MqttMessage pingRespMessage = new MqttMessage(fixedHeader);
|
||||
ctx.writeAndFlush(pingRespMessage);
|
||||
}
|
||||
|
||||
private void sendPublishAck(ChannelHandlerContext ctx, MqttPublishMessage publishMessage) {
|
||||
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0);
|
||||
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(publishMessage.variableHeader().packetId());
|
||||
MqttPubAckMessage pubAckMessage = new MqttPubAckMessage(fixedHeader, variableHeader);
|
||||
ctx.writeAndFlush(pubAckMessage);
|
||||
}
|
||||
|
||||
private void sendSubscribeAck(ChannelHandlerContext ctx, int messageId, List<MqttTopicSubscription> subscriptions) {
|
||||
MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_LEAST_ONCE, false, 0);
|
||||
MqttMessageIdVariableHeader variableHeader = MqttMessageIdVariableHeader.from(messageId);
|
||||
MqttSubAckPayload payload = createSubAckPayload(subscriptions); // 自定义方法,创建订阅回执的有效载荷
|
||||
|
||||
MqttSubAckMessage subAckMessage = new MqttSubAckMessage(fixedHeader, variableHeader, payload);
|
||||
|
||||
ctx.writeAndFlush(subAckMessage);
|
||||
}
|
||||
|
||||
private MqttSubAckPayload createSubAckPayload(List<MqttTopicSubscription> subscriptions) {
|
||||
List<Integer> grantedQosLevels = new ArrayList<>();
|
||||
for (MqttTopicSubscription subscription : subscriptions) {
|
||||
int qosLevel = getGrantedQosLevel(subscription); // 自定义方法,根据订阅主题获取授予的 QoS 等级
|
||||
grantedQosLevels.add(qosLevel);
|
||||
}
|
||||
return new MqttSubAckPayload(grantedQosLevels);
|
||||
}
|
||||
private int getGrantedQosLevel(MqttTopicSubscription subscription) {
|
||||
// 根据实际情况对订阅主题进行处理,并返回授予的 QoS 等级
|
||||
|
||||
// 这里我们简单地返回 1,即“至少一次”QoS 等级
|
||||
return 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* 服务器接受客户端的数据信息时触发
|
||||
*
|
||||
* @param ctx 连接信息
|
||||
* @param msg 消息内容
|
||||
*/
|
||||
public abstract void channelRead(ChannelHandlerContext ctx, String msg);
|
||||
|
||||
@Override
|
||||
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
|
||||
super.channelRegistered(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
|
||||
super.channelUnregistered(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
|
||||
super.userEventTriggered(ctx, evt);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
|
||||
super.channelWritabilityChanged(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected final void ensureNotSharable() {
|
||||
super.ensureNotSharable();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final boolean isSharable() {
|
||||
return super.isSharable();
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void handlerAdded(ChannelHandlerContext ctx) throws Exception {
|
||||
super.handlerAdded(ctx);
|
||||
}
|
||||
|
||||
@Override
|
||||
public final void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
|
||||
super.handlerRemoved(ctx);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,38 @@
|
||||
package com.chaozhanggui.system.cashierservice.netty.config;
|
||||
|
||||
import com.chaozhanggui.system.cashierservice.netty.PushToAppChannelInitializer;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import javax.annotation.Resource;
|
||||
|
||||
@Service
|
||||
@Slf4j
|
||||
public class NettyConfig {
|
||||
|
||||
/**
|
||||
* 端口号
|
||||
*/
|
||||
@Value("${netty.server.port}")
|
||||
private int port;
|
||||
|
||||
/**
|
||||
* 接收者的线程数
|
||||
*/
|
||||
@Value("${netty.server.parent-group-threads}")
|
||||
private int parentGroupThreads;
|
||||
/**
|
||||
* 客户端的线程数
|
||||
*/
|
||||
@Value("${netty.server.child-group-threads}")
|
||||
private int childGroupThreads;
|
||||
|
||||
@Resource
|
||||
private PushToAppChannelInitializer pushToAppChannelInitializer;
|
||||
@PostConstruct
|
||||
public void startCameraNetty() {
|
||||
NettyUtils.getInstance().initNetty(port, parentGroupThreads, childGroupThreads, pushToAppChannelInitializer);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,73 @@
|
||||
package com.chaozhanggui.system.cashierservice.netty.config;
|
||||
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.handler.logging.LogLevel;
|
||||
import io.netty.handler.logging.LoggingHandler;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
@Slf4j
|
||||
public class NettyUtils {
|
||||
|
||||
private ChannelFuture channelFuture;
|
||||
private EventLoopGroup parentGroup;
|
||||
private EventLoopGroup childGroup;
|
||||
private int port;
|
||||
|
||||
private NettyUtils() {
|
||||
}
|
||||
|
||||
public static NettyUtils getInstance() {
|
||||
return new NettyUtils();
|
||||
}
|
||||
|
||||
/**
|
||||
* netty 工具方法
|
||||
*
|
||||
* @param port 端口号
|
||||
* @param parentGroupThreads 接收者的线程数
|
||||
* @param childGroupThreads 客户端的线程数
|
||||
* @param channelInitializer 客户端成功connect后执行的handler
|
||||
*/
|
||||
public void initNetty(int port, int parentGroupThreads, int childGroupThreads, ChannelInitializer<NioSocketChannel> channelInitializer) {
|
||||
log.info("netty初始化 ");
|
||||
// 接收者 线程组用于处理链接工作
|
||||
parentGroup = new NioEventLoopGroup(parentGroupThreads);
|
||||
// 客户端 线程组用户处理数据工作
|
||||
childGroup = new NioEventLoopGroup(childGroupThreads);
|
||||
this.port = port;
|
||||
|
||||
ServerBootstrap bootstrap = new ServerBootstrap();
|
||||
bootstrap.group(parentGroup, childGroup)
|
||||
// 指定channel
|
||||
.channel(NioServerSocketChannel.class)
|
||||
// 定义日志等级
|
||||
.handler(new LoggingHandler(LogLevel.INFO))
|
||||
// 指定端口
|
||||
.localAddress(new InetSocketAddress(port))
|
||||
//服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数
|
||||
.option(ChannelOption.SO_BACKLOG, 1024)
|
||||
//设置TCP长连接,一般如果两个小时内没有数据通信时,tcp会自动发一个活动探测数据报文
|
||||
.childOption(ChannelOption.SO_KEEPALIVE, true)
|
||||
//将小的数据包包装成跟更大的帧进行传送,提高网络负载
|
||||
.childOption(ChannelOption.TCP_NODELAY, true)
|
||||
//设置超时时间
|
||||
.childOption(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000 * 60 * 10)
|
||||
// handler 在初始化时就会执行,而 childHandler 会在客户端成功connect后才执行,这是两者的区别。
|
||||
// .handler
|
||||
.childHandler(channelInitializer);
|
||||
try {
|
||||
channelFuture = bootstrap.bind(port).sync();
|
||||
} catch (Exception e) {
|
||||
log.error("netty 启动时出错:" + e);
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user