| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240 |
- <?php
- /**
- * Created by PhpStorm.
- * User: leo
- * Date: 2018/4/9
- * Time: 下午5:15
- */
- namespace common\components;
- use anlity\swooleAsyncTimer\SocketInterface;
- use anlity\swooleAsyncTimer\SwooleAsyncTimerComponent;
- use backendApi\modules\v1\models\Admin;
- use common\components\Redis;
- use common\helpers\bonus\AutoClosePeriod;
- use common\helpers\bonus\Calc\CalcConsole;
- use common\helpers\Cache;
- use common\helpers\Date;
- use common\helpers\Tool;
- use common\libs\logging\system\ApiSystem;
- use common\libs\logging\system\AsyncSystem;
- use common\libs\taskQueue\Queue;
- use common\libs\taskQueue\TaskFunc;
- use common\models\UserInfo;
- use Yii;
- use anlity\swooleAsyncTimer\SwooleAsyncTimerController;
- use yii\base\Exception;
- use yii\helpers\Json;
- class SwooleAsyncTimer extends SwooleAsyncTimerComponent implements SocketInterface {
- const HANDLE_ADMIN_ASYNC = 'adminAsync';
- const HANDLE_ADMIN_PULL_MESSAGE = 'adminPullMsg';
- const HANDLE_USER_ASYNC = 'userAsync';
- const HANDLE_USER_PULL_MESSAGE = 'userPullMsg';
- const HANDLE_ADMIN_ASYNC_PERCENT = 'adminAsyncPercent';
- const COMMAND_PUSH_TO = 'pushTo';
- /**
- * 定时器自动执行任务
- * @param $timerId
- * @param $server
- * @return bool|void
- * @throws \yii\base\InvalidConfigException
- * @throws \yii\base\InvalidRouteException
- * @throws \yii\console\Exception
- * @throws \yii\db\Exception
- */
- public function timerCallback($timerId, $server){
- // 自动封期
- AutoClosePeriod::instance()->autoClose();
- // 自动执行任务队列中的任务
- Queue::instance()->consumeTask();
- // 实时监听计算系统修改的period表字段
- CalcConsole::listenCalcPeriod();
- //业务系统预计算相关启动逻辑
- CalcConsole::listenAutoPerfPeriod();
- return true;
- }
- /**
- * 初始化队列
- * @param $server
- * @param $workerId
- */
- public function onWorkerStart($server, $workerId){
- if($workerId == 1){
- // 重新更新自动封期的缓存
- AutoClosePeriod::instance()->setCloseTimeAndPeriodStat();
- // 初始化任务队列
- Queue::instance()->initRedis();
- // 初始化备份历史奖金数据表
- // TaskFunc::initAutoBakBalance();
- // 初始化自动发送钉钉推送消息
- // if (YII_ENV == YII_ENV_PROD) {
- // TaskFunc::initAutoSendDingTalk();
- // }
- }
- }
- public function onWorkerStop($server, $workerId){
- }
- public function onWorkerExit($server, $workerId){
- }
- public function onOpen($fd){
- }
- public function onClose($fd){
- }
- public function onMessage($fd, $data){
- // 如果传过来的是一个用户ID,则把$fd和userId绑定存入缓存
- $data = Json::decode($data);
- if(isset($data['userId']) && $data['userId'] != ''){
- Cache::setWebSocketFd($data['app'], $data['userId'], $fd);
- }
- }
- /**
- * 处理异步请求
- * @param $path
- * @param array $params
- * @param array $settings
- * @return bool
- * @throws \yii\base\Exception
- */
- public function asyncHandle($path, array $params = [], array $settings = []){
- // 把处理会员的UserId加进数组
- if(Yii::$app->user->id){
- $params['handleUserId'] = Yii::$app->user->id;
- if(Yii::$app->user->userInfo && isset(Yii::$app->user->userInfo['adminName'])){
- $params['handleUserName'] = Yii::$app->user->userInfo['adminName'];
- }
- } else {
- $params['handleUserId'] = null;
- $params['handleUserName'] = null;
- }
- $taskKey = Cache::setAsyncParams($params);
- $data = [
- 'data' => [
- [
- 'a' => $path,
- 'p' => [$taskKey],
- ]
- ],
- ];
- if($this->async(Json::encode($data), $settings)){
- return $taskKey;
- } else {
- return false;
- }
- }
- /**
- * 异步任务结束后给页面会员推送成功或者失败消息
- * @param $userId
- * @param string $message
- * @param bool $success
- * @param string $type message|progress
- */
- public function pushAsyncResultToAdmin($userId, $message='', $success = true, $type='message'){
- if($userId){
- $fd = Cache::getWebSocketFd(Cache::SOCKET_ADMIN, $userId);
- if($fd){
- try{
- $this->pushMsgByCli($fd, ['success'=>$success, 'handle'=>self::HANDLE_ADMIN_ASYNC, 'message'=>$message, 'type'=>$type]);
- } catch (\Exception $e){
- }
- }
- }
- }
- /**
- * 向管理员发送异步进度百分比
- * @param $percent
- * @param array $other
- */
- public function pushAsyncPercentToAdmin($percent, $other=[]){
- try{
- $this->pushMsgAllByCli(['handle'=>self::HANDLE_ADMIN_ASYNC_PERCENT, 'percent'=>$percent, 'other'=>$other]);
- } catch (\Exception $e){
- }
- }
- /**
- * 给前台会员发送webSocket消息
- * @param null $userId
- * @param string $message
- * @param bool $success
- * @return bool
- * @throws \Exception
- */
- public function pushMsgToUser($userId=null, $message='', $success = true){
- if($userId){
- $return = true;
- $fdAppFd = Cache::getWebSocketFd(Cache::SOCKET_USER_APP, $userId);
- if($fdAppFd && $return !== false){
- $return = $this->pushMsg($fdAppFd, ['success'=>$success, 'handle'=>self::HANDLE_USER_PULL_MESSAGE, 'message'=>$message]);
- }
- $fdPcFd = Cache::getWebSocketFd(Cache::SOCKET_USER_PC, $userId);
- if($fdPcFd && $return !== false){
- $return = $this->pushMsg($fdPcFd, ['success'=>$success, 'handle'=>self::HANDLE_USER_PULL_MESSAGE, 'message'=>$message]);
- }
- return $return;
- } else {
- return $this->pushMsgAll(['success'=>$success, 'handle'=>self::HANDLE_USER_PULL_MESSAGE, 'message'=>$message]);
- }
- }
- /**
- * 任务运行后
- * @param $server
- * @param $workerId
- * @param $action
- * @param $params
- */
- public function onTaskRunActionStart($server, $workerId, $action, $params){
- // 为了保证任务继续执行,此处将错误捕捉到不影响任务继续执行
- try {
- if(isset($params[0]) && $actionParams = Cache::getAsyncParamsWithoutDel($params[0])){
- // 记录日志
- if(strpos($action, 'log/') === false){
- $logApiSystem = new AsyncSystem();
- $logApiSystem->setRequestRoute($action)->setRequestContent($actionParams)->saveByConsole([
- 'apiName' => '异步'.$action,
- 'optUser' => isset($actionParams['handleUserName']) ? $actionParams['handleUserName'] : null,
- ]);
- }
- }
- } catch (\Exception $e) {
- // 忽略错误
- }
- }
- /**
- * 任务运行发生错误时
- * @param $fd
- * @param $data
- * @param $action
- * @param $params
- * @param $errorMessage
- */
- public function onTaskRunActionError($fd, $data, $action, $params, $errorMessage){
- // 为了保证任务继续执行,此处将错误捕捉到不影响任务继续执行
- try {
- // 记录日志
- $logApiSystem = new AsyncSystem();
- $logApiSystem->setRequestRoute($action)->setResponseContent(['errorMessage' => $errorMessage])->saveByConsole([
- 'apiName' => '异步错误'.$action,
- ]);
- } catch (\Exception $e) {
- // 忽略错误
- }
- }
- }
|