Queue.php 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. <?php
  2. namespace common\libs\taskQueue;
  3. use common\helpers\Date;
  4. use common\models\TaskQueue;
  5. use Yii;
  6. use yii\base\InvalidConfigException;
  7. use yii\base\InvalidRouteException;
  8. use yii\base\StaticInstanceTrait;
  9. use yii\db\Exception;
  10. use yii\db\Expression;
  11. use yii\helpers\Json;
  12. class Queue
  13. {
  14. use StaticInstanceTrait;
  15. /**
  16. * 函数任务
  17. */
  18. const TYPE_FUNC = 1;
  19. /**
  20. * 路由任务
  21. */
  22. const TYPE_ROUTE = 2;
  23. /**
  24. * 任务队列缓存键
  25. */
  26. const REDIS_LIST_KEY = 'taskQueue::list';
  27. const REDIS_CONTENT_KEY = 'taskQueue::content';
  28. /**
  29. * 不循环,执行一次即可
  30. * 循环周期内容 LOOP_CYCLE_CONTENT 传值:null(不循环,任务执行一次即完成,这种情况TOTAL_LOOP传值1)
  31. */
  32. const LOOP_TYPE_NO = 0;
  33. /**
  34. * 每年循环一次
  35. * 循环周期内容 LOOP_CYCLE_CONTENT 传值:月份,日期,时间 例:7,20,10:00:00(每年7月20日10点执行一次任务)
  36. */
  37. const LOOP_TYPE_YEAR = 1;
  38. /**
  39. * 每月循环一次
  40. * 循环周期内容 LOOP_CYCLE_CONTENT 传值:日期,时间 例:2,10:00:00(每月2日10点执行一次任务)
  41. */
  42. const LOOP_TYPE_MONTH = 2;
  43. /**
  44. * 每周循环一次
  45. * 循环周期内容 LOOP_CYCLE_CONTENT 传值:星期,时间 例:2,10:00:00(每星期二10点执行一次任务,0代表周日)
  46. */
  47. const LOOP_TYPE_WEEK = 3;
  48. /**
  49. * 每天循环一次
  50. * 循环周期内容 LOOP_CYCLE_CONTENT 传值:时间 例:10:00:00(每天10点执行一次任务)
  51. */
  52. const LOOP_TYPE_DAY = 4;
  53. /**
  54. * 每小时循环一次
  55. * 循环周期内容 LOOP_CYCLE_CONTENT 传值:null 无需传值,直接每小时循环
  56. */
  57. const LOOP_TYPE_HOUR = 5;
  58. /**
  59. * 非固定循环周期
  60. * 循环周期内容 LOOP_CYCLE_CONTENT 传值:300,600,900(第一次执行完任务后间隔300秒执行第二次,第二次执行完毕后600秒执行第三次,第三次执行完毕后900秒执行第四次,这种情况共计执行4次任务,TOTAL_LOOP传值4)
  61. */
  62. const LOOP_TYPE_UNFIXED = 6;
  63. /**
  64. * 添加任务队列
  65. * @param $type
  66. * @param $content
  67. * 1、函数方式 TYPE_FUNC 传值: 'xxx\xxx\ClassName::methodName'或'functionName'
  68. * 2、路由方式 TYPE_ROUTE 传值: 'controller/action'(这个路由只能是console下的路由)
  69. * @param $params
  70. * 参数数组方式传值: ['参数1', '参数2']
  71. * @param int $loopType
  72. * 循环类型
  73. * @param null $loopCycleContent
  74. * 循环周期内容 请看上述的循环类型常量对应的内容传值
  75. * @param int $totalLoop
  76. * 总循环次数,0为不限制次数
  77. * @param null $mainId
  78. * 主任务ID
  79. * @param int $doneTimes
  80. * 本任务执行次数
  81. * @param $startedAt
  82. * 任务开始时间(第一次调用该方法无需传此值,自动计算)
  83. * @return bool
  84. */
  85. public function addTask($type, $content, $params = [], $loopType = self::LOOP_TYPE_NO, $loopCycleContent = null, $totalLoop = 1, $mainId = null, $doneTimes = 1, $startedAt = null){
  86. if(!is_string($content)){
  87. // todo 记录错误系统日志要求必须传值任务内容
  88. return false;
  89. }
  90. // 计算下一次执行任务时间
  91. $timeResult = $this->_calcTime($loopType, $loopCycleContent, $startedAt, $doneTimes);
  92. $taskModel = new TaskQueue();
  93. $taskModel->MAIN_ID = $mainId ? $mainId : '1';
  94. $taskModel->TYPE = $type;
  95. $taskModel->CONTENT = $content;
  96. $taskModel->PARAMS = Json::encode($params);
  97. $taskModel->TOTAL_LOOP = $totalLoop;
  98. $taskModel->DONE_TIMES = $doneTimes;
  99. $taskModel->LOOP_TYPE = $loopType;
  100. $taskModel->LOOP_CYCLE_CONTENT = $loopCycleContent;
  101. $taskModel->NEXT_AT = $timeResult['nextAt'];
  102. $taskModel->STARTED_AT = $timeResult['startedAt'];
  103. $taskModel->CREATED_AT = Date::nowTime();
  104. if(!$taskModel->save()){
  105. // todo 记录错误系统日志
  106. return false;
  107. }
  108. if($taskModel->MAIN_ID == '1'){
  109. $taskModel->MAIN_ID = $taskModel->ID;
  110. if(!$taskModel->save()){
  111. // todo 记录错误系统日志
  112. return false;
  113. }
  114. }
  115. // 第一次任务直接更新Redis缓存中的队列
  116. $task = TaskQueue::findOneAsArray('ID=:ID', [':ID'=>$taskModel->ID]);
  117. $this->addTaskToRedis($task);
  118. unset($taskModel, $task);
  119. return true;
  120. }
  121. /**
  122. * 消费队列
  123. * @throws InvalidConfigException
  124. * @throws InvalidRouteException
  125. * @throws \yii\console\Exception
  126. */
  127. public function consumeTask() {
  128. // 从缓存获取所有小于当前时间的队列
  129. $allTasks = Yii::$app->redis->zrangebyscore(self::REDIS_LIST_KEY, 0, Date::nowTime());
  130. // 循环所有任务
  131. foreach ($allTasks as $taskId){
  132. // 清除当前任务队列
  133. Yii::$app->redis->zrem(self::REDIS_LIST_KEY, $taskId);
  134. // 获取当前任务内容
  135. $task = Yii::$app->redis->hget(self::REDIS_CONTENT_KEY, $taskId);
  136. // 清除当前任务内容
  137. Yii::$app->redis->hdel(self::REDIS_CONTENT_KEY, $taskId);
  138. $task = Json::decode($task);
  139. $this->runTask($task);
  140. }
  141. unset($allTasks);
  142. }
  143. /**
  144. * 执行任务
  145. * @param $task
  146. * @throws InvalidConfigException
  147. * @throws InvalidRouteException
  148. * @throws \yii\console\Exception
  149. */
  150. public function runTask($task){
  151. // 任务类型为函数直接执行函数
  152. if($task['TYPE'] == self::TYPE_FUNC) {
  153. call_user_func_array($task['CONTENT'], Json::decode($task['PARAMS']));
  154. }
  155. // 任务类型为路由直接执行路由
  156. elseif($task['TYPE'] == self::TYPE_ROUTE) {
  157. $parts = Yii::$app->createController($task['CONTENT']);
  158. if (is_array($parts)) {
  159. Yii::$app->runAction($task['CONTENT'], Json::decode($task['PARAMS']));
  160. }
  161. }
  162. else {
  163. // todo 记录错误系统日志
  164. return ;
  165. }
  166. // 任务执行完毕将数据库中的任务标记为已完成
  167. TaskQueue::updateAll([
  168. 'FINISHED_AT' => Date::nowTime(),
  169. ], 'ID=:ID', [':ID'=>$task['ID']]);
  170. // 查看任务的循环总数比任务循环次数小并且循环总数不等于0的情况则删除所有该任务
  171. if(($task['TOTAL_LOOP'] != 0) && ($task['TOTAL_LOOP'] <= $task['DONE_TIMES'])) {
  172. TaskQueue::deleteAll('MAIN_ID=:MAIN_ID', [':MAIN_ID'=>$task['MAIN_ID']]);
  173. return ;
  174. }
  175. // 如果有下次执行时间,则继续添加任务到任务队列
  176. if($task['NEXT_AT'] > 0){
  177. $this->addTask($task['TYPE'], $task['CONTENT'], Json::decode($task['PARAMS']), $task['LOOP_TYPE'], $task['LOOP_CYCLE_CONTENT'], $task['TOTAL_LOOP'], $task['MAIN_ID'], $task['DONE_TIMES']+1, $task['NEXT_AT']);
  178. }
  179. }
  180. /**
  181. * 初始化任务队列的redis缓存
  182. */
  183. public function initRedis(){
  184. // 清空本身全部队列
  185. Yii::$app->redis->del(self::REDIS_LIST_KEY);
  186. Yii::$app->redis->del(self::REDIS_CONTENT_KEY);
  187. // 找出任务列表中所有有效任务(大于当前时间)
  188. $allTasks = TaskQueue::findAllAsArray('STARTED_AT>=:NOW_TIME', [':NOW_TIME'=>Date::nowTime()]);
  189. // 循环把这些任务加入到redis中
  190. foreach ($allTasks as $task){
  191. $this->addTaskToRedis($task);
  192. }
  193. // 清理掉数据库中所有已经完成并且过时的任务
  194. TaskQueue::deleteAll('FINISHED_AT<>0 AND STARTED_AT<:NOW_TIME', [':NOW_TIME'=>Date::nowTime()]);
  195. unset($allTasks);
  196. }
  197. /**
  198. * 把任务添加到redis队列中
  199. * @param $task
  200. */
  201. public function addTaskToRedis($task){
  202. Yii::$app->redis->zadd(self::REDIS_LIST_KEY, intval($task['STARTED_AT']), $task['ID']);
  203. Yii::$app->redis->hset(self::REDIS_CONTENT_KEY, $task['ID'], Json::encode($task));
  204. }
  205. /**
  206. * 计算开始和下次一次时间
  207. * @param $loopType
  208. * @param $loopCycleContent
  209. * @param $startedAt
  210. * @param $doneTimes
  211. * @return array
  212. */
  213. private function _calcTime($loopType, $loopCycleContent, $startedAt, $doneTimes){
  214. if($loopCycleContent){
  215. $loopCycleContent = explode(',', $loopCycleContent);
  216. }
  217. $baseTime = Date::nowTime();
  218. if($startedAt !== null){
  219. $baseTime = $startedAt;
  220. }
  221. switch($loopType){
  222. case self::LOOP_TYPE_YEAR:
  223. // 例:7,20,10:00:00(每年7月20日10点执行一次任务)
  224. // 本年的要执行任务的时间
  225. $nowYear = date('Y', $baseTime);
  226. $nextYear = intval($nowYear) + 1;
  227. $nextNextYear = intval($nowYear) + 2;
  228. $nowYearTaskTime = strtotime($nowYear.'-'.$loopCycleContent[0].'-'.$loopCycleContent[1].' '.$loopCycleContent[2]);
  229. $nextYearTaskTime = strtotime($nextYear.'-'.$loopCycleContent[0].'-'.$loopCycleContent[1].' '.$loopCycleContent[2]);
  230. $nextNextYearTaskTime = strtotime($nextNextYear.'-'.$loopCycleContent[0].'-'.$loopCycleContent[1].' '.$loopCycleContent[2]);
  231. if($startedAt === null){
  232. // 当前时间是否大于这个日期
  233. if(Date::nowTime() <= $nowYearTaskTime){
  234. $startedAt = $nowYearTaskTime;
  235. $nextAt = $nextYearTaskTime;
  236. } else {
  237. $startedAt = $nextYearTaskTime;
  238. $nextAt = $nextNextYearTaskTime;
  239. }
  240. } else {
  241. $nextAt = $nextYearTaskTime;
  242. }
  243. break;
  244. case self::LOOP_TYPE_MONTH:
  245. // 例:2,10:00:00(每月2日10点执行一次任务)
  246. $nowMonthDate = \date('Y-m-'.$loopCycleContent[0].' '.$loopCycleContent[1], $baseTime);
  247. $nowMonthTime = strtotime($nowMonthDate);
  248. $nextMonthTime = strtotime(date('Y-m-d '.$loopCycleContent[1], Date::nextMonthDay($nowMonthTime)));
  249. $nextNextMonthTime = strtotime(date('Y-m-d '.$loopCycleContent[1], Date::nextMonthDay($nextMonthTime)));
  250. if($startedAt == null){
  251. if(Date::nowTime() <= $nowMonthTime){
  252. $startedAt = $nowMonthTime;
  253. $nextAt = $nextMonthTime;
  254. } else {
  255. $startedAt = $nextMonthTime;
  256. $nextAt = $nextNextMonthTime;
  257. }
  258. } else {
  259. $nextAt = $nextMonthTime;
  260. }
  261. break;
  262. case self::LOOP_TYPE_WEEK:
  263. // 例:2,10:00:00(每星期二10点执行一次任务,0代表周日)
  264. $weekStr = 'sunday';
  265. switch ($loopCycleContent[0]){
  266. case 0:
  267. $weekStr = 'sunday';
  268. break;
  269. case 1:
  270. $weekStr = 'monday';
  271. break;
  272. case 2:
  273. $weekStr = 'tuesday';
  274. break;
  275. case 3:
  276. $weekStr = 'wednesday';
  277. break;
  278. case 4:
  279. $weekStr = 'thursday';
  280. break;
  281. case 5:
  282. $weekStr = 'friday';
  283. break;
  284. case 6:
  285. $weekStr = 'saturday';
  286. break;
  287. }
  288. if($startedAt === null) {
  289. $startedAtDay = strtotime('next '.$weekStr, $baseTime);
  290. $startedAt = strtotime(date('Y-m-d '.$loopCycleContent[1], $startedAtDay));
  291. $nextAt = $startedAt + 7 * 24 * 60 * 60;
  292. } else {
  293. $nextAtDay = strtotime('next '.$weekStr, $baseTime);
  294. $nextAt = strtotime(date('Y-m-d '.$loopCycleContent[1], $nextAtDay));
  295. }
  296. break;
  297. case self::LOOP_TYPE_DAY:
  298. // 例:10:00:00(每天10点执行一次任务)
  299. $todayHourTime = strtotime(\date('Y-m-d '.$loopCycleContent[0], $baseTime));
  300. if($startedAt === null){
  301. if($baseTime < $todayHourTime){
  302. $startedAt = $todayHourTime;
  303. } else {
  304. $startedAt = $todayHourTime + 24 * 60 * 60;
  305. }
  306. }
  307. $nextAt = $startedAt + 24 * 60 * 60;
  308. break;
  309. case self::LOOP_TYPE_HOUR:
  310. if($startedAt === null){
  311. $startedAt = $baseTime;
  312. }
  313. $nextAt = $startedAt + 60 * 60;
  314. break;
  315. case self::LOOP_TYPE_UNFIXED:
  316. // 传值:300,600,900(第一次执行完任务后间隔300秒执行第二次,第二次执行完毕后600秒执行第三次,第三次执行完毕后900秒执行第四次,这种情况共计执行4次任务,TOTAL_LOOP传值4)
  317. if($startedAt === null){
  318. $startedAt = $baseTime;
  319. }
  320. if(!isset($loopCycleContent[$doneTimes-1])){
  321. $nextAt = 0;
  322. } else {
  323. $nextAt = $startedAt + $loopCycleContent[$doneTimes-1];
  324. }
  325. break;
  326. default:
  327. $nextAt = 0;
  328. $loopCycleContent = null;
  329. $startedAt = $baseTime;
  330. break;
  331. }
  332. return [
  333. 'startedAt' => $startedAt,
  334. 'nextAt' => $nextAt,
  335. ];
  336. }
  337. }