打印增加等待锁,防止重复打印
This commit is contained in:
@@ -6,11 +6,13 @@ import com.alibaba.fastjson2.JSONObject;
|
|||||||
import com.czg.account.entity.PrintMachine;
|
import com.czg.account.entity.PrintMachine;
|
||||||
import com.czg.account.service.PrintMachineService;
|
import com.czg.account.service.PrintMachineService;
|
||||||
import com.czg.config.RabbitConstants;
|
import com.czg.config.RabbitConstants;
|
||||||
|
import com.czg.config.RedisCst;
|
||||||
import com.czg.order.entity.MqLog;
|
import com.czg.order.entity.MqLog;
|
||||||
import com.czg.order.entity.OrderInfo;
|
import com.czg.order.entity.OrderInfo;
|
||||||
import com.czg.order.service.MqLogService;
|
import com.czg.order.service.MqLogService;
|
||||||
import com.czg.order.service.OrderInfoService;
|
import com.czg.order.service.OrderInfoService;
|
||||||
import com.czg.service.order.print.PrinterHandler;
|
import com.czg.service.order.print.PrinterHandler;
|
||||||
|
import com.czg.service.order.utils.FunUtil;
|
||||||
import com.mybatisflex.core.query.QueryWrapper;
|
import com.mybatisflex.core.query.QueryWrapper;
|
||||||
import jakarta.annotation.Resource;
|
import jakarta.annotation.Resource;
|
||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
@@ -33,25 +35,27 @@ import java.util.function.Consumer;
|
|||||||
public class PrintMqListener {
|
public class PrintMqListener {
|
||||||
@Resource
|
@Resource
|
||||||
private MqLogService mqLogService;
|
private MqLogService mqLogService;
|
||||||
|
@Resource
|
||||||
|
private FunUtil funUtil;
|
||||||
@Lazy
|
@Lazy
|
||||||
@Resource
|
@Resource
|
||||||
private PrinterHandler printerHandler;
|
private PrinterHandler printerHandler;
|
||||||
|
|
||||||
private <T> void invokeFun(String type, String plat, T data, Consumer<T> consumer) {
|
private <T> void invokeFun(String type, String plat, T data, Consumer<T> consumer) {
|
||||||
long startTime = DateUtil.date().getTime();
|
long startTime = DateUtil.date().getTime();
|
||||||
log.info("接收到{}打印消息:{}", type, data);
|
log.info("接收到{}打印消息:{}", type, data);
|
||||||
MqLog mqLog = new MqLog().setQueue(RabbitConstants.Queue.ORDER_MACHINE_PRINT_QUEUE).setMsg(data.toString())
|
MqLog mqLog = new MqLog().setQueue(RabbitConstants.Queue.ORDER_MACHINE_PRINT_QUEUE).setMsg(data.toString())
|
||||||
.setType(type).setPlat(plat).setCreateTime(DateUtil.date().toLocalDateTime());
|
.setType(type).setPlat(plat).setCreateTime(DateUtil.date().toLocalDateTime());
|
||||||
try {
|
try {
|
||||||
consumer.accept(data);
|
consumer.accept(data);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("订单打印失败", e);
|
log.error("订单打印失败", e);
|
||||||
mqLog.setErrInfo(JSONObject.toJSONString(e));
|
mqLog.setErrInfo(JSONObject.toJSONString(e));
|
||||||
mqLog.setDuration(DateUtil.date().getTime() - startTime);
|
mqLog.setDuration(DateUtil.date().getTime() - startTime);
|
||||||
mqLog.setFailTime(DateUtil.date().toLocalDateTime());
|
mqLog.setFailTime(DateUtil.date().toLocalDateTime());
|
||||||
mqLogService.save(mqLog);
|
mqLogService.save(mqLog);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
@RabbitListener(queues = {"${spring.profiles.active}-" + RabbitConstants.Queue.ORDER_MACHINE_PRINT_QUEUE})
|
@RabbitListener(queues = {"${spring.profiles.active}-" + RabbitConstants.Queue.ORDER_MACHINE_PRINT_QUEUE})
|
||||||
public void orderPrint(String req) {
|
public void orderPrint(String req) {
|
||||||
@@ -62,7 +66,10 @@ public class PrintMqListener {
|
|||||||
throw new RuntimeException("订单打印失败,未传递orderId");
|
throw new RuntimeException("订单打印失败,未传递orderId");
|
||||||
}
|
}
|
||||||
Boolean printOrder = jsonObject.getBoolean("printOrder");
|
Boolean printOrder = jsonObject.getBoolean("printOrder");
|
||||||
printerHandler.handler(orderId, printOrder != null && !printOrder ? PrinterHandler.PrintTypeEnum.ONE : PrinterHandler.PrintTypeEnum.ONE_AND_ORDER);
|
funUtil.runFunAndCheckKey(() -> {
|
||||||
|
printerHandler.handler(orderId, printOrder != null && !printOrder ? PrinterHandler.PrintTypeEnum.ONE : PrinterHandler.PrintTypeEnum.ONE_AND_ORDER);
|
||||||
|
return null;
|
||||||
|
}, RedisCst.getLockKey("orderPrint", orderId));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -6,8 +6,10 @@ import jakarta.annotation.Resource;
|
|||||||
import lombok.extern.slf4j.Slf4j;
|
import lombok.extern.slf4j.Slf4j;
|
||||||
import org.springframework.data.redis.core.RedisTemplate;
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||||
|
import org.springframework.data.redis.core.script.DefaultRedisScript;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
@@ -24,29 +26,60 @@ public class FunUtil {
|
|||||||
private RedisTemplate<String, Object> redisTemplate;
|
private RedisTemplate<String, Object> redisTemplate;
|
||||||
public static int retryCount = 5;
|
public static int retryCount = 5;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 执行任务并保证锁唯一
|
||||||
|
* @param supplier 业务逻辑
|
||||||
|
* @param lockKey Redis锁的Key
|
||||||
|
* @return 业务逻辑返回值
|
||||||
|
*/
|
||||||
public <T> T runFunAndCheckKey(Supplier<T> supplier, String lockKey) {
|
public <T> T runFunAndCheckKey(Supplier<T> supplier, String lockKey) {
|
||||||
try{
|
String lockValue = String.valueOf(System.nanoTime() + Thread.currentThread().threadId());
|
||||||
// 设置分布式锁
|
try {
|
||||||
boolean lock = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 30, TimeUnit.MILLISECONDS));
|
// 尝试获取锁,超时时间 5 秒,防止死锁
|
||||||
|
boolean lock = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 5, TimeUnit.SECONDS));
|
||||||
int count = 0;
|
int count = 0;
|
||||||
|
// 初始等待 10ms
|
||||||
|
int retryDelay = 10;
|
||||||
|
|
||||||
while (!lock) {
|
while (!lock) {
|
||||||
if (count++ > 100) {
|
// 最多重试 10 次,大约 10 秒
|
||||||
throw new ApiNotPrintException("系统繁忙, 稍后再试");
|
if (count++ > 50) {
|
||||||
|
throw new RuntimeException("系统繁忙, 稍后再试");
|
||||||
}
|
}
|
||||||
Thread.sleep(20);
|
Thread.sleep(retryDelay);
|
||||||
lock = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 30, TimeUnit.MILLISECONDS));
|
// 指数退避,最大等待 200ms
|
||||||
|
retryDelay = Math.min(retryDelay * 2, 200);
|
||||||
|
lock = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 5, TimeUnit.SECONDS));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 执行任务
|
||||||
return supplier.get();
|
return supplier.get();
|
||||||
} catch (RuntimeException e){
|
|
||||||
log.info("执行出错:{}", e.getMessage());
|
|
||||||
throw e;
|
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
throw new RuntimeException(e);
|
Thread.currentThread().interrupt();
|
||||||
} finally{
|
throw new RuntimeException("线程被中断", e);
|
||||||
redisTemplate.delete(lockKey);
|
} catch (Exception e) {
|
||||||
|
log.error("执行出错:{}", e.getMessage(), e);
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
// 释放锁(使用 Lua 脚本确保原子性)
|
||||||
|
unlock(lockKey, lockValue);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 使用 Lua 脚本确保释放锁的原子性
|
||||||
|
* @param lockKey 锁的 Key
|
||||||
|
* @param lockValue 当前线程的锁值
|
||||||
|
*/
|
||||||
|
private void unlock(String lockKey, String lockValue) {
|
||||||
|
String luaScript =
|
||||||
|
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
|
||||||
|
"return redis.call('del', KEYS[1]) " +
|
||||||
|
"else return 0 end";
|
||||||
|
redisTemplate.execute(new DefaultRedisScript<>(luaScript, Long.class),
|
||||||
|
Collections.singletonList(lockKey), lockValue);
|
||||||
|
}
|
||||||
|
|
||||||
public static <T, R> R runFunAndRetry(
|
public static <T, R> R runFunAndRetry(
|
||||||
Supplier<R> function,
|
Supplier<R> function,
|
||||||
Function<R, Boolean> check, Consumer<R> errFun) {
|
Function<R, Boolean> check, Consumer<R> errFun) {
|
||||||
|
|||||||
@@ -0,0 +1,100 @@
|
|||||||
|
package com.czg.service.order.utils;
|
||||||
|
|
||||||
|
import jakarta.annotation.Resource;
|
||||||
|
import lombok.extern.slf4j.Slf4j;
|
||||||
|
import org.springframework.data.redis.core.RedisTemplate;
|
||||||
|
import org.springframework.data.redis.core.script.DefaultRedisScript;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.function.Consumer;
|
||||||
|
import java.util.function.Function;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author Administrator
|
||||||
|
*/
|
||||||
|
@Slf4j
|
||||||
|
@Component
|
||||||
|
public class FunUtil {
|
||||||
|
@Resource
|
||||||
|
private RedisTemplate<String, Object> redisTemplate;
|
||||||
|
public static int retryCount = 5;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 执行任务并保证锁唯一
|
||||||
|
* @param supplier 业务逻辑
|
||||||
|
* @param lockKey Redis锁的Key
|
||||||
|
* @return 业务逻辑返回值
|
||||||
|
*/
|
||||||
|
public <T> T runFunAndCheckKey(Supplier<T> supplier, String lockKey) {
|
||||||
|
String lockValue = String.valueOf(System.nanoTime() + Thread.currentThread().threadId());
|
||||||
|
try {
|
||||||
|
// 尝试获取锁,超时时间 5 秒,防止死锁
|
||||||
|
boolean lock = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 5, TimeUnit.SECONDS));
|
||||||
|
int count = 0;
|
||||||
|
// 初始等待 10ms
|
||||||
|
int retryDelay = 10;
|
||||||
|
|
||||||
|
while (!lock) {
|
||||||
|
// 最多重试 10 次,大约 10 秒
|
||||||
|
if (count++ > 50) {
|
||||||
|
throw new RuntimeException("系统繁忙, 稍后再试");
|
||||||
|
}
|
||||||
|
Thread.sleep(retryDelay);
|
||||||
|
// 指数退避,最大等待 200ms
|
||||||
|
retryDelay = Math.min(retryDelay * 2, 200);
|
||||||
|
lock = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 5, TimeUnit.SECONDS));
|
||||||
|
}
|
||||||
|
|
||||||
|
// 执行任务
|
||||||
|
return supplier.get();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new RuntimeException("线程被中断", e);
|
||||||
|
} catch (Exception e) {
|
||||||
|
log.error("执行出错:{}", e.getMessage(), e);
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
// 释放锁(使用 Lua 脚本确保原子性)
|
||||||
|
unlock(lockKey, lockValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 使用 Lua 脚本确保释放锁的原子性
|
||||||
|
* @param lockKey 锁的 Key
|
||||||
|
* @param lockValue 当前线程的锁值
|
||||||
|
*/
|
||||||
|
private void unlock(String lockKey, String lockValue) {
|
||||||
|
String luaScript =
|
||||||
|
"if redis.call('get', KEYS[1]) == ARGV[1] then " +
|
||||||
|
"return redis.call('del', KEYS[1]) " +
|
||||||
|
"else return 0 end";
|
||||||
|
redisTemplate.execute(new DefaultRedisScript<>(luaScript, Long.class),
|
||||||
|
Collections.singletonList(lockKey), lockValue);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static <T, R> R runFunAndRetry(
|
||||||
|
Supplier<R> function,
|
||||||
|
Function<R, Boolean> check, Consumer<R> errFun) {
|
||||||
|
log.info("工具类开始执行函数");
|
||||||
|
R result = function.get();
|
||||||
|
boolean flag = check.apply(result);
|
||||||
|
|
||||||
|
log.info("执行结果: {}", result);
|
||||||
|
|
||||||
|
while (flag && retryCount-- > 0) {
|
||||||
|
log.info("执行函数失败, 剩余尝试次数{}", retryCount);
|
||||||
|
result = function.get();
|
||||||
|
log.info("执行结果: {}", result);
|
||||||
|
flag = check.apply(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (flag) {
|
||||||
|
errFun.accept(result);
|
||||||
|
}
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user