好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

PHP模拟supervisor的进程管理

推荐:《PHP视频教程》

前言

模拟supervisor进程管理DEMO(简易实现)

没错,是造轮子!目的在于学习!

截图:

在图中自己实现了一个 Copy 子进程的功能。如果用在AMQP增减消费者时,我觉得应该会很有用。

实现

1、在主进程循环内启动子进程执行命令
2、在web输入 127.0.0.1:7865 获取子进程状态
3、socket接收请求消息,并且执行相应操作,返回web页面
4、回收子进程,防止称为僵尸进程

不足:无法持续监听错误页面。由于socket得到的响应是通过 include 函数加载的,所以在加载的页面内不能出现 tail -f 命令,否则stream就会掉入了死循环了~。我想应该有方案解决(写了socket+多进程模式,模仿fpm在接收到请求之后就启动一个子进程去处理的模式,但是执行有问题。因此将代码贴出来希望得到大家的指点)。
延伸:由于对进程可以很好的管理(期望如此),那么就可以定制化自己的一些需求,比如:(1)定制AMQP的消费者进程管理服务。(2)模拟crontab定时服务。

知识点

代码实现的过程中,有很多的细节是值得学习的。
1、在while()循环中,启用了stream的非阻塞模式。所以不能在循环中使用 sleep(1) ,而是用 stream_select($read, $write, $except, 1) 让stream内部阻塞。
关于阻塞非阻塞模式,可以参阅这里
2、能够执行外部程序的函数很多,但是都稍有不同。这里采用的是 proc_open ,是一个很强大的函数。在这之前我曾用 pcntl_exec 执行过外部程序,但是需要先 pcntl_fork 。而用其他的如 exec , shell_exec 无法对子进程进行管理。
3、重启或停止等操作子进程时,只是先更改主进程中该子进程在内存中的的状态,并不是真正的对子进程操作。在统一处 init() 处理子进程。如此才能防止因为子进程启动时的上下文导致的一些怪异的现象。

代码

由于代码过多,所以如果你对我的方案有更好的建议可以在github这里看。

主进程代码:Process.php

