redis edit

This commit is contained in:
2025-10-13 15:04:29 +08:00
parent 1daf66fdea
commit 87774ca20b
4 changed files with 180 additions and 251 deletions

View File

@@ -4,6 +4,7 @@ namespace extend\workermans\model;
use ba\Exception;
use ba\Random;
use support\Redis;
use support\think\Db;
use GatewayWorker\Lib\Gateway;
use support\Log;
@@ -16,7 +17,6 @@ class Base extends Model
{
public static $db = null;
public static $redis = null;
public static $client_id;
public static $shop_id;
public static $account;
@@ -124,12 +124,11 @@ class Base extends Model
$session[$msg_id] = $time;
Gateway::setSession($g_client_id, $session);
$redis_name = $g_client_id . '_lock';
$redis->get($redis_name, function ($result, $redis) use ($msg_id, $redis_name){
if($result) {
}else {
$redis->set($redis_name, $msg_id, 3600);
}
});
$result = Redis::get($redis_name);
if($result) {
}else {
$redis->set($redis_name, $msg_id, 3600);
}
}
print_r('锁单的key-----' . $msg_id . "\r\n");
// 直接保存锁定
@@ -160,11 +159,10 @@ class Base extends Model
unset($session[$msg_id]);
Gateway::setSession($g_client_id, $session);
$redis_name = $g_client_id . '_lock';
$redis->get($redis_name, function ($result, $redis) use ($redis_name){
if($result) {
$redis->del($redis_name);
}
});
$result = Redis::get($redis_name);
if($result) {
$redis->del($redis_name);
}
}
print_r('解锁的key-----' . $msg_id . "\r\n");
$rand = 'pay_unlock' . Random::build();
@@ -268,94 +266,89 @@ class Base extends Model
public static function geteventredis_new()
{
print_r('定时器' . "\r\n");
$redis = self::$redis;
$gateway = Gateway::class;
// 处理消息(包含推送)
$redis_name = Gateway::getAllClientSessions();
foreach ($redis_name as $client_id => $client_id_data) {
$order_print_client_id = $client_id . '_orderPrint';
if(Gateway::isOnline($client_id)) {
$result = Redis::get($client_id);
// 处理普通消息
$redis->get($client_id, function ($result, $redis) use($gateway, $client_id){
if($result) {
$msg_info_arr = json_decode($result, true);
if($msg_info_arr['send_num'] < self::RETRYCOUNT) {
// 推送类型
$method = '';
switch ($msg_info_arr['method']) {
case 'sendToClient':
$method = 'isOnline';
break;
case 'sendToUid':
$method = 'isUidOnline';
break;
}
// 如果在线就推送
if($method && call_user_func_array([$gateway, $method], [$msg_info_arr['send_id']])) {
// 再次推送
call_user_func_array([$gateway, $msg_info_arr['method']], [$msg_info_arr['send_id'], $result]);
print_r('推送了-----' . "\r\n");
// 推送以后新增推送次数
$msg_info_arr['send_num'] += 1;
$redis->set($client_id, json_encode($msg_info_arr), self::REDIS_TIME);
return true;
}
if($result) {
$msg_info_arr = json_decode($result, true);
if($msg_info_arr['send_num'] < self::RETRYCOUNT) {
// 推送类型
$method = '';
switch ($msg_info_arr['method']) {
case 'sendToClient':
$method = 'isOnline';
break;
case 'sendToUid':
$method = 'isUidOnline';
break;
}
self::add_log_file('----无回执--' .date('Y-m-d H:i:s'). '--->' . $result);
}
});
$r_name = $client_id . '_lock';
// 处理锁单消息
$redis->get($r_name, function ($result, $redis) use($client_id, $gateway, $r_name){
if($result) {
$session = $gateway::getSession($client_id);
print_r('session的内容-----' . json_encode($session) . "\r\n");
if($session && isset($session[$result])) {
// 如果大于五秒没有更新,解除锁定
if(time() - $session[$result] > 10) {
unset($session[$result]);
$gateway::setSession($client_id, $session);
$redis->del($r_name);
print_r('解除锁定了-----' . json_encode($session) . "\r\n");
return true;
}
}
}
});
$uid = $gateway::getUidByClientId($client_id);
// 处理打印消息
$redis->get($uid, function ($result, $redis) use($gateway, $uid){
if($result) {
$msg_info_arr = json_decode($result, true);
foreach ($msg_info_arr as $k => $v) {
if($v['send_num'] < self::RETRYCOUNT) {
$msg_info_arr[$k]['send_num'] += 1;
$v_json = json_encode($v);
call_user_func_array([$gateway, $v['method']], [$v['send_id'], $v_json]);
// 推送以后新增推送次数
self::add_log_file('推送收银机订单打印(重试第' . $v['send_num'] . '次)--->' . $v_json, 'cashier');
// $redis->set($uid, json_encode($v), self::REDIS_TIME);
// array_push($new_arr, $v);
}else {
unset($msg_info_arr[$k]);
$msg_info_arr = array_values($msg_info_arr);
self::add_log_file('----推送收银机订单打印无回执--->' . $result, 'cashier');
}
if(empty($msg_info_arr)) {
$redis->del($uid);
}else {
$redis->set($uid, json_encode($msg_info_arr), self::REDIS_TIME);
}
// 如果在线就推送
if($method && call_user_func_array([$gateway, $method], [$msg_info_arr['send_id']])) {
// 再次推送
call_user_func_array([$gateway, $msg_info_arr['method']], [$msg_info_arr['send_id'], $result]);
print_r('推送了-----' . "\r\n");
// 推送以后新增推送次数
$msg_info_arr['send_num'] += 1;
Redis::set($client_id, json_encode($msg_info_arr), self::REDIS_TIME);
return true;
}
}
});
self::add_log_file('----无回执--' .date('Y-m-d H:i:s'). '--->' . $result);
}
$r_name = $client_id . '_lock';
// 处理锁单消息
$result = Redis::get($r_name);
if($result) {
$session = $gateway::getSession($client_id);
print_r('session的内容-----' . json_encode($session) . "\r\n");
if($session && isset($session[$result])) {
// 如果大于五秒没有更新,解除锁定
if(time() - $session[$result] > 10) {
unset($session[$result]);
$gateway::setSession($client_id, $session);
Redis::del($r_name);
print_r('解除锁定了-----' . json_encode($session) . "\r\n");
return true;
}
}
}
$uid = $gateway::getUidByClientId($client_id);
$result = Redis::get($uid);
// 处理打印消息
if($result) {
$msg_info_arr = json_decode($result, true);
foreach ($msg_info_arr as $k => $v) {
if($v['send_num'] < self::RETRYCOUNT) {
$msg_info_arr[$k]['send_num'] += 1;
$v_json = json_encode($v);
call_user_func_array([$gateway, $v['method']], [$v['send_id'], $v_json]);
// 推送以后新增推送次数
self::add_log_file('推送收银机订单打印(重试第' . $v['send_num'] . '次)--->' . $v_json, 'cashier');
// $redis->set($uid, json_encode($v), self::REDIS_TIME);
// array_push($new_arr, $v);
}else {
unset($msg_info_arr[$k]);
$msg_info_arr = array_values($msg_info_arr);
self::add_log_file('----推送收银机订单打印无回执--->' . $result, 'cashier');
}
if(empty($msg_info_arr)) {
Redis::del($uid);
}else {
Redis::set($uid, json_encode($msg_info_arr), self::REDIS_TIME);
}
return true;
}
}
}
$redis->del($client_id);
$redis->del($uid);
Redis::del($client_id);
Redis::del($uid);
}
// 处理在线列表
self::saveonlinelist($redis_name);
@@ -365,22 +358,20 @@ class Base extends Model
public static function saveonlinelist(array $client_id_list)
{
if(is_array($client_id_list)) {
$redis = self::$redis;
$redis_name = date('Y-m-d') . '_online_number';
$redis->get($redis_name, function ($result, $redis) use ($redis_name, $client_id_list){
if($result) {
$result = json_decode($result, true);
$client_id_arr = [];
foreach ($client_id_list as $client_id => $client_id_info) {
$client_id_arr[] = $client_id;
}
foreach ($result as $type => $redis_client_list) {
$s_arr = array_values(array_intersect($client_id_arr, $redis_client_list));
$result[$type] = $s_arr;
}
$redis->set($redis_name, json_encode($result), 60 * 60 * 24);
$result = Redis::get($redis_name);
if($result) {
$result = json_decode($result, true);
$client_id_arr = [];
foreach ($client_id_list as $client_id => $client_id_info) {
$client_id_arr[] = $client_id;
}
});
foreach ($result as $type => $redis_client_list) {
$s_arr = array_values(array_intersect($client_id_arr, $redis_client_list));
$result[$type] = $s_arr;
}
Redis::set($redis_name, json_encode($result), 60 * 60 * 24);
}
}
}
@@ -432,86 +423,85 @@ class Base extends Model
$gateway = Gateway::class;
$redis = self::$redis;
// 找到本时候的数据msg_id
$redis->get($redis_name, function ($result, $redis) use ($gateway, $redis_name) {
if($result) {
$msg_id_arr = json_decode($result, true);
foreach ($msg_id_arr as $k => $msg) {
$redis->get($msg, function ($result, $redis) use($gateway, $k, $msg, $redis_name, $msg_id_arr){
if($result) {
$msg_info_arr = json_decode($result, true);
if($msg_info_arr['send_num'] < self::RETRYCOUNT) {
// 推送类型
$method = '';
switch ($msg_info_arr['method']) {
case 'sendToClient':
$method = 'isOnline';
break;
case 'sendToUid':
$method = 'isUidOnline';
break;
}
if($msg_info_arr['method'] == 'sendToGroup') {
// 发送广播并将 self::$client_id 排除在外
Gateway::sendToGroup($msg_info_arr['send_id'], $result, !empty($msg_info_arr['no_client_id'])?$msg_info_arr['no_client_id']:[]);
$result = Redis::get($redis_name);
if($result) {
$msg_id_arr = json_decode($result, true);
foreach ($msg_id_arr as $k => $msg) {
$redis->get($msg, function ($result, $redis) use($gateway, $k, $msg, $redis_name, $msg_id_arr){
if($result) {
$msg_info_arr = json_decode($result, true);
if($msg_info_arr['send_num'] < self::RETRYCOUNT) {
// 推送类型
$method = '';
switch ($msg_info_arr['method']) {
case 'sendToClient':
$method = 'isOnline';
break;
case 'sendToUid':
$method = 'isUidOnline';
break;
}
if($msg_info_arr['method'] == 'sendToGroup') {
// 发送广播并将 self::$client_id 排除在外
Gateway::sendToGroup($msg_info_arr['send_id'], $result, !empty($msg_info_arr['no_client_id'])?$msg_info_arr['no_client_id']:[]);
// 推送以后新增推送次数
$msg_info_arr['send_num'] += 1;
}else {
// 如果在线就推送
if($method && call_user_func_array([$gateway, $method], [$msg_info_arr['send_id']])) {
// 再次推送
call_user_func_array([$gateway, $msg_info_arr['method']], [$msg_info_arr['send_id'], $result]);
// 推送以后新增推送次数
$msg_info_arr['send_num'] += 1;
}else {
// 如果在线就推送
if($method && call_user_func_array([$gateway, $method], [$msg_info_arr['send_id']])) {
// 再次推送
call_user_func_array([$gateway, $msg_info_arr['method']], [$msg_info_arr['send_id'], $result]);
// 推送以后新增推送次数
$msg_info_arr['send_num'] += 1;
}
}
$redis->set($msg, json_encode($msg_info_arr), 3600);
}else {
// 失败,重发次数达到最大限度值。记录日志并删除数据
$text = '------>无回执----->'. $result . "\r\n";
self::add_log_file($text);
if(strstr($msg,'cashier_order')) {
$serviceLog = [
'queue' => 'test-order.print.queue',
'msg' => $msg_info_arr['data'],
'type' => 'orderPrint',
'plat' => 'Cashier',
'create_time' => date('Y-m-d H:i:s'),
'fail_time' => date('Y-m-d H:i:s'),
'err_info' => '无回执',
];
Db::table('mq_log')->insert($serviceLog);
}
// 删除redis
$redis->del($msg);
print_r('msg---->' . $msg . '已经删除' . date('Y-m-d H:i:s') . "\r\n");
$redis->get($redis_name, function ($result, $redis) use ($redis_name, $msg) {
$result = json_decode($result, true);
if($result) {
$key = array_search($msg, $result);
if($key === false) {
}
$redis->set($msg, json_encode($msg_info_arr), 3600);
}else {
// 失败,重发次数达到最大限度值。记录日志并删除数据
$text = '------>无回执----->'. $result . "\r\n";
self::add_log_file($text);
if(strstr($msg,'cashier_order')) {
$serviceLog = [
'queue' => 'test-order.print.queue',
'msg' => $msg_info_arr['data'],
'type' => 'orderPrint',
'plat' => 'Cashier',
'create_time' => date('Y-m-d H:i:s'),
'fail_time' => date('Y-m-d H:i:s'),
'err_info' => '无回执',
];
Db::table('mq_log')->insert($serviceLog);
}
// 删除redis
$redis->del($msg);
print_r('msg---->' . $msg . '已经删除' . date('Y-m-d H:i:s') . "\r\n");
$redis->get($redis_name, function ($result, $redis) use ($redis_name, $msg) {
$result = json_decode($result, true);
if($result) {
$key = array_search($msg, $result);
if($key === false) {
}else {
print_r('删除前---->' . json_encode($result) . date('Y-m-d H:i:s') . "\r\n");
unset($result[$key]);
print_r('删除后---->' . json_encode($result). date('Y-m-d H:i:s') ."\r\n");
$result = array_values($result);
print_r('排序后---->' . json_encode($result). date('Y-m-d H:i:s') . "\r\n");
if(empty($result)) {
$redis->del($redis_name);
}else {
print_r('删除前---->' . json_encode($result) . date('Y-m-d H:i:s') . "\r\n");
unset($result[$key]);
print_r('删除后---->' . json_encode($result). date('Y-m-d H:i:s') ."\r\n");
$result = array_values($result);
print_r('排序后---->' . json_encode($result). date('Y-m-d H:i:s') . "\r\n");
if(empty($result)) {
$redis->del($redis_name);
}else {
print_r('保存了---->' . json_encode($result). date('Y-m-d H:i:s') . "\r\n");
$redis->set($redis_name, json_encode($result), 3600, function ($r) {
});
}
print_r('保存了---->' . json_encode($result). date('Y-m-d H:i:s') . "\r\n");
$redis->set($redis_name, json_encode($result), 3600, function ($r) {
});
}
}
});
}
}
});
}
});
sleep(1);
}
}
});
sleep(1);
}
});
}
}
// 购物车 多端同步 UID