This commit is contained in:
2025-08-13 16:27:39 +08:00
parent 0d93c2570a
commit 17ad88608b
8 changed files with 1206 additions and 68 deletions

View File

@@ -0,0 +1,533 @@
<?php
namespace app\common\library;
use app\DbCoroutineContext;
use think\Collection;
use think\db\Connection;
use think\facade\Db;
use support\Log;
class DatabaseRoute
{
/**
* 跨所有分库分页(自动 count自动追加 LIMIT 和 ORDER BY
*
* @param \Closure $sqlBuilder 生成基础 SQL 的闭包函数(不包含 LIMIT
* @param int $page 当前页码
* @param int $pageSize 每页条数
* @param string|null $orderCol 排序字段(全局排序使用)
* @return array
*/
public static function paginateAllDbBySqlAutoCount(
\Closure $sqlBuilder,
int $page = 1,
int $pageSize = 20,
string $orderCol = null,
int $count = null,
): array
{
$dbMap = config('think-orm.db_map');
$counts = [];
$total = 0;
if ($count === null) {
// 1. 自动 count 分库总数
$dbList = DatabaseRoute::getAllSelectDb();
foreach ($dbMap as $connName) {
if (in_array($connName, $dbList) ) {
$baseSql = call_user_func($sqlBuilder, $connName);
$countSql = self::buildCountSql($baseSql);
$count = Db::connect($connName)->query($countSql)[0]['count'] ?? 0;
$counts[$connName] = $count;
$total += $count;
}
}
}
if ($total === 0) {
return [
'list' => [],
'totalCount' => 0,
'totalPage' => 0,
'currPage' => $page,
'pageSize' => $pageSize,
];
}
$offset = ($page - 1) * $pageSize;
if ($offset >= $total) {
return [
'list' => [],
'totalCount' => $total,
'totalPage' => (int)ceil($total / $pageSize),
'currPage' => $page,
'pageSize' => $pageSize,
];
}
$limit = $pageSize;
$allRows = [];
$skip = $offset;
// 2. 分库分页查询
foreach ($counts as $connName => $count) {
if ($count <= 0) continue;
if ($skip >= $count) {
$skip -= $count;
continue;
}
$localOffset = $skip;
$localLimit = min($count - $localOffset, $limit);
$baseSql = call_user_func($sqlBuilder, $connName);
$sql = rtrim($baseSql, ';');
if ($orderCol) {
$sql .= " ORDER BY {$orderCol} DESC";
}
$sql .= " LIMIT {$localOffset}, {$localLimit}";
$rows = Db::connect($connName)->query($sql);
$allRows = array_merge($allRows, $rows);
$limit -= $localLimit;
$skip = 0;
if ($limit <= 0) break;
}
// 3. 全局排序(若指定排序字段)
if ($orderCol) {
usort($allRows, function ($a, $b) use ($orderCol) {
return strtotime($b[$orderCol]) <=> strtotime($a[$orderCol]);
});
}
// 4. 返回结构
return [
'list' => $allRows,
'totalCount' => $total,
'totalPage' => (int)ceil($total / $pageSize),
'currPage' => $page,
'pageSize' => $pageSize,
];
}
/**
* 从 SELECT SQL 自动构建对应的 COUNT SQL
*
* @param string $sql
* @return string
*/
private static function buildCountSql(string $sql): string
{
$sql = preg_replace('/\s+ORDER\s+BY\s+.+?$/i', '', $sql);
$sql = preg_replace('/\s+LIMIT\s+\d+(\s*,\s*\d+)?$/i', '', $sql);
if (preg_match('/FROM\s+(.*)/i', $sql, $matches)) {
return 'SELECT COUNT(*) AS count FROM ' . $matches[1];
}
throw new \RuntimeException("无法从 SQL 自动构建 COUNT 查询,请检查 SQL 是否规范");
}
public static function paginateDb(
string $table,
\Closure $builder,
int $page = 1,
int $pageSize = 20,
string|array $keyInfo = null,
$isRecords = false
) {
// 构建基础查询
if ($keyInfo) {
$query = self::getDb($table, $keyInfo);
}else{
$query = Db::table($table);
}
$builder($query); // 应用外部闭包设置查询条件,如 where/order 等
// 获取总数
$totalCount = (clone $query)->count();
// 计算总页数
$totalPage = (int) ceil($totalCount / $pageSize);
$currPage = max(1, min($page, $totalPage));
// 获取分页数据
$list = $query
->page($currPage, $pageSize)
->select()
->toArray();
$query->getConnection()->close();
return [
'totalCount' => $totalCount,
'pageSize' => $pageSize,
'totalPage' => $totalPage,
'currPage' => $currPage,
$isRecords ? 'records' : 'list' => convertToCamelCase($list),
];
}
/**
* 跨所有从库分页(支持游标分页和传统页码分页)
* @param string $table 表名
* @param \Closure $builder 查询构造器
* @param int|null $page 当前页null 表示使用游标分页)
* @param int $pageSize 每页条数
* @param string|null $lastCreateTime 游标分页的游标值create_time
* @return array [list, total|nextCursor, hasMore]
*/
public static function paginateAllDb(
string $table,
\Closure $builder,
int $page = 1,
int $pageSize = 20,
string $orderCol = 'create_time',
string $modelOrderCol = null,
bool $isRecords = false
): array
{
$dbMap = config('think-orm.db_map');
$counts = [];
$total = 0;
// 1. 统计每个分库总数
$dbList = DatabaseRoute::getAllSelectDb();
foreach ($dbMap as $connName) {
if (in_array($connName, $dbList) ) {
$query = Db::connect($connName)->name($table);
$query = call_user_func($builder, $query);
// 不要order避免报错
$query->removeOption('order');
$count = $query->count();
$query->getConnection()->close();
$counts[$connName] = $count;
$total += $count;
}
}
if ($total === 0) {
return [
$isRecords ? 'records' : 'list' => [],
'totalCount' => 0,
'totalPage' => 0,
'currPage' => $page,
'pageSize' => $pageSize,
];
}
$offset = ($page - 1) * $pageSize;
if ($offset >= $total) {
// 超出总数,返回空数据
return [
$isRecords ? 'records' : 'list' => [],
'totalCount' => $total,
'totalPage' => (int)ceil($total / $pageSize),
'currPage' => $page,
'pageSize' => $pageSize,
];
}
$limit = $pageSize;
$allRows = [];
// 2. 计算每个库需要查询的偏移和条数
$skip = $offset; // 还需要跳过多少条
foreach ($counts as $connName => $count) {
if ($count <= 0) continue;
if ($skip >= $count) {
// 跳过整个分库
$skip -= $count;
continue;
}
// 本库要查询的起始 offset
$localOffset = $skip;
// 本库剩余条数
$localLimit = min($count - $localOffset, $limit);
// 查询数据
$query = Db::connect($connName)->name($table);
$query = call_user_func($builder, $query);
$rows = $query->order($orderCol, 'desc')
->limit($localOffset, $localLimit)
->select()
->toArray();
$query->getConnection()->close();
$allRows = array_merge($allRows, $rows);
$limit -= $localLimit; // 剩余要查询的条数
$skip = 0; // 之后库不再跳过
if ($limit <= 0) break; // 已查够
}
// 3. 全局排序(降序)
if ($modelOrderCol) {
usort($allRows, function ($a, $b) use ($modelOrderCol) {
return strtotime($b[$modelOrderCol]) <=> strtotime($a[$modelOrderCol]);
});
}
// 4. 返回分页数据
return [
$isRecords ? 'records' : 'list' => $allRows,
'totalCount' => $total,
'totalPage' => (int)ceil($total / $pageSize),
'currPage' => $page,
'pageSize' => $pageSize,
];
}
/**
* 对所有从库执行事务XA 分布式模拟,非真实两阶段提交)
* @param \Closure $callback
* @return mixed
* @throws \Throwable
*/
public static function transactionXa(\Closure $callback)
{
DbCoroutineContext::set('startTrans', true);
try {
// 执行用户操作(传入事务闭包)
$result = call_user_func($callback);
// 提交所有连接
DbCoroutineContext::commit();
return $result;
} catch (\Throwable $e) {
// 回滚所有连接
DbCoroutineContext::rollback();
Log::error("transactionXa 全部回滚:" . $e->getMessage());
Log::info('错误信息'.$e->getMessage().'具体信息'.$e->getTraceAsString());
throw $e;
}finally {
DbCoroutineContext::clearList();
DbCoroutineContext::set('startTrans', false);
}
}
public static function getMasterDb($table, $isWrite = false)
{
$conn = Db::connect();
if ($isWrite) {
DbCoroutineContext::put($conn);
}
return $conn->name($table);
}
/**
* 根据表和分库值获取db
* @param $table string 表名
* @param $keyInfo string | array
*/
public static function getDb($table, $keyInfo, $isWrite = false, $isUpdate = false, $addWhere = true)
{
if (is_array($keyInfo)) {
$con = Db::connect(DatabaseRoute::getConnection($table, $keyInfo, $isWrite));
if ($isWrite) {
DbCoroutineContext::put($con);
}
$model = $con->name($table);
if ($addWhere && (!$isWrite || $isUpdate)) {
$model->where($keyInfo);
}
} else {
$con = Db::connect(DatabaseRoute::getConnection($table, ['user_id' => $keyInfo], $isWrite));
if ($isWrite) {
DbCoroutineContext::put($con);
}
$model = $con->name($table);
if ($addWhere && (!$isWrite || $isUpdate)) {
$model->where([
'user_id' => $keyInfo
]);
}
}
return $model;
}
/**
* 跨所有分库直接删除(危险操作,务必确认条件准确)
* @param string $table 表名
* @param \Closure $builder 条件构造器,如 function($query) { return $query->where('xxx', xxx); }
* @return int 删除的总记录数
*/
public static function deleteAllDbDirect(string $table, \Closure $builder): int
{
$dbMap = config('think-orm.db_master_map');
$totalDeleted = 0;
foreach ($dbMap as $connName) {
if ($connName === 'duanju_master') continue;
$query = Db::connect($connName)->name($table);
$query = call_user_func($builder, $query);
$deleted = $query->delete();
$query->getConnection()->close();
$totalDeleted += $deleted;
}
return $totalDeleted;
}
public static function getAllDbData($table, \Closure $builder)
{
return new class($table, $builder) {
protected $table;
protected $builder;
protected $dbMap;
protected $masterDbMap;
public function __construct($table, $builder)
{
$this->table = $table;
$this->builder = $builder;
$this->dbMap = config('think-orm.db_map');
$this->masterDbMap = config('think-orm.db_master_map');
}
public function __call($method, $args)
{
$finalResult = null;
try {
if ($method == 'insert') {
throw new \RuntimeException("不支持insert");
}
// insert 和 update 使用主库
if ($method == 'update' || $method == 'delete') {
foreach ($this->masterDbMap as $dbname) {
if ($dbname !== 'duanju_master') {
$query = Db::connect($dbname)->name($this->table);
$query = call_user_func($this->builder, $query);
if (method_exists($query, $method)) {
$result = call_user_func_array([$query, $method], $args);
$finalResult = $result;
$query->getConnection()->close();
// find 返回 nullselect 返回空数组count 返回数字
if ($result || $result === 0) {
return $result;
}
}else {
$query->getConnection()->close();
}
}
}
return $finalResult;
}else{
$dbList = DatabaseRoute::getAllSelectDb();
foreach ($this->dbMap as $dbname) {
if (in_array($dbname, $dbList) ) {
$query = Db::connect($dbname)->name($this->table);
$query = call_user_func($this->builder, $query);
if (method_exists($query, $method)) {
$result = call_user_func_array([$query, $method], $args);
// find 返回 nullselect 返回空数组count 返回数字
$finalResult = $result;
$query->getConnection()->close();
if ($result instanceof Collection) {
if (!$result->isEmpty()) {
return $result;
}
}elseif (is_int($result) && $result != 0) {
return $result;
} else if ($result) {
return $result;
}
}else {
$query->getConnection()->close();
}
}
}
}
}catch (\Throwable $e) {
Log::error("错误信息".$e->getMessage());
throw $e;
}
return $finalResult;
}
};
}
public static function getAllSelectDb()
{
$index = self::getIndex();
return ["duanju$index-0_slave", "duanju$index-1_slave", "duanju$index-2_slave", "duanju$index-3_slave", "duanju$index-4_slave"];
}
private static function getIndex()
{
$INDEX = cache('db_index');
if ($INDEX == null) {
$INDEX = 0;
}
$INDEX = ++$INDEX % 4;
cache('db_index', $INDEX);
return $INDEX;
}
/**
* 获取数据库连接
* @param string $tableName 表名
* @param array $data 数据
* @param bool $isWrite 是否为写操作
* @return Connection
*/
public static function getConnection($table, $data = [], $isWrite = false)
{
$routeConfig = config('think-orm.route');
if (!isset($routeConfig[$table])) {
return $isWrite ? 'duanju-master' : 'duanju-slave';
}
$keyField = strpos($table, 'user') !== false || in_array($table, [
'orders', 'course_collect', 'pay_details', 'disc_spinning_record',
'cash_out', 'course_user', 'tb_user', 'task_center_record',
'user_money', 'user_sign_record', 'invite_achievement',
'invite_money', 'user_info', 'sys_user', 'user_money_details', 'sys_user_money_details'
]) ? 'user_id' : 'course_id';
if (!isset($data[$keyField])) {
Log::warning("分库警告: 表={$table}, 数据中缺少 {$keyField} 字段");
return $isWrite ? 'duanju-master' : 'duanju-slave';
}
$index = abs($data[$keyField] % 5);
$connectionTemplate = $isWrite
? $routeConfig[$table]['master']
: $routeConfig[$table]['slave'];
$connectionName = str_replace("{\${$keyField}%5}", $index, $connectionTemplate);
if (strpos($connectionName, 'slave')) {
$INDEX = self::getIndex();
$connectionName = str_replace("duanju", 'duanju'.$INDEX, $connectionName);
}
// 验证连接是否存在
$connections = config('think-orm.connections');
if (!isset($connections[$connectionName])) {
Log::error("分库错误: 表={$table}, 连接={$connectionName} 不存在");
return $isWrite ? 'duanju-master' : 'duanju-slave';
}
// Log::info("分库成功: 表={$table}, 键={$keyField}, 值={$data[$keyField]}, 索引={$index}, 连接={$connectionName}");
return $connectionName;
}
}