From 961550196e181ec8c5dd92745dbcc77b012a0fdf Mon Sep 17 00:00:00 2001 From: ASUS <515617283@qq.com> Date: Mon, 13 Apr 2026 09:12:51 +0800 Subject: [PATCH] =?UTF-8?q?=E8=80=97=E6=9D=90=E7=9B=91=E6=8E=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/consinfochangequeue.php | 112 ++++++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 scripts/consinfochangequeue.php diff --git a/scripts/consinfochangequeue.php b/scripts/consinfochangequeue.php new file mode 100644 index 0000000..826d98d --- /dev/null +++ b/scripts/consinfochangequeue.php @@ -0,0 +1,112 @@ +channel(); + $rabbit_channel->queue_declare($queue, false, true, false, false); + + $callback = function ($msg) use ($queue){ + $date_time = date('Y-m-d H:i:s'); + $data = $msg->body; + Log::info('MQ收到消息[耗材监控]--->' . $data . '--->' . $date_time . "\n"); + // 如果是数字。则打印订单 + $is_log = false; + $curl_error = ''; + if(is_numeric($data)) { + $rand = 'con_update' . Random::build(); + $send_id = 'product_update'. $data; + $method = 'sendToGroup'; + $snd_data = [ + 'msg' => '商品更新', + 'type' => 'product', + 'operate_type' => 'product_update', + 'data_type' => 'product_update', + 'status' => 1, + 'method' => $method, + 'send_num' => 0, + 'send_id' => $send_id, + 'msg_id' => $rand, + 'data' => $data + ]; + $snd_data_json = json_encode($snd_data); + Gateway::$registerAddress = '127.0.0.1:1238'; + $res = Gateway::sendToGroup($send_id, $snd_data_json); + Log::info('耗材监控推送结果-->' . $res); + $result = Redis::get($snd_data['send_id']); + if($result) { + $msg_id_arr = json_decode($result, true); + $msg_id_arr_c = count($msg_id_arr); + $msg_id_arr[$msg_id_arr_c] = $snd_data; + $result_n = json_encode($msg_id_arr); + \extend\workermans\model\Base::setredis_new($result_n, $snd_data['send_id']); + }else { + $params_arr_n[] = $snd_data; + print_r('数组-------->' . json_encode($params_arr_n)) . "\r\n"; + \extend\workermans\model\Base::setredis_new(json_encode($params_arr_n), $snd_data['send_id']); + } + + + +// $url = 'http://127.0.0.1:8686/?method=' . $method . '&account='. $send_id .'¶ms=' . $snd_data_json; // 替换为你的服务地址和端口 +// $ch = curl_init(); +// curl_setopt($ch, CURLOPT_URL, $url); +// curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1); +// $output = curl_exec($ch); +// if (curl_errno($ch)) { +// $is_log = true; +// $curl_error = 'Curl error: ' . curl_error($ch); +// } +// curl_close($ch); +// Log::info('内部通讯结果(商品更新)--->' . $output . '通讯ID--->shop_id-->' .$send_id. '---' . date('Y-m-d H:i:s') . "\n"); + if($is_log) { + Db::table('tb_mq_log')->insert([ + 'queue' => $queue, + 'msg' => $data, + 'type' => 'product_update', + 'plat' => 'product', + 'create_time' => date('Y-m-d H:i:s'), + 'fail_time' => date('Y-m-d H:i:s'), + 'err_info' => $curl_error, + ]); + } + } + $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); + }; + $rabbit_channel->basic_consume($queue, '', false, false, false, false, $callback); + while ($rabbit_channel->is_consuming()) { + Log::info('cons.info.change.queue-MQ准备调用wait'); + $rabbit_channel->wait(); + } + $rabbit_channel->close(); + $connection->close(); + }catch (Exception $exception) { + Log::info('cons.info.change.queue-MQ错误' . $exception->getMessage()); + }