| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767 |
- <?php
- /**
- * Created by PhpStorm.
- * User: liyunlong
- * Date: 2018-12-24
- * Time: 11:30
- */
- namespace common\libs\swoole;
- use yii\base\Exception;
- class RPCClient
- {
- const OK = 0;
- /**
- * 版本号
- */
- const VERSION = 1005;
- /**
- * Server的实例列表
- * @var array
- */
- protected $servers = array();
- protected $requestIndex = 0;
- protected $env = array();
- /**
- * 连接到服务器
- * @var array
- */
- protected $connections = array();
- protected $waitList = array();
- protected $timeout = 0.5;
- protected $packet_maxlen = 2097152; //最大不超过2M的数据包
- /**
- * 启用长连接
- * @var bool
- */
- protected $keepConnection = false;
- protected static $_instances = array();
- protected $encode_gzip = false;
- protected $encode_type = RPCServer::DECODE_PHP;
- protected $user;
- protected $password;
- private $keepSocket = false; //让整个对象保持同一个socket,不再重新分配
- private $keepSocketServer = array(); //对象保持同一个socket的服务器信息
- static $enableCoroutine = false;
- function __construct($id = null)
- {
- $key = empty($id) ? 'default' : $id;
- self::$_instances[$key] = $this;
- }
- /**
- * @param bool $keepSocket
- */
- public function setKeepSocket($keepSocket)
- {
- $this->keepSocket = $keepSocket;
- }
- /**
- * 设置编码类型
- * @param $type
- * @param $gzip
- * @throws \Exception
- */
- function setEncodeType($type, $gzip)
- {
- //兼容老版本,老版本true代表用json false代表serialize
- if ($type === true)
- {
- $type = RPCServer::DECODE_JSON;
- }
- if ($type === false)
- {
- $type = RPCServer::DECODE_PHP;
- }
- if ($type === RPCServer::DECODE_SWOOLE and (substr(PHP_VERSION, 0, 1) != '7'))
- {
- throw new \Exception("swoole_serialize only use in phpng");
- }
- else
- {
- $this->encode_type = $type;
- }
- if ($gzip)
- {
- $this->encode_gzip = true;
- }
- }
- /**
- * 获取SOA服务实例
- * @param $id
- * @return RPCClient
- */
- static function getInstance($id = null)
- {
- $key = empty($id) ? 'default' : $id;
- // if (self::$enableCoroutine)
- // {
- // return new self($id);
- // }
- if (empty(self::$_instances[$key]))
- {
- $object = new static($id);
- }
- else
- {
- $object = self::$_instances[$key];
- }
- return $object;
- }
- protected function beforeRequest($retObj)
- {
- }
- protected function afterRequest($retObj)
- {
- }
- /**
- * 生成请求串号
- * @return int
- */
- static function getRequestId()
- {
- $us = strstr(microtime(), ' ', true);
- return intval(strval($us * 1000 * 1000) . rand(100, 999));
- }
- protected function closeConnection($host, $port)
- {
- $conn_key = $host . ':' . $port;
- if (!isset($this->connections[$conn_key]))
- {
- return false;
- }
- $socket = $this->connections[$conn_key];
- $socket->close(true);
- unset($this->connections[$conn_key]);
- return true;
- }
- protected function getConnection($host, $port)
- {
- $ret = false;
- $conn_key = $host.':'.$port;
- if (isset($this->connections[$conn_key]))
- {
- return $this->connections[$conn_key];
- }
- //基于Swoole扩展
- $socket = new \swoole_client(SWOOLE_SOCK_TCP | SWOOLE_KEEP, SWOOLE_SOCK_SYNC);
- $socket->set(array(
- 'open_length_check' => true,
- 'package_max_length' => $this->packet_maxlen,
- 'package_length_type' => 'N',
- 'package_body_offset' => RPCServer::HEADER_SIZE,
- 'package_length_offset' => 0,
- ));
- /**
- * 尝试重连一次
- */
- for ($i = 0; $i < 2; $i++)
- {
- $ret = $socket->connect($host, $port, $this->timeout);
- if ($ret === false and ($socket->errCode == 114 or $socket->errCode == 115))
- {
- //强制关闭,重连
- $socket->close(true);
- continue;
- }
- else
- {
- break;
- }
- }
- if ($ret)
- {
- $this->connections[$conn_key] = $socket;
- return $socket;
- }
- else
- {
- return false;
- }
- }
- /**
- * 验证缓存中是否存在已经配置为下线的连接 防止短时间内配置恢复后连接错误使用
- * @param $servers
- */
- protected function validConnection($servers)
- {
- $offline = [];
- foreach ($servers as $k => $svr)
- {
- if (!empty($svr['status']) and $svr['status'] == 'offline')
- {
- $offline[] = $svr;
- }
- }
- if (!empty($offline)) {
- foreach ($offline as $svr)
- {
- $conn_key = $svr['host'].':'.$svr['port'];
- if (isset($this->connections[$conn_key]))
- {
- $socket = $this->connections[$conn_key];
- $socket->close(true);
- unset($this->connections[$conn_key]);
- }
- }
- }
- }
- /**
- * 连接到服务器
- * @param $retObj
- * @return bool
- * @throws Exception
- */
- protected function connectToServer($retObj)
- {
- $servers = $this->servers;
- //循环连接
- while (count($servers) > 0)
- {
- $svr = $this->getServer($servers);
- if (empty($svr))
- {
- return false;
- }
- $this->validConnection($servers);
- $socket = $this->getConnection($svr['host'], $svr['port']);
- //连接失败,服务器节点不可用
- //TODO 如果连接失败,需要上报机器存活状态
- if ($socket === false)
- {
- foreach($servers as $k => $v)
- {
- if ($v['host'] == $svr['host'] and $v['port'] == $svr['port'])
- {
- //从Server列表中移除
- unset($servers[$k]);
- }
- }
- if ($this->keepSocket)
- {
- //若连接失败,则清除掉该server
- $this->keepSocketServer = array();
- }
- }
- else
- {
- $retObj->socket = $socket;
- $retObj->server_host = $svr['host'];
- $retObj->server_port = $svr['port'];
- return true;
- }
- }
- return false;
- }
- /**
- * RPCResult
- * @param $send
- * @param RPCResult $retObj
- * @return bool
- * @throws Exception
- */
- protected function request($send, RPCResult $retObj)
- {
- $retObj->send = $send;
- $this->beforeRequest($retObj);
- $retObj->index = $this->requestIndex++;
- connect_to_server:
- if ($this->connectToServer($retObj) === false)
- {
- $retObj->code = RPCResult::ERR_CONNECT;
- return false;
- }
- //请求串号
- $retObj->requestId = self::getRequestId();
- //打包格式
- $encodeType = $this->encode_type;
- if ($this->encode_gzip)
- {
- $encodeType |= RPCServer::DECODE_GZIP;
- }
- //发送失败了
- if ($retObj->socket->send(RPCServer::encode($retObj->send, $encodeType, 0, $retObj->requestId)) === false)
- {
- $this->closeConnection($retObj->server_host, $retObj->server_port);
- //连接被重置了,重现连接到服务器
- if ($retObj->socket->errCode == 104)
- {
- goto connect_to_server;
- }
- $retObj->code = RPCResult::ERR_SEND;
- unset($retObj->socket);
- return false;
- }
- $retObj->code = RPCResult::ERR_RECV;
- //加入wait_list
- $this->waitList[$retObj->requestId] = $retObj;
- return true;
- }
- /**
- * 设置环境变量
- * @return array
- */
- public function getEnv()
- {
- return $this->env;
- }
- /**
- * 获取环境变量
- * @param array $env
- */
- public function setEnv($env)
- {
- $this->env = $env;
- }
- /**
- * 设置一项环境变量
- * @param $k
- * @param $v
- */
- public function putEnv($k, $v)
- {
- $this->env[$k] = $v;
- }
- /**
- * 设置超时时间,包括连接超时和接收超时
- * @param $timeout
- */
- public function setTimeout($timeout)
- {
- $this->timeout = $timeout;
- }
- /**
- * 设置用户名和密码
- * @param $user
- * @param $password
- */
- public function auth($user, $password)
- {
- $this->putEnv('user', $user);
- $this->putEnv('password', $password);
- }
- /**
- * 完成请求
- * @param $retData
- * @param $retObj RPCResult
- */
- protected function finish($retData, $retObj)
- {
- //解包失败了
- if ($retData === false)
- {
- $retObj->code = RPCResult::ERR_UNPACK;
- }
- //调用成功
- elseif ($retData['errno'] === self::OK)
- {
- $retObj->code = self::OK;
- $retObj->data = $retData['data'];
- $retObj->msg = null;
- }
- //服务器返回失败
- else
- {
- $retObj->code = $retData['errno'];
- $retObj->data = null;
- $retObj->msg = $retData['msg'];
- }
- unset($this->waitList[$retObj->requestId]);
- //执行after钩子函数
- $this->afterRequest($retObj);
- //执行回调函数
- if ($retObj->callback)
- {
- call_user_func($retObj->callback, $retObj);
- }
- }
- /**
- * 添加服务器
- * @param array $servers
- * @throws Exception
- */
- public function addServers(array $servers)
- {
- if (isset($servers['host']))
- {
- self::formatServerConfig($servers);
- $this->servers[] = $servers;
- }
- else
- {
- //兼容老的写法
- foreach ($servers as $svr)
- {
- // 127.0.0.1:8001 的写法
- if (is_string($svr))
- {
- list($config['host'], $config['port']) = explode(':', $svr);
- }
- else
- {
- $config = $svr;
- }
- self::formatServerConfig($config);
- $this->servers[] = $config;
- }
- }
- }
- /**
- * @param $config
- * @throws Exception
- */
- static protected function formatServerConfig(&$config)
- {
- if (empty($config['host']))
- {
- throw new Exception("require 'host' option.");
- }
- if (empty($config['port']))
- {
- throw new Exception("require 'port' option.");
- }
- if (empty($config['status']))
- {
- $config['status'] = 'online';
- }
- if (empty($config['weight']))
- {
- $config['weight'] = 100;
- }
- }
- /**
- * 设置服务器
- * @param array $servers
- * @throws Exception
- */
- public function setServers(array $servers)
- {
- foreach($servers as &$svr)
- {
- self::formatServerConfig($svr);
- }
- $this->servers = $servers;
- }
- /**
- * 从配置中取出一个服务器配置
- * @param $servers
- * @return array|mixed
- * @throws Exception
- */
- public function getServer($servers)
- {
- if (empty($servers))
- {
- throw new Exception("servers config empty.");
- }
- if ($this->keepSocket)
- {
- if (is_array($this->keepSocketServer) && count($this->keepSocketServer))
- {
- return $this->keepSocketServer;
- }
- else
- {
- $this->keepSocketServer = self::toolGetServer($servers);
- return $this->keepSocketServer;
- }
- }
- //保留老的server获取方式
- return self::toolGetServer($servers);
- }
- /**
- * RPC调用
- * @param $function
- * @param array $params
- * @param null $callback
- * @return RPCResult
- * @throws Exception
- */
- public function task($function, $params = array(), $callback = null)
- {
- $retObj = new RPCResult($this);
- $send = array('call' => $function, 'params' => $params);
- if (count($this->env) > 0)
- {
- //调用端环境变量
- $send['env'] = $this->env;
- }
- $this->request($send, $retObj);
- $retObj->callback = $callback;
- return $retObj;
- }
- /**
- * 侦测服务器是否存活
- * @return bool
- * @throws \Exception
- */
- public function ping()
- {
- return $this->task('PING')->getResult() === 'PONG';
- }
- /**
- * @param $connection
- * @param float $timeout
- * @return bool|string
- */
- protected function recvPacket($connection, $timeout=0.5)
- {
- return $connection->recv();
- }
- /**
- * select等待数据接收事件
- * @param $read
- * @param $write
- * @param $error
- * @param $timeout
- * @return int
- */
- protected function select($read, $write, $error, $timeout)
- {
- return swoole_client_select($read, $write, $error, $timeout);
- }
- protected function freeConnection($socket)
- {
- }
- /**
- * 接收响应
- * @param $timeout
- * @return int
- */
- public function wait($timeout = 0.5)
- {
- $st = microtime(true);
- $success_num = 0;
- $read = [];
- while (count($this->waitList) > 0)
- {
- $write = $error = array();
- foreach ($this->waitList as $obj)
- {
- /**
- * @var $obj RPCResult
- */
- if ($obj->socket !== null)
- {
- $read[] = $obj->socket;
- }
- }
- if (empty($read))
- {
- break;
- }
- //去掉重复的socket
- self::arrayUnique($read);
- //等待可读事件
- $n = $this->select($read, $write, $error, $timeout);
- if ($n > 0)
- {
- //可读
- foreach($read as $connection)
- {
- $data = $this->recvPacket($connection,$timeout);
- //socket被关闭了
- if ($data === "")
- {
- foreach($this->waitList as $retObj)
- {
- if ($retObj->socket == $connection)
- {
- $retObj->code = RPCResult::ERR_CLOSED;
- unset($this->waitList[$retObj->requestId]);
- $this->closeConnection($retObj->server_host, $retObj->server_port);
- //执行after钩子函数
- $this->afterRequest($retObj);
- }
- }
- continue;
- }
- elseif ($data === false)
- {
- continue;
- }
- $header = unpack(RPCServer::HEADER_STRUCT, substr($data, 0, RPCServer::HEADER_SIZE));
- //不在请求列表中,错误的请求串号
- if (!isset($this->waitList[$header['serid']]))
- {
- trigger_error(__CLASS__ . " invalid responseId[{$header['serid']}].", E_USER_WARNING);
- continue;
- }
- $retObj = $this->waitList[$header['serid']];
- //成功处理
- $this->finish(RPCServer::decode(substr($data, RPCServer::HEADER_SIZE), $header['type']), $retObj);
- $success_num++;
- }
- }
- //发生超时
- if ((microtime(true) - $st) > $timeout)
- {
- foreach ($this->waitList as $obj)
- {
- $obj->code = ($obj->socket->isConnected()) ? RPCResult::ERR_TIMEOUT : RPCResult::ERR_CONNECT;
- $this->closeConnection($obj->server_host, $obj->server_port);
- //执行after钩子函数
- $this->afterRequest($obj);
- }
- //清空当前列表
- $this->waitList = array();
- foreach($read as $r)
- {
- $this->freeConnection($r);
- }
- return $success_num;
- }
- }
- foreach($read as $r)
- {
- $this->freeConnection($r);
- }
- //未发生任何超时
- $this->waitList = array();
- $this->requestIndex = 0;
- return $success_num;
- }
- /**
- * 关闭所有连接
- */
- public function close()
- {
- foreach ($this->connections as $key => $socket)
- {
- /**
- * @var $socket \swoole_client
- */
- $socket->close(true);
- unset($this->connections[$key]);
- }
- }
- /**
- * @param array $servers
- * @return mixed
- */
- public static function toolGetServer(array $servers)
- {
- $weight = 0;
- //移除不在线的节点
- foreach ($servers as $k => $svr)
- {
- //节点已掉线
- if (!empty($svr['status']) and $svr['status'] == 'offline')
- {
- unset($servers[$k]);
- }
- else
- {
- $weight += $svr['weight'];
- }
- }
- //计算权重并随机选择一台机器
- $use = rand(0, $weight - 1);
- $weight = 0;
- foreach ($servers as $k => $svr)
- {
- //默认100权重
- if (empty($svr['weight']))
- {
- $svr['weight'] = 100;
- }
- $weight += $svr['weight'];
- //在权重范围内
- if ($use < $weight)
- {
- return $svr;
- }
- }
- //绝不会到这里
- $servers = array_values($servers);
- return $servers[0];
- }
- /**
- * 数组去重
- * @param array $arr
- */
- public static function arrayUnique(array &$arr)
- {
- $map = array();
- foreach ($arr as $k => $v)
- {
- if (is_object($v))
- {
- $hash = spl_object_hash($v);
- }
- elseif (is_resource($v))
- {
- $hash = intval($v);
- }
- else
- {
- $hash = $v;
- }
- if (isset($map[$hash]))
- {
- unset($arr[$k]);
- }
- else
- {
- $map[$hash] = true;
- }
- }
- }
- }
|