query($countSql)[0]['count'] ?? 0; $counts[$connName] = $count; $total += $count; } } } if ($total === 0) { return [ 'list' => [], 'totalCount' => 0, 'totalPage' => 0, 'currPage' => $page, 'pageSize' => $pageSize, ]; } $offset = ($page - 1) * $pageSize; if ($offset >= $total) { return [ 'list' => [], 'totalCount' => $total, 'totalPage' => (int)ceil($total / $pageSize), 'currPage' => $page, 'pageSize' => $pageSize, ]; } $limit = $pageSize; $allRows = []; $skip = $offset; // 2. 分库分页查询 foreach ($counts as $connName => $count) { if ($count <= 0) continue; if ($skip >= $count) { $skip -= $count; continue; } $localOffset = $skip; $localLimit = min($count - $localOffset, $limit); $baseSql = call_user_func($sqlBuilder, $connName); $sql = rtrim($baseSql, ';'); if ($orderCol) { $sql .= " ORDER BY {$orderCol} DESC"; } $sql .= " LIMIT {$localOffset}, {$localLimit}"; $rows = Db::connect($connName)->query($sql); $allRows = array_merge($allRows, $rows); $limit -= $localLimit; $skip = 0; if ($limit <= 0) break; } // 3. 全局排序(若指定排序字段) if ($orderCol) { usort($allRows, function ($a, $b) use ($orderCol) { return strtotime($b[$orderCol]) <=> strtotime($a[$orderCol]); }); } // 4. 返回结构 return [ 'list' => $allRows, 'totalCount' => $total, 'totalPage' => (int)ceil($total / $pageSize), 'currPage' => $page, 'pageSize' => $pageSize, ]; } /** * 从 SELECT SQL 自动构建对应的 COUNT SQL * * @param string $sql * @return string */ private static function buildCountSql(string $sql): string { $sql = preg_replace('/\s+ORDER\s+BY\s+.+?$/i', '', $sql); $sql = preg_replace('/\s+LIMIT\s+\d+(\s*,\s*\d+)?$/i', '', $sql); if (preg_match('/FROM\s+(.*)/i', $sql, $matches)) { return 'SELECT COUNT(*) AS count FROM ' . $matches[1]; } throw new \RuntimeException("无法从 SQL 自动构建 COUNT 查询,请检查 SQL 是否规范"); } public static function paginateDb( string $table, \Closure $builder, int $page = 1, int $pageSize = 20, string|array $keyInfo = null, $isRecords = false ) { // 构建基础查询 if ($keyInfo) { $query = self::getDb($table, $keyInfo); }else{ $query = Db::table($table); } $builder($query); // 应用外部闭包设置查询条件,如 where/order 等 // 获取总数 $totalCount = (clone $query)->count(); // 计算总页数 $totalPage = (int) ceil($totalCount / $pageSize); $currPage = max(1, min($page, $totalPage)); // 获取分页数据 $list = $query ->page($currPage, $pageSize) ->select() ->toArray(); $query->getConnection()->close(); return [ 'totalCount' => $totalCount, 'pageSize' => $pageSize, 'totalPage' => $totalPage, 'currPage' => $currPage, $isRecords ? 'records' : 'list' => convertToCamelCase($list), ]; } /** * 跨所有从库分页(支持游标分页和传统页码分页) * @param string $table 表名 * @param \Closure $builder 查询构造器 * @param int|null $page 当前页(null 表示使用游标分页) * @param int $pageSize 每页条数 * @param string|null $lastCreateTime 游标分页的游标值(create_time) * @return array [list, total|nextCursor, hasMore] */ public static function paginateAllDb( string $table, \Closure $builder, int $page = 1, int $pageSize = 20, string $orderCol = 'create_time', string $modelOrderCol = null, bool $isRecords = false ): array { $dbMap = config('database.db_map'); $counts = []; $total = 0; // 1. 统计每个分库总数 $dbList = DatabaseRoute::getAllSelectDb(); foreach ($dbMap as $connName) { if (in_array($connName, $dbList) ) { $query = Db::connect($connName)->name($table); $query = call_user_func($builder, $query); // 不要order,避免报错 $query->removeOption('order'); $count = $query->count(); $query->getConnection()->close(); $counts[$connName] = $count; $total += $count; } } if ($total === 0) { return [ $isRecords ? 'records' : 'list' => [], 'totalCount' => 0, 'totalPage' => 0, 'currPage' => $page, 'pageSize' => $pageSize, ]; } $offset = ($page - 1) * $pageSize; if ($offset >= $total) { // 超出总数,返回空数据 return [ $isRecords ? 'records' : 'list' => [], 'totalCount' => $total, 'totalPage' => (int)ceil($total / $pageSize), 'currPage' => $page, 'pageSize' => $pageSize, ]; } $limit = $pageSize; $allRows = []; // 2. 计算每个库需要查询的偏移和条数 $skip = $offset; // 还需要跳过多少条 foreach ($counts as $connName => $count) { if ($count <= 0) continue; if ($skip >= $count) { // 跳过整个分库 $skip -= $count; continue; } // 本库要查询的起始 offset $localOffset = $skip; // 本库剩余条数 $localLimit = min($count - $localOffset, $limit); // 查询数据 $query = Db::connect($connName)->name($table); $query = call_user_func($builder, $query); $rows = $query->order($orderCol, 'desc') ->limit($localOffset, $localLimit) ->select() ->toArray(); $query->getConnection()->close(); $allRows = array_merge($allRows, $rows); $limit -= $localLimit; // 剩余要查询的条数 $skip = 0; // 之后库不再跳过 if ($limit <= 0) break; // 已查够 } // 3. 全局排序(降序) if ($modelOrderCol) { usort($allRows, function ($a, $b) use ($modelOrderCol) { return strtotime($b[$modelOrderCol]) <=> strtotime($a[$modelOrderCol]); }); } // 4. 返回分页数据 return [ $isRecords ? 'records' : 'list' => $allRows, 'totalCount' => $total, 'totalPage' => (int)ceil($total / $pageSize), 'currPage' => $page, 'pageSize' => $pageSize, ]; } /** * 对所有从库执行事务(XA 分布式模拟,非真实两阶段提交) * @param \Closure $callback * @return mixed * @throws \Throwable */ public static function transactionXa(\Closure $callback) { DbCoroutineContext::set('startTrans', true); try { // 执行用户操作(传入事务闭包) $result = call_user_func($callback); // 提交所有连接 DbCoroutineContext::commit(); return $result; } catch (\Throwable $e) { // 回滚所有连接 DbCoroutineContext::rollback(); Log::error("transactionXa 全部回滚:" . $e->getMessage()); Log::info('错误信息'.$e->getMessage().'具体信息'.$e->getTraceAsString()); throw $e; }finally { DbCoroutineContext::clearList(); DbCoroutineContext::set('startTrans', false); } } public static function getMasterDb($table, $isWrite = false) { $conn = Db::connect(); if ($isWrite) { DbCoroutineContext::put($conn); } return $conn->name($table); } /** * 根据表和分库值获取db * @param $table string 表名 * @param $keyInfo string | array */ public static function getDb($table, $keyInfo, $isWrite = false, $isUpdate = false, $addWhere = true) { if (is_array($keyInfo)) { $con = Db::connect(DatabaseRoute::getConnection($table, $keyInfo, $isWrite)); if ($isWrite) { DbCoroutineContext::put($con); } $model = $con->name($table); if ($addWhere && (!$isWrite || $isUpdate)) { $model->where($keyInfo); } } else { $con = Db::connect(DatabaseRoute::getConnection($table, ['user_id' => $keyInfo], $isWrite)); if ($isWrite) { DbCoroutineContext::put($con); } $model = $con->name($table); if ($addWhere && (!$isWrite || $isUpdate)) { $model->where([ 'user_id' => $keyInfo ]); } } return $model; } /** * 跨所有分库直接删除(危险操作,务必确认条件准确) * @param string $table 表名 * @param \Closure $builder 条件构造器,如 function($query) { return $query->where('xxx', xxx); } * @return int 删除的总记录数 */ public static function deleteAllDbDirect(string $table, \Closure $builder): int { $dbMap = config('database.db_master_map'); $totalDeleted = 0; foreach ($dbMap as $connName) { if ($connName === 'duanju_master') continue; $query = Db::connect($connName)->name($table); $query = call_user_func($builder, $query); $deleted = $query->delete(); $query->getConnection()->close(); $totalDeleted += $deleted; } return $totalDeleted; } public static function getAllDbData($table, \Closure $builder) { return new class($table, $builder) { protected $table; protected $builder; protected $dbMap; protected $masterDbMap; public function __construct($table, $builder) { $this->table = $table; $this->builder = $builder; $this->dbMap = config('database.db_map'); $this->masterDbMap = config('database.db_master_map'); } public function __call($method, $args) { $finalResult = null; try { if ($method == 'insert') { throw new \RuntimeException("不支持insert"); } // insert 和 update 使用主库 if ($method == 'update' || $method == 'delete') { foreach ($this->masterDbMap as $dbname) { if ($dbname !== 'duanju_master') { $query = Db::connect($dbname)->name($this->table); $query = call_user_func($this->builder, $query); if (method_exists($query, $method)) { $result = call_user_func_array([$query, $method], $args); $finalResult = $result; $query->getConnection()->close(); // find 返回 null,select 返回空数组,count 返回数字 if ($result || $result === 0) { return $result; } }else { $query->getConnection()->close(); } } } return $finalResult; }else{ $dbList = DatabaseRoute::getAllSelectDb(); foreach ($this->dbMap as $dbname) { if (in_array($dbname, $dbList) ) { $query = Db::connect($dbname)->name($this->table); $query = call_user_func($this->builder, $query); if (method_exists($query, $method)) { $result = call_user_func_array([$query, $method], $args); // find 返回 null,select 返回空数组,count 返回数字 $finalResult = $result; $query->getConnection()->close(); if ($result instanceof Collection) { if (!$result->isEmpty()) { return $result; } }elseif (is_int($result) && $result != 0) { return $result; } else if ($result) { return $result; } }else { $query->getConnection()->close(); } } } } }catch (\Throwable $e) { Log::error("错误信息".$e->getMessage()); throw $e; } return $finalResult; } }; } public static function getAllSelectDb() { $index = self::getIndex(); return ["duanju$index-0_slave", "duanju$index-1_slave", "duanju$index-2_slave", "duanju$index-3_slave", "duanju$index-4_slave"]; } private static function getIndex() { $INDEX = \cache('db_index'); if ($INDEX == null) { $INDEX = 0; } $INDEX = ++$INDEX % 4; \cache('db_index', $INDEX); return $INDEX; } /** * 获取数据库连接 * @param string $tableName 表名 * @param array $data 数据 * @param bool $isWrite 是否为写操作 * @return Connection */ public static function getConnection($table, $data = [], $isWrite = false) { $routeConfig = config('think-orm.route'); $keyField = strpos($table, 'user') !== false || in_array($table, [ 'orders', 'course_collect', 'pay_details', 'disc_spinning_record', 'cash_out', 'course_user', 'tb_user', 'task_center_record', 'user_money', 'user_sign_record', 'invite_achievement', 'invite_money', 'user_info', 'sys_user', 'user_money_details', 'sys_user_money_details' ]) ? 'user_id' : 'course_id'; if (!isset($data[$keyField])) { Log::warning("分库警告: 表={$table}, 数据中缺少 {$keyField} 字段"); return $isWrite ? 'duanju_master' : 'duanju_slave'; } $index = abs($data[$keyField] % 5); $connectionTemplate = $isWrite ? $routeConfig[$table]['master'] : $routeConfig[$table]['slave']; $connectionName = str_replace("{\${$keyField}%5}", $index, $connectionTemplate); return $connectionName; } }