ES全量同步数据

This commit is contained in:
谭凯凯 2025-01-08 19:26:41 +08:00 committed by Tankaikai
parent 1d5899cf46
commit 7186e2c989
1 changed files with 82 additions and 34 deletions

View File

@ -3,10 +3,11 @@ package com.sqx.modules.es.service.impl;
import cn.hutool.core.lang.Dict; import cn.hutool.core.lang.Dict;
import cn.hutool.core.util.PageUtil; import cn.hutool.core.util.PageUtil;
import cn.hutool.core.util.StrUtil; import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.sqx.modules.app.dao.UserMoneyDetailsDao; import com.sqx.modules.app.dao.UserMoneyDetailsDao;
import com.sqx.modules.app.entity.UserEntity;
import com.sqx.modules.app.entity.UserMoneyDetails; import com.sqx.modules.app.entity.UserMoneyDetails;
import com.sqx.modules.app.service.UserService;
import com.sqx.modules.course.dao.CourseCollectDao; import com.sqx.modules.course.dao.CourseCollectDao;
import com.sqx.modules.course.dao.CourseUserDao; import com.sqx.modules.course.dao.CourseUserDao;
import com.sqx.modules.course.entity.CourseCollect; import com.sqx.modules.course.entity.CourseCollect;
@ -21,6 +22,8 @@ import com.sqx.modules.pay.dao.CashOutDao;
import com.sqx.modules.pay.dao.PayDetailsDao; import com.sqx.modules.pay.dao.PayDetailsDao;
import com.sqx.modules.pay.entity.CashOut; import com.sqx.modules.pay.entity.CashOut;
import com.sqx.modules.pay.entity.PayDetails; import com.sqx.modules.pay.entity.PayDetails;
import com.sqx.modules.taskCenter.dao.TaskCenterRecordDao;
import com.sqx.modules.taskCenter.entity.TaskCenterRecord;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
@ -50,6 +53,8 @@ public class EsCoreServiceImpl implements EsCoreService {
private PayDetailsDao payDetailsDao; private PayDetailsDao payDetailsDao;
@Resource @Resource
private UserMoneyDetailsDao userMoneyDetailsDao; private UserMoneyDetailsDao userMoneyDetailsDao;
@Resource
private TaskCenterRecordDao taskCenterRecordDao;
@Resource @Resource
private EsOrdersMapper esOrdersMapper; private EsOrdersMapper esOrdersMapper;
@ -69,16 +74,21 @@ public class EsCoreServiceImpl implements EsCoreService {
@Resource @Resource
private EsUserMoneyDetailsMapper esUserMoneyDetailsMapper; private EsUserMoneyDetailsMapper esUserMoneyDetailsMapper;
@Resource
private EsTaskCenterRecordMapper esTaskCenterRecordMapper;
@Resource
private UserService userService;
@Override @Override
public void sync() { public void sync() {
syncUserMoneyDetails();
syncOrders(); syncOrders();
//syncCashOut(); syncCourseCollect();
//syncCourseCollect(); syncPayDetails();
//syncCourseUser(); syncDiscSpinningRecord();
//syncDiscSpinningRecord(); syncCashOut();
//syncPayDetails(); syncCourseUser();
//syncUserMoneyDetails(); syncTaskCenterRecord();
} }
private List<Dict> buildPageList(int total) { private List<Dict> buildPageList(int total) {
@ -95,85 +105,123 @@ public class EsCoreServiceImpl implements EsCoreService {
} }
private void syncOrders() { private void syncOrders() {
Integer total = ordersDao.selectCount(Wrappers.<Orders>lambdaQuery().orderByAsc(Orders::getOrdersId)); int total = userService.count(Wrappers.emptyWrapper());
List<Dict> pageList = buildPageList(total); List<Dict> pageList = buildPageList(total);
pageList.parallelStream().forEach(param -> { pageList.parallelStream().forEach(param -> {
Integer p = param.getInt("page"); Integer p = param.getInt("page");
Integer s = param.getInt("size"); Integer s = param.getInt("size");
List<Orders> list = ordersDao.selectList(Wrappers.<Orders>lambdaQuery().orderByAsc(Orders::getOrdersId).last(StrUtil.format("limit {},{}", p, s))); List<UserEntity> list = userService.list(Wrappers.<UserEntity>lambdaQuery().orderByAsc(UserEntity::getUserId).last(StrUtil.format("limit {},{}", p, s)));
esOrdersMapper.insertBatch(list); list.parallelStream().forEach(user -> {
System.out.println("同步订单数据:" + user.getUserId());
List<Orders> batchList = ordersDao.selectList(Wrappers.<Orders>lambdaQuery().eq(Orders::getUserId, user.getUserId()));
esOrdersMapper.insertBatch(batchList);
});
}); });
} }
private void syncCashOut() { private void syncCashOut() {
LambdaQueryWrapper<CashOut> queryWrapper = Wrappers.<CashOut>lambdaQuery().orderByAsc(CashOut::getId); int total = userService.count(Wrappers.emptyWrapper());
Integer total = cashOutDao.selectCount(queryWrapper);
List<Dict> pageList = buildPageList(total); List<Dict> pageList = buildPageList(total);
pageList.parallelStream().forEach(param -> { pageList.parallelStream().forEach(param -> {
Integer p = param.getInt("page"); Integer p = param.getInt("page");
Integer s = param.getInt("size"); Integer s = param.getInt("size");
List<CashOut> list = cashOutDao.selectList(queryWrapper.last(StrUtil.format("limit {},{}", p, s))); List<UserEntity> list = userService.list(Wrappers.<UserEntity>lambdaQuery().orderByAsc(UserEntity::getUserId).last(StrUtil.format("limit {},{}", p, s)));
esCashOutMapper.insertBatch(list); list.parallelStream().forEach(user -> {
System.out.println("同步提现数据:" + user.getUserId());
List<CashOut> batchList = cashOutDao.selectList(Wrappers.<CashOut>lambdaQuery().eq(CashOut::getUserId, user.getUserId()));
esCashOutMapper.insertBatch(batchList);
});
}); });
} }
private void syncCourseCollect() { private void syncCourseCollect() {
LambdaQueryWrapper<CourseCollect> queryWrapper = Wrappers.<CourseCollect>lambdaQuery().orderByAsc(CourseCollect::getCourseId); int total = userService.count(Wrappers.emptyWrapper());
Integer total = courseCollectDao.selectCount(queryWrapper);
List<Dict> pageList = buildPageList(total); List<Dict> pageList = buildPageList(total);
pageList.parallelStream().forEach(param -> { pageList.parallelStream().forEach(param -> {
Integer p = param.getInt("page"); Integer p = param.getInt("page");
Integer s = param.getInt("size"); Integer s = param.getInt("size");
List<CourseCollect> list = courseCollectDao.selectList(queryWrapper.last(StrUtil.format("limit {},{}", p, s))); List<UserEntity> list = userService.list(Wrappers.<UserEntity>lambdaQuery().orderByAsc(UserEntity::getUserId).last(StrUtil.format("limit {},{}", p, s)));
esCourseCollectMapper.insertBatch(list); list.parallelStream().forEach(user -> {
System.out.println("同步短剧收藏数据:" + user.getUserId());
List<CourseCollect> batchList = courseCollectDao.selectList(Wrappers.<CourseCollect>lambdaQuery().eq(CourseCollect::getUserId, user.getUserId()));
esCourseCollectMapper.insertBatch(batchList);
});
}); });
} }
private void syncCourseUser() { private void syncCourseUser() {
LambdaQueryWrapper<CourseUser> queryWrapper = Wrappers.<CourseUser>lambdaQuery().orderByAsc(CourseUser::getCourseUserId); int total = userService.count(Wrappers.emptyWrapper());
Integer total = courseUserDao.selectCount(queryWrapper);
List<Dict> pageList = buildPageList(total); List<Dict> pageList = buildPageList(total);
pageList.parallelStream().forEach(param -> { pageList.parallelStream().forEach(param -> {
Integer p = param.getInt("page"); Integer p = param.getInt("page");
Integer s = param.getInt("size"); Integer s = param.getInt("size");
List<CourseUser> list = courseUserDao.selectList(queryWrapper.last(StrUtil.format("limit {},{}", p, s))); List<UserEntity> list = userService.list(Wrappers.<UserEntity>lambdaQuery().orderByAsc(UserEntity::getUserId).last(StrUtil.format("limit {},{}", p, s)));
esCourseUserMapper.insertBatch(list); list.parallelStream().forEach(user -> {
System.out.println("同步短剧用户数据:" + user.getUserId());
List<CourseUser> batchList = courseUserDao.selectList(Wrappers.<CourseUser>lambdaQuery().eq(CourseUser::getUserId, user.getUserId()));
esCourseUserMapper.insertBatch(batchList);
});
});
}
private void syncTaskCenterRecord() {
int total = userService.count(Wrappers.emptyWrapper());
List<Dict> pageList = buildPageList(total);
pageList.parallelStream().forEach(param -> {
Integer p = param.getInt("page");
Integer s = param.getInt("size");
List<UserEntity> list = userService.list(Wrappers.<UserEntity>lambdaQuery().orderByAsc(UserEntity::getUserId).last(StrUtil.format("limit {},{}", p, s)));
list.parallelStream().forEach(user -> {
System.out.println("同步任务中心数据:" + user.getUserId());
List<TaskCenterRecord> batchList = taskCenterRecordDao.selectList(Wrappers.<TaskCenterRecord>lambdaQuery().eq(TaskCenterRecord::getUserId, user.getUserId()));
esTaskCenterRecordMapper.insertBatch(batchList);
});
}); });
} }
private void syncDiscSpinningRecord() { private void syncDiscSpinningRecord() {
LambdaQueryWrapper<DiscSpinningRecord> queryWrapper = Wrappers.<DiscSpinningRecord>lambdaQuery().orderByAsc(DiscSpinningRecord::getId); int total = userService.count(Wrappers.emptyWrapper());
Integer total = discSpinningRecordDao.selectCount(queryWrapper);
List<Dict> pageList = buildPageList(total); List<Dict> pageList = buildPageList(total);
pageList.parallelStream().forEach(param -> { pageList.parallelStream().forEach(param -> {
Integer p = param.getInt("page"); Integer p = param.getInt("page");
Integer s = param.getInt("size"); Integer s = param.getInt("size");
List<DiscSpinningRecord> list = discSpinningRecordDao.selectList(queryWrapper.last(StrUtil.format("limit {},{}", p, s))); List<UserEntity> list = userService.list(Wrappers.<UserEntity>lambdaQuery().orderByAsc(UserEntity::getUserId).last(StrUtil.format("limit {},{}", p, s)));
esDiscSpinningRecordMapper.insertBatch(list); list.parallelStream().forEach(user -> {
System.out.println("同步抽奖记录数据:" + user.getUserId());
List<DiscSpinningRecord> batchList = discSpinningRecordDao.selectList(Wrappers.<DiscSpinningRecord>lambdaQuery().eq(DiscSpinningRecord::getUserId, user.getUserId()));
esDiscSpinningRecordMapper.insertBatch(batchList);
});
}); });
} }
private void syncPayDetails() { private void syncPayDetails() {
LambdaQueryWrapper<PayDetails> queryWrapper = Wrappers.<PayDetails>lambdaQuery().orderByAsc(PayDetails::getId); int total = userService.count(Wrappers.emptyWrapper());
Integer total = payDetailsDao.selectCount(queryWrapper);
List<Dict> pageList = buildPageList(total); List<Dict> pageList = buildPageList(total);
pageList.parallelStream().forEach(param -> { pageList.parallelStream().forEach(param -> {
Integer p = param.getInt("page"); Integer p = param.getInt("page");
Integer s = param.getInt("size"); Integer s = param.getInt("size");
List<PayDetails> list = payDetailsDao.selectList(queryWrapper.last(StrUtil.format("limit {},{}", p, s))); List<UserEntity> list = userService.list(Wrappers.<UserEntity>lambdaQuery().orderByAsc(UserEntity::getUserId).last(StrUtil.format("limit {},{}", p, s)));
esPayDetailsMapper.insertBatch(list); list.parallelStream().forEach(user -> {
System.out.println("同步支付详情数据:" + user.getUserId());
List<PayDetails> batchList = payDetailsDao.selectList(Wrappers.<PayDetails>lambdaQuery().eq(PayDetails::getUserId, user.getUserId()));
esPayDetailsMapper.insertBatch(batchList);
});
}); });
} }
private void syncUserMoneyDetails() { private void syncUserMoneyDetails() {
LambdaQueryWrapper<UserMoneyDetails> queryWrapper = Wrappers.<UserMoneyDetails>lambdaQuery().orderByAsc(UserMoneyDetails::getId); int total = userService.count(Wrappers.emptyWrapper());
Integer total = userMoneyDetailsDao.selectCount(queryWrapper);
List<Dict> pageList = buildPageList(total); List<Dict> pageList = buildPageList(total);
pageList.parallelStream().forEach(param -> { pageList.parallelStream().forEach(param -> {
Integer p = param.getInt("page"); Integer p = param.getInt("page");
Integer s = param.getInt("size"); Integer s = param.getInt("size");
List<UserMoneyDetails> list = userMoneyDetailsDao.selectList(queryWrapper.last(StrUtil.format("limit {},{}", p, s))); List<UserEntity> list = userService.list(Wrappers.<UserEntity>lambdaQuery().orderByAsc(UserEntity::getUserId).last(StrUtil.format("limit {},{}", p, s)));
esUserMoneyDetailsMapper.insertBatch(list); list.parallelStream().forEach(user -> {
System.out.println("同步用户资金明细数据:" + user.getUserId());
List<UserMoneyDetails> batchList = userMoneyDetailsDao.selectList(Wrappers.<UserMoneyDetails>lambdaQuery().eq(UserMoneyDetails::getUserId, user.getUserId()));
esUserMoneyDetailsMapper.insertBatch(batchList);
});
}); });
} }
} }