SwooleAsyncTimerComponent.php 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260
  1. <?php
  2. /**
  3. * 异步组件
  4. * $Id: SwooleAsyncComponent.php 9507 2016-09-29 06:48:44Z mevyen $
  5. * $Date: 2016-09-29 14:48:44 +0800 (Wed, 07 Sep 2016) $
  6. * $Author: mevyen $
  7. */
  8. namespace anlity\swooleAsyncTimer;
  9. use anlity\swooleAsyncTimer\src\SocketSecurity;
  10. use anlity\swooleAsyncTimer\src\SwooleClient;
  11. use Yii;
  12. use anlity\swooleAsyncTimer\src\SCurl;
  13. use yii\helpers\Json;
  14. class SwooleAsyncTimerComponent extends \yii\base\Component implements SocketInterface
  15. {
  16. public $swooleServer;
  17. /**
  18. * 获取服务端对象
  19. * @return mixed
  20. */
  21. public function getSwooleServer(){
  22. return $this->swooleServer;
  23. }
  24. /**
  25. * 异步执行入口
  26. * $data.data 定义需要执行的任务列表,其中如果指定多个任务(以数组形式),则server将顺序执行
  27. * $data.finish 定义了data中的任务执行完成后的回调任务,执行方式同$data.data
  28. * @param [json] $data 结构如下
  29. * [
  30. * 'data' => [
  31. * [
  32. * 'a' => 'test1/mail1' #要执行的console控制器和action
  33. * 'p' => ['p1','p2','p3'] // action参数列表
  34. * ],
  35. * [
  36. * 'a' => 'test2/mail2' #要执行的console控制器和action
  37. * 'p' => ['p1','p2','p3'] // action参数列表
  38. * ]
  39. * ],
  40. * 'finish' => [
  41. * [
  42. * 'a' => 'test3/mail3' #要执行的console控制器和action
  43. * 'p' => ['p1','p2','p3'] // action参数列表
  44. * ],
  45. * [
  46. * 'a' => 'test4/mail4' #要执行的console控制器和action
  47. * 'p' => ['p1','p2','p3'] // action参数列表
  48. * ]
  49. * ]
  50. * ]
  51. * @return [type] [description]
  52. */
  53. public function async($data, $settings=[])
  54. {
  55. $data = $this->paresData($data);
  56. $data = ['type'=>'async', 'data'=>$data];
  57. return $this->requestServer($data, $settings);
  58. }
  59. /**
  60. * 用于从页面端实现webSocket推送消息
  61. * @param $fd
  62. * @param $data
  63. * @return bool
  64. * @throws \Exception
  65. */
  66. public function pushMsg($fd, $data){
  67. if(!$fd){
  68. return false;
  69. }
  70. $data = $this->paresData($data);
  71. $data = ['type'=>'pushMsg', 'fd' => $fd, 'data'=>$data];
  72. return $this->requestServer($data);
  73. }
  74. /**
  75. * 用于从页面端实现webSocket推送消息给所有已连接的会员
  76. * @param $data
  77. * @return bool
  78. * @throws \Exception
  79. */
  80. public function pushMsgAll($data){
  81. $data = $this->paresData($data);
  82. $data = ['type'=>'pushMsgAll', 'data'=>$data];
  83. return $this->requestServer($data);
  84. }
  85. /**
  86. * 从服务端的cli直接推送消息到客户端
  87. * @param $fd
  88. * @param $data
  89. * @return bool
  90. */
  91. public function pushMsgByCli($fd, $data){
  92. if(!$fd){
  93. return false;
  94. }
  95. $data = $this->paresData($data);
  96. return $this->swooleServer->push($fd, $data);
  97. }
  98. /**
  99. * 广播发送消息
  100. * @param $data
  101. */
  102. public function pushMsgAllByCli($data){
  103. $data = $this->paresData($data);
  104. foreach($this->swooleServer->connections as $fd){
  105. $this->swooleServer->push($fd, $data);
  106. }
  107. }
  108. /**
  109. * 请求服务端
  110. * @param $data
  111. * @param $settings
  112. * @return bool
  113. * @throws \Exception
  114. */
  115. public function requestServer($data, array $settings=[]){
  116. if ( empty($settings) ) {
  117. $settings = Yii::$app->params['swooleAsyncTimer'];
  118. }
  119. $socketSecurity = new SocketSecurity($settings);
  120. $data = $socketSecurity->paramsFormat($data);
  121. if($settings['sender_client'] == 'swoole'){
  122. try {
  123. $client = new SwooleClient();
  124. $client->setOption('host', $settings['host']);
  125. $client->setOption('port', $settings['port']);
  126. $client->setOption('timeout', $settings['client_timeout']);
  127. $client->setOption('data', Json::encode($data));
  128. $response = $client->post();
  129. } catch (\Exception $e){
  130. $response = false;
  131. }
  132. } else {
  133. $client = new SCurl();
  134. $client->setOption(CURLOPT_POSTFIELDS, $data);
  135. $client->setOption(CURLOPT_TIMEOUT, $settings['client_timeout']);
  136. $response = $client->post("http://".$settings['host'].":".$settings['port']);
  137. }
  138. if($response === false){
  139. return false;
  140. }
  141. if($response === 'false'){
  142. return false;
  143. }
  144. return true;
  145. }
  146. /**
  147. * 处理数据
  148. * @param $data
  149. * @return string
  150. */
  151. public function paresData($data){
  152. if(!is_string($data)){
  153. $data = Json::encode($data);
  154. }
  155. return $data;
  156. }
  157. /**
  158. * swoole进程服务开始时的回调
  159. * @param $server
  160. * @param $workerId
  161. */
  162. public function onWorkerStart($server, $workerId){
  163. }
  164. /**
  165. * swoole进程服务结束时的回调
  166. * @param $server
  167. * @param $workerId
  168. */
  169. public function onWorkerStop($server, $workerId){
  170. }
  171. /**
  172. * swoole进程退出时的回调
  173. * @param $server
  174. * @param $workerId
  175. */
  176. public function onWorkerExit($server, $workerId){
  177. }
  178. /**
  179. * 需继承此方法,用于定时器的回调方法
  180. * @param $timerId
  181. * @param $server
  182. */
  183. public function timerCallback($timerId, $server){
  184. }
  185. /**
  186. * 需继承此方法,用于websocket的握手记录fd
  187. * @param $fd
  188. */
  189. public function onOpen($fd){
  190. }
  191. /**
  192. * 需继承此方法,用于websocket的清除fd
  193. * @param $fd
  194. */
  195. public function onClose($fd){
  196. }
  197. /**
  198. * 需继承此方法,用于websocket的接受客户端消息
  199. * @param $fd
  200. * @param $data
  201. */
  202. public function onMessage($fd, $data){
  203. }
  204. /**
  205. * 任务运行开始
  206. * @param $server
  207. * @param $workerId
  208. * @param $action
  209. * @param $params
  210. */
  211. public function onTaskRunActionStart($server, $workerId, $action, $params){
  212. }
  213. /**
  214. * 任务运行完成
  215. * @param $server
  216. * @param $workerId
  217. * @param $action
  218. * @param $params
  219. */
  220. public function onTaskRunActionFinish($server, $workerId, $action, $params){
  221. }
  222. /**
  223. * 任务运行发生错误时
  224. * @param $fd
  225. * @param $data
  226. * @param $action
  227. * @param $params
  228. * @param $errorMessage
  229. */
  230. public function onTaskRunActionError($fd, $data, $action, $params, $errorMessage){
  231. }
  232. }