异步线程 延迟打印
This commit is contained in:
parent
6f0601c165
commit
b1c6f85e46
|
|
@ -19,10 +19,13 @@ import lombok.extern.slf4j.Slf4j;
|
||||||
import org.apache.dubbo.config.annotation.DubboReference;
|
import org.apache.dubbo.config.annotation.DubboReference;
|
||||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||||
import org.springframework.context.annotation.Lazy;
|
import org.springframework.context.annotation.Lazy;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -37,40 +40,59 @@ public class PrintMqListener {
|
||||||
private MqLogService mqLogService;
|
private MqLogService mqLogService;
|
||||||
@Resource
|
@Resource
|
||||||
private FunUtil funUtil;
|
private FunUtil funUtil;
|
||||||
|
|
||||||
|
// 注入自定义线程池(建议单独配置,避免使用默认线程池)
|
||||||
|
@Resource
|
||||||
|
private ThreadPoolTaskExecutor asyncExecutor;
|
||||||
@Lazy
|
@Lazy
|
||||||
@Resource
|
@Resource
|
||||||
private PrinterHandler printerHandler;
|
private PrinterHandler printerHandler;
|
||||||
|
|
||||||
private <T> void invokeFun(String type, String plat, T data, Consumer<T> consumer) {
|
private <T> void invokeFun(String type, String plat, T data, Consumer<T> consumer) {
|
||||||
long startTime = DateUtil.date().getTime();
|
long startTime = DateUtil.date().getTime();
|
||||||
log.info("接收到{}打印消息:{}", type, data);
|
log.info("接收到{}打印消息:{}", type, data);
|
||||||
MqLog mqLog = new MqLog().setQueue(RabbitConstants.Queue.ORDER_MACHINE_PRINT_QUEUE).setMsg(data.toString())
|
MqLog mqLog = new MqLog().setQueue(RabbitConstants.Queue.ORDER_MACHINE_PRINT_QUEUE).setMsg(data.toString())
|
||||||
.setType(type).setPlat(plat).setCreateTime(DateUtil.date().toLocalDateTime());
|
.setType(type).setPlat(plat).setCreateTime(DateUtil.date().toLocalDateTime());
|
||||||
try {
|
try {
|
||||||
consumer.accept(data);
|
consumer.accept(data);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("订单打印失败", e);
|
log.error("订单打印失败", e);
|
||||||
mqLog.setErrInfo(JSONObject.toJSONString(e));
|
mqLog.setErrInfo(JSONObject.toJSONString(e));
|
||||||
mqLog.setDuration(DateUtil.date().getTime() - startTime);
|
mqLog.setDuration(DateUtil.date().getTime() - startTime);
|
||||||
mqLog.setFailTime(DateUtil.date().toLocalDateTime());
|
mqLog.setFailTime(DateUtil.date().toLocalDateTime());
|
||||||
mqLogService.save(mqLog);
|
mqLogService.save(mqLog);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@RabbitListener(queues = {"${spring.profiles.active}-" + RabbitConstants.Queue.ORDER_MACHINE_PRINT_QUEUE})
|
@RabbitListener(queues = {"${spring.profiles.active}-" + RabbitConstants.Queue.ORDER_MACHINE_PRINT_QUEUE})
|
||||||
public void orderPrint(String req) {
|
public void orderPrint(String req) {
|
||||||
invokeFun("orderPrint", "java.order", req, (data) -> {
|
// 使用异步线程池执行延迟任务,不阻塞当前消费者线程
|
||||||
JSONObject jsonObject = JSONObject.parseObject(data);
|
CompletableFuture.runAsync(() -> {
|
||||||
String orderId = jsonObject.getString("orderId");
|
try {
|
||||||
if (orderId == null) {
|
// 延迟3秒处理
|
||||||
throw new RuntimeException("订单打印失败,未传递orderId");
|
TimeUnit.SECONDS.sleep(3);
|
||||||
|
// 执行核心打印逻辑
|
||||||
|
invokeFun("orderPrint", "java.order", req, (data) -> {
|
||||||
|
JSONObject jsonObject = JSONObject.parseObject(data);
|
||||||
|
String orderId = jsonObject.getString("orderId");
|
||||||
|
if (orderId == null) {
|
||||||
|
throw new RuntimeException("订单打印失败,未传递orderId");
|
||||||
|
}
|
||||||
|
Boolean printOrder = jsonObject.getBoolean("printOrder");
|
||||||
|
funUtil.runFunAndCheckKey(() -> {
|
||||||
|
printerHandler.handler(orderId, printOrder != null && !printOrder ? PrinterHandler.PrintTypeEnum.ONE : PrinterHandler.PrintTypeEnum.ONE_AND_ORDER);
|
||||||
|
return null;
|
||||||
|
}, RedisCst.getLockKey("orderPrint", orderId));
|
||||||
|
});
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
// 记录中断日志
|
||||||
|
log.warn("打印任务被中断,req:{}", req, e);
|
||||||
|
} catch (Exception e) {
|
||||||
|
// 记录业务异常日志
|
||||||
|
log.error("打印任务处理失败,req:{}", req, e);
|
||||||
}
|
}
|
||||||
Boolean printOrder = jsonObject.getBoolean("printOrder");
|
}, asyncExecutor);
|
||||||
funUtil.runFunAndCheckKey(() -> {
|
|
||||||
printerHandler.handler(orderId, printOrder != null && !printOrder ? PrinterHandler.PrintTypeEnum.ONE : PrinterHandler.PrintTypeEnum.ONE_AND_ORDER);
|
|
||||||
return null;
|
|
||||||
}, RedisCst.getLockKey("orderPrint", orderId));
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,35 @@
|
||||||
|
package com.czg.config;
|
||||||
|
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
|
|
||||||
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author ww
|
||||||
|
* @description
|
||||||
|
*/
|
||||||
|
@Configuration
|
||||||
|
public class AsyncConfig {
|
||||||
|
|
||||||
|
@Bean
|
||||||
|
public ThreadPoolTaskExecutor asyncExecutor() {
|
||||||
|
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||||
|
// 核心线程数(根据CPU核心数调整,一般为CPU核心数 * 2)
|
||||||
|
executor.setCorePoolSize(8);
|
||||||
|
// 最大线程数
|
||||||
|
executor.setMaxPoolSize(16);
|
||||||
|
// 队列容量
|
||||||
|
executor.setQueueCapacity(1000);
|
||||||
|
// 线程空闲时间(秒)
|
||||||
|
executor.setKeepAliveSeconds(60);
|
||||||
|
// 线程名称前缀
|
||||||
|
executor.setThreadNamePrefix("print-delay-");
|
||||||
|
// 拒绝策略(当任务满时,由提交任务的线程执行)
|
||||||
|
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
|
||||||
|
// 初始化线程池
|
||||||
|
executor.initialize();
|
||||||
|
return executor;
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue