addArgument('name', InputArgument::OPTIONAL, 'Name description'); } /** * @param InputInterface $input * @param OutputInterface $output * @return int */ protected function execute(InputInterface $input, OutputInterface $output): int { $params = 1; if($input->hasOption('params')) { $params = $input->getOption('params'); } // 计算时间范围(当前时间 - N*5分钟 到 当前时间 - (N*5+15)分钟) $now = time(); $offsetMinutes = (int)$params * -5; // N*5分钟前 $fiveMinutesAgo = date('Y-m-d H:i:s', strtotime("{$offsetMinutes} minutes", $now)); $tenMinutesAgo = date('Y-m-d H:i:s', strtotime("-15 minutes", strtotime($fiveMinutesAgo))); Log::info("大转盘到账补偿时间范围:{$tenMinutesAgo}-----{$fiveMinutesAgo}"); $list = DatabaseRoute::getAllDbData('disc_spinning_record', function ($query) use($fiveMinutesAgo, $tenMinutesAgo) { return $query->whereNull('target') ->whereNull('target_id') ->where('type', 2) ->where('create_time', '>=', $tenMinutesAgo) // 大于等于(N*5+15)分钟前 ->where('create_time', '<=', $fiveMinutesAgo); })->select(); if($list) { $list = $list->toArray(); Log::info('需要补偿的总条数' . count($list)); if(count($list) > 0) { // 推进队列 $this->execAsync($list); } } $output->writeln("大转盘到账补偿机制结束"); Log::info("大转盘到账补偿机制结束"); $output->writeln('Hello SpinningTask3'); return self::SUCCESS; } public function execAsync($list) { foreach ($list as $k => $data) { $queue = class_basename(DiscCompensateJob::class); $data[] = [ 'queueId' => uuid(), ]; Log::info("消息队列发送消息,对列名: $queue, 携带数据: ".json_encode($data).', 延时时间: 0'); // 投递延迟消息,消息会在60秒后处理 \Webman\RedisQueue\Redis::send($queue, $data); } } }