From b1c6f85e461cdfade18d7e04b477f002a1e02616 Mon Sep 17 00:00:00 2001 From: wangw <1594593906@qq.com> Date: Tue, 30 Sep 2025 14:43:49 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BC=82=E6=AD=A5=E7=BA=BF=E7=A8=8B=20?= =?UTF-8?q?=E5=BB=B6=E8=BF=9F=E6=89=93=E5=8D=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../main/java/com/czg/mq/PrintMqListener.java | 72 ++++++++++++------- .../main/java/com/czg/config/AsyncConfig.java | 35 +++++++++ 2 files changed, 82 insertions(+), 25 deletions(-) create mode 100644 cash-common/cash-common-api-config/src/main/java/com/czg/config/AsyncConfig.java diff --git a/cash-api/order-server/src/main/java/com/czg/mq/PrintMqListener.java b/cash-api/order-server/src/main/java/com/czg/mq/PrintMqListener.java index 9d8304f5..9e26bd59 100644 --- a/cash-api/order-server/src/main/java/com/czg/mq/PrintMqListener.java +++ b/cash-api/order-server/src/main/java/com/czg/mq/PrintMqListener.java @@ -19,10 +19,13 @@ import lombok.extern.slf4j.Slf4j; import org.apache.dubbo.config.annotation.DubboReference; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.context.annotation.Lazy; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.util.Arrays; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; /** @@ -37,40 +40,59 @@ public class PrintMqListener { private MqLogService mqLogService; @Resource private FunUtil funUtil; + + // 注入自定义线程池(建议单独配置,避免使用默认线程池) + @Resource + private ThreadPoolTaskExecutor asyncExecutor; @Lazy @Resource private PrinterHandler printerHandler; - private void invokeFun(String type, String plat, T data, Consumer consumer) { - long startTime = DateUtil.date().getTime(); - log.info("接收到{}打印消息:{}", type, data); - MqLog mqLog = new MqLog().setQueue(RabbitConstants.Queue.ORDER_MACHINE_PRINT_QUEUE).setMsg(data.toString()) - .setType(type).setPlat(plat).setCreateTime(DateUtil.date().toLocalDateTime()); - try { - consumer.accept(data); - } catch (Exception e) { - log.error("订单打印失败", e); - mqLog.setErrInfo(JSONObject.toJSONString(e)); - mqLog.setDuration(DateUtil.date().getTime() - startTime); - mqLog.setFailTime(DateUtil.date().toLocalDateTime()); - mqLogService.save(mqLog); - } + private void invokeFun(String type, String plat, T data, Consumer consumer) { + long startTime = DateUtil.date().getTime(); + log.info("接收到{}打印消息:{}", type, data); + MqLog mqLog = new MqLog().setQueue(RabbitConstants.Queue.ORDER_MACHINE_PRINT_QUEUE).setMsg(data.toString()) + .setType(type).setPlat(plat).setCreateTime(DateUtil.date().toLocalDateTime()); + try { + consumer.accept(data); + } catch (Exception e) { + log.error("订单打印失败", e); + mqLog.setErrInfo(JSONObject.toJSONString(e)); + mqLog.setDuration(DateUtil.date().getTime() - startTime); + mqLog.setFailTime(DateUtil.date().toLocalDateTime()); + mqLogService.save(mqLog); } + } @RabbitListener(queues = {"${spring.profiles.active}-" + RabbitConstants.Queue.ORDER_MACHINE_PRINT_QUEUE}) public void orderPrint(String req) { - invokeFun("orderPrint", "java.order", req, (data) -> { - JSONObject jsonObject = JSONObject.parseObject(data); - String orderId = jsonObject.getString("orderId"); - if (orderId == null) { - throw new RuntimeException("订单打印失败,未传递orderId"); + // 使用异步线程池执行延迟任务,不阻塞当前消费者线程 + CompletableFuture.runAsync(() -> { + try { + // 延迟3秒处理 + TimeUnit.SECONDS.sleep(3); + // 执行核心打印逻辑 + invokeFun("orderPrint", "java.order", req, (data) -> { + JSONObject jsonObject = JSONObject.parseObject(data); + String orderId = jsonObject.getString("orderId"); + if (orderId == null) { + throw new RuntimeException("订单打印失败,未传递orderId"); + } + Boolean printOrder = jsonObject.getBoolean("printOrder"); + funUtil.runFunAndCheckKey(() -> { + printerHandler.handler(orderId, printOrder != null && !printOrder ? PrinterHandler.PrintTypeEnum.ONE : PrinterHandler.PrintTypeEnum.ONE_AND_ORDER); + return null; + }, RedisCst.getLockKey("orderPrint", orderId)); + }); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + // 记录中断日志 + log.warn("打印任务被中断,req:{}", req, e); + } catch (Exception e) { + // 记录业务异常日志 + log.error("打印任务处理失败,req:{}", req, e); } - Boolean printOrder = jsonObject.getBoolean("printOrder"); - funUtil.runFunAndCheckKey(() -> { - printerHandler.handler(orderId, printOrder != null && !printOrder ? PrinterHandler.PrintTypeEnum.ONE : PrinterHandler.PrintTypeEnum.ONE_AND_ORDER); - return null; - }, RedisCst.getLockKey("orderPrint", orderId)); - }); + }, asyncExecutor); } /** diff --git a/cash-common/cash-common-api-config/src/main/java/com/czg/config/AsyncConfig.java b/cash-common/cash-common-api-config/src/main/java/com/czg/config/AsyncConfig.java new file mode 100644 index 00000000..757a483b --- /dev/null +++ b/cash-common/cash-common-api-config/src/main/java/com/czg/config/AsyncConfig.java @@ -0,0 +1,35 @@ +package com.czg.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.ThreadPoolExecutor; + +/** + * @author ww + * @description + */ +@Configuration +public class AsyncConfig { + + @Bean + public ThreadPoolTaskExecutor asyncExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + // 核心线程数(根据CPU核心数调整,一般为CPU核心数 * 2) + executor.setCorePoolSize(8); + // 最大线程数 + executor.setMaxPoolSize(16); + // 队列容量 + executor.setQueueCapacity(1000); + // 线程空闲时间(秒) + executor.setKeepAliveSeconds(60); + // 线程名称前缀 + executor.setThreadNamePrefix("print-delay-"); + // 拒绝策略(当任务满时,由提交任务的线程执行) + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); + // 初始化线程池 + executor.initialize(); + return executor; + } +}