RPCServer.php 16 KB


  1. <?php
  2. /**
  3. * Swoole 实现的 server,用来处理Rpc任务
  4. * $Author: Leo $
  5. */
  6. namespace common\libs\swoole;
  7. use yii\base\Exception;
  8. use yii\console\Application;
  9. use yii\helpers\Json;
  10. use yii\validators\IpValidator;
  11. class RPCServer
  12. {
  13. protected $_buffer = array(); //buffer区
  14. protected $_headers = array(); //保存头
  15. public static $clientEnv; // 客户端环境变量 array
  16. public static $stop = false;
  17. public static $requestHeader; // 请求头 array
  18. public $packet_maxlen = 5465792000; //2M默认最大长度
  19. protected $buffer_maxlen = 1024000; //最大待处理区排队长度, 超过后将丢弃最早入队数据
  20. protected $buffer_clear_num = 128; //超过最大长度后,清理100个数据
  21. const ERR_HEADER = 9001; //错误的包头
  22. const ERR_TOOBIG = 9002; //请求包体长度超过允许的范围
  23. const ERR_SERVER_BUSY = 9003; //服务器繁忙,超过处理能力
  24. const ERR_UNPACK = 9204; //解包失败
  25. const ERR_PARAMS = 9205; //参数错误
  26. const ERR_NOFUNC = 9206; //函数不存在
  27. const ERR_CALL = 9207; //执行错误
  28. const ERR_ACCESS_DENY = 9208; //访问被拒绝,客户端主机未被授权
  29. const ERR_USER = 9209; //用户名密码错误
  30. const ERR_API = 9210; //调用API错误
  31. const HEADER_SIZE = 16;
  32. const HEADER_STRUCT = "Nlength/Ntype/Nuid/Nserid";
  33. const HEADER_PACK = "NNNN";
  34. const DECODE_PHP = 1; //使用PHP的serialize打包
  35. const DECODE_JSON = 2; //使用json_encode打包
  36. const DECODE_MSGPACK = 3; //使用msgpack打包
  37. const DECODE_SWOOLE = 4; //使用swoole_serialize打包
  38. const DECODE_GZIP = 128; //启用GZIP压缩
  39. const IS_COMPRESS = true; // 启用gzcompress压缩数据
  40. const ALLOW_IP = 1;
  41. const ALLOW_USER = 2;
  42. protected $appNamespaces = array(); //应用程序命名空间
  43. protected $ipWhiteList = array(); //IP白名单
  44. protected $userList = array(); //用户列表
  45. protected $verifyIp = true;
  46. protected $verifyUser = false;
  47. /**
  48. * swoole server 实例
  49. * @var null|\Swoole\Server
  50. */
  51. protected $server = null;
  52. /**
  53. * swoole 配置
  54. * @var array
  55. */
  56. private $setting = [];
  57. /**
  58. * Yii::$app 对象
  59. * @var Application
  60. */
  61. private $app = null;
  62. private $_swooleController;
  63. /**
  64. * SServer constructor.
  65. * @param $setting
  66. * @param $app
  67. * @param $swooleController
  68. */
  69. public function __construct($setting, $app, &$swooleController)
  70. {
  71. $this->setting = $setting;
  72. $this->app = $app;
  73. $this->_swooleController = $swooleController;
  74. $this->ipWhiteList = $this->setting['ipWhiteList'];
  75. }
  76. /**
  77. * 设置swoole进程名称
  78. * @param string $name swoole进程名称
  79. */
  80. private function setProcessName($name)
  81. {
  82. if (function_exists('cli_set_process_title')) {
  83. @cli_set_process_title($name);
  84. } else {
  85. if (function_exists('swoole_set_process_name')) {
  86. @swoole_set_process_name($name);
  87. } else {
  88. trigger_error(__METHOD__ . " failed.require cli_set_process_title or swoole_set_process_name.");
  89. }
  90. }
  91. }
  92. /**
  93. * 运行服务
  94. * @return boolean
  95. */
  96. public function run()
  97. {
  98. $this->server = new \swoole_server($this->setting['host'], $this->setting['port']);
  99. $this->server->set($this->setting);
  100. //回调函数
  101. $call = [
  102. 'start',
  103. 'workerStart',
  104. 'managerStart',
  105. 'open',
  106. 'message',
  107. 'receive',
  108. 'request',
  109. 'task',
  110. 'finish',
  111. 'close',
  112. 'workerStop',
  113. 'shutdown',
  114. ];
  115. //事件回调函数绑定
  116. foreach ($call as $v) {
  117. $m = 'on' . ucfirst($v);
  118. if (method_exists($this, $m)) {
  119. $this->server->on($v, [$this, $m]);
  120. }
  121. }
  122. echo "服务成功启动" . PHP_EOL;
  123. echo "服务运行名称:{$this->setting['process_name']}" . PHP_EOL;
  124. echo "服务运行端口:{$this->setting['host']}:{$this->setting['port']}" . PHP_EOL;
  125. return $this->server->start();
  126. }
  127. /**
  128. * 关闭连接
  129. * @param $fd
  130. */
  131. protected function close($fd)
  132. {
  133. $this->server->close($fd);
  134. unset($this->_buffer[$fd], $this->_headers[$fd]);
  135. }
  136. /**
  137. * [onStart description]
  138. * @param [type] $server [description]
  139. * @return boolean
  140. */
  141. public function onStart($server)
  142. {
  143. echo '[' . date('Y-m-d H:i:s') . "]\t swoole_server master worker start\n";
  144. $this->setProcessName($server->setting['process_name'] . '-master');
  145. //记录进程id,脚本实现自动重启
  146. $pid = "{$this->server->master_pid}\n{$this->server->manager_pid}";
  147. file_put_contents($this->setting['pidfile'], $pid);
  148. return true;
  149. }
  150. /**
  151. * [onManagerStart description]
  152. * @param [type] $server [description]
  153. */
  154. public function onManagerStart($server)
  155. {
  156. echo '[' . date('Y-m-d H:i:s') . "]\t swoole_server manager worker start\n";
  157. $this->setProcessName($server->setting['process_name'] . '-manager');
  158. }
  159. /**
  160. * [onOpen description]
  161. * @param $server
  162. * @param $request
  163. */
  164. public function onOpen($server, $request)
  165. {
  166. }
  167. public function onTask($server){
  168. }
  169. /**
  170. * [onShutdown description]
  171. */
  172. public function onClose($server, $fd)
  173. {
  174. unset($this->_buffer[$fd]);
  175. //echo '[' . date('Y-m-d H:i:s') . "]\t swoole_server shutdown\n";
  176. }
  177. /**
  178. * [onWorkerStart description]
  179. * @param [type] $server [description]
  180. * @param [type] $workerId [description]
  181. */
  182. public function onWorkerStart($server, $workerId)
  183. {
  184. if ($workerId >= $this->setting['worker_num']) {
  185. $this->setProcessName($server->setting['process_name'] . '-task');
  186. } else {
  187. $this->setProcessName($server->setting['process_name'] . '-event');
  188. }
  189. }
  190. /**
  191. * [onWorkerStop description]
  192. * @param [type] $server [description]
  193. * @param [type] $workerId [description]
  194. */
  195. public function onWorkerStop($server, $workerId)
  196. {
  197. echo '[' . date('Y-m-d H:i:s') . "]\t swoole_server[{$server->setting['process_name']} worker:{$workerId} shutdown\n";
  198. }
  199. /**
  200. * 处理请求
  201. * @param $server
  202. * @param $fd
  203. * @param $from_id
  204. * @param $data
  205. * @return bool
  206. */
  207. public function onReceive($server, $fd, $from_id, $data)
  208. {
  209. if (!isset($this->_buffer[$fd]) or $this->_buffer[$fd] === '') {
  210. //超过buffer区的最大长度了
  211. if (count($this->_buffer) >= $this->buffer_maxlen) {
  212. $n = 0;
  213. foreach ($this->_buffer as $k => $v) {
  214. $this->close($k);
  215. $n++;
  216. //清理完毕
  217. if ($n >= $this->buffer_clear_num) {
  218. break;
  219. }
  220. }
  221. $this->logger("clear $n buffer");
  222. }
  223. //解析包头
  224. $header = unpack(self::HEADER_STRUCT, substr($data, 0, self::HEADER_SIZE));
  225. //错误的包头
  226. if ($header === false) {
  227. $this->close($fd);
  228. }
  229. $header['fd'] = $fd;
  230. $this->_headers[$fd] = $header;
  231. //长度错误
  232. if ($header['length'] - self::HEADER_SIZE > $this->packet_maxlen or strlen($data) > $this->packet_maxlen) {
  233. return $this->sendErrorMessage($fd, self::ERR_TOOBIG, '数据长度错误');
  234. }
  235. //加入缓存区
  236. $this->_buffer[$fd] = substr($data, self::HEADER_SIZE);
  237. } else {
  238. $this->_buffer[$fd] .= $data;
  239. }
  240. //长度不足
  241. // if (strlen($this->_buffer[$fd]) < $this->_headers[$fd]['length']) {
  242. // return $this->sendErrorMessage($fd, self::ERR_TOOBIG);
  243. // }
  244. //数据解包
  245. $request = self::decode($this->_buffer[$fd], self::DECODE_PHP);
  246. if ($request === false) {
  247. $this->sendErrorMessage($fd, self::ERR_UNPACK, '数据解包失败');
  248. } //执行远程调用
  249. else {
  250. //当前请求的头
  251. self::$requestHeader = $_header = $this->_headers[$fd];
  252. //调用端环境变量
  253. if (!empty($request['env'])) {
  254. self::$clientEnv = $request['env'];
  255. }
  256. //socket信息
  257. self::$clientEnv['_socket'] = $this->server->connection_info($_header['fd']);
  258. $response = $this->call($request, $_header);
  259. //发送响应
  260. $ret = $this->server->send($fd, self::encode($response, self::DECODE_JSON, $_header['uid'], $_header['serid']));
  261. if ($ret === false) {
  262. trigger_error("SendToClient failed. code=" . $this->server->getLastError() . " params=" . var_export($request, true) . "\nheaders=" . var_export($_header, true), E_USER_WARNING);
  263. }
  264. //退出进程
  265. if (self::$stop) {
  266. exit(0);
  267. }
  268. }
  269. //清理缓存
  270. unset($this->_buffer[$fd], $this->_headers[$fd]);
  271. return true;
  272. }
  273. /**
  274. * 关闭服务
  275. */
  276. public function onShutdown()
  277. {
  278. echo '[' . date('Y-m-d H:i:s') . "]\t server shutdown 关闭服务完成\n";
  279. unlink($this->setting['pidfile']);
  280. }
  281. /**
  282. * 记录日志 日志文件名为当前年月(date("Y-m"))
  283. * @param string|array|object $msg 日志内容
  284. * @param string $logfile
  285. */
  286. public function logger($msg, $logfile = '')
  287. {
  288. if (empty($msg)) {
  289. return;
  290. }
  291. if (!$this->setting['debug']) {
  292. return;
  293. }
  294. if (!is_string($msg)) {
  295. if (is_object($msg) || is_array($msg)) {
  296. $msg = var_export($msg, true);
  297. } else {
  298. $msg = '未知错误';
  299. }
  300. }
  301. //日志内容
  302. $msg = '[' . date('Y-m-d H:i:s') . '] ' . $msg . PHP_EOL;
  303. //日志文件大小
  304. $maxSize = $this->setting['log_size'];
  305. //日志文件位置
  306. $file = $logfile ?: $this->setting['log_dir'] . "/" . date('Y-m') . ".log";
  307. //切割日志
  308. if (file_exists($file) && filesize($file) >= $maxSize) {
  309. $bak = $file . '-' . time();
  310. if (!rename($file, $bak)) {
  311. error_log("rename file:{$file} to {$bak} failed", 3, $file);
  312. }
  313. }
  314. error_log($msg, 3, $file);
  315. }
  316. /**
  317. * 发送错误消息
  318. * @param $fd
  319. * @param $errno
  320. * @param string $msg
  321. * @return bool
  322. */
  323. public function sendErrorMessage($fd, $errno, $msg = '')
  324. {
  325. //return $this->server->send($fd, self::encode(['errno' => $errno], $this->_headers[$fd]['type']));
  326. return $this->server->send($fd, self::encode(['errno' => $errno, 'msg' => $msg], self::DECODE_JSON));
  327. }
  328. /**
  329. * 打包数据
  330. * @param $data
  331. * @param int $type
  332. * @param int $uid
  333. * @param int $serid
  334. * @return string
  335. */
  336. public static function encode($data, $type = self::DECODE_JSON, $uid = 0, $serid = 0)
  337. {
  338. $_type = $type;
  339. switch ($_type) {
  340. case self::DECODE_JSON:
  341. $body = Json::encode($data);
  342. break;
  343. case self::DECODE_SWOOLE:
  344. $body = \swoole_serialize::pack($data);
  345. break;
  346. case self::DECODE_PHP:
  347. default:
  348. $body = serialize($data);
  349. break;
  350. }
  351. if(self::IS_COMPRESS && ($_type == self::DECODE_PHP)){
  352. $body = gzcompress($body, -1);
  353. }
  354. return pack(RPCServer::HEADER_PACK, strlen($body), $type, $uid, $serid) . $body;
  355. }
  356. /**
  357. * 解包数据
  358. * @param string $data
  359. * @param int $unseralize_type
  360. * @return string
  361. */
  362. public static function decode($data, $unseralize_type = self::DECODE_JSON)
  363. {
  364. if($data && ($unseralize_type == self::DECODE_PHP)){
  365. $data = gzuncompress($data);
  366. }
  367. switch ($unseralize_type) {
  368. case self::DECODE_JSON:
  369. $data = Json::decode($data);
  370. break;
  371. case self::DECODE_SWOOLE:
  372. $data = \swoole_serialize::unpack($data);
  373. break;
  374. case self::DECODE_PHP;
  375. default:
  376. $data = unserialize($data);
  377. }
  378. return $data;
  379. }
  380. /**
  381. * 验证IP
  382. * @param $ip
  383. * @return bool
  384. */
  385. protected function verifyIp($ip)
  386. {
  387. return empty($this->ipWhiteList) ? true : isset($this->ipWhiteList[$ip]);
  388. }
  389. /**
  390. * 验证用户名密码
  391. * @param $user
  392. * @param $password
  393. * @return bool
  394. */
  395. protected function verifyUser($user, $password)
  396. {
  397. if (!isset($this->userList[$user])) {
  398. return false;
  399. }
  400. if ($this->userList[$user] != $password) {
  401. return false;
  402. }
  403. return true;
  404. }
  405. /**
  406. * 调用远程函数
  407. * @param $request
  408. * @param $header
  409. * @return array
  410. */
  411. protected function call($request, $header)
  412. {
  413. if (empty($request['call'])) {
  414. return array('errno' => self::ERR_PARAMS, 'msg'=>'远程调用函数参数错误');
  415. }
  416. // 侦测服务器是否存活
  417. if ($request['call'] === 'PING') {
  418. return array('errno' => 0, 'data' => 'PONG');
  419. }
  420. //验证客户端IP是否被允许访问
  421. if ($this->verifyIp) {
  422. if (!$this->verifyIp(self::$clientEnv['_socket']['remote_ip'])) {
  423. return array('errno' => self::ERR_ACCESS_DENY, 'msg'=>'访问被拒绝,客户端主机未被授权');
  424. }
  425. }
  426. //验证密码是否正确
  427. if ($this->verifyUser) {
  428. if (empty(self::$clientEnv['user']) or empty(self::$clientEnv['password'])) {
  429. fail:
  430. return array('errno' => self::ERR_USER, 'msg'=>'用户名密码错误');
  431. }
  432. if (!$this->verifyUser(self::$clientEnv['user'], self::$clientEnv['password'])) {
  433. goto fail;
  434. }
  435. }
  436. //函数不存在
  437. if (!is_callable($request['call'])) {
  438. return array('errno' => self::ERR_NOFUNC, 'msg' => '函数不存在');
  439. }
  440. //前置方法
  441. if (method_exists($this, 'beforeRequest')) {
  442. $this->beforeRequest($request);
  443. }
  444. //调用接口方法
  445. try{
  446. $ret = call_user_func_array($request['call'], $request['params']);
  447. } catch (Exception $e){
  448. return array('errno' => self::ERR_API, 'msg' => $e->getMessage());
  449. }
  450. //后置方法
  451. if (method_exists($this, 'afterRequest')) {
  452. $this->afterRequest($ret);
  453. }
  454. //禁止接口返回NULL,客户端得到NULL时认为RPC调用失败
  455. if ($ret === NULL) {
  456. return array('errno' => self::ERR_CALL, 'msg' => '禁止接口返回NULL');
  457. }
  458. return array('errno' => 0, 'data' => $ret);
  459. }
  460. /**
  461. * 添加访问规则
  462. * @param $ip
  463. * @throws Exception
  464. */
  465. public function addAllowIP($ip)
  466. {
  467. $ipValidator = new IpValidator();
  468. if ($ipValidator->validate($ip)) {
  469. $this->ipWhiteList[$ip] = true;
  470. $this->verifyIp = true;
  471. } else {
  472. throw new Exception("require ip address.");
  473. }
  474. }
  475. /**
  476. * 添加用户许可
  477. * @param $user
  478. * @param $password
  479. */
  480. public function addAllowUser($user, $password)
  481. {
  482. $this->userList[$user] = $password;
  483. $this->verifyUser = true;
  484. }
  485. }