<?php
require_once __DIR__ . '/Consumer.php';require_once __DIR__ . '/StreamConnection.php';require_once __DIR__ . '/Http.php';class Process{
    /** 
* 待启动的消费者数组
*/
    protected $consumers = array();
    protected $childPids = array();

    const PPID_FILE = __DIR__ . '/process';
    protected $serializerConsumer;

    public function __construct()
    {
   $this->consumers = $this->getConsumers();
    }

    // 这里是个DEMO,实际可以用读取配置文件的方式。
    public function getConsumers()
    {
   $consumer = new Consumer([
  'program' => 'test',
  'command' => '/usr/bin/php test.php',
  'directory' => __DIR__,
  'logfile' => __DIR__ . '/test.log',
  'uniqid' => uniqid(),
  'auto_restart' => false,
   ]);
   return [
  $consumer->uniqid => $consumer,
   ];
    }

    public function run()
    {
   if (empty($this->consumers)) {
  // consumer empty
  return;
   }
   if ($this->_notifyMaster()) {
  // master alive
  return;
   }

   $pid = pcntl_fork();
   if ($pid < 0) {
  exit;
   } elseif ($pid > 0) {
  exit;
   }
   if (!posix_setsid()) {
  exit;
   }

   $stream = new StreamConnection('tcp://0.0.0.0:7865');
   @cli_set_process_title('AMQP Master Process');
   // 将主进程ID写入文件
   file_put_contents(self::PPID_FILE, getmypid());
   // master进程继续
   while (true) {
  $this->init();
  pcntl_signal_dispatch();
  $this->waitpid();
  // 如果子进程被全部回收,则主进程退出
  // if (empty($this->childPids)) {
  //$stream->close($stream->getSocket());
  //break;
  // }
  $stream->accept(function ($uniqid, $action) {
 $this->handle($uniqid, $action);
 return $this->display();
  });
   }
    }

    protected function init()
    {
   foreach ($this->consumers as &$c) {
  switch ($c->state) {
 case Consumer::RUNNING:
 case Consumer::STOP:
break;
 case Consumer::NOMINAL:
 case Consumer::STARTING:
$this->fork($c);
break;
 case Consumer::STOPING:
if ($c->pid && posix_kill($c->pid, SIGTERM)) {
    $this->reset($c, Consumer::STOP);
}
break;
 case Consumer::RESTART:
if (empty($c->pid)) {
    $this->fork($c);
    break;
}
if (posix_kill($c->pid, SIGTERM)) {
    $this->reset($c, Consumer::STOP);
    $this->fork($c);
}
break;
 default:
break;
  }
   }
    }

    protected function reset(Consumer $c, $state)
    {
   $c->pid = '';
   $c->uptime = '';
   $c->state = $state;
   $c->process = null;
    }

    protected function waitpid()
    {
   foreach ($this->childPids as $uniqid => $pid) {
  $result = pcntl_waitpid($pid, $status, WNOHANG);
  if ($result == $pid || $result == -1) {
 unset($this->childPids[$uniqid]);
 $c = &$this->consumers[$uniqid];
 $state = pcntl_wifexited($status) ? Consumer::EXITED : Consumer::STOP;
 $this->reset($c, $state);
  }
   }
    }


    /**
* 父进程存活情况下,只会通知父进程信息,否则可能产生多个守护进程
*/
    private function _notifyMaster()
    {
   $ppid = file_get_contents(self::PPID_FILE );
   $isAlive = $this->checkProcessAlive($ppid);
   if (!$isAlive) return false;
   return true;
    }

    public function checkProcessAlive($pid)
    {
   if (empty($pid)) return false;
   $pidinfo = `ps co pid {$pid} | xargs`;
   $pidinfo = trim($pidinfo);
   $pattern = "/.*?PID.*?(\d+).*?/";
   preg_match($pattern, $pidinfo, $matches);
   return empty($matches) ? false : ($matches[1] == $pid ? true : false);
    }

    /**
* fork一个新的子进程
*/
    protected function fork(Consumer $c)
    {
   $descriptorspec = [2 => ['file', $c->logfile, 'a'],];
   $process = proc_open('exec ' . $c->command, $descriptorspec, $pipes, $c->directory);
   if ($process) {
  $ret = proc_get_status($process);
  if ($ret['running']) {
 $c->state = Consumer::RUNNING;
 $c->pid = $ret['pid'];
 $c->process = $process;
 $c->uptime = date('m-d H:i');
 $this->childPids[$c->uniqid] = $ret['pid'];
  } else {
 $c->state = Consumer::EXITED;
 proc_close($process);
  }
   } else {
  $c->state = Consumer::ERROR;
   }
   return $c;
    }

    public function display()
    {
   $location = 'http://127.0.0.1:7865';
   $basePath = Http::$basePath;
   $scriptName = isset($_SERVER['SCRIPT_NAME']) &&
  !empty($_SERVER['SCRIPT_NAME']) &&
  $_SERVER['SCRIPT_NAME'] != '/' ? $_SERVER['SCRIPT_NAME'] : '/index.php';
   if ($scriptName == '/index.html') {
  return Http::status_301($location);
   }

   $sourcePath = $basePath . $scriptName;
   if (!is_file($sourcePath)) {
  return Http::status_404();
   }

   ob_start();
   include $sourcePath;
   $response = ob_get_contents();
   ob_clean();

   return Http::status_200($response);
    }

    public function handle($uniqid, $action)
    {
   if (!empty($uniqid) && !isset($this->consumers[$uniqid])) {
  return;
   }
   switch ($action) {
  case 'refresh':
 break;
  case 'restartall':
 $this->killall(true);
 break;
  case 'stopall':
 $this->killall();
 break;
  case 'stop':
 $c = &$this->consumers[$uniqid];
 if ($c->state != Consumer::RUNNING) break;
 $c->state = Consumer::STOPING;
 break;
  case 'start':
 $c = &$this->consumers[$uniqid];
 if ($c->state == Consumer::RUNNING) break;
 $c->state = Consumer::STARTING;
 break;
  case 'restart':
 $c = &$this->consumers[$uniqid];
 $c->state = Consumer::RESTART;
 break;
  case 'copy':
 $c = $this->consumers[$uniqid];
 $newC = clone $c;
 $newC->uniqid = uniqid('C');
 $newC->state = Consumer::NOMINAL;
 $newC->pid = '';
 $this->consumers[$newC->uniqid] = $newC;
 break;
  default:
 break;
   }
    }

    protected function killall($restart = false)
    {
   foreach ($this->consumers as &$c) {
  $c->state = $restart ? Consumer::RESTART : Consumer::STOPING;
   }
    }}$cli = new Process();$cli->run();

