mq问题
This commit is contained in:
@@ -36,6 +36,7 @@ public class EntryManagerController {
|
|||||||
|
|
||||||
@Resource
|
@Resource
|
||||||
private RabbitPublisher rabbitPublisher;
|
private RabbitPublisher rabbitPublisher;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ocr识别填充
|
* ocr识别填充
|
||||||
* 阿里 ocr识别图片
|
* 阿里 ocr识别图片
|
||||||
@@ -86,8 +87,8 @@ public class EntryManagerController {
|
|||||||
* 获取进件信息
|
* 获取进件信息
|
||||||
*/
|
*/
|
||||||
@GetMapping
|
@GetMapping
|
||||||
public CzgResult<AggregateMerchantVO> getEntry(Long shopId) {
|
public CzgResult<AggregateMerchantVO> getEntry(Long shopId, String licenceNo) {
|
||||||
return CzgResult.success(shopDirectMerchantService.getEntry(shopId));
|
return CzgResult.success(shopDirectMerchantService.getEntry(shopId, licenceNo));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -97,8 +98,8 @@ public class EntryManagerController {
|
|||||||
*/
|
*/
|
||||||
@GetMapping("queryEntry")
|
@GetMapping("queryEntry")
|
||||||
@Debounce(value = "#shopId", interval = 1000 * 60 * 3)
|
@Debounce(value = "#shopId", interval = 1000 * 60 * 3)
|
||||||
public CzgResult<Boolean> queryEntry(Long shopId) {
|
public CzgResult<Boolean> queryEntry(Long shopId, String licenceNo) {
|
||||||
entryManagerTask.entryManager(shopId);
|
entryManagerTask.entryManager(shopId, licenceNo);
|
||||||
return CzgResult.success();
|
return CzgResult.success();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -48,23 +48,32 @@ public class EntryManagerMqListener {
|
|||||||
)
|
)
|
||||||
@RabbitHandler
|
@RabbitHandler
|
||||||
public void handle(Message message, Channel channel, String msg) throws IOException {
|
public void handle(Message message, Channel channel, String msg) throws IOException {
|
||||||
log.info("进件1MQ对接开始shopId:{}", msg);
|
log.info("进件1MQ对接开始 店铺标识:{}", msg);
|
||||||
String messageId = message.getMessageProperties().getMessageId();
|
long deliveryTag = message.getMessageProperties().getDeliveryTag();
|
||||||
log.info("进件0MQ对接开始shopId:{}messageId:{}", msg, messageId);
|
if (StrUtil.isBlank(msg)) {
|
||||||
// if (hasMessageId(messageId)) {
|
channel.basicNack(deliveryTag, false, false);
|
||||||
// return;
|
return;
|
||||||
// }
|
}
|
||||||
|
String[] split = msg.split(":");
|
||||||
|
if (split.length != 2) {
|
||||||
|
log.error("进件MQ对接参数异常 店铺标识:{}", msg);
|
||||||
|
channel.basicNack(deliveryTag, false, false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
Long shopId = Long.valueOf(split[0]);
|
||||||
|
if (shopId == null) {
|
||||||
|
channel.basicNack(deliveryTag, false, false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (hasMessageId(String.valueOf(shopId))) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
try {
|
try {
|
||||||
Long shopId = Long.valueOf(msg);
|
|
||||||
// 将唯一标识添加到日志上下文
|
// 将唯一标识添加到日志上下文
|
||||||
// ThreadContext.put("traceId", messageId);
|
ThreadContext.put("traceId", String.valueOf(shopId));
|
||||||
log.info("进件2MQ对接开始shopId:{}", msg);
|
log.info("进件2MQ对接开始shopId:{}", msg);
|
||||||
// 安全转换shopId
|
// 安全转换shopId
|
||||||
if (shopId == null) {
|
AggregateMerchantVO entry = shopDirectMerchantService.getEntry(shopId, split[1]);
|
||||||
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
AggregateMerchantVO entry = shopDirectMerchantService.getEntry(Long.valueOf(msg));
|
|
||||||
log.info("进件3MQ对接开始shopId:{}", msg);
|
log.info("进件3MQ对接开始shopId:{}", msg);
|
||||||
if (entry != null) {
|
if (entry != null) {
|
||||||
EntryManager.uploadParamImage(entry);
|
EntryManager.uploadParamImage(entry);
|
||||||
@@ -89,7 +98,7 @@ public class EntryManagerMqListener {
|
|||||||
merchant.setAlipayErrorMsg(resp.getAlipayErrorMsg());
|
merchant.setAlipayErrorMsg(resp.getAlipayErrorMsg());
|
||||||
shopDirectMerchantService.updateById(merchant);
|
shopDirectMerchantService.updateById(merchant);
|
||||||
}
|
}
|
||||||
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
|
channel.basicAck(deliveryTag, false);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("进件MQ对接业务异常shopId:{}", msg, e);
|
log.error("进件MQ对接业务异常shopId:{}", msg, e);
|
||||||
ShopDirectMerchant merchant = new ShopDirectMerchant();
|
ShopDirectMerchant merchant = new ShopDirectMerchant();
|
||||||
@@ -98,12 +107,11 @@ public class EntryManagerMqListener {
|
|||||||
merchant.setAlipayStatus(PayCst.EntryStatus.REJECTED);
|
merchant.setAlipayStatus(PayCst.EntryStatus.REJECTED);
|
||||||
merchant.setErrorMsg("系统错误,请联系管理员后重试。");
|
merchant.setErrorMsg("系统错误,请联系管理员后重试。");
|
||||||
shopDirectMerchantService.updateById(merchant);
|
shopDirectMerchantService.updateById(merchant);
|
||||||
|
channel.basicNack(deliveryTag, false, false);
|
||||||
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
|
|
||||||
} finally {
|
} finally {
|
||||||
// delMessageId(messageId);
|
delMessageId(String.valueOf(shopId));
|
||||||
// 清除日志上下文信息
|
// 清除日志上下文信息
|
||||||
// ThreadContext.remove("messageId");
|
ThreadContext.remove("messageId");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,14 +4,9 @@ import cn.hutool.core.collection.CollUtil;
|
|||||||
import cn.hutool.core.util.StrUtil;
|
import cn.hutool.core.util.StrUtil;
|
||||||
import com.czg.EntryManager;
|
import com.czg.EntryManager;
|
||||||
import com.czg.PayCst;
|
import com.czg.PayCst;
|
||||||
import com.czg.account.entity.ShopInfo;
|
|
||||||
import com.czg.account.service.ShopInfoService;
|
import com.czg.account.service.ShopInfoService;
|
||||||
import com.czg.dto.resp.QueryStatusResp;
|
import com.czg.dto.resp.QueryStatusResp;
|
||||||
import com.czg.order.entity.ShopDirectMerchant;
|
import com.czg.order.entity.ShopDirectMerchant;
|
||||||
import com.czg.order.service.ShopOrderStatisticService;
|
|
||||||
import com.czg.order.service.ShopProdStatisticService;
|
|
||||||
import com.czg.order.service.ShopTableOrderStatisticService;
|
|
||||||
import com.czg.service.RedisService;
|
|
||||||
import com.czg.service.order.service.ShopDirectMerchantService;
|
import com.czg.service.order.service.ShopDirectMerchantService;
|
||||||
import com.mybatisflex.core.query.QueryWrapper;
|
import com.mybatisflex.core.query.QueryWrapper;
|
||||||
import jakarta.annotation.Resource;
|
import jakarta.annotation.Resource;
|
||||||
@@ -20,8 +15,6 @@ import org.apache.dubbo.config.annotation.DubboReference;
|
|||||||
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.annotation.Scheduled;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
import java.time.LocalDate;
|
|
||||||
import java.time.LocalDateTime;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -42,16 +35,17 @@ public class EntryManagerTask {
|
|||||||
public void run() {
|
public void run() {
|
||||||
log.info("进件查询,定时任务执行");
|
log.info("进件查询,定时任务执行");
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
entryManager(null);
|
entryManager(null, null);
|
||||||
log.info("进件查询,定时任务执行完毕,耗时:{}ms", start - System.currentTimeMillis());
|
log.info("进件查询,定时任务执行完毕,耗时:{}ms", start - System.currentTimeMillis());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 查询状态为待处理、待签约、待审核的进件
|
* 查询状态为待处理、待签约、待审核的进件
|
||||||
*/
|
*/
|
||||||
public void entryManager(Long shopId) {
|
public void entryManager(Long shopId, String licenceNo) {
|
||||||
List<ShopDirectMerchant> list = shopDirectMerchantService.list(QueryWrapper.create()
|
List<ShopDirectMerchant> list = shopDirectMerchantService.list(QueryWrapper.create()
|
||||||
.eq(ShopDirectMerchant::getShopId, shopId)
|
.eq(ShopDirectMerchant::getShopId, shopId)
|
||||||
|
.eq(ShopDirectMerchant::getLicenceNo, licenceNo)
|
||||||
.in(ShopDirectMerchant::getWechatStatus, PayCst.EntryStatus.NEED_QUERY_LIST)
|
.in(ShopDirectMerchant::getWechatStatus, PayCst.EntryStatus.NEED_QUERY_LIST)
|
||||||
.or(ShopDirectMerchant::getAlipayStatus).in(PayCst.EntryStatus.NEED_QUERY_LIST));
|
.or(ShopDirectMerchant::getAlipayStatus).in(PayCst.EntryStatus.NEED_QUERY_LIST));
|
||||||
if (CollUtil.isEmpty(list)) {
|
if (CollUtil.isEmpty(list)) {
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ public interface ShopDirectMerchantService extends IService<ShopDirectMerchant>
|
|||||||
/**
|
/**
|
||||||
* 获取进件信息
|
* 获取进件信息
|
||||||
*/
|
*/
|
||||||
AggregateMerchantVO getEntry(Long shopId);
|
AggregateMerchantVO getEntry(Long shopId, String licenceNo);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 申请进件
|
* 申请进件
|
||||||
|
|||||||
@@ -64,8 +64,10 @@ public class ShopDirectMerchantServiceImpl extends ServiceImpl<ShopDirectMerchan
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AggregateMerchantVO getEntry(Long shopId) {
|
public AggregateMerchantVO getEntry(Long shopId, String licenceNo) {
|
||||||
ShopDirectMerchant merchant = getById(shopId);
|
ShopDirectMerchant merchant = getOne(query()
|
||||||
|
.eq(ShopDirectMerchant::getLicenceNo, licenceNo)
|
||||||
|
.eq(ShopDirectMerchant::getShopId, shopId));
|
||||||
if (merchant == null) {
|
if (merchant == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
@@ -106,7 +108,7 @@ public class ShopDirectMerchantServiceImpl extends ServiceImpl<ShopDirectMerchan
|
|||||||
result = updateById(merchant);
|
result = updateById(merchant);
|
||||||
}
|
}
|
||||||
//发送进件队列消息
|
//发送进件队列消息
|
||||||
FunUtils.transactionSafeRun(() -> rabbitPublisher.sendEntryManagerMsg(reqDto.getShopId().toString()));
|
FunUtils.transactionSafeRun(() -> rabbitPublisher.sendEntryManagerMsg(reqDto.getShopId() + ":" + merchant.getLicenceNo()));
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user