兼容修改
This commit is contained in:
@@ -1,20 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace app\queue;
|
||||
|
||||
use app\api\model\Orders;
|
||||
use app\api\model\UserMoney;
|
||||
use app\common\library\DatabaseRoute;
|
||||
use think\facade\Log;
|
||||
use think\queue\Job;
|
||||
|
||||
/**
|
||||
* 奖项领取
|
||||
*/
|
||||
class ActivitiesQueue extends BaseQueue
|
||||
{
|
||||
public function run(Job $job, $data)
|
||||
{
|
||||
Orders::activities($data['userInfo'], $data['sourceUser']);
|
||||
}
|
||||
}
|
||||
@@ -1,30 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace app\queue;
|
||||
|
||||
use think\facade\Log;
|
||||
use think\queue\Job;
|
||||
|
||||
abstract class BaseQueue
|
||||
{
|
||||
public function fire(Job $job, $data)
|
||||
{
|
||||
$start = microtime(true);
|
||||
try {
|
||||
Log::info("消息队列接收到消息,当前队列: ".self::class.", 携带数据: ".json_encode($data));
|
||||
$this->run($job, $data);
|
||||
Log::info("消息队列执行成功:" . static::class);
|
||||
$job->delete();
|
||||
} catch (\Throwable $e) {
|
||||
Log::error("消息队列执行异常:" . $e->getMessage());
|
||||
Log::info($e->getTraceAsString());
|
||||
$job->release(10); // 或 $job->fail()
|
||||
}
|
||||
|
||||
$end = microtime(true);
|
||||
Log::info("消息队列执行完毕, 耗时:" . ($end - $start) . 's');
|
||||
}
|
||||
|
||||
// 子类实现具体逻辑
|
||||
abstract public function run(Job $job, $data);
|
||||
}
|
||||
@@ -1,26 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace app\queue;
|
||||
|
||||
use app\admin\model\DiscSpinning;
|
||||
use think\facade\Log;
|
||||
use think\queue\Job;
|
||||
|
||||
class DiscCompensateJob
|
||||
{
|
||||
|
||||
public function fire(Job $job, $data) {
|
||||
try {
|
||||
Log::write('准备处理DiscCompensateJob' . json_encode($data, JSON_UNESCAPED_UNICODE));
|
||||
DiscSpinning::receive1($data);
|
||||
$job->delete(); // 处理成功删除任务
|
||||
} catch (\Exception $e) {
|
||||
if ($job->attempts() < 3) {
|
||||
$job->release(5); // 重试3次,间隔5秒
|
||||
} else {
|
||||
$job->delete();
|
||||
Log::error("大转盘补偿任务最终失败:ID={$data['id']}");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
18
app/queue/redis/ActivitiesQueue.php
Normal file
18
app/queue/redis/ActivitiesQueue.php
Normal file
@@ -0,0 +1,18 @@
|
||||
<?php
|
||||
|
||||
namespace app\queue\redis;
|
||||
|
||||
use support\Log;
|
||||
use Webman\RedisQueue\Consumer;
|
||||
|
||||
/**
|
||||
* 奖项领取
|
||||
*/
|
||||
class ActivitiesQueue extends BaseQueue implements Consumer
|
||||
{
|
||||
public function run($data)
|
||||
{
|
||||
Log::info("哈哈哈哈");
|
||||
// Orders::activities($data['userInfo'], $data['sourceUser']);
|
||||
}
|
||||
}
|
||||
37
app/queue/redis/BaseQueue.php
Normal file
37
app/queue/redis/BaseQueue.php
Normal file
@@ -0,0 +1,37 @@
|
||||
<?php
|
||||
|
||||
namespace app\queue\redis;
|
||||
|
||||
|
||||
use support\Log;
|
||||
use Webman\RedisQueue\Consumer;
|
||||
|
||||
abstract class BaseQueue
|
||||
{
|
||||
public $queue;
|
||||
public $connection = 'default';
|
||||
|
||||
public function __construct()
|
||||
{
|
||||
$this->queue = class_basename(static::class);
|
||||
Log::info("消息队列启动成功,".$this->queue);
|
||||
}
|
||||
|
||||
public function consume( $data)
|
||||
{
|
||||
$start = microtime(true);
|
||||
Log::info("消息队列接收到消息,当前队列: ".self::class.", 携带数据: ".json_encode($data));
|
||||
$this->run($data);
|
||||
$end = microtime(true);
|
||||
Log::info("消息队列执行完毕, 耗时:" . ($end - $start) . 's');
|
||||
}
|
||||
|
||||
public function onConsumeFailure(\Throwable $e, $package)
|
||||
{
|
||||
Log::error("消息队列执行异常:" . $e->getMessage());
|
||||
Log::info($e->getTraceAsString());
|
||||
}
|
||||
|
||||
// 子类实现具体逻辑
|
||||
abstract public function run($data);
|
||||
}
|
||||
16
app/queue/redis/DiscCompensateJob.php
Normal file
16
app/queue/redis/DiscCompensateJob.php
Normal file
@@ -0,0 +1,16 @@
|
||||
<?php
|
||||
|
||||
namespace app\queue\redis;
|
||||
|
||||
use app\admin\model\DiscSpinning;
|
||||
use support\Log;
|
||||
use Webman\RedisQueue\Consumer;
|
||||
|
||||
class DiscCompensateJob extends BaseQueue implements Consumer
|
||||
{
|
||||
|
||||
public function run( $data) {
|
||||
Log::write('准备处理DiscCompensateJob' . json_encode($data, JSON_UNESCAPED_UNICODE));
|
||||
DiscSpinning::receive1($data);
|
||||
}
|
||||
}
|
||||
@@ -1,6 +1,6 @@
|
||||
<?php
|
||||
|
||||
namespace app\queue;
|
||||
namespace app\queue\redis;
|
||||
|
||||
use app\api\model\UserMoney;
|
||||
use app\common\library\DatabaseRoute;
|
||||
@@ -12,7 +12,7 @@ use think\queue\Job;
|
||||
*/
|
||||
class DiscReceiveQueue extends BaseQueue
|
||||
{
|
||||
public function run(Job $job, $data)
|
||||
public function run( $data)
|
||||
{
|
||||
$drawsInfo = $data['draws'];
|
||||
if ($drawsInfo['type'] != 2) {
|
||||
@@ -1,19 +1,20 @@
|
||||
<?php
|
||||
|
||||
namespace app\queue;
|
||||
namespace app\queue\redis;
|
||||
|
||||
use app\admin\controller\Cash;
|
||||
use app\api\model\UserMoney;
|
||||
use app\common\library\DatabaseRoute;
|
||||
use think\facade\Log;
|
||||
use think\queue\Job;
|
||||
use Webman\RedisQueue\Consumer;
|
||||
|
||||
/**
|
||||
* 奖项领取
|
||||
*/
|
||||
class UserPushQueue extends BaseQueue
|
||||
class UserPushQueue extends BaseQueue implements Consumer
|
||||
{
|
||||
public function run(Job $job, $data)
|
||||
public function run($data)
|
||||
{
|
||||
$userInfoList = $data['list'];
|
||||
foreach ($userInfoList as $userInfo) {
|
||||
Reference in New Issue
Block a user