| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530 |
- <?php
- /**
- * Swoole 实现的 server,用来处理Rpc任务
- * $Author: Leo $
- */
- namespace common\libs\swoole;
- use yii\base\Exception;
- use yii\console\Application;
- use yii\helpers\Json;
- use yii\validators\IpValidator;
- class RPCServer
- {
- protected $_buffer = array(); //buffer区
- protected $_headers = array(); //保存头
- public static $clientEnv; // 客户端环境变量 array
- public static $stop = false;
- public static $requestHeader; // 请求头 array
- public $packet_maxlen = 5465792000; //2M默认最大长度
- protected $buffer_maxlen = 1024000; //最大待处理区排队长度, 超过后将丢弃最早入队数据
- protected $buffer_clear_num = 128; //超过最大长度后,清理100个数据
- const ERR_HEADER = 9001; //错误的包头
- const ERR_TOOBIG = 9002; //请求包体长度超过允许的范围
- const ERR_SERVER_BUSY = 9003; //服务器繁忙,超过处理能力
- const ERR_UNPACK = 9204; //解包失败
- const ERR_PARAMS = 9205; //参数错误
- const ERR_NOFUNC = 9206; //函数不存在
- const ERR_CALL = 9207; //执行错误
- const ERR_ACCESS_DENY = 9208; //访问被拒绝,客户端主机未被授权
- const ERR_USER = 9209; //用户名密码错误
- const ERR_API = 9210; //调用API错误
- const HEADER_SIZE = 16;
- const HEADER_STRUCT = "Nlength/Ntype/Nuid/Nserid";
- const HEADER_PACK = "NNNN";
- const DECODE_PHP = 1; //使用PHP的serialize打包
- const DECODE_JSON = 2; //使用json_encode打包
- const DECODE_MSGPACK = 3; //使用msgpack打包
- const DECODE_SWOOLE = 4; //使用swoole_serialize打包
- const DECODE_GZIP = 128; //启用GZIP压缩
- const IS_COMPRESS = true; // 启用gzcompress压缩数据
- const ALLOW_IP = 1;
- const ALLOW_USER = 2;
- protected $appNamespaces = array(); //应用程序命名空间
- protected $ipWhiteList = array(); //IP白名单
- protected $userList = array(); //用户列表
- protected $verifyIp = true;
- protected $verifyUser = false;
- /**
- * swoole server 实例
- * @var null|\Swoole\Server
- */
- protected $server = null;
- /**
- * swoole 配置
- * @var array
- */
- private $setting = [];
- /**
- * Yii::$app 对象
- * @var Application
- */
- private $app = null;
- private $_swooleController;
- /**
- * SServer constructor.
- * @param $setting
- * @param $app
- * @param $swooleController
- */
- public function __construct($setting, $app, &$swooleController)
- {
- $this->setting = $setting;
- $this->app = $app;
- $this->_swooleController = $swooleController;
- $this->ipWhiteList = $this->setting['ipWhiteList'];
- }
- /**
- * 设置swoole进程名称
- * @param string $name swoole进程名称
- */
- private function setProcessName($name)
- {
- if (function_exists('cli_set_process_title')) {
- @cli_set_process_title($name);
- } else {
- if (function_exists('swoole_set_process_name')) {
- @swoole_set_process_name($name);
- } else {
- trigger_error(__METHOD__ . " failed.require cli_set_process_title or swoole_set_process_name.");
- }
- }
- }
- /**
- * 运行服务
- * @return boolean
- */
- public function run()
- {
- $this->server = new \swoole_server($this->setting['host'], $this->setting['port']);
- $this->server->set($this->setting);
- //回调函数
- $call = [
- 'start',
- 'workerStart',
- 'managerStart',
- 'open',
- 'message',
- 'receive',
- 'request',
- 'task',
- 'finish',
- 'close',
- 'workerStop',
- 'shutdown',
- ];
- //事件回调函数绑定
- foreach ($call as $v) {
- $m = 'on' . ucfirst($v);
- if (method_exists($this, $m)) {
- $this->server->on($v, [$this, $m]);
- }
- }
- echo "服务成功启动" . PHP_EOL;
- echo "服务运行名称:{$this->setting['process_name']}" . PHP_EOL;
- echo "服务运行端口:{$this->setting['host']}:{$this->setting['port']}" . PHP_EOL;
- return $this->server->start();
- }
- /**
- * 关闭连接
- * @param $fd
- */
- protected function close($fd)
- {
- $this->server->close($fd);
- unset($this->_buffer[$fd], $this->_headers[$fd]);
- }
- /**
- * [onStart description]
- * @param [type] $server [description]
- * @return boolean
- */
- public function onStart($server)
- {
- echo '[' . date('Y-m-d H:i:s') . "]\t swoole_server master worker start\n";
- $this->setProcessName($server->setting['process_name'] . '-master');
- //记录进程id,脚本实现自动重启
- $pid = "{$this->server->master_pid}\n{$this->server->manager_pid}";
- file_put_contents($this->setting['pidfile'], $pid);
- return true;
- }
- /**
- * [onManagerStart description]
- * @param [type] $server [description]
- */
- public function onManagerStart($server)
- {
- echo '[' . date('Y-m-d H:i:s') . "]\t swoole_server manager worker start\n";
- $this->setProcessName($server->setting['process_name'] . '-manager');
- }
- /**
- * [onOpen description]
- * @param $server
- * @param $request
- */
- public function onOpen($server, $request)
- {
- }
- public function onTask($server){
- }
- /**
- * [onShutdown description]
- */
- public function onClose($server, $fd)
- {
- unset($this->_buffer[$fd]);
- //echo '[' . date('Y-m-d H:i:s') . "]\t swoole_server shutdown\n";
- }
- /**
- * [onWorkerStart description]
- * @param [type] $server [description]
- * @param [type] $workerId [description]
- */
- public function onWorkerStart($server, $workerId)
- {
- if ($workerId >= $this->setting['worker_num']) {
- $this->setProcessName($server->setting['process_name'] . '-task');
- } else {
- $this->setProcessName($server->setting['process_name'] . '-event');
- }
- }
- /**
- * [onWorkerStop description]
- * @param [type] $server [description]
- * @param [type] $workerId [description]
- */
- public function onWorkerStop($server, $workerId)
- {
- echo '[' . date('Y-m-d H:i:s') . "]\t swoole_server[{$server->setting['process_name']} worker:{$workerId} shutdown\n";
- }
- /**
- * 处理请求
- * @param $server
- * @param $fd
- * @param $from_id
- * @param $data
- * @return bool
- */
- public function onReceive($server, $fd, $from_id, $data)
- {
- if (!isset($this->_buffer[$fd]) or $this->_buffer[$fd] === '') {
- //超过buffer区的最大长度了
- if (count($this->_buffer) >= $this->buffer_maxlen) {
- $n = 0;
- foreach ($this->_buffer as $k => $v) {
- $this->close($k);
- $n++;
- //清理完毕
- if ($n >= $this->buffer_clear_num) {
- break;
- }
- }
- $this->logger("clear $n buffer");
- }
- //解析包头
- $header = unpack(self::HEADER_STRUCT, substr($data, 0, self::HEADER_SIZE));
- //错误的包头
- if ($header === false) {
- $this->close($fd);
- }
- $header['fd'] = $fd;
- $this->_headers[$fd] = $header;
- //长度错误
- if ($header['length'] - self::HEADER_SIZE > $this->packet_maxlen or strlen($data) > $this->packet_maxlen) {
- return $this->sendErrorMessage($fd, self::ERR_TOOBIG, '数据长度错误');
- }
- //加入缓存区
- $this->_buffer[$fd] = substr($data, self::HEADER_SIZE);
- } else {
- $this->_buffer[$fd] .= $data;
- }
- //长度不足
- // if (strlen($this->_buffer[$fd]) < $this->_headers[$fd]['length']) {
- // return $this->sendErrorMessage($fd, self::ERR_TOOBIG);
- // }
- //数据解包
- $request = self::decode($this->_buffer[$fd], self::DECODE_PHP);
- if ($request === false) {
- $this->sendErrorMessage($fd, self::ERR_UNPACK, '数据解包失败');
- } //执行远程调用
- else {
- //当前请求的头
- self::$requestHeader = $_header = $this->_headers[$fd];
- //调用端环境变量
- if (!empty($request['env'])) {
- self::$clientEnv = $request['env'];
- }
- //socket信息
- self::$clientEnv['_socket'] = $this->server->connection_info($_header['fd']);
- $response = $this->call($request, $_header);
- //发送响应
- $ret = $this->server->send($fd, self::encode($response, self::DECODE_JSON, $_header['uid'], $_header['serid']));
- if ($ret === false) {
- trigger_error("SendToClient failed. code=" . $this->server->getLastError() . " params=" . var_export($request, true) . "\nheaders=" . var_export($_header, true), E_USER_WARNING);
- }
- //退出进程
- if (self::$stop) {
- exit(0);
- }
- }
- //清理缓存
- unset($this->_buffer[$fd], $this->_headers[$fd]);
- return true;
- }
- /**
- * 关闭服务
- */
- public function onShutdown()
- {
- echo '[' . date('Y-m-d H:i:s') . "]\t server shutdown 关闭服务完成\n";
- unlink($this->setting['pidfile']);
- }
- /**
- * 记录日志 日志文件名为当前年月(date("Y-m"))
- * @param string|array|object $msg 日志内容
- * @param string $logfile
- */
- public function logger($msg, $logfile = '')
- {
- if (empty($msg)) {
- return;
- }
- if (!$this->setting['debug']) {
- return;
- }
- if (!is_string($msg)) {
- if (is_object($msg) || is_array($msg)) {
- $msg = var_export($msg, true);
- } else {
- $msg = '未知错误';
- }
- }
- //日志内容
- $msg = '[' . date('Y-m-d H:i:s') . '] ' . $msg . PHP_EOL;
- //日志文件大小
- $maxSize = $this->setting['log_size'];
- //日志文件位置
- $file = $logfile ?: $this->setting['log_dir'] . "/" . date('Y-m') . ".log";
- //切割日志
- if (file_exists($file) && filesize($file) >= $maxSize) {
- $bak = $file . '-' . time();
- if (!rename($file, $bak)) {
- error_log("rename file:{$file} to {$bak} failed", 3, $file);
- }
- }
- error_log($msg, 3, $file);
- }
- /**
- * 发送错误消息
- * @param $fd
- * @param $errno
- * @param string $msg
- * @return bool
- */
- public function sendErrorMessage($fd, $errno, $msg = '')
- {
- //return $this->server->send($fd, self::encode(['errno' => $errno], $this->_headers[$fd]['type']));
- return $this->server->send($fd, self::encode(['errno' => $errno, 'msg' => $msg], self::DECODE_JSON));
- }
- /**
- * 打包数据
- * @param $data
- * @param int $type
- * @param int $uid
- * @param int $serid
- * @return string
- */
- public static function encode($data, $type = self::DECODE_JSON, $uid = 0, $serid = 0)
- {
- $_type = $type;
- switch ($_type) {
- case self::DECODE_JSON:
- $body = Json::encode($data);
- break;
- case self::DECODE_SWOOLE:
- $body = \swoole_serialize::pack($data);
- break;
- case self::DECODE_PHP:
- default:
- $body = serialize($data);
- break;
- }
- if(self::IS_COMPRESS && ($_type == self::DECODE_PHP)){
- $body = gzcompress($body, -1);
- }
- return pack(RPCServer::HEADER_PACK, strlen($body), $type, $uid, $serid) . $body;
- }
- /**
- * 解包数据
- * @param string $data
- * @param int $unseralize_type
- * @return string
- */
- public static function decode($data, $unseralize_type = self::DECODE_JSON)
- {
- if($data && ($unseralize_type == self::DECODE_PHP)){
- $data = gzuncompress($data);
- }
- switch ($unseralize_type) {
- case self::DECODE_JSON:
- $data = Json::decode($data);
- break;
- case self::DECODE_SWOOLE:
- $data = \swoole_serialize::unpack($data);
- break;
- case self::DECODE_PHP;
- default:
- $data = unserialize($data);
- }
- return $data;
- }
- /**
- * 验证IP
- * @param $ip
- * @return bool
- */
- protected function verifyIp($ip)
- {
- return empty($this->ipWhiteList) ? true : isset($this->ipWhiteList[$ip]);
- }
- /**
- * 验证用户名密码
- * @param $user
- * @param $password
- * @return bool
- */
- protected function verifyUser($user, $password)
- {
- if (!isset($this->userList[$user])) {
- return false;
- }
- if ($this->userList[$user] != $password) {
- return false;
- }
- return true;
- }
- /**
- * 调用远程函数
- * @param $request
- * @param $header
- * @return array
- */
- protected function call($request, $header)
- {
- if (empty($request['call'])) {
- return array('errno' => self::ERR_PARAMS, 'msg'=>'远程调用函数参数错误');
- }
- // 侦测服务器是否存活
- if ($request['call'] === 'PING') {
- return array('errno' => 0, 'data' => 'PONG');
- }
- //验证客户端IP是否被允许访问
- if ($this->verifyIp) {
- if (!$this->verifyIp(self::$clientEnv['_socket']['remote_ip'])) {
- return array('errno' => self::ERR_ACCESS_DENY, 'msg'=>'访问被拒绝,客户端主机未被授权');
- }
- }
- //验证密码是否正确
- if ($this->verifyUser) {
- if (empty(self::$clientEnv['user']) or empty(self::$clientEnv['password'])) {
- fail:
- return array('errno' => self::ERR_USER, 'msg'=>'用户名密码错误');
- }
- if (!$this->verifyUser(self::$clientEnv['user'], self::$clientEnv['password'])) {
- goto fail;
- }
- }
- //函数不存在
- if (!is_callable($request['call'])) {
- return array('errno' => self::ERR_NOFUNC, 'msg' => '函数不存在');
- }
- //前置方法
- if (method_exists($this, 'beforeRequest')) {
- $this->beforeRequest($request);
- }
- //调用接口方法
- try{
- $ret = call_user_func_array($request['call'], $request['params']);
- } catch (Exception $e){
- return array('errno' => self::ERR_API, 'msg' => $e->getMessage());
- }
- //后置方法
- if (method_exists($this, 'afterRequest')) {
- $this->afterRequest($ret);
- }
- //禁止接口返回NULL,客户端得到NULL时认为RPC调用失败
- if ($ret === NULL) {
- return array('errno' => self::ERR_CALL, 'msg' => '禁止接口返回NULL');
- }
- return array('errno' => 0, 'data' => $ret);
- }
- /**
- * 添加访问规则
- * @param $ip
- * @throws Exception
- */
- public function addAllowIP($ip)
- {
- $ipValidator = new IpValidator();
- if ($ipValidator->validate($ip)) {
- $this->ipWhiteList[$ip] = true;
- $this->verifyIp = true;
- } else {
- throw new Exception("require ip address.");
- }
- }
- /**
- * 添加用户许可
- * @param $user
- * @param $password
- */
- public function addAllowUser($user, $password)
- {
- $this->userList[$user] = $password;
- $this->verifyUser = true;
- }
- }
|