keepSocket = $keepSocket; } /** * 设置编码类型 * @param $type * @param $gzip * @throws \Exception */ function setEncodeType($type, $gzip) { //兼容老版本,老版本true代表用json false代表serialize if ($type === true) { $type = RPCServer::DECODE_JSON; } if ($type === false) { $type = RPCServer::DECODE_PHP; } if ($type === RPCServer::DECODE_SWOOLE and (substr(PHP_VERSION, 0, 1) != '7')) { throw new \Exception("swoole_serialize only use in phpng"); } else { $this->encode_type = $type; } if ($gzip) { $this->encode_gzip = true; } } /** * 获取SOA服务实例 * @param $id * @return RPCClient */ static function getInstance($id = null) { $key = empty($id) ? 'default' : $id; // if (self::$enableCoroutine) // { // return new self($id); // } if (empty(self::$_instances[$key])) { $object = new static($id); } else { $object = self::$_instances[$key]; } return $object; } protected function beforeRequest($retObj) { } protected function afterRequest($retObj) { } /** * 生成请求串号 * @return int */ static function getRequestId() { $us = strstr(microtime(), ' ', true); return intval(strval($us * 1000 * 1000) . rand(100, 999)); } protected function closeConnection($host, $port) { $conn_key = $host . ':' . $port; if (!isset($this->connections[$conn_key])) { return false; } $socket = $this->connections[$conn_key]; $socket->close(true); unset($this->connections[$conn_key]); return true; } protected function getConnection($host, $port) { $ret = false; $conn_key = $host.':'.$port; if (isset($this->connections[$conn_key])) { return $this->connections[$conn_key]; } //基于Swoole扩展 $socket = new \swoole_client(SWOOLE_SOCK_TCP | SWOOLE_KEEP, SWOOLE_SOCK_SYNC); $socket->set(array( 'open_length_check' => true, 'package_max_length' => $this->packet_maxlen, 'package_length_type' => 'N', 'package_body_offset' => RPCServer::HEADER_SIZE, 'package_length_offset' => 0, )); /** * 尝试重连一次 */ for ($i = 0; $i < 2; $i++) { $ret = $socket->connect($host, $port, $this->timeout); if ($ret === false and ($socket->errCode == 114 or $socket->errCode == 115)) { //强制关闭,重连 $socket->close(true); continue; } else { break; } } if ($ret) { $this->connections[$conn_key] = $socket; return $socket; } else { return false; } } /** * 验证缓存中是否存在已经配置为下线的连接 防止短时间内配置恢复后连接错误使用 * @param $servers */ protected function validConnection($servers) { $offline = []; foreach ($servers as $k => $svr) { if (!empty($svr['status']) and $svr['status'] == 'offline') { $offline[] = $svr; } } if (!empty($offline)) { foreach ($offline as $svr) { $conn_key = $svr['host'].':'.$svr['port']; if (isset($this->connections[$conn_key])) { $socket = $this->connections[$conn_key]; $socket->close(true); unset($this->connections[$conn_key]); } } } } /** * 连接到服务器 * @param $retObj * @return bool * @throws Exception */ protected function connectToServer($retObj) { $servers = $this->servers; //循环连接 while (count($servers) > 0) { $svr = $this->getServer($servers); if (empty($svr)) { return false; } $this->validConnection($servers); $socket = $this->getConnection($svr['host'], $svr['port']); //连接失败,服务器节点不可用 //TODO 如果连接失败,需要上报机器存活状态 if ($socket === false) { foreach($servers as $k => $v) { if ($v['host'] == $svr['host'] and $v['port'] == $svr['port']) { //从Server列表中移除 unset($servers[$k]); } } if ($this->keepSocket) { //若连接失败,则清除掉该server $this->keepSocketServer = array(); } } else { $retObj->socket = $socket; $retObj->server_host = $svr['host']; $retObj->server_port = $svr['port']; return true; } } return false; } /** * RPCResult * @param $send * @param RPCResult $retObj * @return bool * @throws Exception */ protected function request($send, RPCResult $retObj) { $retObj->send = $send; $this->beforeRequest($retObj); $retObj->index = $this->requestIndex++; connect_to_server: if ($this->connectToServer($retObj) === false) { $retObj->code = RPCResult::ERR_CONNECT; return false; } //请求串号 $retObj->requestId = self::getRequestId(); //打包格式 $encodeType = $this->encode_type; if ($this->encode_gzip) { $encodeType |= RPCServer::DECODE_GZIP; } //发送失败了 if ($retObj->socket->send(RPCServer::encode($retObj->send, $encodeType, 0, $retObj->requestId)) === false) { $this->closeConnection($retObj->server_host, $retObj->server_port); //连接被重置了,重现连接到服务器 if ($retObj->socket->errCode == 104) { goto connect_to_server; } $retObj->code = RPCResult::ERR_SEND; unset($retObj->socket); return false; } $retObj->code = RPCResult::ERR_RECV; //加入wait_list $this->waitList[$retObj->requestId] = $retObj; return true; } /** * 设置环境变量 * @return array */ public function getEnv() { return $this->env; } /** * 获取环境变量 * @param array $env */ public function setEnv($env) { $this->env = $env; } /** * 设置一项环境变量 * @param $k * @param $v */ public function putEnv($k, $v) { $this->env[$k] = $v; } /** * 设置超时时间,包括连接超时和接收超时 * @param $timeout */ public function setTimeout($timeout) { $this->timeout = $timeout; } /** * 设置用户名和密码 * @param $user * @param $password */ public function auth($user, $password) { $this->putEnv('user', $user); $this->putEnv('password', $password); } /** * 完成请求 * @param $retData * @param $retObj RPCResult */ protected function finish($retData, $retObj) { //解包失败了 if ($retData === false) { $retObj->code = RPCResult::ERR_UNPACK; } //调用成功 elseif ($retData['errno'] === self::OK) { $retObj->code = self::OK; $retObj->data = $retData['data']; $retObj->msg = null; } //服务器返回失败 else { $retObj->code = $retData['errno']; $retObj->data = null; $retObj->msg = $retData['msg']; } unset($this->waitList[$retObj->requestId]); //执行after钩子函数 $this->afterRequest($retObj); //执行回调函数 if ($retObj->callback) { call_user_func($retObj->callback, $retObj); } } /** * 添加服务器 * @param array $servers * @throws Exception */ public function addServers(array $servers) { if (isset($servers['host'])) { self::formatServerConfig($servers); $this->servers[] = $servers; } else { //兼容老的写法 foreach ($servers as $svr) { // 127.0.0.1:8001 的写法 if (is_string($svr)) { list($config['host'], $config['port']) = explode(':', $svr); } else { $config = $svr; } self::formatServerConfig($config); $this->servers[] = $config; } } } /** * @param $config * @throws Exception */ static protected function formatServerConfig(&$config) { if (empty($config['host'])) { throw new Exception("require 'host' option."); } if (empty($config['port'])) { throw new Exception("require 'port' option."); } if (empty($config['status'])) { $config['status'] = 'online'; } if (empty($config['weight'])) { $config['weight'] = 100; } } /** * 设置服务器 * @param array $servers * @throws Exception */ public function setServers(array $servers) { foreach($servers as &$svr) { self::formatServerConfig($svr); } $this->servers = $servers; } /** * 从配置中取出一个服务器配置 * @param $servers * @return array|mixed * @throws Exception */ public function getServer($servers) { if (empty($servers)) { throw new Exception("servers config empty."); } if ($this->keepSocket) { if (is_array($this->keepSocketServer) && count($this->keepSocketServer)) { return $this->keepSocketServer; } else { $this->keepSocketServer = self::toolGetServer($servers); return $this->keepSocketServer; } } //保留老的server获取方式 return self::toolGetServer($servers); } /** * RPC调用 * @param $function * @param array $params * @param null $callback * @return RPCResult * @throws Exception */ public function task($function, $params = array(), $callback = null) { $retObj = new RPCResult($this); $send = array('call' => $function, 'params' => $params); if (count($this->env) > 0) { //调用端环境变量 $send['env'] = $this->env; } $this->request($send, $retObj); $retObj->callback = $callback; return $retObj; } /** * 侦测服务器是否存活 * @return bool * @throws \Exception */ public function ping() { return $this->task('PING')->getResult() === 'PONG'; } /** * @param $connection * @param float $timeout * @return bool|string */ protected function recvPacket($connection, $timeout=0.5) { return $connection->recv(); } /** * select等待数据接收事件 * @param $read * @param $write * @param $error * @param $timeout * @return int */ protected function select($read, $write, $error, $timeout) { return swoole_client_select($read, $write, $error, $timeout); } protected function freeConnection($socket) { } /** * 接收响应 * @param $timeout * @return int */ public function wait($timeout = 0.5) { $st = microtime(true); $success_num = 0; $read = []; while (count($this->waitList) > 0) { $write = $error = array(); foreach ($this->waitList as $obj) { /** * @var $obj RPCResult */ if ($obj->socket !== null) { $read[] = $obj->socket; } } if (empty($read)) { break; } //去掉重复的socket self::arrayUnique($read); //等待可读事件 $n = $this->select($read, $write, $error, $timeout); if ($n > 0) { //可读 foreach($read as $connection) { $data = $this->recvPacket($connection,$timeout); //socket被关闭了 if ($data === "") { foreach($this->waitList as $retObj) { if ($retObj->socket == $connection) { $retObj->code = RPCResult::ERR_CLOSED; unset($this->waitList[$retObj->requestId]); $this->closeConnection($retObj->server_host, $retObj->server_port); //执行after钩子函数 $this->afterRequest($retObj); } } continue; } elseif ($data === false) { continue; } $header = unpack(RPCServer::HEADER_STRUCT, substr($data, 0, RPCServer::HEADER_SIZE)); //不在请求列表中,错误的请求串号 if (!isset($this->waitList[$header['serid']])) { trigger_error(__CLASS__ . " invalid responseId[{$header['serid']}].", E_USER_WARNING); continue; } $retObj = $this->waitList[$header['serid']]; //成功处理 $this->finish(RPCServer::decode(substr($data, RPCServer::HEADER_SIZE), $header['type']), $retObj); $success_num++; } } //发生超时 if ((microtime(true) - $st) > $timeout) { foreach ($this->waitList as $obj) { $obj->code = ($obj->socket->isConnected()) ? RPCResult::ERR_TIMEOUT : RPCResult::ERR_CONNECT; $this->closeConnection($obj->server_host, $obj->server_port); //执行after钩子函数 $this->afterRequest($obj); } //清空当前列表 $this->waitList = array(); foreach($read as $r) { $this->freeConnection($r); } return $success_num; } } foreach($read as $r) { $this->freeConnection($r); } //未发生任何超时 $this->waitList = array(); $this->requestIndex = 0; return $success_num; } /** * 关闭所有连接 */ public function close() { foreach ($this->connections as $key => $socket) { /** * @var $socket \swoole_client */ $socket->close(true); unset($this->connections[$key]); } } /** * @param array $servers * @return mixed */ public static function toolGetServer(array $servers) { $weight = 0; //移除不在线的节点 foreach ($servers as $k => $svr) { //节点已掉线 if (!empty($svr['status']) and $svr['status'] == 'offline') { unset($servers[$k]); } else { $weight += $svr['weight']; } } //计算权重并随机选择一台机器 $use = rand(0, $weight - 1); $weight = 0; foreach ($servers as $k => $svr) { //默认100权重 if (empty($svr['weight'])) { $svr['weight'] = 100; } $weight += $svr['weight']; //在权重范围内 if ($use < $weight) { return $svr; } } //绝不会到这里 $servers = array_values($servers); return $servers[0]; } /** * 数组去重 * @param array $arr */ public static function arrayUnique(array &$arr) { $map = array(); foreach ($arr as $k => $v) { if (is_object($v)) { $hash = spl_object_hash($v); } elseif (is_resource($v)) { $hash = intval($v); } else { $hash = $v; } if (isset($map[$hash])) { unset($arr[$k]); } else { $map[$hash] = true; } } } }