78 lines
2.8 KiB
PHP
78 lines
2.8 KiB
PHP
<?php
|
||
|
||
namespace app\command;
|
||
|
||
use app\common\library\DatabaseRoute;
|
||
use app\queue\redis\DiscCompensateJob;
|
||
use Symfony\Component\Console\Command\Command;
|
||
use Symfony\Component\Console\Input\InputInterface;
|
||
use Symfony\Component\Console\Input\InputOption;
|
||
use Symfony\Component\Console\Input\InputArgument;
|
||
use Symfony\Component\Console\Output\OutputInterface;
|
||
use support\Log;
|
||
|
||
|
||
class SpinningTask3 extends Command
|
||
{
|
||
protected static $defaultName = 'SpinningTask3';
|
||
protected static $defaultDescription = 'SpinningTask3';
|
||
|
||
/**
|
||
* @return void
|
||
*/
|
||
protected function configure()
|
||
{
|
||
$this->addArgument('name', InputArgument::OPTIONAL, 'Name description');
|
||
}
|
||
|
||
/**
|
||
* @param InputInterface $input
|
||
* @param OutputInterface $output
|
||
* @return int
|
||
*/
|
||
protected function execute(InputInterface $input, OutputInterface $output): int
|
||
{
|
||
$params = 1;
|
||
if($input->hasOption('params')) {
|
||
$params = $input->getOption('params');
|
||
}
|
||
// 计算时间范围(当前时间 - N*5分钟 到 当前时间 - (N*5+15)分钟)
|
||
$now = time();
|
||
$offsetMinutes = (int)$params * -5; // N*5分钟前
|
||
$fiveMinutesAgo = date('Y-m-d H:i:s', strtotime("{$offsetMinutes} minutes", $now));
|
||
$tenMinutesAgo = date('Y-m-d H:i:s', strtotime("-15 minutes", strtotime($fiveMinutesAgo)));
|
||
Log::info("大转盘到账补偿时间范围:{$tenMinutesAgo}-----{$fiveMinutesAgo}");
|
||
$list = DatabaseRoute::getAllDbData('disc_spinning_record', function ($query) use($fiveMinutesAgo, $tenMinutesAgo) {
|
||
return $query->whereNull('target')
|
||
->whereNull('target_id')
|
||
->where('type', 2)
|
||
->where('create_time', '>=', $tenMinutesAgo) // 大于等于(N*5+15)分钟前
|
||
->where('create_time', '<=', $fiveMinutesAgo);
|
||
})->select();
|
||
if($list) {
|
||
$list = $list->toArray();
|
||
Log::info('需要补偿的总条数' . count($list));
|
||
if(count($list) > 0) {
|
||
// 推进队列
|
||
$this->execAsync($list);
|
||
}
|
||
}
|
||
$output->writeln("大转盘到账补偿机制结束");
|
||
Log::info("大转盘到账补偿机制结束");
|
||
$output->writeln('Hello SpinningTask3');
|
||
return self::SUCCESS;
|
||
}
|
||
public function execAsync($list)
|
||
{
|
||
foreach ($list as $k => $data) {
|
||
$queue = class_basename(DiscCompensateJob::class);
|
||
$data[] = [
|
||
'queueId' => uuid(),
|
||
];
|
||
Log::info("消息队列发送消息,对列名: $queue, 携带数据: ".json_encode($data).', 延时时间: 0');
|
||
// 投递延迟消息,消息会在60秒后处理
|
||
\Webman\RedisQueue\Redis::send($queue, $data);
|
||
}
|
||
}
|
||
}
|