||
- <?php
- namespace common\libs\taskQueue;
- use common\helpers\Date;
- use common\models\TaskQueue;
- use Yii;
- use yii\base\InvalidConfigException;
- use yii\base\InvalidRouteException;
- use yii\base\StaticInstanceTrait;
- use yii\db\Exception;
- use yii\db\Expression;
- use yii\helpers\Json;
- class Queue
- {
- use StaticInstanceTrait;
- /**
- * 函数任务
- */
- const TYPE_FUNC = 1;
- /**
- * 路由任务
- */
- const TYPE_ROUTE = 2;
- /**
- * 任务队列缓存键
- */
- const REDIS_LIST_KEY = 'taskQueue::list';
- const REDIS_CONTENT_KEY = 'taskQueue::content';
- /**
- * 不循环,执行一次即可
- * 循环周期内容 LOOP_CYCLE_CONTENT 传值:null(不循环,任务执行一次即完成,这种情况TOTAL_LOOP传值1)
- */
- const LOOP_TYPE_NO = 0;
- /**
- * 每年循环一次
- * 循环周期内容 LOOP_CYCLE_CONTENT 传值:月份,日期,时间 例:7,20,10:00:00(每年7月20日10点执行一次任务)
- */
- const LOOP_TYPE_YEAR = 1;
- /**
- * 每月循环一次
- * 循环周期内容 LOOP_CYCLE_CONTENT 传值:日期,时间 例:2,10:00:00(每月2日10点执行一次任务)
- */
- const LOOP_TYPE_MONTH = 2;
- /**
- * 每周循环一次
- * 循环周期内容 LOOP_CYCLE_CONTENT 传值:星期,时间 例:2,10:00:00(每星期二10点执行一次任务,0代表周日)
- */
- const LOOP_TYPE_WEEK = 3;
- /**
- * 每天循环一次
- * 循环周期内容 LOOP_CYCLE_CONTENT 传值:时间 例:10:00:00(每天10点执行一次任务)
- */
- const LOOP_TYPE_DAY = 4;
- /**
- * 每小时循环一次
- * 循环周期内容 LOOP_CYCLE_CONTENT 传值:null 无需传值,直接每小时循环
- */
- const LOOP_TYPE_HOUR = 5;
- /**
- * 非固定循环周期
- * 循环周期内容 LOOP_CYCLE_CONTENT 传值:300,600,900(第一次执行完任务后间隔300秒执行第二次,第二次执行完毕后600秒执行第三次,第三次执行完毕后900秒执行第四次,这种情况共计执行4次任务,TOTAL_LOOP传值4)
- */
- const LOOP_TYPE_UNFIXED = 6;
- /**
- * 添加任务队列
- * @param $type
- * @param $content
- * 1、函数方式 TYPE_FUNC 传值: 'xxx\xxx\ClassName::methodName'或'functionName'
- * 2、路由方式 TYPE_ROUTE 传值: 'controller/action'(这个路由只能是console下的路由)
- * @param $params
- * 参数数组方式传值: ['参数1', '参数2']
- * @param int $loopType
- * 循环类型
- * @param null $loopCycleContent
- * 循环周期内容 请看上述的循环类型常量对应的内容传值
- * @param int $totalLoop
- * 总循环次数,0为不限制次数
- * @param null $mainId
- * 主任务ID
- * @param int $doneTimes
- * 本任务执行次数
- * @param $startedAt
- * 任务开始时间(第一次调用该方法无需传此值,自动计算)
- * @return bool
- */
- public function addTask($type, $content, $params = [], $loopType = self::LOOP_TYPE_NO, $loopCycleContent = null, $totalLoop = 1, $mainId = null, $doneTimes = 1, $startedAt = null){
- if(!is_string($content)){
- // todo 记录错误系统日志要求必须传值任务内容
- return false;
- }
- // 计算下一次执行任务时间
- $timeResult = $this->_calcTime($loopType, $loopCycleContent, $startedAt, $doneTimes);
- $taskModel = new TaskQueue();
- $taskModel->MAIN_ID = $mainId ? $mainId : '1';
- $taskModel->TYPE = $type;
- $taskModel->CONTENT = $content;
- $taskModel->PARAMS = Json::encode($params);
- $taskModel->TOTAL_LOOP = $totalLoop;
- $taskModel->DONE_TIMES = $doneTimes;
- $taskModel->LOOP_TYPE = $loopType;
- $taskModel->LOOP_CYCLE_CONTENT = $loopCycleContent;
- $taskModel->NEXT_AT = $timeResult['nextAt'];
- $taskModel->STARTED_AT = $timeResult['startedAt'];
- $taskModel->CREATED_AT = Date::nowTime();
- if(!$taskModel->save()){
- // todo 记录错误系统日志
- return false;
- }
- if($taskModel->MAIN_ID == '1'){
- $taskModel->MAIN_ID = $taskModel->ID;
- if(!$taskModel->save()){
- // todo 记录错误系统日志
- return false;
- }
- }
- // 第一次任务直接更新Redis缓存中的队列
- $task = TaskQueue::findOneAsArray('ID=:ID', [':ID'=>$taskModel->ID]);
- $this->addTaskToRedis($task);
- unset($taskModel, $task);
- return true;
- }
- /**
- * 消费队列
- * @throws InvalidConfigException
- * @throws InvalidRouteException
- * @throws \yii\console\Exception
- */
- public function consumeTask() {
- // 从缓存获取所有小于当前时间的队列
- $allTasks = Yii::$app->redis->zrangebyscore(self::REDIS_LIST_KEY, 0, Date::nowTime());
- // 循环所有任务
- foreach ($allTasks as $taskId){
- // 清除当前任务队列
- Yii::$app->redis->zrem(self::REDIS_LIST_KEY, $taskId);
- // 获取当前任务内容
- $task = Yii::$app->redis->hget(self::REDIS_CONTENT_KEY, $taskId);
- // 清除当前任务内容
- Yii::$app->redis->hdel(self::REDIS_CONTENT_KEY, $taskId);
- $task = Json::decode($task);
- $this->runTask($task);
- }
- unset($allTasks);
- }
- /**
- * 执行任务
- * @param $task
- * @throws InvalidConfigException
- * @throws InvalidRouteException
- * @throws \yii\console\Exception
- */
- public function runTask($task){
- // 任务类型为函数直接执行函数
- if($task['TYPE'] == self::TYPE_FUNC) {
- call_user_func_array($task['CONTENT'], Json::decode($task['PARAMS']));
- }
- // 任务类型为路由直接执行路由
- elseif($task['TYPE'] == self::TYPE_ROUTE) {
- $parts = Yii::$app->createController($task['CONTENT']);
- if (is_array($parts)) {
- Yii::$app->runAction($task['CONTENT'], Json::decode($task['PARAMS']));
- }
- }
- else {
- // todo 记录错误系统日志
- return ;
- }
- // 任务执行完毕将数据库中的任务标记为已完成
- TaskQueue::updateAll([
- 'FINISHED_AT' => Date::nowTime(),
- ], 'ID=:ID', [':ID'=>$task['ID']]);
- // 查看任务的循环总数比任务循环次数小并且循环总数不等于0的情况则删除所有该任务
- if(($task['TOTAL_LOOP'] != 0) && ($task['TOTAL_LOOP'] <= $task['DONE_TIMES'])) {
- TaskQueue::deleteAll('MAIN_ID=:MAIN_ID', [':MAIN_ID'=>$task['MAIN_ID']]);
- return ;
- }
- // 如果有下次执行时间,则继续添加任务到任务队列
- if($task['NEXT_AT'] > 0){
- $this->addTask($task['TYPE'], $task['CONTENT'], Json::decode($task['PARAMS']), $task['LOOP_TYPE'], $task['LOOP_CYCLE_CONTENT'], $task['TOTAL_LOOP'], $task['MAIN_ID'], $task['DONE_TIMES']+1, $task['NEXT_AT']);
- }
- }
- /**
- * 初始化任务队列的redis缓存
- */
- public function initRedis(){
- // 清空本身全部队列
- Yii::$app->redis->del(self::REDIS_LIST_KEY);
- Yii::$app->redis->del(self::REDIS_CONTENT_KEY);
- // 找出任务列表中所有有效任务(大于当前时间)
- $allTasks = TaskQueue::findAllAsArray('STARTED_AT>=:NOW_TIME', [':NOW_TIME'=>Date::nowTime()]);
- // 循环把这些任务加入到redis中
- foreach ($allTasks as $task){
- $this->addTaskToRedis($task);
- }
- // 清理掉数据库中所有已经完成并且过时的任务
- TaskQueue::deleteAll('FINISHED_AT<>0 AND STARTED_AT<:NOW_TIME', [':NOW_TIME'=>Date::nowTime()]);
- unset($allTasks);
- }
- /**
- * 把任务添加到redis队列中
- * @param $task
- */
- public function addTaskToRedis($task){
- Yii::$app->redis->zadd(self::REDIS_LIST_KEY, intval($task['STARTED_AT']), $task['ID']);
- Yii::$app->redis->hset(self::REDIS_CONTENT_KEY, $task['ID'], Json::encode($task));
- }
- /**
- * 计算开始和下次一次时间
- * @param $loopType
- * @param $loopCycleContent
- * @param $startedAt
- * @param $doneTimes
- * @return array
- */
- private function _calcTime($loopType, $loopCycleContent, $startedAt, $doneTimes){
- if($loopCycleContent){
- $loopCycleContent = explode(',', $loopCycleContent);
- }
- $baseTime = Date::nowTime();
- if($startedAt !== null){
- $baseTime = $startedAt;
- }
- switch($loopType){
- case self::LOOP_TYPE_YEAR:
- // 例:7,20,10:00:00(每年7月20日10点执行一次任务)
- // 本年的要执行任务的时间
- $nowYear = date('Y', $baseTime);
- $nextYear = intval($nowYear) + 1;
- $nextNextYear = intval($nowYear) + 2;
- $nowYearTaskTime = strtotime($nowYear.'-'.$loopCycleContent[0].'-'.$loopCycleContent[1].' '.$loopCycleContent[2]);
- $nextYearTaskTime = strtotime($nextYear.'-'.$loopCycleContent[0].'-'.$loopCycleContent[1].' '.$loopCycleContent[2]);
- $nextNextYearTaskTime = strtotime($nextNextYear.'-'.$loopCycleContent[0].'-'.$loopCycleContent[1].' '.$loopCycleContent[2]);
- if($startedAt === null){
- // 当前时间是否大于这个日期
- if(Date::nowTime() <= $nowYearTaskTime){
- $startedAt = $nowYearTaskTime;
- $nextAt = $nextYearTaskTime;
- } else {
- $startedAt = $nextYearTaskTime;
- $nextAt = $nextNextYearTaskTime;
- }
- } else {
- $nextAt = $nextYearTaskTime;
- }
- break;
- case self::LOOP_TYPE_MONTH:
- // 例:2,10:00:00(每月2日10点执行一次任务)
- $nowMonthDate = \date('Y-m-'.$loopCycleContent[0].' '.$loopCycleContent[1], $baseTime);
- $nowMonthTime = strtotime($nowMonthDate);
- $nextMonthTime = strtotime(date('Y-m-d '.$loopCycleContent[1], Date::nextMonthDay($nowMonthTime)));
- $nextNextMonthTime = strtotime(date('Y-m-d '.$loopCycleContent[1], Date::nextMonthDay($nextMonthTime)));
- if($startedAt == null){
- if(Date::nowTime() <= $nowMonthTime){
- $startedAt = $nowMonthTime;
- $nextAt = $nextMonthTime;
- } else {
- $startedAt = $nextMonthTime;
- $nextAt = $nextNextMonthTime;
- }
- } else {
- $nextAt = $nextMonthTime;
- }
- break;
- case self::LOOP_TYPE_WEEK:
- // 例:2,10:00:00(每星期二10点执行一次任务,0代表周日)
- $weekStr = 'sunday';
- switch ($loopCycleContent[0]){
- case 0:
- $weekStr = 'sunday';
- break;
- case 1:
- $weekStr = 'monday';
- break;
- case 2:
- $weekStr = 'tuesday';
- break;
- case 3:
- $weekStr = 'wednesday';
- break;
- case 4:
- $weekStr = 'thursday';
- break;
- case 5:
- $weekStr = 'friday';
- break;
- case 6:
- $weekStr = 'saturday';
- break;
- }
- if($startedAt === null) {
- $startedAtDay = strtotime('next '.$weekStr, $baseTime);
- $startedAt = strtotime(date('Y-m-d '.$loopCycleContent[1], $startedAtDay));
- $nextAt = $startedAt + 7 * 24 * 60 * 60;
- } else {
- $nextAtDay = strtotime('next '.$weekStr, $baseTime);
- $nextAt = strtotime(date('Y-m-d '.$loopCycleContent[1], $nextAtDay));
- }
- break;
- case self::LOOP_TYPE_DAY:
- // 例:10:00:00(每天10点执行一次任务)
- $todayHourTime = strtotime(\date('Y-m-d '.$loopCycleContent[0], $baseTime));
- if($startedAt === null){
- if($baseTime < $todayHourTime){
- $startedAt = $todayHourTime;
- } else {
- $startedAt = $todayHourTime + 24 * 60 * 60;
- }
- }
- $nextAt = $startedAt + 24 * 60 * 60;
- break;
- case self::LOOP_TYPE_HOUR:
- if($startedAt === null){
- $startedAt = $baseTime;
- }
- $nextAt = $startedAt + 60 * 60;
- break;
- case self::LOOP_TYPE_UNFIXED:
- // 传值:300,600,900(第一次执行完任务后间隔300秒执行第二次,第二次执行完毕后600秒执行第三次,第三次执行完毕后900秒执行第四次,这种情况共计执行4次任务,TOTAL_LOOP传值4)
- if($startedAt === null){
- $startedAt = $baseTime;
- }
- if(!isset($loopCycleContent[$doneTimes-1])){
- $nextAt = 0;
- } else {
- $nextAt = $startedAt + $loopCycleContent[$doneTimes-1];
- }
- break;
- default:
- $nextAt = 0;
- $loopCycleContent = null;
- $startedAt = $baseTime;
- break;
- }
- return [
- 'startedAt' => $startedAt,
- 'nextAt' => $nextAt,
- ];
- }
- }
|