SwooleAsyncTimer.php 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: leo
  5. * Date: 2018/4/9
  6. * Time: 下午5:15
  7. */
  8. namespace common\components;
  9. use anlity\swooleAsyncTimer\SocketInterface;
  10. use anlity\swooleAsyncTimer\SwooleAsyncTimerComponent;
  11. use common\helpers\Cache;
  12. use common\libs\logging\system\AsyncSystem;
  13. use common\libs\taskQueue\Queue;
  14. use Yii;
  15. use common\helpers\bonus\CalcServeBonusCalc;
  16. use common\helpers\bonus\CalcServePerfCalc;
  17. use yii\base\Exception;
  18. use yii\helpers\Json;
  19. class SwooleAsyncTimer extends SwooleAsyncTimerComponent implements SocketInterface {
  20. const HANDLE_ADMIN_ASYNC = 'adminAsync';
  21. const HANDLE_ADMIN_PULL_MESSAGE = 'adminPullMsg';
  22. const HANDLE_USER_ASYNC = 'userAsync';
  23. const HANDLE_USER_PULL_MESSAGE = 'userPullMsg';
  24. const HANDLE_ADMIN_ASYNC_PERCENT = 'adminAsyncPercent';
  25. const COMMAND_PUSH_TO = 'pushTo';
  26. /**
  27. * 定时器自动执行任务
  28. * @param $timerId
  29. * @param $server
  30. * @return bool|void
  31. * @throws \yii\base\InvalidConfigException
  32. * @throws \yii\base\InvalidRouteException
  33. * @throws \yii\console\Exception
  34. * @throws \yii\db\Exception
  35. */
  36. public function timerCallback($timerId, $server){
  37. CalcServePerfCalc::instance()->calcStep(); // 累计业绩
  38. CalcServeBonusCalc::instance()->calcStep(); // 计算奖金
  39. // 自动执行任务队列中的任务
  40. Queue::instance()->consumeTask();
  41. return true;
  42. }
  43. /**
  44. * 初始化队列
  45. * @param $server
  46. * @param $workerId
  47. */
  48. public function onWorkerStart($server, $workerId){
  49. if($workerId == 1){
  50. // 重新更新自动封期的缓存
  51. //AutoClosePeriod::instance()->setCloseTimeAndPeriodStat();
  52. // 初始化任务队列
  53. Queue::instance()->initRedis();
  54. // 初始化备份历史奖金数据表
  55. // TaskFunc::initAutoBakBalance();
  56. }
  57. }
  58. public function onWorkerStop($server, $workerId){
  59. }
  60. public function onWorkerExit($server, $workerId){
  61. }
  62. public function onOpen($fd){
  63. }
  64. public function onClose($fd){
  65. }
  66. public function onMessage($fd, $data){
  67. // 如果传过来的是一个用户ID,则把$fd和userId绑定存入缓存
  68. $data = Json::decode($data);
  69. if(isset($data['userId']) && $data['userId'] != ''){
  70. Cache::setWebSocketFd($data['app'], $data['userId'], $fd);
  71. }
  72. }
  73. /**
  74. * 处理异步请求
  75. * @param $path
  76. * @param array $params
  77. * @param array $settings
  78. * @return bool
  79. * @throws \yii\base\Exception
  80. */
  81. public function asyncHandle($path, array $params = [], array $settings = []){
  82. // 把处理会员的UserId加进数组
  83. if(Yii::$app->user->id){
  84. $params['handleUserId'] = Yii::$app->user->id;
  85. if(Yii::$app->user->userInfo && isset(Yii::$app->user->userInfo['adminName'])){
  86. $params['handleUserName'] = Yii::$app->user->userInfo['adminName'];
  87. }
  88. } else {
  89. $params['handleUserId'] = null;
  90. $params['handleUserName'] = null;
  91. }
  92. $taskKey = Cache::setAsyncParams($params);
  93. $data = [
  94. 'data' => [
  95. [
  96. 'a' => $path,
  97. 'p' => [$taskKey],
  98. ]
  99. ],
  100. ];
  101. if($this->async(Json::encode($data), $settings)){
  102. return $taskKey;
  103. } else {
  104. return false;
  105. }
  106. }
  107. /**
  108. * 异步任务结束后给页面会员推送成功或者失败消息
  109. * @param $userId
  110. * @param string $message
  111. * @param bool $success
  112. * @param string $type message|progress
  113. */
  114. public function pushAsyncResultToAdmin($userId, $message='', $success = true, $type='message'){
  115. if($userId){
  116. $fd = Cache::getWebSocketFd(Cache::SOCKET_ADMIN, $userId);
  117. if($fd){
  118. try{
  119. $this->pushMsgByCli($fd, ['success'=>$success, 'handle'=>self::HANDLE_ADMIN_ASYNC, 'message'=>$message, 'type'=>$type]);
  120. } catch (\Exception $e){
  121. }
  122. }
  123. }
  124. }
  125. /**
  126. * 向管理员发送异步进度百分比
  127. * @param $percent
  128. * @param array $other
  129. */
  130. public function pushAsyncPercentToAdmin($percent, $other=[]){
  131. try{
  132. $this->pushMsgAllByCli(['handle'=>self::HANDLE_ADMIN_ASYNC_PERCENT, 'percent'=>$percent, 'other'=>$other]);
  133. } catch (\Exception $e){
  134. }
  135. }
  136. /**
  137. * 给前台会员发送webSocket消息
  138. * @param null $userId
  139. * @param string $message
  140. * @param bool $success
  141. * @return bool
  142. * @throws \Exception
  143. */
  144. public function pushMsgToUser($userId=null, $message='', $success = true){
  145. if($userId){
  146. $return = true;
  147. $fdAppFd = Cache::getWebSocketFd(Cache::SOCKET_USER_APP, $userId);
  148. if($fdAppFd && $return !== false){
  149. $return = $this->pushMsg($fdAppFd, ['success'=>$success, 'handle'=>self::HANDLE_USER_PULL_MESSAGE, 'message'=>$message]);
  150. }
  151. $fdPcFd = Cache::getWebSocketFd(Cache::SOCKET_USER_PC, $userId);
  152. if($fdPcFd && $return !== false){
  153. $return = $this->pushMsg($fdPcFd, ['success'=>$success, 'handle'=>self::HANDLE_USER_PULL_MESSAGE, 'message'=>$message]);
  154. }
  155. return $return;
  156. } else {
  157. return $this->pushMsgAll(['success'=>$success, 'handle'=>self::HANDLE_USER_PULL_MESSAGE, 'message'=>$message]);
  158. }
  159. }
  160. /**
  161. * 任务运行后
  162. * @param $server
  163. * @param $workerId
  164. * @param $action
  165. * @param $params
  166. */
  167. public function onTaskRunActionStart($server, $workerId, $action, $params){
  168. // 为了保证任务继续执行,此处将错误捕捉到不影响任务继续执行
  169. try {
  170. if(isset($params[0]) && $actionParams = Cache::getAsyncParamsWithoutDel($params[0])){
  171. // 记录日志
  172. if(strpos($action, 'log/') === false){
  173. $logApiSystem = new AsyncSystem();
  174. $logApiSystem->setRequestRoute($action)->setRequestContent($actionParams)->saveByConsole([
  175. 'apiName' => '异步'.$action,
  176. 'optUser' => isset($actionParams['handleUserName']) ? $actionParams['handleUserName'] : null,
  177. ]);
  178. }
  179. }
  180. } catch (\Exception $e) {
  181. // 忽略错误
  182. }
  183. }
  184. /**
  185. * 任务运行发生错误时
  186. * @param $fd
  187. * @param $data
  188. * @param $action
  189. * @param $params
  190. * @param $errorMessage
  191. */
  192. public function onTaskRunActionError($fd, $data, $action, $params, $errorMessage){
  193. // 为了保证任务继续执行,此处将错误捕捉到不影响任务继续执行
  194. try {
  195. // 记录日志
  196. $logApiSystem = new AsyncSystem();
  197. $logApiSystem->setRequestRoute($action)->setResponseContent(['errorMessage' => $errorMessage])->saveByConsole([
  198. 'apiName' => '异步错误'.$action,
  199. ]);
  200. } catch (\Exception $e) {
  201. // 忽略错误
  202. }
  203. }
  204. }