_calcTime($loopType, $loopCycleContent, $startedAt, $doneTimes); $taskModel = new TaskQueue(); $taskModel->MAIN_ID = $mainId ? $mainId : '1'; $taskModel->TYPE = $type; $taskModel->CONTENT = $content; $taskModel->PARAMS = Json::encode($params); $taskModel->TOTAL_LOOP = $totalLoop; $taskModel->DONE_TIMES = $doneTimes; $taskModel->LOOP_TYPE = $loopType; $taskModel->LOOP_CYCLE_CONTENT = $loopCycleContent; $taskModel->NEXT_AT = $timeResult['nextAt']; $taskModel->STARTED_AT = $timeResult['startedAt']; $taskModel->CREATED_AT = Date::nowTime(); if(!$taskModel->save()){ // todo 记录错误系统日志 return false; } if($taskModel->MAIN_ID == '1'){ $taskModel->MAIN_ID = $taskModel->ID; if(!$taskModel->save()){ // todo 记录错误系统日志 return false; } } // 第一次任务直接更新Redis缓存中的队列 $task = TaskQueue::findOneAsArray('ID=:ID', [':ID'=>$taskModel->ID]); $this->addTaskToRedis($task); unset($taskModel, $task); return true; } /** * 消费队列 * @throws InvalidConfigException * @throws InvalidRouteException * @throws \yii\console\Exception */ public function consumeTask() { // 从缓存获取所有小于当前时间的队列 $allTasks = Yii::$app->redis->zrangebyscore(self::REDIS_LIST_KEY, 0, Date::nowTime()); // 循环所有任务 foreach ($allTasks as $taskId){ // 清除当前任务队列 Yii::$app->redis->zrem(self::REDIS_LIST_KEY, $taskId); // 获取当前任务内容 $task = Yii::$app->redis->hget(self::REDIS_CONTENT_KEY, $taskId); // 清除当前任务内容 Yii::$app->redis->hdel(self::REDIS_CONTENT_KEY, $taskId); $task = Json::decode($task); $this->runTask($task); } unset($allTasks); } /** * 执行任务 * @param $task * @throws InvalidConfigException * @throws InvalidRouteException * @throws \yii\console\Exception */ public function runTask($task){ // 任务类型为函数直接执行函数 if($task['TYPE'] == self::TYPE_FUNC) { call_user_func_array($task['CONTENT'], Json::decode($task['PARAMS'])); } // 任务类型为路由直接执行路由 elseif($task['TYPE'] == self::TYPE_ROUTE) { $parts = Yii::$app->createController($task['CONTENT']); if (is_array($parts)) { Yii::$app->runAction($task['CONTENT'], Json::decode($task['PARAMS'])); } } else { // todo 记录错误系统日志 return ; } // 任务执行完毕将数据库中的任务标记为已完成 TaskQueue::updateAll([ 'FINISHED_AT' => Date::nowTime(), ], 'ID=:ID', [':ID'=>$task['ID']]); // 查看任务的循环总数比任务循环次数小并且循环总数不等于0的情况则删除所有该任务 if(($task['TOTAL_LOOP'] != 0) && ($task['TOTAL_LOOP'] <= $task['DONE_TIMES'])) { TaskQueue::deleteAll('MAIN_ID=:MAIN_ID', [':MAIN_ID'=>$task['MAIN_ID']]); return ; } // 如果有下次执行时间,则继续添加任务到任务队列 if($task['NEXT_AT'] > 0){ $this->addTask($task['TYPE'], $task['CONTENT'], Json::decode($task['PARAMS']), $task['LOOP_TYPE'], $task['LOOP_CYCLE_CONTENT'], $task['TOTAL_LOOP'], $task['MAIN_ID'], $task['DONE_TIMES']+1, $task['NEXT_AT']); } } /** * 初始化任务队列的redis缓存 */ public function initRedis(){ // 清空本身全部队列 Yii::$app->redis->del(self::REDIS_LIST_KEY); Yii::$app->redis->del(self::REDIS_CONTENT_KEY); // 找出任务列表中所有有效任务(大于当前时间) $allTasks = TaskQueue::findAllAsArray('STARTED_AT>=:NOW_TIME', [':NOW_TIME'=>Date::nowTime()]); // 循环把这些任务加入到redis中 foreach ($allTasks as $task){ $this->addTaskToRedis($task); } // 清理掉数据库中所有已经完成并且过时的任务 TaskQueue::deleteAll('FINISHED_AT<>0 AND STARTED_AT<:NOW_TIME', [':NOW_TIME'=>Date::nowTime()]); unset($allTasks); } /** * 把任务添加到redis队列中 * @param $task */ public function addTaskToRedis($task){ Yii::$app->redis->zadd(self::REDIS_LIST_KEY, intval($task['STARTED_AT']), $task['ID']); Yii::$app->redis->hset(self::REDIS_CONTENT_KEY, $task['ID'], Json::encode($task)); } /** * 计算开始和下次一次时间 * @param $loopType * @param $loopCycleContent * @param $startedAt * @param $doneTimes * @return array */ private function _calcTime($loopType, $loopCycleContent, $startedAt, $doneTimes){ if($loopCycleContent){ $loopCycleContent = explode(',', $loopCycleContent); } $baseTime = Date::nowTime(); if($startedAt !== null){ $baseTime = $startedAt; } switch($loopType){ case self::LOOP_TYPE_YEAR: // 例:7,20,10:00:00(每年7月20日10点执行一次任务) // 本年的要执行任务的时间 $nowYear = date('Y', $baseTime); $nextYear = intval($nowYear) + 1; $nextNextYear = intval($nowYear) + 2; $nowYearTaskTime = strtotime($nowYear.'-'.$loopCycleContent[0].'-'.$loopCycleContent[1].' '.$loopCycleContent[2]); $nextYearTaskTime = strtotime($nextYear.'-'.$loopCycleContent[0].'-'.$loopCycleContent[1].' '.$loopCycleContent[2]); $nextNextYearTaskTime = strtotime($nextNextYear.'-'.$loopCycleContent[0].'-'.$loopCycleContent[1].' '.$loopCycleContent[2]); if($startedAt === null){ // 当前时间是否大于这个日期 if(Date::nowTime() <= $nowYearTaskTime){ $startedAt = $nowYearTaskTime; $nextAt = $nextYearTaskTime; } else { $startedAt = $nextYearTaskTime; $nextAt = $nextNextYearTaskTime; } } else { $nextAt = $nextYearTaskTime; } break; case self::LOOP_TYPE_MONTH: // 例:2,10:00:00(每月2日10点执行一次任务) $nowMonthDate = \date('Y-m-'.$loopCycleContent[0].' '.$loopCycleContent[1], $baseTime); $nowMonthTime = strtotime($nowMonthDate); $nextMonthTime = strtotime(date('Y-m-d '.$loopCycleContent[1], Date::nextMonthDay($nowMonthTime))); $nextNextMonthTime = strtotime(date('Y-m-d '.$loopCycleContent[1], Date::nextMonthDay($nextMonthTime))); if($startedAt == null){ if(Date::nowTime() <= $nowMonthTime){ $startedAt = $nowMonthTime; $nextAt = $nextMonthTime; } else { $startedAt = $nextMonthTime; $nextAt = $nextNextMonthTime; } } else { $nextAt = $nextMonthTime; } break; case self::LOOP_TYPE_WEEK: // 例:2,10:00:00(每星期二10点执行一次任务,0代表周日) $weekStr = 'sunday'; switch ($loopCycleContent[0]){ case 0: $weekStr = 'sunday'; break; case 1: $weekStr = 'monday'; break; case 2: $weekStr = 'tuesday'; break; case 3: $weekStr = 'wednesday'; break; case 4: $weekStr = 'thursday'; break; case 5: $weekStr = 'friday'; break; case 6: $weekStr = 'saturday'; break; } if($startedAt === null) { $startedAtDay = strtotime('next '.$weekStr, $baseTime); $startedAt = strtotime(date('Y-m-d '.$loopCycleContent[1], $startedAtDay)); $nextAt = $startedAt + 7 * 24 * 60 * 60; } else { $nextAtDay = strtotime('next '.$weekStr, $baseTime); $nextAt = strtotime(date('Y-m-d '.$loopCycleContent[1], $nextAtDay)); } break; case self::LOOP_TYPE_DAY: // 例:10:00:00(每天10点执行一次任务) $todayHourTime = strtotime(\date('Y-m-d '.$loopCycleContent[0], $baseTime)); if($startedAt === null){ if($baseTime < $todayHourTime){ $startedAt = $todayHourTime; } else { $startedAt = $todayHourTime + 24 * 60 * 60; } } $nextAt = $startedAt + 24 * 60 * 60; break; case self::LOOP_TYPE_HOUR: if($startedAt === null){ $startedAt = $baseTime; } $nextAt = $startedAt + 60 * 60; break; case self::LOOP_TYPE_UNFIXED: // 传值:300,600,900(第一次执行完任务后间隔300秒执行第二次,第二次执行完毕后600秒执行第三次,第三次执行完毕后900秒执行第四次,这种情况共计执行4次任务,TOTAL_LOOP传值4) if($startedAt === null){ $startedAt = $baseTime; } if(!isset($loopCycleContent[$doneTimes-1])){ $nextAt = 0; } else { $nextAt = $startedAt + $loopCycleContent[$doneTimes-1]; } break; default: $nextAt = 0; $loopCycleContent = null; $startedAt = $baseTime; break; } return [ 'startedAt' => $startedAt, 'nextAt' => $nextAt, ]; } }