This commit is contained in:
parent
3fb7712f29
commit
1a1040b514
|
|
@ -17,7 +17,7 @@ use Symfony\Component\Console\Output\OutputInterface;
|
|||
use support\Log;
|
||||
use Webman\RedisQueue\Redis;
|
||||
|
||||
// 申请新短信模版
|
||||
// 申请新短信模版/营销短信发送
|
||||
class ApplySmsTemp extends Command
|
||||
{
|
||||
protected static $defaultName = 'applysmstemp';
|
||||
|
|
@ -73,11 +73,25 @@ class ApplySmsTemp extends Command
|
|||
]);
|
||||
|
||||
$callback = function ($msg) use ($queue){
|
||||
$date_time = date('Y-m-d H:i:s');
|
||||
$data = $msg->body;
|
||||
Log::info('MQ收到消息[申请新短信模版]--->' . $data . '--->' . $date_time . "\n");
|
||||
// 发给队列
|
||||
Redis::send('apply.sms.temp', $data);
|
||||
Log::info('MQ收到消息[申请新短信模版/营销短信发送/微信模版消息]--->' . $data . "\n");
|
||||
$arr = explode(',', $data);
|
||||
if(is_array($arr)) {
|
||||
$type = $arr[2];
|
||||
if($type == 'applySmsTemp') {
|
||||
// 发给队列 短信模版审核
|
||||
Redis::send('apply.sms.temp', $data);
|
||||
}elseif ($type == 'sendMarkSms') {
|
||||
// 发给队列 营销短信发送
|
||||
Redis::send('send.mark.sms', $data);
|
||||
}elseif ($type == 'sendWechatTemp') {
|
||||
// 发给队列 发送微信模版消息
|
||||
Redis::send('send.wechat.temp', $data);
|
||||
}
|
||||
}else {
|
||||
Log::info('MQ收到消息[申请新短信模版/营销短信发送/微信模版消息]格式错误');
|
||||
return false;
|
||||
}
|
||||
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
|
||||
};
|
||||
$rabbit_channel->basic_consume($queue, '', false, false, false, false, $callback);
|
||||
|
|
|
|||
|
|
@ -93,7 +93,6 @@ class QuerySmsStatus extends Command
|
|||
'pageSize' => 10,
|
||||
'currentPage' => 1,
|
||||
];
|
||||
Log::info('12312312312312' . json_encode($data));
|
||||
$res = AlibabaSms::QuerySendDetails($data);
|
||||
try {
|
||||
Db::startTrans();
|
||||
|
|
|
|||
|
|
@ -1,82 +0,0 @@
|
|||
<?php
|
||||
|
||||
namespace app\command;
|
||||
|
||||
use app\common\model\MqLog;
|
||||
use app\common\model\OrderInfo;
|
||||
use app\common\model\RabbitMqConfig;
|
||||
use ba\Random;
|
||||
use PhpAmqpLib\Connection\AMQPStreamConnection;
|
||||
use support\think\Db;
|
||||
use Symfony\Component\Console\Command\Command;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Input\InputOption;
|
||||
use Symfony\Component\Console\Input\InputArgument;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
use support\Log;
|
||||
use Webman\RedisQueue\Redis;
|
||||
|
||||
// 发送营销短信
|
||||
class SendMarkSms extends Command
|
||||
{
|
||||
protected static $defaultName = 'sendmarksms';
|
||||
protected static $defaultDescription = 'sendmarksms';
|
||||
|
||||
/**
|
||||
* @return void
|
||||
*/
|
||||
protected function configure()
|
||||
{
|
||||
$this->addArgument('name', InputArgument::OPTIONAL, 'Name description');
|
||||
}
|
||||
|
||||
/**
|
||||
* @param InputInterface $input
|
||||
* @param OutputInterface $output
|
||||
* @return int
|
||||
*/
|
||||
protected function execute(InputInterface $input, OutputInterface $output): int
|
||||
{
|
||||
|
||||
$host = RabbitMqConfig::$host;
|
||||
$port = RabbitMqConfig::$port;
|
||||
$user = RabbitMqConfig::$user;
|
||||
$password = RabbitMqConfig::$password;
|
||||
$queue = RabbitMqConfig::$queue_t . '-send.mark.sms';
|
||||
|
||||
// 防止空闲时间断线必须设置心跳
|
||||
$connection = new AMQPStreamConnection($host, $port, $user, $password,
|
||||
'/',
|
||||
false,
|
||||
'AMQPLAIN',
|
||||
null,
|
||||
'en_US',
|
||||
60,
|
||||
60,
|
||||
null,
|
||||
false,
|
||||
30
|
||||
);
|
||||
$rabbit_channel = $connection->channel();
|
||||
$rabbit_channel->queue_declare($queue, false, true, false, false, false, [
|
||||
// 'x-message-ttl' => ['I', 180000]
|
||||
]);
|
||||
|
||||
$callback = function ($msg) use ($queue){
|
||||
$date_time = date('Y-m-d H:i:s');
|
||||
$data = $msg->body;
|
||||
Log::info('MQ收到消息[发送营销短信]--->' . $data . '--->' . $date_time . "\n");
|
||||
// 发给队列
|
||||
Redis::send('send.mark.sms', $data);
|
||||
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
|
||||
};
|
||||
$rabbit_channel->basic_consume($queue, '', false, false, false, false, $callback);
|
||||
while ($rabbit_channel->is_consuming()) {
|
||||
$rabbit_channel->wait();
|
||||
}
|
||||
$rabbit_channel->close();
|
||||
$connection->close();
|
||||
return self::SUCCESS;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,82 +0,0 @@
|
|||
<?php
|
||||
|
||||
namespace app\command;
|
||||
|
||||
use app\common\model\MqLog;
|
||||
use app\common\model\OrderInfo;
|
||||
use app\common\model\RabbitMqConfig;
|
||||
use ba\Random;
|
||||
use PhpAmqpLib\Connection\AMQPStreamConnection;
|
||||
use support\think\Db;
|
||||
use Symfony\Component\Console\Command\Command;
|
||||
use Symfony\Component\Console\Input\InputInterface;
|
||||
use Symfony\Component\Console\Input\InputOption;
|
||||
use Symfony\Component\Console\Input\InputArgument;
|
||||
use Symfony\Component\Console\Output\OutputInterface;
|
||||
use support\Log;
|
||||
use Webman\RedisQueue\Redis;
|
||||
|
||||
// 发送微信模版消息
|
||||
class SendWechatTemp extends Command
|
||||
{
|
||||
protected static $defaultName = 'sendwechattemp';
|
||||
protected static $defaultDescription = 'sendwechattemp';
|
||||
|
||||
/**
|
||||
* @return void
|
||||
*/
|
||||
protected function configure()
|
||||
{
|
||||
$this->addArgument('name', InputArgument::OPTIONAL, 'Name description');
|
||||
}
|
||||
|
||||
/**
|
||||
* @param InputInterface $input
|
||||
* @param OutputInterface $output
|
||||
* @return int
|
||||
*/
|
||||
protected function execute(InputInterface $input, OutputInterface $output): int
|
||||
{
|
||||
|
||||
$host = RabbitMqConfig::$host;
|
||||
$port = RabbitMqConfig::$port;
|
||||
$user = RabbitMqConfig::$user;
|
||||
$password = RabbitMqConfig::$password;
|
||||
$queue = RabbitMqConfig::$queue_t . '-send.wechat.temp';
|
||||
|
||||
// 防止空闲时间断线必须设置心跳
|
||||
$connection = new AMQPStreamConnection($host, $port, $user, $password,
|
||||
'/',
|
||||
false,
|
||||
'AMQPLAIN',
|
||||
null,
|
||||
'en_US',
|
||||
60,
|
||||
60,
|
||||
null,
|
||||
false,
|
||||
30
|
||||
);
|
||||
$rabbit_channel = $connection->channel();
|
||||
$rabbit_channel->queue_declare($queue, false, true, false, false, false, [
|
||||
// 'x-message-ttl' => ['I', 180000]
|
||||
]);
|
||||
|
||||
$callback = function ($msg) use ($queue){
|
||||
$date_time = date('Y-m-d H:i:s');
|
||||
$data = $msg->body;
|
||||
Log::info('MQ收到消息[发送微信模版消息]--->' . $data . '--->' . $date_time . "\n");
|
||||
// 发给队列
|
||||
Redis::send('send.wechat.temp', $data);
|
||||
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
|
||||
};
|
||||
$rabbit_channel->basic_consume($queue, '', false, false, false, false, $callback);
|
||||
while ($rabbit_channel->is_consuming()) {
|
||||
$rabbit_channel->wait();
|
||||
}
|
||||
$rabbit_channel->close();
|
||||
$connection->close();
|
||||
return self::SUCCESS;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -16,23 +16,17 @@ class ExpiredRedis
|
|||
|
||||
// 如果Redis需要密码认证
|
||||
$redis->auth($resis_pwd);
|
||||
|
||||
// 选择数据库(默认0)
|
||||
$redis->select(0);
|
||||
|
||||
// 设置读取超时(-1表示不超时)
|
||||
$redis->setOption(\Redis::OPT_READ_TIMEOUT, -1);
|
||||
|
||||
// 订阅键过期事件频道
|
||||
// __keyevent@0__:expired 表示监听0号数据库的键过期事件
|
||||
$redis->psubscribe(['__keyevent@0__:expired'], function ($pattern, $channel, $key) use($redis){
|
||||
// 这里可以添加你的业务逻辑
|
||||
// 例如根据键名前缀处理不同业务
|
||||
$str = 'expired:sms:';
|
||||
if (strpos($key, $str) === 0) {
|
||||
Log::info ("替换之前: " . $key);
|
||||
$id = str_replace($str, '', $key);
|
||||
Log::info ("替换过的ID: " . $id);
|
||||
// 发给队列
|
||||
Redis::send('send.mark.sms', '1,' . $id);
|
||||
Log::info ("处理订单过期: {$id}\n{}");
|
||||
|
|
|
|||
|
|
@ -9,11 +9,29 @@ class MessagePushTask
|
|||
{
|
||||
public function onWorkerStart()
|
||||
{
|
||||
// 每月1号 查询短信发送状态
|
||||
new Crontab('0 0 1 * *', function(){
|
||||
Log::info('营销接收次数更新开始执行---->');
|
||||
\app\model\SaveUserPushNumber::savePushNumber();
|
||||
Log::info('营销接收次数更新执行结束---->');
|
||||
// 查询短信发送状态
|
||||
new Crontab('0 */5 * * * *', function(){
|
||||
$command = 'cd ' . BASE_PATH . ' && php webman querysmsstatus';
|
||||
// 存储命令输出的数组
|
||||
$output = [];
|
||||
// 存储命令返回状态码(0为成功,非0为失败)
|
||||
$status = 0;
|
||||
// 执行命令
|
||||
exec($command, $output, $status);
|
||||
// 处理执行结果
|
||||
Log::info('查询短信发送状态:执行状态'. $status . '/执行结果:' . json_encode($output, JSON_UNESCAPED_UNICODE));
|
||||
});
|
||||
// 查询模版审核状态
|
||||
new Crontab('0 */5 * * * *', function(){
|
||||
$command = 'cd ' . BASE_PATH . ' && php webman querysmstempstatus';
|
||||
// 存储命令输出的数组
|
||||
$output = [];
|
||||
// 存储命令返回状态码(0为成功,非0为失败)
|
||||
$status = 0;
|
||||
// 执行命令
|
||||
exec($command, $output, $status);
|
||||
// 处理执行结果
|
||||
Log::info('查询模版审核状态:执行状态'. $status . '/执行结果:' . json_encode($output, JSON_UNESCAPED_UNICODE));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
@ -42,7 +42,7 @@ class SendMarkSms implements Consumer
|
|||
// 推给redis
|
||||
$stortime = strtotime($record['send_time']);
|
||||
Redis::setEx('expired:sms:'.$record['id'], $stortime - time(), 1);
|
||||
Log::info('定时发送,已存入redis');
|
||||
Log::info($record . ' 定时发送,已存入redis, 过期时间->>>' .$record['send_time'] . '/' . $stortime - time());
|
||||
}else {
|
||||
// 待发送
|
||||
if($record['status'] == 0) {
|
||||
|
|
|
|||
|
|
@ -61,7 +61,12 @@ return [
|
|||
]
|
||||
]
|
||||
],
|
||||
// redis 过期触发事件
|
||||
'ExpiredRedis' => [
|
||||
'handler' => \app\process\ExpiredRedis::class,
|
||||
],
|
||||
// 定时任务/查询短信发送状态/查询模版审核状态
|
||||
'QuerySmsStatus' => [
|
||||
'handler' => MessagePushTask::class
|
||||
]
|
||||
];
|
||||
|
|
|
|||
Loading…
Reference in New Issue