diff --git a/extend/Config/Db.php b/extend/Config/Db.php deleted file mode 100644 index 6390e50..0000000 --- a/extend/Config/Db.php +++ /dev/null @@ -1,37 +0,0 @@ -select('name,age')->from('users')->where('age>12')->query();print_r()exit - * 等价于 - * $user_array = Db::instance('db1')->query('SELECT `name`,`age` FROM `users` WHERE `age`>12'); - * @var array - */ -class Db -{ - public static $db = array( - 'host' => DB_HOST, - 'user' => DB_USER, - 'password' =>DB_PWD, - 'dbname' => DB_NAME, - 'port' => DB_PORT, - 'charset' => DB_CHARSET, - ); -} - diff --git a/extend/workermans/model/Base.php b/extend/workermans/model/Base.php index a22cc24..bdd112e 100644 --- a/extend/workermans/model/Base.php +++ b/extend/workermans/model/Base.php @@ -4,6 +4,7 @@ namespace extend\workermans\model; use ba\Exception; use ba\Random; +use support\Redis; use support\think\Db; use GatewayWorker\Lib\Gateway; use support\Log; @@ -16,7 +17,6 @@ class Base extends Model { public static $db = null; - public static $redis = null; public static $client_id; public static $shop_id; public static $account; @@ -124,12 +124,11 @@ class Base extends Model $session[$msg_id] = $time; Gateway::setSession($g_client_id, $session); $redis_name = $g_client_id . '_lock'; - $redis->get($redis_name, function ($result, $redis) use ($msg_id, $redis_name){ - if($result) { - }else { - $redis->set($redis_name, $msg_id, 3600); - } - }); + $result = Redis::get($redis_name); + if($result) { + }else { + $redis->set($redis_name, $msg_id, 3600); + } } print_r('锁单的key-----' . $msg_id . "\r\n"); // 直接保存锁定 @@ -160,11 +159,10 @@ class Base extends Model unset($session[$msg_id]); Gateway::setSession($g_client_id, $session); $redis_name = $g_client_id . '_lock'; - $redis->get($redis_name, function ($result, $redis) use ($redis_name){ - if($result) { - $redis->del($redis_name); - } - }); + $result = Redis::get($redis_name); + if($result) { + $redis->del($redis_name); + } } print_r('解锁的key-----' . $msg_id . "\r\n"); $rand = 'pay_unlock' . Random::build(); @@ -268,94 +266,89 @@ class Base extends Model public static function geteventredis_new() { print_r('定时器' . "\r\n"); - $redis = self::$redis; $gateway = Gateway::class; // 处理消息(包含推送) $redis_name = Gateway::getAllClientSessions(); foreach ($redis_name as $client_id => $client_id_data) { - $order_print_client_id = $client_id . '_orderPrint'; if(Gateway::isOnline($client_id)) { + $result = Redis::get($client_id); // 处理普通消息 - $redis->get($client_id, function ($result, $redis) use($gateway, $client_id){ - if($result) { - $msg_info_arr = json_decode($result, true); - if($msg_info_arr['send_num'] < self::RETRYCOUNT) { - // 推送类型 - $method = ''; - switch ($msg_info_arr['method']) { - case 'sendToClient': - $method = 'isOnline'; - break; - case 'sendToUid': - $method = 'isUidOnline'; - break; - } - // 如果在线就推送 - if($method && call_user_func_array([$gateway, $method], [$msg_info_arr['send_id']])) { - // 再次推送 - call_user_func_array([$gateway, $msg_info_arr['method']], [$msg_info_arr['send_id'], $result]); - print_r('推送了-----' . "\r\n"); - // 推送以后新增推送次数 - $msg_info_arr['send_num'] += 1; - $redis->set($client_id, json_encode($msg_info_arr), self::REDIS_TIME); - return true; - } + if($result) { + $msg_info_arr = json_decode($result, true); + if($msg_info_arr['send_num'] < self::RETRYCOUNT) { + // 推送类型 + $method = ''; + switch ($msg_info_arr['method']) { + case 'sendToClient': + $method = 'isOnline'; + break; + case 'sendToUid': + $method = 'isUidOnline'; + break; } - self::add_log_file('----无回执--' .date('Y-m-d H:i:s'). '--->' . $result); - } - }); - - $r_name = $client_id . '_lock'; - // 处理锁单消息 - $redis->get($r_name, function ($result, $redis) use($client_id, $gateway, $r_name){ - if($result) { - $session = $gateway::getSession($client_id); - print_r('session的内容-----' . json_encode($session) . "\r\n"); - if($session && isset($session[$result])) { - // 如果大于五秒没有更新,解除锁定 - if(time() - $session[$result] > 10) { - unset($session[$result]); - $gateway::setSession($client_id, $session); - $redis->del($r_name); - print_r('解除锁定了-----' . json_encode($session) . "\r\n"); - return true; - } - } - } - }); - - - $uid = $gateway::getUidByClientId($client_id); - // 处理打印消息 - $redis->get($uid, function ($result, $redis) use($gateway, $uid){ - if($result) { - $msg_info_arr = json_decode($result, true); - foreach ($msg_info_arr as $k => $v) { - if($v['send_num'] < self::RETRYCOUNT) { - $msg_info_arr[$k]['send_num'] += 1; - $v_json = json_encode($v); - call_user_func_array([$gateway, $v['method']], [$v['send_id'], $v_json]); - // 推送以后新增推送次数 - self::add_log_file('推送收银机订单打印(重试第' . $v['send_num'] . '次)--->' . $v_json, 'cashier'); -// $redis->set($uid, json_encode($v), self::REDIS_TIME); -// array_push($new_arr, $v); - }else { - unset($msg_info_arr[$k]); - $msg_info_arr = array_values($msg_info_arr); - self::add_log_file('----推送收银机订单打印无回执--->' . $result, 'cashier'); - } - if(empty($msg_info_arr)) { - $redis->del($uid); - }else { - $redis->set($uid, json_encode($msg_info_arr), self::REDIS_TIME); - } + // 如果在线就推送 + if($method && call_user_func_array([$gateway, $method], [$msg_info_arr['send_id']])) { + // 再次推送 + call_user_func_array([$gateway, $msg_info_arr['method']], [$msg_info_arr['send_id'], $result]); + print_r('推送了-----' . "\r\n"); + // 推送以后新增推送次数 + $msg_info_arr['send_num'] += 1; + Redis::set($client_id, json_encode($msg_info_arr), self::REDIS_TIME); return true; } } - }); + self::add_log_file('----无回执--' .date('Y-m-d H:i:s'). '--->' . $result); + } + + $r_name = $client_id . '_lock'; + // 处理锁单消息 + $result = Redis::get($r_name); + if($result) { + $session = $gateway::getSession($client_id); + print_r('session的内容-----' . json_encode($session) . "\r\n"); + if($session && isset($session[$result])) { + // 如果大于五秒没有更新,解除锁定 + if(time() - $session[$result] > 10) { + unset($session[$result]); + $gateway::setSession($client_id, $session); + Redis::del($r_name); + print_r('解除锁定了-----' . json_encode($session) . "\r\n"); + return true; + } + } + } + + + $uid = $gateway::getUidByClientId($client_id); + $result = Redis::get($uid); + // 处理打印消息 + if($result) { + $msg_info_arr = json_decode($result, true); + foreach ($msg_info_arr as $k => $v) { + if($v['send_num'] < self::RETRYCOUNT) { + $msg_info_arr[$k]['send_num'] += 1; + $v_json = json_encode($v); + call_user_func_array([$gateway, $v['method']], [$v['send_id'], $v_json]); + // 推送以后新增推送次数 + self::add_log_file('推送收银机订单打印(重试第' . $v['send_num'] . '次)--->' . $v_json, 'cashier'); +// $redis->set($uid, json_encode($v), self::REDIS_TIME); +// array_push($new_arr, $v); + }else { + unset($msg_info_arr[$k]); + $msg_info_arr = array_values($msg_info_arr); + self::add_log_file('----推送收银机订单打印无回执--->' . $result, 'cashier'); + } + if(empty($msg_info_arr)) { + Redis::del($uid); + }else { + Redis::set($uid, json_encode($msg_info_arr), self::REDIS_TIME); + } + return true; + } + } } - $redis->del($client_id); - $redis->del($uid); + Redis::del($client_id); + Redis::del($uid); } // 处理在线列表 self::saveonlinelist($redis_name); @@ -365,22 +358,20 @@ class Base extends Model public static function saveonlinelist(array $client_id_list) { if(is_array($client_id_list)) { - $redis = self::$redis; $redis_name = date('Y-m-d') . '_online_number'; - $redis->get($redis_name, function ($result, $redis) use ($redis_name, $client_id_list){ - if($result) { - $result = json_decode($result, true); - $client_id_arr = []; - foreach ($client_id_list as $client_id => $client_id_info) { - $client_id_arr[] = $client_id; - } - foreach ($result as $type => $redis_client_list) { - $s_arr = array_values(array_intersect($client_id_arr, $redis_client_list)); - $result[$type] = $s_arr; - } - $redis->set($redis_name, json_encode($result), 60 * 60 * 24); + $result = Redis::get($redis_name); + if($result) { + $result = json_decode($result, true); + $client_id_arr = []; + foreach ($client_id_list as $client_id => $client_id_info) { + $client_id_arr[] = $client_id; } - }); + foreach ($result as $type => $redis_client_list) { + $s_arr = array_values(array_intersect($client_id_arr, $redis_client_list)); + $result[$type] = $s_arr; + } + Redis::set($redis_name, json_encode($result), 60 * 60 * 24); + } } } @@ -432,86 +423,85 @@ class Base extends Model $gateway = Gateway::class; $redis = self::$redis; // 找到本时候的数据msg_id - $redis->get($redis_name, function ($result, $redis) use ($gateway, $redis_name) { - if($result) { - $msg_id_arr = json_decode($result, true); - foreach ($msg_id_arr as $k => $msg) { - $redis->get($msg, function ($result, $redis) use($gateway, $k, $msg, $redis_name, $msg_id_arr){ - if($result) { - $msg_info_arr = json_decode($result, true); - if($msg_info_arr['send_num'] < self::RETRYCOUNT) { - // 推送类型 - $method = ''; - switch ($msg_info_arr['method']) { - case 'sendToClient': - $method = 'isOnline'; - break; - case 'sendToUid': - $method = 'isUidOnline'; - break; - } - if($msg_info_arr['method'] == 'sendToGroup') { - // 发送广播并将 self::$client_id 排除在外 - Gateway::sendToGroup($msg_info_arr['send_id'], $result, !empty($msg_info_arr['no_client_id'])?$msg_info_arr['no_client_id']:[]); + $result = Redis::get($redis_name); + if($result) { + $msg_id_arr = json_decode($result, true); + foreach ($msg_id_arr as $k => $msg) { + $redis->get($msg, function ($result, $redis) use($gateway, $k, $msg, $redis_name, $msg_id_arr){ + if($result) { + $msg_info_arr = json_decode($result, true); + if($msg_info_arr['send_num'] < self::RETRYCOUNT) { + // 推送类型 + $method = ''; + switch ($msg_info_arr['method']) { + case 'sendToClient': + $method = 'isOnline'; + break; + case 'sendToUid': + $method = 'isUidOnline'; + break; + } + if($msg_info_arr['method'] == 'sendToGroup') { + // 发送广播并将 self::$client_id 排除在外 + Gateway::sendToGroup($msg_info_arr['send_id'], $result, !empty($msg_info_arr['no_client_id'])?$msg_info_arr['no_client_id']:[]); + // 推送以后新增推送次数 + $msg_info_arr['send_num'] += 1; + }else { + // 如果在线就推送 + if($method && call_user_func_array([$gateway, $method], [$msg_info_arr['send_id']])) { + // 再次推送 + call_user_func_array([$gateway, $msg_info_arr['method']], [$msg_info_arr['send_id'], $result]); // 推送以后新增推送次数 $msg_info_arr['send_num'] += 1; - }else { - // 如果在线就推送 - if($method && call_user_func_array([$gateway, $method], [$msg_info_arr['send_id']])) { - // 再次推送 - call_user_func_array([$gateway, $msg_info_arr['method']], [$msg_info_arr['send_id'], $result]); - // 推送以后新增推送次数 - $msg_info_arr['send_num'] += 1; - } } - $redis->set($msg, json_encode($msg_info_arr), 3600); - }else { - // 失败,重发次数达到最大限度值。记录日志并删除数据 - $text = '------>无回执----->'. $result . "\r\n"; - self::add_log_file($text); - if(strstr($msg,'cashier_order')) { - $serviceLog = [ - 'queue' => 'test-order.print.queue', - 'msg' => $msg_info_arr['data'], - 'type' => 'orderPrint', - 'plat' => 'Cashier', - 'create_time' => date('Y-m-d H:i:s'), - 'fail_time' => date('Y-m-d H:i:s'), - 'err_info' => '无回执', - ]; - Db::table('mq_log')->insert($serviceLog); - } - // 删除redis - $redis->del($msg); - print_r('msg---->' . $msg . '已经删除' . date('Y-m-d H:i:s') . "\r\n"); - $redis->get($redis_name, function ($result, $redis) use ($redis_name, $msg) { - $result = json_decode($result, true); - if($result) { - $key = array_search($msg, $result); - if($key === false) { + } + $redis->set($msg, json_encode($msg_info_arr), 3600); + }else { + // 失败,重发次数达到最大限度值。记录日志并删除数据 + $text = '------>无回执----->'. $result . "\r\n"; + self::add_log_file($text); + if(strstr($msg,'cashier_order')) { + $serviceLog = [ + 'queue' => 'test-order.print.queue', + 'msg' => $msg_info_arr['data'], + 'type' => 'orderPrint', + 'plat' => 'Cashier', + 'create_time' => date('Y-m-d H:i:s'), + 'fail_time' => date('Y-m-d H:i:s'), + 'err_info' => '无回执', + ]; + Db::table('mq_log')->insert($serviceLog); + } + // 删除redis + $redis->del($msg); + print_r('msg---->' . $msg . '已经删除' . date('Y-m-d H:i:s') . "\r\n"); + $redis->get($redis_name, function ($result, $redis) use ($redis_name, $msg) { + $result = json_decode($result, true); + if($result) { + $key = array_search($msg, $result); + if($key === false) { + }else { + print_r('删除前---->' . json_encode($result) . date('Y-m-d H:i:s') . "\r\n"); + unset($result[$key]); + print_r('删除后---->' . json_encode($result). date('Y-m-d H:i:s') ."\r\n"); + $result = array_values($result); + print_r('排序后---->' . json_encode($result). date('Y-m-d H:i:s') . "\r\n"); + if(empty($result)) { + $redis->del($redis_name); }else { - print_r('删除前---->' . json_encode($result) . date('Y-m-d H:i:s') . "\r\n"); - unset($result[$key]); - print_r('删除后---->' . json_encode($result). date('Y-m-d H:i:s') ."\r\n"); - $result = array_values($result); - print_r('排序后---->' . json_encode($result). date('Y-m-d H:i:s') . "\r\n"); - if(empty($result)) { - $redis->del($redis_name); - }else { - print_r('保存了---->' . json_encode($result). date('Y-m-d H:i:s') . "\r\n"); - $redis->set($redis_name, json_encode($result), 3600, function ($r) { - }); - } + print_r('保存了---->' . json_encode($result). date('Y-m-d H:i:s') . "\r\n"); + $redis->set($redis_name, json_encode($result), 3600, function ($r) { + }); } } - }); - } + } + }); } - }); - sleep(1); - } + } + }); + sleep(1); } - }); + } } // 购物车 多端同步 UID diff --git a/extend/workermans/model/RedisInit.php b/extend/workermans/model/RedisInit.php deleted file mode 100644 index 5f9993f..0000000 --- a/extend/workermans/model/RedisInit.php +++ /dev/null @@ -1,16 +0,0 @@ -reusePort = true; - $redis = Base::$redis; - $http_worker->onMessage = function (TcpConnection $connection, Request $request) use($redis) { + $http_worker->onMessage = function (TcpConnection $connection, Request $request) { $method = $request->get('method'); $account = $request->get('account'); $params = $request->get('params'); @@ -40,19 +37,18 @@ class Events $params_arr = json_decode($params, true); // 如果是订单打印并且通过uid推送, 保存redis if($params_arr['operate_type'] == 'order_print' && $method == 'sendToUid') { - $redis->get($params_arr['send_id'], function ($result)use($params_arr, $params){ - if($result) { - $msg_id_arr = json_decode($result, true); - $msg_id_arr_c = count($msg_id_arr); - $msg_id_arr[$msg_id_arr_c] = $params_arr; - $result_n = json_encode($msg_id_arr); - Base::setredis_new($result_n, $params_arr['send_id']); - }else { - $params_arr_n[] = $params_arr; - print_r('数组-------->' . json_encode($params_arr_n)) . "\r\n"; - Base::setredis_new(json_encode($params_arr_n), $params_arr['send_id']); - } - }); + $result = Redis::get($params_arr['send_id']); + if($result) { + $msg_id_arr = json_decode($result, true); + $msg_id_arr_c = count($msg_id_arr); + $msg_id_arr[$msg_id_arr_c] = $params_arr; + $result_n = json_encode($msg_id_arr); + Base::setredis_new($result_n, $params_arr['send_id']); + }else { + $params_arr_n[] = $params_arr; + print_r('数组-------->' . json_encode($params_arr_n)) . "\r\n"; + Base::setredis_new(json_encode($params_arr_n), $params_arr['send_id']); + } } return $connection->send('ok'); @@ -64,8 +60,6 @@ class Events print_r('定时器'); // 启动定时器 定时器处理消息回执/在线列表 $_SESSION['auth_timer_id'] = Timer::add(10, function (){ -// print_r('时间--->' . date('Y-m-d H:i:s') . "\r\n"); -// print_r( "\r\n"); Base::geteventredis_new(); }); } @@ -172,13 +166,11 @@ class Events $gate_way = GateWay::class; $number = $message['number']; try { - $redis = self::$redis; - $redis->select(3); - $red_number = $redis->get('shop:' . $message['shop_id'] . ':product-stock:' . $message['product_id']); + Redis::select(3); + $red_number = Redis::get('shop:' . $message['shop_id'] . ':product-stock:' . $message['product_id']); }catch (\Exception $e) { print_r( "Redis 错误: " . $e->getMessage() . "\r\n"); } - $redis->close(); print_r('库存数量-->' . $red_number . "\r\n" . date('Y-m-d H:i:s')); if(is_numeric($red_number)) { // 起售数量和库存之间的处理