Consumer消费者对象

<?php
require_once __DIR__ . '/BaseObject.php';class Consumer extends BaseObject{
    /** 开启多少个消费者 */
    public $numprocs = 1;
    /** 当前配置的唯一标志 */
    public $program;
    /** 执行的命令 */
    public $command;
    /** 当前工作的目录 */
    public $directory;

    /** 通过 $qos $queueName $duplicate 生成的 $queue */
    public $queue;
    /** 程序执行日志记录 */
    public $logfile = '';
    /** 消费进程的唯一ID */
    public $uniqid;
    /** 进程IDpid */
    public $pid;
    /** 进程状态 */
    public $state = self::NOMINAL;
    /** 自启动 */
    public $auto_restart = false;

    public $process;
    /** 启动时间 */
    public $uptime;

    const RUNNING = 'running';
    const STOP = 'stoped';
    const NOMINAL = 'nominal';
    const RESTART = 'restart';
    const STOPING = 'stoping';
    const STARTING = 'stating';
    const ERROR = 'error';
    const BLOCKED = 'blocked';
    const EXITED = 'exited';
    const FATEL = 'fatel';}

stream相关代码:StreamConnection.php

<?php
class StreamConnection{
    protected $socket;
    protected $timeout = 2; //s
    protected $client;

    public function __construct($host)
    {
   $this->socket = $this->connect($host);
    }

    public function connect($host)
    {
   $socket = stream_socket_server($host, $errno, $errstr);
   if (!$socket) {
  exit('stream error');
   }
   stream_set_timeout($socket, $this->timeout);
   stream_set_chunk_size($socket, 1024);
   stream_set_blocking($socket, false);
   $this->client = [$socket];
   return $socket;
    }

    public function accept(Closure $callback)
    {
   $read = $this->client;
   if (stream_select($read, $write, $except, 1) < 1) return;
   if (in_array($this->socket, $read)) {
  $cs = stream_socket_accept($this->socket);
  $this->client[] = $cs;
   }
   foreach ($read as $s) {
  if ($s == $this->socket) continue;
  $header = fread($s, 1024);
  if (empty($header)) {
 $index = array_search($s, $this->client);
 if ($index)
unset($this->client[$index]);
 $this->close($s);
 continue;
  }
  Http::parse_http($header);
  $uniqid = isset($_GET['uniqid']) ? $_GET['uniqid'] : '';
  $action = isset($_GET['action']) ? $_GET['action'] : '';
  $response = $callback($uniqid, $action);
  $this->write($s, $response);
  $index = array_search($s, $this->client);
  if ($index)
 unset($this->client[$index]);
  $this->close($s);
   }
    }

    public function write($socket, $response)
    {
   $ret = fwrite($socket, $response, strlen($response));
    }

    public function close($socket)
    {
   $flag = fclose($socket);
    }

    public function getSocket()
    {
   return $this->socket;
    }}

Http响应代码:Http.php

<?php
class Http{

    public static $basePath = __DIR__ . '/views';
    public static $max_age = 120; //秒

