processName = $processName; $this->mpid = posix_getpid(); $this->maxPrecess = $workerNum; }catch (\Exception $e){ throw new \yii\base\Exception('ALL ERROR: '.$e->getMessage()); } } /** * 运行 * @param callable $callback */ public function run(callable $callback){ for ($i=0; $i < $this->maxPrecess; $i++) { $this->CreateProcess($callback, $i+1); } $this->processWait(); } /** * 创建进程 * @param callable $callback * @param $workId * @return int */ public function CreateProcess(callable $callback, $workId){ $process = new \Swoole\Process(function(\Swoole\Process $worker) use($callback, $workId){ @swoole_set_process_name($this->processName.$workId); $callback($workId, $this->mpid); }, false, false); $pid=$process->start(); $this->works[] = $pid; return $pid; } /** * 回收子进程 */ public function processWait(){ while(1) { if(count($this->works)){ $ret = \Swoole\Process::wait(); if($ret){ foreach($this->works as $key => $pid){ if($pid == $ret['pid']){ unset($this->works[$key]); } } } }else{ break; } } } /** * 获取是否完成了全部子进程 * @return bool */ public function isFinish(){ return !count($this->works); } }