autoClose(); // 自动执行任务队列中的任务 Queue::instance()->consumeTask(); // 实时监听计算系统修改的period表字段 CalcConsole::listenCalcPeriod(); //业务系统预计算相关启动逻辑 CalcConsole::listenAutoPerfPeriod(); return true; } /** * 初始化队列 * @param $server * @param $workerId */ public function onWorkerStart($server, $workerId){ if($workerId == 1){ // 重新更新自动封期的缓存 AutoClosePeriod::instance()->setCloseTimeAndPeriodStat(); // 初始化任务队列 Queue::instance()->initRedis(); // 初始化备份历史奖金数据表 // TaskFunc::initAutoBakBalance(); // 初始化自动发送钉钉推送消息 // if (YII_ENV == YII_ENV_PROD) { // TaskFunc::initAutoSendDingTalk(); // } } } public function onWorkerStop($server, $workerId){ } public function onWorkerExit($server, $workerId){ } public function onOpen($fd){ } public function onClose($fd){ } public function onMessage($fd, $data){ // 如果传过来的是一个用户ID,则把$fd和userId绑定存入缓存 $data = Json::decode($data); if(isset($data['userId']) && $data['userId'] != ''){ Cache::setWebSocketFd($data['app'], $data['userId'], $fd); } } /** * 处理异步请求 * @param $path * @param array $params * @param array $settings * @return bool * @throws \yii\base\Exception */ public function asyncHandle($path, array $params = [], array $settings = []){ // 把处理会员的UserId加进数组 if(Yii::$app->user->id){ $params['handleUserId'] = Yii::$app->user->id; if(Yii::$app->user->userInfo && isset(Yii::$app->user->userInfo['adminName'])){ $params['handleUserName'] = Yii::$app->user->userInfo['adminName']; } } else { $params['handleUserId'] = null; $params['handleUserName'] = null; } $taskKey = Cache::setAsyncParams($params); $data = [ 'data' => [ [ 'a' => $path, 'p' => [$taskKey], ] ], ]; if($this->async(Json::encode($data), $settings)){ return $taskKey; } else { return false; } } /** * 异步任务结束后给页面会员推送成功或者失败消息 * @param $userId * @param string $message * @param bool $success * @param string $type message|progress */ public function pushAsyncResultToAdmin($userId, $message='', $success = true, $type='message'){ if($userId){ $fd = Cache::getWebSocketFd(Cache::SOCKET_ADMIN, $userId); if($fd){ try{ $this->pushMsgByCli($fd, ['success'=>$success, 'handle'=>self::HANDLE_ADMIN_ASYNC, 'message'=>$message, 'type'=>$type]); } catch (\Exception $e){ } } } } /** * 向管理员发送异步进度百分比 * @param $percent * @param array $other */ public function pushAsyncPercentToAdmin($percent, $other=[]){ try{ $this->pushMsgAllByCli(['handle'=>self::HANDLE_ADMIN_ASYNC_PERCENT, 'percent'=>$percent, 'other'=>$other]); } catch (\Exception $e){ } } /** * 给前台会员发送webSocket消息 * @param null $userId * @param string $message * @param bool $success * @return bool * @throws \Exception */ public function pushMsgToUser($userId=null, $message='', $success = true){ if($userId){ $return = true; $fdAppFd = Cache::getWebSocketFd(Cache::SOCKET_USER_APP, $userId); if($fdAppFd && $return !== false){ $return = $this->pushMsg($fdAppFd, ['success'=>$success, 'handle'=>self::HANDLE_USER_PULL_MESSAGE, 'message'=>$message]); } $fdPcFd = Cache::getWebSocketFd(Cache::SOCKET_USER_PC, $userId); if($fdPcFd && $return !== false){ $return = $this->pushMsg($fdPcFd, ['success'=>$success, 'handle'=>self::HANDLE_USER_PULL_MESSAGE, 'message'=>$message]); } return $return; } else { return $this->pushMsgAll(['success'=>$success, 'handle'=>self::HANDLE_USER_PULL_MESSAGE, 'message'=>$message]); } } /** * 任务运行后 * @param $server * @param $workerId * @param $action * @param $params */ public function onTaskRunActionStart($server, $workerId, $action, $params){ // 为了保证任务继续执行,此处将错误捕捉到不影响任务继续执行 try { if(isset($params[0]) && $actionParams = Cache::getAsyncParamsWithoutDel($params[0])){ // 记录日志 if(strpos($action, 'log/') === false){ $logApiSystem = new AsyncSystem(); $logApiSystem->setRequestRoute($action)->setRequestContent($actionParams)->saveByConsole([ 'apiName' => '异步'.$action, 'optUser' => isset($actionParams['handleUserName']) ? $actionParams['handleUserName'] : null, ]); } } } catch (\Exception $e) { // 忽略错误 } } /** * 任务运行发生错误时 * @param $fd * @param $data * @param $action * @param $params * @param $errorMessage */ public function onTaskRunActionError($fd, $data, $action, $params, $errorMessage){ // 为了保证任务继续执行,此处将错误捕捉到不影响任务继续执行 try { // 记录日志 $logApiSystem = new AsyncSystem(); $logApiSystem->setRequestRoute($action)->setResponseContent(['errorMessage' => $errorMessage])->saveByConsole([ 'apiName' => '异步错误'.$action, ]); } catch (\Exception $e) { // 忽略错误 } } }