diff --git a/cash-api/order-server/src/main/java/com/czg/mq/PrintMqListener.java b/cash-api/order-server/src/main/java/com/czg/mq/PrintMqListener.java index af0742f8..7a66a4c9 100644 --- a/cash-api/order-server/src/main/java/com/czg/mq/PrintMqListener.java +++ b/cash-api/order-server/src/main/java/com/czg/mq/PrintMqListener.java @@ -6,11 +6,13 @@ import com.alibaba.fastjson2.JSONObject; import com.czg.account.entity.PrintMachine; import com.czg.account.service.PrintMachineService; import com.czg.config.RabbitConstants; +import com.czg.config.RedisCst; import com.czg.order.entity.MqLog; import com.czg.order.entity.OrderInfo; import com.czg.order.service.MqLogService; import com.czg.order.service.OrderInfoService; import com.czg.service.order.print.PrinterHandler; +import com.czg.service.order.utils.FunUtil; import com.mybatisflex.core.query.QueryWrapper; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; @@ -33,25 +35,27 @@ import java.util.function.Consumer; public class PrintMqListener { @Resource private MqLogService mqLogService; + @Resource + private FunUtil funUtil; @Lazy @Resource private PrinterHandler printerHandler; - private void invokeFun(String type, String plat, T data, Consumer consumer) { - long startTime = DateUtil.date().getTime(); - log.info("接收到{}打印消息:{}", type, data); - MqLog mqLog = new MqLog().setQueue(RabbitConstants.Queue.ORDER_MACHINE_PRINT_QUEUE).setMsg(data.toString()) - .setType(type).setPlat(plat).setCreateTime(DateUtil.date().toLocalDateTime()); - try { - consumer.accept(data); - } catch (Exception e) { - log.error("订单打印失败", e); - mqLog.setErrInfo(JSONObject.toJSONString(e)); - mqLog.setDuration(DateUtil.date().getTime() - startTime); - mqLog.setFailTime(DateUtil.date().toLocalDateTime()); - mqLogService.save(mqLog); + private void invokeFun(String type, String plat, T data, Consumer consumer) { + long startTime = DateUtil.date().getTime(); + log.info("接收到{}打印消息:{}", type, data); + MqLog mqLog = new MqLog().setQueue(RabbitConstants.Queue.ORDER_MACHINE_PRINT_QUEUE).setMsg(data.toString()) + .setType(type).setPlat(plat).setCreateTime(DateUtil.date().toLocalDateTime()); + try { + consumer.accept(data); + } catch (Exception e) { + log.error("订单打印失败", e); + mqLog.setErrInfo(JSONObject.toJSONString(e)); + mqLog.setDuration(DateUtil.date().getTime() - startTime); + mqLog.setFailTime(DateUtil.date().toLocalDateTime()); + mqLogService.save(mqLog); + } } - } @RabbitListener(queues = {"${spring.profiles.active}-" + RabbitConstants.Queue.ORDER_MACHINE_PRINT_QUEUE}) public void orderPrint(String req) { @@ -62,7 +66,10 @@ public class PrintMqListener { throw new RuntimeException("订单打印失败,未传递orderId"); } Boolean printOrder = jsonObject.getBoolean("printOrder"); - printerHandler.handler(orderId, printOrder != null && !printOrder ? PrinterHandler.PrintTypeEnum.ONE : PrinterHandler.PrintTypeEnum.ONE_AND_ORDER); + funUtil.runFunAndCheckKey(() -> { + printerHandler.handler(orderId, printOrder != null && !printOrder ? PrinterHandler.PrintTypeEnum.ONE : PrinterHandler.PrintTypeEnum.ONE_AND_ORDER); + return null; + }, RedisCst.getLockKey("orderPrint", orderId)); }); } diff --git a/cash-service/account-service/src/main/java/com/czg/service/account/util/FunUtil.java b/cash-service/account-service/src/main/java/com/czg/service/account/util/FunUtil.java index 20cb8132..dd069022 100644 --- a/cash-service/account-service/src/main/java/com/czg/service/account/util/FunUtil.java +++ b/cash-service/account-service/src/main/java/com/czg/service/account/util/FunUtil.java @@ -6,8 +6,10 @@ import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; +import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.stereotype.Component; +import java.util.Collections; import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -24,29 +26,60 @@ public class FunUtil { private RedisTemplate redisTemplate; public static int retryCount = 5; + /** + * 执行任务并保证锁唯一 + * @param supplier 业务逻辑 + * @param lockKey Redis锁的Key + * @return 业务逻辑返回值 + */ public T runFunAndCheckKey(Supplier supplier, String lockKey) { - try{ - // 设置分布式锁 - boolean lock = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 30, TimeUnit.MILLISECONDS)); + String lockValue = String.valueOf(System.nanoTime() + Thread.currentThread().threadId()); + try { + // 尝试获取锁,超时时间 5 秒,防止死锁 + boolean lock = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 5, TimeUnit.SECONDS)); int count = 0; + // 初始等待 10ms + int retryDelay = 10; + while (!lock) { - if (count++ > 100) { - throw new ApiNotPrintException("系统繁忙, 稍后再试"); + // 最多重试 10 次,大约 10 秒 + if (count++ > 50) { + throw new RuntimeException("系统繁忙, 稍后再试"); } - Thread.sleep(20); - lock = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 30, TimeUnit.MILLISECONDS)); + Thread.sleep(retryDelay); + // 指数退避,最大等待 200ms + retryDelay = Math.min(retryDelay * 2, 200); + lock = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 5, TimeUnit.SECONDS)); } + + // 执行任务 return supplier.get(); - } catch (RuntimeException e){ - log.info("执行出错:{}", e.getMessage()); - throw e; } catch (InterruptedException e) { - throw new RuntimeException(e); - } finally{ - redisTemplate.delete(lockKey); + Thread.currentThread().interrupt(); + throw new RuntimeException("线程被中断", e); + } catch (Exception e) { + log.error("执行出错:{}", e.getMessage(), e); + throw e; + } finally { + // 释放锁(使用 Lua 脚本确保原子性) + unlock(lockKey, lockValue); } } + /** + * 使用 Lua 脚本确保释放锁的原子性 + * @param lockKey 锁的 Key + * @param lockValue 当前线程的锁值 + */ + private void unlock(String lockKey, String lockValue) { + String luaScript = + "if redis.call('get', KEYS[1]) == ARGV[1] then " + + "return redis.call('del', KEYS[1]) " + + "else return 0 end"; + redisTemplate.execute(new DefaultRedisScript<>(luaScript, Long.class), + Collections.singletonList(lockKey), lockValue); + } + public static R runFunAndRetry( Supplier function, Function check, Consumer errFun) { diff --git a/cash-service/order-service/src/main/java/com/czg/service/order/utils/FunUtil.java b/cash-service/order-service/src/main/java/com/czg/service/order/utils/FunUtil.java new file mode 100644 index 00000000..feaf2af3 --- /dev/null +++ b/cash-service/order-service/src/main/java/com/czg/service/order/utils/FunUtil.java @@ -0,0 +1,100 @@ +package com.czg.service.order.utils; + +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.script.DefaultRedisScript; +import org.springframework.stereotype.Component; + +import java.util.Collections; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Supplier; + +/** + * @author Administrator + */ +@Slf4j +@Component +public class FunUtil { + @Resource + private RedisTemplate redisTemplate; + public static int retryCount = 5; + + /** + * 执行任务并保证锁唯一 + * @param supplier 业务逻辑 + * @param lockKey Redis锁的Key + * @return 业务逻辑返回值 + */ + public T runFunAndCheckKey(Supplier supplier, String lockKey) { + String lockValue = String.valueOf(System.nanoTime() + Thread.currentThread().threadId()); + try { + // 尝试获取锁,超时时间 5 秒,防止死锁 + boolean lock = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 5, TimeUnit.SECONDS)); + int count = 0; + // 初始等待 10ms + int retryDelay = 10; + + while (!lock) { + // 最多重试 10 次,大约 10 秒 + if (count++ > 50) { + throw new RuntimeException("系统繁忙, 稍后再试"); + } + Thread.sleep(retryDelay); + // 指数退避,最大等待 200ms + retryDelay = Math.min(retryDelay * 2, 200); + lock = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 5, TimeUnit.SECONDS)); + } + + // 执行任务 + return supplier.get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("线程被中断", e); + } catch (Exception e) { + log.error("执行出错:{}", e.getMessage(), e); + throw e; + } finally { + // 释放锁(使用 Lua 脚本确保原子性) + unlock(lockKey, lockValue); + } + } + + /** + * 使用 Lua 脚本确保释放锁的原子性 + * @param lockKey 锁的 Key + * @param lockValue 当前线程的锁值 + */ + private void unlock(String lockKey, String lockValue) { + String luaScript = + "if redis.call('get', KEYS[1]) == ARGV[1] then " + + "return redis.call('del', KEYS[1]) " + + "else return 0 end"; + redisTemplate.execute(new DefaultRedisScript<>(luaScript, Long.class), + Collections.singletonList(lockKey), lockValue); + } + + public static R runFunAndRetry( + Supplier function, + Function check, Consumer errFun) { + log.info("工具类开始执行函数"); + R result = function.get(); + boolean flag = check.apply(result); + + log.info("执行结果: {}", result); + + while (flag && retryCount-- > 0) { + log.info("执行函数失败, 剩余尝试次数{}", retryCount); + result = function.get(); + log.info("执行结果: {}", result); + flag = check.apply(result); + } + + if (flag) { + errFun.accept(result); + } + return result; + } +}