From ef8eabb06c6fac7b7f24fbb7a63b2df4ba1476dc Mon Sep 17 00:00:00 2001 From: wangw <1594593906@qq.com> Date: Fri, 9 Jan 2026 16:49:22 +0800 Subject: [PATCH] =?UTF-8?q?mq=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../admin/EntryManagerController.java | 9 ++-- .../com/czg/mq/EntryManagerMqListener.java | 46 +++++++++++-------- .../java/com/czg/task/EntryManagerTask.java | 12 ++--- .../service/ShopDirectMerchantService.java | 2 +- .../impl/ShopDirectMerchantServiceImpl.java | 8 ++-- 5 files changed, 41 insertions(+), 36 deletions(-) diff --git a/cash-api/order-server/src/main/java/com/czg/controller/admin/EntryManagerController.java b/cash-api/order-server/src/main/java/com/czg/controller/admin/EntryManagerController.java index 53fd4b3ec..9610173b8 100644 --- a/cash-api/order-server/src/main/java/com/czg/controller/admin/EntryManagerController.java +++ b/cash-api/order-server/src/main/java/com/czg/controller/admin/EntryManagerController.java @@ -36,6 +36,7 @@ public class EntryManagerController { @Resource private RabbitPublisher rabbitPublisher; + /** * ocr识别填充 * 阿里 ocr识别图片 @@ -86,8 +87,8 @@ public class EntryManagerController { * 获取进件信息 */ @GetMapping - public CzgResult getEntry(Long shopId) { - return CzgResult.success(shopDirectMerchantService.getEntry(shopId)); + public CzgResult getEntry(Long shopId, String licenceNo) { + return CzgResult.success(shopDirectMerchantService.getEntry(shopId, licenceNo)); } /** @@ -97,8 +98,8 @@ public class EntryManagerController { */ @GetMapping("queryEntry") @Debounce(value = "#shopId", interval = 1000 * 60 * 3) - public CzgResult queryEntry(Long shopId) { - entryManagerTask.entryManager(shopId); + public CzgResult queryEntry(Long shopId, String licenceNo) { + entryManagerTask.entryManager(shopId, licenceNo); return CzgResult.success(); } diff --git a/cash-api/order-server/src/main/java/com/czg/mq/EntryManagerMqListener.java b/cash-api/order-server/src/main/java/com/czg/mq/EntryManagerMqListener.java index c7b85df9b..9abd57dcf 100644 --- a/cash-api/order-server/src/main/java/com/czg/mq/EntryManagerMqListener.java +++ b/cash-api/order-server/src/main/java/com/czg/mq/EntryManagerMqListener.java @@ -48,23 +48,32 @@ public class EntryManagerMqListener { ) @RabbitHandler public void handle(Message message, Channel channel, String msg) throws IOException { - log.info("进件1MQ对接开始shopId:{}", msg); - String messageId = message.getMessageProperties().getMessageId(); - log.info("进件0MQ对接开始shopId:{}messageId:{}", msg, messageId); -// if (hasMessageId(messageId)) { -// return; -// } + log.info("进件1MQ对接开始 店铺标识:{}", msg); + long deliveryTag = message.getMessageProperties().getDeliveryTag(); + if (StrUtil.isBlank(msg)) { + channel.basicNack(deliveryTag, false, false); + return; + } + String[] split = msg.split(":"); + if (split.length != 2) { + log.error("进件MQ对接参数异常 店铺标识:{}", msg); + channel.basicNack(deliveryTag, false, false); + return; + } + Long shopId = Long.valueOf(split[0]); + if (shopId == null) { + channel.basicNack(deliveryTag, false, false); + return; + } + if (hasMessageId(String.valueOf(shopId))) { + return; + } try { - Long shopId = Long.valueOf(msg); // 将唯一标识添加到日志上下文 -// ThreadContext.put("traceId", messageId); + ThreadContext.put("traceId", String.valueOf(shopId)); log.info("进件2MQ对接开始shopId:{}", msg); // 安全转换shopId - if (shopId == null) { - channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); - return; - } - AggregateMerchantVO entry = shopDirectMerchantService.getEntry(Long.valueOf(msg)); + AggregateMerchantVO entry = shopDirectMerchantService.getEntry(shopId, split[1]); log.info("进件3MQ对接开始shopId:{}", msg); if (entry != null) { EntryManager.uploadParamImage(entry); @@ -89,7 +98,7 @@ public class EntryManagerMqListener { merchant.setAlipayErrorMsg(resp.getAlipayErrorMsg()); shopDirectMerchantService.updateById(merchant); } - channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); + channel.basicAck(deliveryTag, false); } catch (Exception e) { log.error("进件MQ对接业务异常shopId:{}", msg, e); ShopDirectMerchant merchant = new ShopDirectMerchant(); @@ -98,12 +107,11 @@ public class EntryManagerMqListener { merchant.setAlipayStatus(PayCst.EntryStatus.REJECTED); merchant.setErrorMsg("系统错误,请联系管理员后重试。"); shopDirectMerchantService.updateById(merchant); - - channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); + channel.basicNack(deliveryTag, false, false); } finally { -// delMessageId(messageId); - // 清除日志上下文信息 -// ThreadContext.remove("messageId"); + delMessageId(String.valueOf(shopId)); +// 清除日志上下文信息 + ThreadContext.remove("messageId"); } } diff --git a/cash-api/order-server/src/main/java/com/czg/task/EntryManagerTask.java b/cash-api/order-server/src/main/java/com/czg/task/EntryManagerTask.java index 39f505b53..69850f661 100644 --- a/cash-api/order-server/src/main/java/com/czg/task/EntryManagerTask.java +++ b/cash-api/order-server/src/main/java/com/czg/task/EntryManagerTask.java @@ -4,14 +4,9 @@ import cn.hutool.core.collection.CollUtil; import cn.hutool.core.util.StrUtil; import com.czg.EntryManager; import com.czg.PayCst; -import com.czg.account.entity.ShopInfo; import com.czg.account.service.ShopInfoService; import com.czg.dto.resp.QueryStatusResp; import com.czg.order.entity.ShopDirectMerchant; -import com.czg.order.service.ShopOrderStatisticService; -import com.czg.order.service.ShopProdStatisticService; -import com.czg.order.service.ShopTableOrderStatisticService; -import com.czg.service.RedisService; import com.czg.service.order.service.ShopDirectMerchantService; import com.mybatisflex.core.query.QueryWrapper; import jakarta.annotation.Resource; @@ -20,8 +15,6 @@ import org.apache.dubbo.config.annotation.DubboReference; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; -import java.time.LocalDate; -import java.time.LocalDateTime; import java.util.List; /** @@ -42,16 +35,17 @@ public class EntryManagerTask { public void run() { log.info("进件查询,定时任务执行"); long start = System.currentTimeMillis(); - entryManager(null); + entryManager(null, null); log.info("进件查询,定时任务执行完毕,耗时:{}ms", start - System.currentTimeMillis()); } /** * 查询状态为待处理、待签约、待审核的进件 */ - public void entryManager(Long shopId) { + public void entryManager(Long shopId, String licenceNo) { List list = shopDirectMerchantService.list(QueryWrapper.create() .eq(ShopDirectMerchant::getShopId, shopId) + .eq(ShopDirectMerchant::getLicenceNo, licenceNo) .in(ShopDirectMerchant::getWechatStatus, PayCst.EntryStatus.NEED_QUERY_LIST) .or(ShopDirectMerchant::getAlipayStatus).in(PayCst.EntryStatus.NEED_QUERY_LIST)); if (CollUtil.isEmpty(list)) { diff --git a/cash-service/order-service/src/main/java/com/czg/service/order/service/ShopDirectMerchantService.java b/cash-service/order-service/src/main/java/com/czg/service/order/service/ShopDirectMerchantService.java index 6a9bc72ed..07999fb2e 100644 --- a/cash-service/order-service/src/main/java/com/czg/service/order/service/ShopDirectMerchantService.java +++ b/cash-service/order-service/src/main/java/com/czg/service/order/service/ShopDirectMerchantService.java @@ -29,7 +29,7 @@ public interface ShopDirectMerchantService extends IService /** * 获取进件信息 */ - AggregateMerchantVO getEntry(Long shopId); + AggregateMerchantVO getEntry(Long shopId, String licenceNo); /** * 申请进件 diff --git a/cash-service/order-service/src/main/java/com/czg/service/order/service/impl/ShopDirectMerchantServiceImpl.java b/cash-service/order-service/src/main/java/com/czg/service/order/service/impl/ShopDirectMerchantServiceImpl.java index ac10c3a7f..542864737 100644 --- a/cash-service/order-service/src/main/java/com/czg/service/order/service/impl/ShopDirectMerchantServiceImpl.java +++ b/cash-service/order-service/src/main/java/com/czg/service/order/service/impl/ShopDirectMerchantServiceImpl.java @@ -64,8 +64,10 @@ public class ShopDirectMerchantServiceImpl extends ServiceImpl rabbitPublisher.sendEntryManagerMsg(reqDto.getShopId().toString())); + FunUtils.transactionSafeRun(() -> rabbitPublisher.sendEntryManagerMsg(reqDto.getShopId() + ":" + merchant.getLicenceNo())); return result; }