    /*
    *  函数:parse_http
    *  描述:解析http协议
    */
    public static function parse_http($http)
    {
   // 初始化
   $_POST = $_GET = $_COOKIE = $_REQUEST = $_SESSION = $_FILES =  array();
   $GLOBALS['HTTP_RAW_POST_DATA'] = '';
   // 需要设置的变量名
   $_SERVER = array(
  'QUERY_STRING' => '',
  'REQUEST_METHOD' => '',
  'REQUEST_URI' => '',
  'SERVER_PROTOCOL' => '',
  'SERVER_SOFTWARE' => '',
  'SERVER_NAME' => '',
  'HTTP_HOST' => '',
  'HTTP_USER_AGENT' => '',
  'HTTP_ACCEPT' => '',
  'HTTP_ACCEPT_LANGUAGE' => '',
  'HTTP_ACCEPT_ENCODING' => '',
  'HTTP_COOKIE' => '',
  'HTTP_CONNECTION' => '',
  'REMOTE_ADDR' => '',
  'REMOTE_PORT' => '0',
  'SCRIPT_NAME' => '',
  'HTTP_REFERER' => '',
  'CONTENT_TYPE' => '',
  'HTTP_IF_NONE_MATCH' => '',
   );

   // 将header分割成数组
   list($http_header, $http_body) = explode("\r\n\r\n", $http, 2);
   $header_data = explode("\r\n", $http_header);

   list($_SERVER['REQUEST_METHOD'], $_SERVER['REQUEST_URI'], $_SERVER['SERVER_PROTOCOL']) = explode(' ', $header_data[0]);

   unset($header_data[0]);
   foreach ($header_data as $content) {
  // \r\n\r\n
  if (empty($content)) {
 continue;
  }
  list($key, $value) = explode(':', $content, 2);
  $key = strtolower($key);
  $value = trim($value);
  switch ($key) {
 case 'host':
$_SERVER['HTTP_HOST'] = $value;
$tmp = explode(':', $value);
$_SERVER['SERVER_NAME'] = $tmp[0];
if (isset($tmp[1])) {
    $_SERVER['SERVER_PORT'] = $tmp[1];
}
break;
 case 'cookie':
$_SERVER['HTTP_COOKIE'] = $value;
parse_str(str_replace('; ', '&', $_SERVER['HTTP_COOKIE']), $_COOKIE);
break;
 case 'user-agent':
$_SERVER['HTTP_USER_AGENT'] = $value;
break;
 case 'accept':
$_SERVER['HTTP_ACCEPT'] = $value;
break;
 case 'accept-language':
$_SERVER['HTTP_ACCEPT_LANGUAGE'] = $value;
break;
 case 'accept-encoding':
$_SERVER['HTTP_ACCEPT_ENCODING'] = $value;
break;
 case 'connection':
$_SERVER['HTTP_CONNECTION'] = $value;
break;
 case 'referer':
$_SERVER['HTTP_REFERER'] = $value;
break;
 case 'if-modified-since':
$_SERVER['HTTP_IF_MODIFIED_SINCE'] = $value;
break;
 case 'if-none-match':
$_SERVER['HTTP_IF_NONE_MATCH'] = $value;
break;
 case 'content-type':
if (!preg_match('/boundary="?(\S+)"?/', $value, $match)) {
    $_SERVER['CONTENT_TYPE'] = $value;
} else {
    $_SERVER['CONTENT_TYPE'] = 'multipart/form-data';
    $http_post_boundary = '--' . $match[1];
}
break;
  }
   }

   // script_name
   $_SERVER['SCRIPT_NAME'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_PATH);

   // QUERY_STRING
   $_SERVER['QUERY_STRING'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_QUERY);
   if ($_SERVER['QUERY_STRING']) {
  // $GET
  parse_str($_SERVER['QUERY_STRING'], $_GET);
   } else {
  $_SERVER['QUERY_STRING'] = '';
   }

   // REQUEST
   $_REQUEST = array_merge($_GET, $_POST);

   return array('get' => $_GET, 'post' => $_POST, 'cookie' => $_COOKIE, 'server' => $_SERVER, 'files' => $_FILES);
    }

    public static function status_404()
    {
   return <<<EOFHTTP/1.1 404 OK
content-type: text/htmlEOF;
    }

    public static function status_301($location)
    {
   return <<<EOFHTTP/1.1 301 Moved Permanently
Content-Length: 0
Content-Type: text/plain
Location: $locationCache-Control: no-cacheEOF;
    }

    public static function status_304()
    {
   return <<<EOFHTTP/1.1 304 Not Modified
Content-Length: 0EOF;
    }

    public static function status_200($response)
    {
   $contentType = $_SERVER['CONTENT_TYPE'];
   $length = strlen($response);
   $header = '';
   if ($contentType)
  $header = 'Cache-Control: max-age=180';
   return <<<EOFHTTP/1.1 200 OK
Content-Type: $contentTypeContent-Length: $length$header$responseEOF;
    }}

待执行的脚本:test.php

<?php
while(true) {
    file_put_contents(__DIR__  .  '/test.log', date('Y-m-d H:i:s'));
    sleep(1);}

在当前目录下的视图页面:
|- Process.php
|- Http.php
|- StreamConnection.php
|- Consumer.php
|- BaseObject.php
|- views/

更多编程相关知识,请访问:编程教学!!

以上就是PHP模拟supervisor的进程管理的详细内容!

查看更多关于PHP模拟supervisor的进程管理的详细内容...

  阅读:42次