setting = $setting; $this->app = $app; $this->_swooleController = $swooleController; $this->ipWhiteList = $this->setting['ipWhiteList']; } /** * 设置swoole进程名称 * @param string $name swoole进程名称 */ private function setProcessName($name) { if (function_exists('cli_set_process_title')) { @cli_set_process_title($name); } else { if (function_exists('swoole_set_process_name')) { @swoole_set_process_name($name); } else { trigger_error(__METHOD__ . " failed.require cli_set_process_title or swoole_set_process_name."); } } } /** * 运行服务 * @return boolean */ public function run() { $this->server = new \swoole_server($this->setting['host'], $this->setting['port']); $this->server->set($this->setting); //回调函数 $call = [ 'start', 'workerStart', 'managerStart', 'open', 'message', 'receive', 'request', 'task', 'finish', 'close', 'workerStop', 'shutdown', ]; //事件回调函数绑定 foreach ($call as $v) { $m = 'on' . ucfirst($v); if (method_exists($this, $m)) { $this->server->on($v, [$this, $m]); } } echo "服务成功启动" . PHP_EOL; echo "服务运行名称:{$this->setting['process_name']}" . PHP_EOL; echo "服务运行端口:{$this->setting['host']}:{$this->setting['port']}" . PHP_EOL; return $this->server->start(); } /** * 关闭连接 * @param $fd */ protected function close($fd) { $this->server->close($fd); unset($this->_buffer[$fd], $this->_headers[$fd]); } /** * [onStart description] * @param [type] $server [description] * @return boolean */ public function onStart($server) { echo '[' . date('Y-m-d H:i:s') . "]\t swoole_server master worker start\n"; $this->setProcessName($server->setting['process_name'] . '-master'); //记录进程id,脚本实现自动重启 $pid = "{$this->server->master_pid}\n{$this->server->manager_pid}"; file_put_contents($this->setting['pidfile'], $pid); return true; } /** * [onManagerStart description] * @param [type] $server [description] */ public function onManagerStart($server) { echo '[' . date('Y-m-d H:i:s') . "]\t swoole_server manager worker start\n"; $this->setProcessName($server->setting['process_name'] . '-manager'); } /** * [onOpen description] * @param $server * @param $request */ public function onOpen($server, $request) { } public function onTask($server){ } /** * [onShutdown description] */ public function onClose($server, $fd) { unset($this->_buffer[$fd]); //echo '[' . date('Y-m-d H:i:s') . "]\t swoole_server shutdown\n"; } /** * [onWorkerStart description] * @param [type] $server [description] * @param [type] $workerId [description] */ public function onWorkerStart($server, $workerId) { if ($workerId >= $this->setting['worker_num']) { $this->setProcessName($server->setting['process_name'] . '-task'); } else { $this->setProcessName($server->setting['process_name'] . '-event'); } } /** * [onWorkerStop description] * @param [type] $server [description] * @param [type] $workerId [description] */ public function onWorkerStop($server, $workerId) { echo '[' . date('Y-m-d H:i:s') . "]\t swoole_server[{$server->setting['process_name']} worker:{$workerId} shutdown\n"; } /** * 处理请求 * @param $server * @param $fd * @param $from_id * @param $data * @return bool */ public function onReceive($server, $fd, $from_id, $data) { if (!isset($this->_buffer[$fd]) or $this->_buffer[$fd] === '') { //超过buffer区的最大长度了 if (count($this->_buffer) >= $this->buffer_maxlen) { $n = 0; foreach ($this->_buffer as $k => $v) { $this->close($k); $n++; //清理完毕 if ($n >= $this->buffer_clear_num) { break; } } $this->logger("clear $n buffer"); } //解析包头 $header = unpack(self::HEADER_STRUCT, substr($data, 0, self::HEADER_SIZE)); //错误的包头 if ($header === false) { $this->close($fd); } $header['fd'] = $fd; $this->_headers[$fd] = $header; //长度错误 if ($header['length'] - self::HEADER_SIZE > $this->packet_maxlen or strlen($data) > $this->packet_maxlen) { return $this->sendErrorMessage($fd, self::ERR_TOOBIG, '数据长度错误'); } //加入缓存区 $this->_buffer[$fd] = substr($data, self::HEADER_SIZE); } else { $this->_buffer[$fd] .= $data; } //长度不足 // if (strlen($this->_buffer[$fd]) < $this->_headers[$fd]['length']) { // return $this->sendErrorMessage($fd, self::ERR_TOOBIG); // } //数据解包 $request = self::decode($this->_buffer[$fd], self::DECODE_PHP); if ($request === false) { $this->sendErrorMessage($fd, self::ERR_UNPACK, '数据解包失败'); } //执行远程调用 else { //当前请求的头 self::$requestHeader = $_header = $this->_headers[$fd]; //调用端环境变量 if (!empty($request['env'])) { self::$clientEnv = $request['env']; } //socket信息 self::$clientEnv['_socket'] = $this->server->connection_info($_header['fd']); $response = $this->call($request, $_header); //发送响应 $ret = $this->server->send($fd, self::encode($response, self::DECODE_JSON, $_header['uid'], $_header['serid'])); if ($ret === false) { trigger_error("SendToClient failed. code=" . $this->server->getLastError() . " params=" . var_export($request, true) . "\nheaders=" . var_export($_header, true), E_USER_WARNING); } //退出进程 if (self::$stop) { exit(0); } } //清理缓存 unset($this->_buffer[$fd], $this->_headers[$fd]); return true; } /** * 关闭服务 */ public function onShutdown() { echo '[' . date('Y-m-d H:i:s') . "]\t server shutdown 关闭服务完成\n"; unlink($this->setting['pidfile']); } /** * 记录日志 日志文件名为当前年月(date("Y-m")) * @param string|array|object $msg 日志内容 * @param string $logfile */ public function logger($msg, $logfile = '') { if (empty($msg)) { return; } if (!$this->setting['debug']) { return; } if (!is_string($msg)) { if (is_object($msg) || is_array($msg)) { $msg = var_export($msg, true); } else { $msg = '未知错误'; } } //日志内容 $msg = '[' . date('Y-m-d H:i:s') . '] ' . $msg . PHP_EOL; //日志文件大小 $maxSize = $this->setting['log_size']; //日志文件位置 $file = $logfile ?: $this->setting['log_dir'] . "/" . date('Y-m') . ".log"; //切割日志 if (file_exists($file) && filesize($file) >= $maxSize) { $bak = $file . '-' . time(); if (!rename($file, $bak)) { error_log("rename file:{$file} to {$bak} failed", 3, $file); } } error_log($msg, 3, $file); } /** * 发送错误消息 * @param $fd * @param $errno * @param string $msg * @return bool */ public function sendErrorMessage($fd, $errno, $msg = '') { //return $this->server->send($fd, self::encode(['errno' => $errno], $this->_headers[$fd]['type'])); return $this->server->send($fd, self::encode(['errno' => $errno, 'msg' => $msg], self::DECODE_JSON)); } /** * 打包数据 * @param $data * @param int $type * @param int $uid * @param int $serid * @return string */ public static function encode($data, $type = self::DECODE_JSON, $uid = 0, $serid = 0) { $_type = $type; switch ($_type) { case self::DECODE_JSON: $body = Json::encode($data); break; case self::DECODE_SWOOLE: $body = \swoole_serialize::pack($data); break; case self::DECODE_PHP: default: $body = serialize($data); break; } if(self::IS_COMPRESS && ($_type == self::DECODE_PHP)){ $body = gzcompress($body, -1); } return pack(RPCServer::HEADER_PACK, strlen($body), $type, $uid, $serid) . $body; } /** * 解包数据 * @param string $data * @param int $unseralize_type * @return string */ public static function decode($data, $unseralize_type = self::DECODE_JSON) { if($data && ($unseralize_type == self::DECODE_PHP)){ $data = gzuncompress($data); } switch ($unseralize_type) { case self::DECODE_JSON: $data = Json::decode($data); break; case self::DECODE_SWOOLE: $data = \swoole_serialize::unpack($data); break; case self::DECODE_PHP; default: $data = unserialize($data); } return $data; } /** * 验证IP * @param $ip * @return bool */ protected function verifyIp($ip) { return empty($this->ipWhiteList) ? true : isset($this->ipWhiteList[$ip]); } /** * 验证用户名密码 * @param $user * @param $password * @return bool */ protected function verifyUser($user, $password) { if (!isset($this->userList[$user])) { return false; } if ($this->userList[$user] != $password) { return false; } return true; } /** * 调用远程函数 * @param $request * @param $header * @return array */ protected function call($request, $header) { if (empty($request['call'])) { return array('errno' => self::ERR_PARAMS, 'msg'=>'远程调用函数参数错误'); } // 侦测服务器是否存活 if ($request['call'] === 'PING') { return array('errno' => 0, 'data' => 'PONG'); } //验证客户端IP是否被允许访问 if ($this->verifyIp) { if (!$this->verifyIp(self::$clientEnv['_socket']['remote_ip'])) { return array('errno' => self::ERR_ACCESS_DENY, 'msg'=>'访问被拒绝,客户端主机未被授权'); } } //验证密码是否正确 if ($this->verifyUser) { if (empty(self::$clientEnv['user']) or empty(self::$clientEnv['password'])) { fail: return array('errno' => self::ERR_USER, 'msg'=>'用户名密码错误'); } if (!$this->verifyUser(self::$clientEnv['user'], self::$clientEnv['password'])) { goto fail; } } //函数不存在 if (!is_callable($request['call'])) { return array('errno' => self::ERR_NOFUNC, 'msg' => '函数不存在'); } //前置方法 if (method_exists($this, 'beforeRequest')) { $this->beforeRequest($request); } //调用接口方法 try{ $ret = call_user_func_array($request['call'], $request['params']); } catch (Exception $e){ return array('errno' => self::ERR_API, 'msg' => $e->getMessage()); } //后置方法 if (method_exists($this, 'afterRequest')) { $this->afterRequest($ret); } //禁止接口返回NULL,客户端得到NULL时认为RPC调用失败 if ($ret === NULL) { return array('errno' => self::ERR_CALL, 'msg' => '禁止接口返回NULL'); } return array('errno' => 0, 'data' => $ret); } /** * 添加访问规则 * @param $ip * @throws Exception */ public function addAllowIP($ip) { $ipValidator = new IpValidator(); if ($ipValidator->validate($ip)) { $this->ipWhiteList[$ip] = true; $this->verifyIp = true; } else { throw new Exception("require ip address."); } } /** * 添加用户许可 * @param $user * @param $password */ public function addAllowUser($user, $password) { $this->userList[$user] = $password; $this->verifyUser = true; } }