SwooleAsyncTimerComponent.php 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. <?php
  2. /**
  3. * 异步组件
  4. * $Id: SwooleAsyncComponent.php 9507 2016-09-29 06:48:44Z mevyen $
  5. * $Date: 2016-09-29 14:48:44 +0800 (Wed, 07 Sep 2016) $
  6. * $Author: mevyen $
  7. */
  8. namespace anlity\swooleAsyncTimer;
  9. use anlity\swooleAsyncTimer\src\SocketSecurity;
  10. use anlity\swooleAsyncTimer\src\SwooleClient;
  11. use Yii;
  12. use anlity\swooleAsyncTimer\src\SCurl;
  13. use yii\helpers\Json;
  14. class SwooleAsyncTimerComponent extends \yii\base\Component implements SocketInterface
  15. {
  16. public $swooleServer;
  17. /**
  18. * 获取服务端对象
  19. * @return mixed
  20. */
  21. public function getSwooleServer(){
  22. return $this->swooleServer;
  23. }
  24. /**
  25. * 异步执行入口
  26. * $data.data 定义需要执行的任务列表,其中如果指定多个任务(以数组形式),则server将顺序执行
  27. * $data.finish 定义了data中的任务执行完成后的回调任务,执行方式同$data.data
  28. * @param [json] $data 结构如下
  29. * [
  30. * 'data' => [
  31. * [
  32. * 'a' => 'test1/mail1' #要执行的console控制器和action
  33. * 'p' => ['p1','p2','p3'] // action参数列表
  34. * ],
  35. * [
  36. * 'a' => 'test2/mail2' #要执行的console控制器和action
  37. * 'p' => ['p1','p2','p3'] // action参数列表
  38. * ]
  39. * ],
  40. * 'finish' => [
  41. * [
  42. * 'a' => 'test3/mail3' #要执行的console控制器和action
  43. * 'p' => ['p1','p2','p3'] // action参数列表
  44. * ],
  45. * [
  46. * 'a' => 'test4/mail4' #要执行的console控制器和action
  47. * 'p' => ['p1','p2','p3'] // action参数列表
  48. * ]
  49. * ]
  50. * ]
  51. * @return [type] [description]
  52. */
  53. public function async($data)
  54. {
  55. $data = $this->paresData($data);
  56. $data = ['type'=>'async', 'data'=>$data];
  57. return $this->requestServer($data);
  58. }
  59. /**
  60. * 用于从页面端实现webSocket推送消息
  61. * @param $fd
  62. * @param $data
  63. * @return bool
  64. * @throws \Exception
  65. */
  66. public function pushMsg($fd, $data){
  67. if(!$fd){
  68. return false;
  69. }
  70. $data = $this->paresData($data);
  71. $data = ['type'=>'pushMsg', 'fd' => $fd, 'data'=>$data];
  72. return $this->requestServer($data);
  73. }
  74. /**
  75. * 用于从页面端实现webSocket推送消息给所有已连接的会员
  76. * @param $data
  77. * @return bool
  78. * @throws \Exception
  79. */
  80. public function pushMsgAll($data){
  81. $data = $this->paresData($data);
  82. $data = ['type'=>'pushMsgAll', 'data'=>$data];
  83. return $this->requestServer($data);
  84. }
  85. /**
  86. * 从服务端的cli直接推送消息到客户端
  87. * @param $fd
  88. * @param $data
  89. * @return bool
  90. */
  91. public function pushMsgByCli($fd, $data){
  92. if(!$fd){
  93. return false;
  94. }
  95. $data = $this->paresData($data);
  96. return $this->swooleServer->push($fd, $data);
  97. }
  98. /**
  99. * 广播发送消息
  100. * @param $data
  101. */
  102. public function pushMsgAllByCli($data){
  103. $data = $this->paresData($data);
  104. foreach($this->swooleServer->connections as $fd){
  105. $this->swooleServer->push($fd, $data);
  106. }
  107. }
  108. /**
  109. * 请求服务端
  110. * @param $data
  111. * @return bool
  112. * @throws \Exception
  113. */
  114. public function requestServer($data){
  115. $settings = Yii::$app->params['swooleAsyncTimer'];
  116. $socketSecurity = new SocketSecurity($settings);
  117. $data = $socketSecurity->paramsFormat($data);
  118. if($settings['sender_client'] == 'swoole'){
  119. try {
  120. $client = new SwooleClient();
  121. $client->setOption('host', $settings['host']);
  122. $client->setOption('port', $settings['port']);
  123. $client->setOption('timeout', $settings['client_timeout']);
  124. $client->setOption('data', Json::encode($data));
  125. $response = $client->post();
  126. } catch (\Exception $e){
  127. $response = false;
  128. }
  129. } else {
  130. $client = new SCurl();
  131. $client->setOption(CURLOPT_POSTFIELDS, $data);
  132. $client->setOption(CURLOPT_TIMEOUT, $settings['client_timeout']);
  133. $response = $client->post("http://".$settings['host'].":".$settings['port']);
  134. }
  135. if($response === false){
  136. return false;
  137. }
  138. if($response === 'false'){
  139. return false;
  140. }
  141. return true;
  142. }
  143. /**
  144. * 处理数据
  145. * @param $data
  146. * @return string
  147. */
  148. public function paresData($data){
  149. if(!is_string($data)){
  150. $data = Json::encode($data);
  151. }
  152. return $data;
  153. }
  154. /**
  155. * swoole进程服务开始时的回调
  156. * @param $server
  157. * @param $workerId
  158. */
  159. public function onWorkerStart($server, $workerId){
  160. }
  161. /**
  162. * swoole进程服务结束时的回调
  163. * @param $server
  164. * @param $workerId
  165. */
  166. public function onWorkerStop($server, $workerId){
  167. }
  168. /**
  169. * swoole进程退出时的回调
  170. * @param $server
  171. * @param $workerId
  172. */
  173. public function onWorkerExit($server, $workerId){
  174. }
  175. /**
  176. * 需继承此方法,用于定时器的回调方法
  177. * @param $timerId
  178. * @param $server
  179. */
  180. public function timerCallback($timerId, $server){
  181. }
  182. /**
  183. * 需继承此方法,用于websocket的握手记录fd
  184. * @param $fd
  185. */
  186. public function onOpen($fd){
  187. }
  188. /**
  189. * 需继承此方法,用于websocket的清除fd
  190. * @param $fd
  191. */
  192. public function onClose($fd){
  193. }
  194. /**
  195. * 需继承此方法,用于websocket的接受客户端消息
  196. * @param $fd
  197. * @param $data
  198. */
  199. public function onMessage($fd, $data){
  200. }
  201. /**
  202. * 任务运行开始
  203. * @param $server
  204. * @param $workerId
  205. * @param $action
  206. * @param $params
  207. */
  208. public function onTaskRunActionStart($server, $workerId, $action, $params){
  209. }
  210. /**
  211. * 任务运行完成
  212. * @param $server
  213. * @param $workerId
  214. * @param $action
  215. * @param $params
  216. */
  217. public function onTaskRunActionFinish($server, $workerId, $action, $params){
  218. }
  219. /**
  220. * 任务运行发生错误时
  221. * @param $fd
  222. * @param $data
  223. * @param $action
  224. * @param $params
  225. * @param $errorMessage
  226. */
  227. public function onTaskRunActionError($fd, $data, $action, $params, $errorMessage){
  228. }
  229. }