525 lines
17 KiB
PHP
525 lines
17 KiB
PHP
<?php
|
||
|
||
namespace app\common\library;
|
||
|
||
use app\DbCoroutineContext;
|
||
use think\Collection;
|
||
use think\db\Connection;
|
||
use think\facade\Db;
|
||
use support\Log;
|
||
|
||
class DatabaseRoute
|
||
{
|
||
|
||
/**
|
||
* 跨所有分库分页(自动 count,自动追加 LIMIT 和 ORDER BY)
|
||
*
|
||
* @param \Closure $sqlBuilder 生成基础 SQL 的闭包函数(不包含 LIMIT)
|
||
* @param int $page 当前页码
|
||
* @param int $pageSize 每页条数
|
||
* @param string|null $orderCol 排序字段(全局排序使用)
|
||
* @return array
|
||
*/
|
||
public static function paginateAllDbBySqlAutoCount(
|
||
\Closure $sqlBuilder,
|
||
int $page = 1,
|
||
int $pageSize = 20,
|
||
string $orderCol = null,
|
||
int $count = null,
|
||
): array
|
||
{
|
||
$dbMap = config('think-orm.db_map');
|
||
$counts = [];
|
||
$total = 0;
|
||
|
||
|
||
if ($count === null) {
|
||
// 1. 自动 count 分库总数
|
||
$dbList = DatabaseRoute::getAllSelectDb();
|
||
foreach ($dbMap as $connName) {
|
||
if (in_array($connName, $dbList) ) {
|
||
$baseSql = call_user_func($sqlBuilder, $connName);
|
||
$countSql = self::buildCountSql($baseSql);
|
||
$count = Db::connect($connName)->query($countSql)[0]['count'] ?? 0;
|
||
$counts[$connName] = $count;
|
||
$total += $count;
|
||
}
|
||
}
|
||
}
|
||
|
||
|
||
if ($total === 0) {
|
||
return [
|
||
'list' => [],
|
||
'totalCount' => 0,
|
||
'totalPage' => 0,
|
||
'currPage' => $page,
|
||
'pageSize' => $pageSize,
|
||
];
|
||
}
|
||
|
||
$offset = ($page - 1) * $pageSize;
|
||
if ($offset >= $total) {
|
||
return [
|
||
'list' => [],
|
||
'totalCount' => $total,
|
||
'totalPage' => (int)ceil($total / $pageSize),
|
||
'currPage' => $page,
|
||
'pageSize' => $pageSize,
|
||
];
|
||
}
|
||
|
||
$limit = $pageSize;
|
||
$allRows = [];
|
||
$skip = $offset;
|
||
|
||
// 2. 分库分页查询
|
||
foreach ($counts as $connName => $count) {
|
||
if ($count <= 0) continue;
|
||
if ($skip >= $count) {
|
||
$skip -= $count;
|
||
continue;
|
||
}
|
||
|
||
$localOffset = $skip;
|
||
$localLimit = min($count - $localOffset, $limit);
|
||
|
||
$baseSql = call_user_func($sqlBuilder, $connName);
|
||
$sql = rtrim($baseSql, ';');
|
||
|
||
if ($orderCol) {
|
||
$sql .= " ORDER BY {$orderCol} DESC";
|
||
}
|
||
$sql .= " LIMIT {$localOffset}, {$localLimit}";
|
||
|
||
$rows = Db::connect($connName)->query($sql);
|
||
$allRows = array_merge($allRows, $rows);
|
||
|
||
$limit -= $localLimit;
|
||
$skip = 0;
|
||
if ($limit <= 0) break;
|
||
}
|
||
|
||
// 3. 全局排序(若指定排序字段)
|
||
if ($orderCol) {
|
||
usort($allRows, function ($a, $b) use ($orderCol) {
|
||
return strtotime($b[$orderCol]) <=> strtotime($a[$orderCol]);
|
||
});
|
||
}
|
||
|
||
// 4. 返回结构
|
||
return [
|
||
'list' => $allRows,
|
||
'totalCount' => $total,
|
||
'totalPage' => (int)ceil($total / $pageSize),
|
||
'currPage' => $page,
|
||
'pageSize' => $pageSize,
|
||
];
|
||
}
|
||
|
||
/**
|
||
* 从 SELECT SQL 自动构建对应的 COUNT SQL
|
||
*
|
||
* @param string $sql
|
||
* @return string
|
||
*/
|
||
private static function buildCountSql(string $sql): string
|
||
{
|
||
$sql = preg_replace('/\s+ORDER\s+BY\s+.+?$/i', '', $sql);
|
||
$sql = preg_replace('/\s+LIMIT\s+\d+(\s*,\s*\d+)?$/i', '', $sql);
|
||
if (preg_match('/FROM\s+(.*)/i', $sql, $matches)) {
|
||
return 'SELECT COUNT(*) AS count FROM ' . $matches[1];
|
||
}
|
||
throw new \RuntimeException("无法从 SQL 自动构建 COUNT 查询,请检查 SQL 是否规范");
|
||
}
|
||
|
||
public static function paginateDb(
|
||
string $table,
|
||
\Closure $builder,
|
||
int $page = 1,
|
||
int $pageSize = 20,
|
||
string|array $keyInfo = null,
|
||
$isRecords = false
|
||
) {
|
||
// 构建基础查询
|
||
if ($keyInfo) {
|
||
$query = self::getDb($table, $keyInfo);
|
||
}else{
|
||
$query = Db::table($table);
|
||
}
|
||
$builder($query); // 应用外部闭包设置查询条件,如 where/order 等
|
||
|
||
// 获取总数
|
||
$totalCount = (clone $query)->count();
|
||
|
||
// 计算总页数
|
||
$totalPage = (int) ceil($totalCount / $pageSize);
|
||
$currPage = max(1, min($page, $totalPage));
|
||
|
||
// 获取分页数据
|
||
$list = $query
|
||
->page($currPage, $pageSize)
|
||
->select()
|
||
->toArray();
|
||
|
||
return [
|
||
'totalCount' => $totalCount,
|
||
'pageSize' => $pageSize,
|
||
'totalPage' => $totalPage,
|
||
'currPage' => $currPage,
|
||
$isRecords ? 'records' : 'list' => convertToCamelCase($list),
|
||
];
|
||
}
|
||
|
||
/**
|
||
* 跨所有从库分页(支持游标分页和传统页码分页)
|
||
* @param string $table 表名
|
||
* @param \Closure $builder 查询构造器
|
||
* @param int|null $page 当前页(null 表示使用游标分页)
|
||
* @param int $pageSize 每页条数
|
||
* @param string|null $lastCreateTime 游标分页的游标值(create_time)
|
||
* @return array [list, total|nextCursor, hasMore]
|
||
*/
|
||
public static function paginateAllDb(
|
||
string $table,
|
||
\Closure $builder,
|
||
int $page = 1,
|
||
int $pageSize = 20,
|
||
string $orderCol = 'create_time',
|
||
string $modelOrderCol = null,
|
||
bool $isRecords = false
|
||
): array
|
||
{
|
||
$dbMap = config('think-orm.db_map');
|
||
$counts = [];
|
||
$total = 0;
|
||
|
||
// 1. 统计每个分库总数
|
||
$dbList = DatabaseRoute::getAllSelectDb();
|
||
foreach ($dbMap as $connName) {
|
||
if (in_array($connName, $dbList) ) {
|
||
$query = Db::connect($connName)->name($table);
|
||
$query = call_user_func($builder, $query);
|
||
// 不要order,避免报错
|
||
$query->removeOption('order');
|
||
$count = $query->count();
|
||
$counts[$connName] = $count;
|
||
$total += $count;
|
||
}
|
||
}
|
||
|
||
if ($total === 0) {
|
||
return [
|
||
$isRecords ? 'records' : 'list' => [],
|
||
'totalCount' => 0,
|
||
'totalPage' => 0,
|
||
'currPage' => $page,
|
||
'pageSize' => $pageSize,
|
||
];
|
||
}
|
||
|
||
$offset = ($page - 1) * $pageSize;
|
||
if ($offset >= $total) {
|
||
// 超出总数,返回空数据
|
||
return [
|
||
$isRecords ? 'records' : 'list' => [],
|
||
'totalCount' => $total,
|
||
'totalPage' => (int)ceil($total / $pageSize),
|
||
'currPage' => $page,
|
||
'pageSize' => $pageSize,
|
||
];
|
||
}
|
||
|
||
$limit = $pageSize;
|
||
$allRows = [];
|
||
|
||
// 2. 计算每个库需要查询的偏移和条数
|
||
$skip = $offset; // 还需要跳过多少条
|
||
foreach ($counts as $connName => $count) {
|
||
if ($count <= 0) continue;
|
||
|
||
if ($skip >= $count) {
|
||
// 跳过整个分库
|
||
$skip -= $count;
|
||
continue;
|
||
}
|
||
|
||
// 本库要查询的起始 offset
|
||
$localOffset = $skip;
|
||
// 本库剩余条数
|
||
$localLimit = min($count - $localOffset, $limit);
|
||
|
||
// 查询数据
|
||
$query = Db::connect($connName)->name($table);
|
||
$query = call_user_func($builder, $query);
|
||
$rows = $query->order($orderCol, 'desc')
|
||
->limit($localOffset, $localLimit)
|
||
->select()
|
||
->toArray();
|
||
|
||
$allRows = array_merge($allRows, $rows);
|
||
|
||
$limit -= $localLimit; // 剩余要查询的条数
|
||
$skip = 0; // 之后库不再跳过
|
||
|
||
if ($limit <= 0) break; // 已查够
|
||
}
|
||
|
||
// 3. 全局排序(降序)
|
||
if ($modelOrderCol) {
|
||
usort($allRows, function ($a, $b) use ($modelOrderCol) {
|
||
return strtotime($b[$modelOrderCol]) <=> strtotime($a[$modelOrderCol]);
|
||
});
|
||
}
|
||
|
||
// 4. 返回分页数据
|
||
return [
|
||
$isRecords ? 'records' : 'list' => $allRows,
|
||
'totalCount' => $total,
|
||
'totalPage' => (int)ceil($total / $pageSize),
|
||
'currPage' => $page,
|
||
'pageSize' => $pageSize,
|
||
];
|
||
}
|
||
|
||
|
||
/**
|
||
* 对所有从库执行事务(XA 分布式模拟,非真实两阶段提交)
|
||
* @param \Closure $callback
|
||
* @return mixed
|
||
* @throws \Throwable
|
||
*/
|
||
public static function transactionXa(\Closure $callback)
|
||
{
|
||
DbCoroutineContext::set('startTrans', true);
|
||
|
||
try {
|
||
|
||
// 执行用户操作(传入事务闭包)
|
||
$result = call_user_func($callback);
|
||
// 提交所有连接
|
||
DbCoroutineContext::commit();
|
||
return $result;
|
||
} catch (\Throwable $e) {
|
||
// 回滚所有连接
|
||
DbCoroutineContext::rollback();
|
||
|
||
Log::error("transactionXa 全部回滚:" . $e->getMessage());
|
||
Log::info('错误信息'.$e->getMessage().'具体信息'.$e->getTraceAsString());
|
||
throw $e;
|
||
}finally {
|
||
DbCoroutineContext::clearList();
|
||
DbCoroutineContext::set('startTrans', false);
|
||
}
|
||
}
|
||
|
||
public static function getMasterDb($table, $isWrite = false)
|
||
{
|
||
$conn = Db::connect();
|
||
if ($isWrite) {
|
||
DbCoroutineContext::put($conn);
|
||
}
|
||
return $conn->name($table);
|
||
}
|
||
|
||
|
||
/**
|
||
* 根据表和分库值获取db
|
||
* @param $table string 表名
|
||
* @param $keyInfo string | array
|
||
*/
|
||
public static function getDb($table, $keyInfo, $isWrite = false, $isUpdate = false, $addWhere = true)
|
||
{
|
||
if (is_array($keyInfo)) {
|
||
$con = Db::connect(DatabaseRoute::getConnection($table, $keyInfo, $isWrite));
|
||
if ($isWrite) {
|
||
DbCoroutineContext::put($con);
|
||
}
|
||
$model = $con->name($table);
|
||
if ($addWhere && (!$isWrite || $isUpdate)) {
|
||
$model->where($keyInfo);
|
||
}
|
||
} else {
|
||
$con = Db::connect(DatabaseRoute::getConnection($table, ['user_id' => $keyInfo], $isWrite));
|
||
if ($isWrite) {
|
||
DbCoroutineContext::put($con);
|
||
}
|
||
$model = $con->name($table);
|
||
if ($addWhere && (!$isWrite || $isUpdate)) {
|
||
$model->where([
|
||
'user_id' => $keyInfo
|
||
]);
|
||
}
|
||
}
|
||
|
||
return $model;
|
||
}
|
||
|
||
/**
|
||
* 跨所有分库直接删除(危险操作,务必确认条件准确)
|
||
* @param string $table 表名
|
||
* @param \Closure $builder 条件构造器,如 function($query) { return $query->where('xxx', xxx); }
|
||
* @return int 删除的总记录数
|
||
*/
|
||
public static function deleteAllDbDirect(string $table, \Closure $builder): int
|
||
{
|
||
$dbMap = config('think-orm.db_master_map');
|
||
$totalDeleted = 0;
|
||
|
||
foreach ($dbMap as $connName) {
|
||
if ($connName === 'duanju_master') continue;
|
||
|
||
$query = Db::connect($connName)->name($table);
|
||
$query = call_user_func($builder, $query);
|
||
$deleted = $query->delete();
|
||
$totalDeleted += $deleted;
|
||
}
|
||
|
||
return $totalDeleted;
|
||
}
|
||
|
||
public static function getAllDbData($table, \Closure $builder)
|
||
{
|
||
|
||
return new class($table, $builder) {
|
||
protected $table;
|
||
protected $builder;
|
||
protected $dbMap;
|
||
protected $masterDbMap;
|
||
|
||
public function __construct($table, $builder)
|
||
{
|
||
$this->table = $table;
|
||
$this->builder = $builder;
|
||
$this->dbMap = config('think-orm.db_map');
|
||
$this->masterDbMap = config('think-orm.db_master_map');
|
||
}
|
||
|
||
public function __call($method, $args)
|
||
{
|
||
$finalResult = null;
|
||
try {
|
||
if ($method == 'insert') {
|
||
throw new \RuntimeException("不支持insert");
|
||
}
|
||
// insert 和 update 使用主库
|
||
if ($method == 'update' || $method == 'delete') {
|
||
foreach ($this->masterDbMap as $dbname) {
|
||
if ($dbname !== 'duanju_master') {
|
||
$query = Db::connect($dbname)->name($this->table);
|
||
$query = call_user_func($this->builder, $query);
|
||
if (method_exists($query, $method)) {
|
||
$result = call_user_func_array([$query, $method], $args);
|
||
$finalResult = $result;
|
||
// find 返回 null,select 返回空数组,count 返回数字
|
||
if ($result || $result === 0) {
|
||
return $result;
|
||
}
|
||
}else {
|
||
}
|
||
}
|
||
}
|
||
return $finalResult;
|
||
}else{
|
||
$dbList = DatabaseRoute::getAllSelectDb();
|
||
foreach ($this->dbMap as $dbname) {
|
||
if (in_array($dbname, $dbList) ) {
|
||
$query = Db::connect($dbname)->name($this->table);
|
||
$query = call_user_func($this->builder, $query);
|
||
if (method_exists($query, $method)) {
|
||
$result = call_user_func_array([$query, $method], $args);
|
||
// find 返回 null,select 返回空数组,count 返回数字
|
||
$finalResult = $result;
|
||
if ($result instanceof Collection) {
|
||
if (!$result->isEmpty()) {
|
||
return $result;
|
||
}
|
||
}elseif (is_int($result) && $result != 0) {
|
||
return $result;
|
||
} else if ($result) {
|
||
return $result;
|
||
}
|
||
}else {
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}catch (\Throwable $e) {
|
||
Log::error("错误信息".$e->getMessage());
|
||
throw $e;
|
||
}
|
||
|
||
|
||
return $finalResult;
|
||
}
|
||
};
|
||
}
|
||
|
||
public static function getAllSelectDb()
|
||
{
|
||
$index = self::getIndex();
|
||
return ["duanju$index-0_slave", "duanju$index-1_slave", "duanju$index-2_slave", "duanju$index-3_slave", "duanju$index-4_slave"];
|
||
}
|
||
|
||
|
||
private static function getIndex()
|
||
{
|
||
$INDEX = cache('db_index');
|
||
if ($INDEX == null) {
|
||
$INDEX = 0;
|
||
}
|
||
$INDEX = ++$INDEX % 4;
|
||
cache('db_index', $INDEX);
|
||
return $INDEX;
|
||
}
|
||
|
||
|
||
/**
|
||
* 获取数据库连接
|
||
* @param string $tableName 表名
|
||
* @param array $data 数据
|
||
* @param bool $isWrite 是否为写操作
|
||
* @return Connection
|
||
*/
|
||
public static function getConnection($table, $data = [], $isWrite = false)
|
||
{
|
||
|
||
|
||
$routeConfig = config('think-orm.route');
|
||
if (!isset($routeConfig[$table])) {
|
||
return $isWrite ? 'duanju-master' : 'duanju-slave';
|
||
}
|
||
$keyField = strpos($table, 'user') !== false || in_array($table, [
|
||
'orders', 'course_collect', 'pay_details', 'disc_spinning_record',
|
||
'cash_out', 'course_user', 'tb_user', 'task_center_record',
|
||
'user_money', 'user_sign_record', 'invite_achievement',
|
||
'invite_money', 'user_info', 'sys_user', 'user_money_details', 'sys_user_money_details'
|
||
]) ? 'user_id' : 'course_id';
|
||
|
||
if (!isset($data[$keyField])) {
|
||
Log::warning("分库警告: 表={$table}, 数据中缺少 {$keyField} 字段");
|
||
return $isWrite ? 'duanju-master' : 'duanju-slave';
|
||
}
|
||
|
||
$index = abs($data[$keyField] % 5);
|
||
$connectionTemplate = $isWrite
|
||
? $routeConfig[$table]['master']
|
||
: $routeConfig[$table]['slave'];
|
||
|
||
$connectionName = str_replace("{\${$keyField}%5}", $index, $connectionTemplate);
|
||
|
||
if (strpos($connectionName, 'slave')) {
|
||
$INDEX = self::getIndex();
|
||
$connectionName = str_replace("duanju", 'duanju'.$INDEX, $connectionName);
|
||
}
|
||
// 验证连接是否存在
|
||
$connections = config('think-orm.connections');
|
||
if (!isset($connections[$connectionName])) {
|
||
Log::error("分库错误: 表={$table}, 连接={$connectionName} 不存在");
|
||
return $isWrite ? 'duanju-master' : 'duanju-slave';
|
||
}
|
||
|
||
// Log::info("分库成功: 表={$table}, 键={$keyField}, 值={$data[$keyField]}, 索引={$index}, 连接={$connectionName}");
|
||
return $connectionName;
|
||
}
|
||
} |