SwooleAsyncTimer.php 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: leo
  5. * Date: 2018/4/9
  6. * Time: 下午5:15
  7. */
  8. namespace common\components;
  9. use anlity\swooleAsyncTimer\SocketInterface;
  10. use anlity\swooleAsyncTimer\SwooleAsyncTimerComponent;
  11. use backendApi\modules\v1\models\Admin;
  12. use common\components\Redis;
  13. use common\helpers\bonus\AutoClosePeriod;
  14. use common\helpers\bonus\Calc\CalcConsole;
  15. use common\helpers\Cache;
  16. use common\helpers\Date;
  17. use common\helpers\Tool;
  18. use common\libs\logging\system\ApiSystem;
  19. use common\libs\logging\system\AsyncSystem;
  20. use common\libs\taskQueue\Queue;
  21. use common\libs\taskQueue\TaskFunc;
  22. use common\models\UserInfo;
  23. use Yii;
  24. use anlity\swooleAsyncTimer\SwooleAsyncTimerController;
  25. use yii\base\Exception;
  26. use yii\helpers\Json;
  27. class SwooleAsyncTimer extends SwooleAsyncTimerComponent implements SocketInterface {
  28. const HANDLE_ADMIN_ASYNC = 'adminAsync';
  29. const HANDLE_ADMIN_PULL_MESSAGE = 'adminPullMsg';
  30. const HANDLE_USER_ASYNC = 'userAsync';
  31. const HANDLE_USER_PULL_MESSAGE = 'userPullMsg';
  32. const HANDLE_ADMIN_ASYNC_PERCENT = 'adminAsyncPercent';
  33. const COMMAND_PUSH_TO = 'pushTo';
  34. /**
  35. * 定时器自动执行任务
  36. * @param $timerId
  37. * @param $server
  38. * @return bool|void
  39. * @throws \yii\base\InvalidConfigException
  40. * @throws \yii\base\InvalidRouteException
  41. * @throws \yii\console\Exception
  42. * @throws \yii\db\Exception
  43. */
  44. public function timerCallback($timerId, $server){
  45. // 自动封期
  46. // AutoClosePeriod::instance()->autoClose();
  47. // 自动执行任务队列中的任务
  48. Queue::instance()->consumeTask();
  49. // 实时监听计算系统修改的period表字段
  50. CalcConsole::listenCalcPeriod();
  51. //业务系统预计算相关启动逻辑
  52. CalcConsole::autoPrePerf();
  53. // CalcConsole::listenAutoPerfPeriod();
  54. return true;
  55. }
  56. /**
  57. * 初始化队列
  58. * @param $server
  59. * @param $workerId
  60. */
  61. public function onWorkerStart($server, $workerId){
  62. if($workerId == 1){
  63. // 重新更新自动封期的缓存
  64. AutoClosePeriod::instance()->setCloseTimeAndPeriodStat();
  65. // 初始化任务队列
  66. Queue::instance()->initRedis();
  67. // 初始化备份历史奖金数据表
  68. // TaskFunc::initAutoBakBalance();
  69. // 初始化自动发送钉钉推送消息
  70. // if (YII_ENV == YII_ENV_PROD) {
  71. // TaskFunc::initAutoSendDingTalk();
  72. // }
  73. }
  74. }
  75. public function onWorkerStop($server, $workerId){
  76. }
  77. public function onWorkerExit($server, $workerId){
  78. }
  79. public function onOpen($fd){
  80. }
  81. public function onClose($fd){
  82. }
  83. public function onMessage($fd, $data){
  84. // 如果传过来的是一个用户ID,则把$fd和userId绑定存入缓存
  85. $data = Json::decode($data);
  86. if(isset($data['userId']) && $data['userId'] != ''){
  87. Cache::setWebSocketFd($data['app'], $data['userId'], $fd);
  88. }
  89. }
  90. /**
  91. * 处理异步请求
  92. * @param $path
  93. * @param array $params
  94. * @param array $settings
  95. * @return bool
  96. * @throws \yii\base\Exception
  97. */
  98. public function asyncHandle($path, array $params = [], array $settings = []){
  99. // 把处理会员的UserId加进数组
  100. if(Yii::$app->user->id){
  101. $params['handleUserId'] = Yii::$app->user->id;
  102. if(Yii::$app->user->userInfo && isset(Yii::$app->user->userInfo['adminName'])){
  103. $params['handleUserName'] = Yii::$app->user->userInfo['adminName'];
  104. }
  105. } else {
  106. $params['handleUserId'] = null;
  107. $params['handleUserName'] = null;
  108. }
  109. $taskKey = Cache::setAsyncParams($params);
  110. $data = [
  111. 'data' => [
  112. [
  113. 'a' => $path,
  114. 'p' => [$taskKey],
  115. ]
  116. ],
  117. ];
  118. if($this->async(Json::encode($data), $settings)){
  119. return $taskKey;
  120. } else {
  121. return false;
  122. }
  123. }
  124. /**
  125. * 异步任务结束后给页面会员推送成功或者失败消息
  126. * @param $userId
  127. * @param string $message
  128. * @param bool $success
  129. * @param string $type message|progress
  130. */
  131. public function pushAsyncResultToAdmin($userId, $message='', $success = true, $type='message'){
  132. if($userId){
  133. $fd = Cache::getWebSocketFd(Cache::SOCKET_ADMIN, $userId);
  134. if($fd){
  135. try{
  136. $this->pushMsgByCli($fd, ['success'=>$success, 'handle'=>self::HANDLE_ADMIN_ASYNC, 'message'=>$message, 'type'=>$type]);
  137. } catch (\Exception $e){
  138. }
  139. }
  140. }
  141. }
  142. /**
  143. * 向管理员发送异步进度百分比
  144. * @param $percent
  145. * @param array $other
  146. */
  147. public function pushAsyncPercentToAdmin($percent, $other=[]){
  148. try{
  149. $this->pushMsgAllByCli(['handle'=>self::HANDLE_ADMIN_ASYNC_PERCENT, 'percent'=>$percent, 'other'=>$other]);
  150. } catch (\Exception $e){
  151. }
  152. }
  153. /**
  154. * 给前台会员发送webSocket消息
  155. * @param null $userId
  156. * @param string $message
  157. * @param bool $success
  158. * @return bool
  159. * @throws \Exception
  160. */
  161. public function pushMsgToUser($userId=null, $message='', $success = true){
  162. if($userId){
  163. $return = true;
  164. $fdAppFd = Cache::getWebSocketFd(Cache::SOCKET_USER_APP, $userId);
  165. if($fdAppFd && $return !== false){
  166. $return = $this->pushMsg($fdAppFd, ['success'=>$success, 'handle'=>self::HANDLE_USER_PULL_MESSAGE, 'message'=>$message]);
  167. }
  168. $fdPcFd = Cache::getWebSocketFd(Cache::SOCKET_USER_PC, $userId);
  169. if($fdPcFd && $return !== false){
  170. $return = $this->pushMsg($fdPcFd, ['success'=>$success, 'handle'=>self::HANDLE_USER_PULL_MESSAGE, 'message'=>$message]);
  171. }
  172. return $return;
  173. } else {
  174. return $this->pushMsgAll(['success'=>$success, 'handle'=>self::HANDLE_USER_PULL_MESSAGE, 'message'=>$message]);
  175. }
  176. }
  177. /**
  178. * 任务运行后
  179. * @param $server
  180. * @param $workerId
  181. * @param $action
  182. * @param $params
  183. */
  184. public function onTaskRunActionStart($server, $workerId, $action, $params){
  185. // 为了保证任务继续执行,此处将错误捕捉到不影响任务继续执行
  186. try {
  187. if(isset($params[0]) && $actionParams = Cache::getAsyncParamsWithoutDel($params[0])){
  188. // 记录日志
  189. if(strpos($action, 'log/') === false){
  190. $logApiSystem = new AsyncSystem();
  191. $logApiSystem->setRequestRoute($action)->setRequestContent($actionParams)->saveByConsole([
  192. 'apiName' => '异步'.$action,
  193. 'optUser' => isset($actionParams['handleUserName']) ? $actionParams['handleUserName'] : null,
  194. ]);
  195. }
  196. }
  197. } catch (\Exception $e) {
  198. // 忽略错误
  199. }
  200. }
  201. /**
  202. * 任务运行发生错误时
  203. * @param $fd
  204. * @param $data
  205. * @param $action
  206. * @param $params
  207. * @param $errorMessage
  208. */
  209. public function onTaskRunActionError($fd, $data, $action, $params, $errorMessage){
  210. // 为了保证任务继续执行,此处将错误捕捉到不影响任务继续执行
  211. try {
  212. // 记录日志
  213. $logApiSystem = new AsyncSystem();
  214. $logApiSystem->setRequestRoute($action)->setResponseContent(['errorMessage' => $errorMessage])->saveByConsole([
  215. 'apiName' => '异步错误'.$action,
  216. ]);
  217. } catch (\Exception $e) {
  218. // 忽略错误
  219. }
  220. }
  221. }