From 791ebc7239ff591921ab275d89f19ef6ca74e7e7 Mon Sep 17 00:00:00 2001 From: justgoodman Date: Mon, 25 May 2015 01:55:23 +0300 Subject: [PATCH 1/2] first implementation --- .gitignore | 1 + daemon.php | 22 ++++ firstTask.php | 9 ++ model/Task.php | 116 ++++++++++++++++++++ secondTask.php | 9 ++ socketTask/BaseSocketTask.php | 149 +++++++++++++++++++++++++ socketTask/SimpleSocketTask.php | 73 ++++++++++++ socketTask/TaskController.php | 189 ++++++++++++++++++++++++++++++++ 8 files changed, 568 insertions(+) create mode 100644 .gitignore create mode 100644 daemon.php create mode 100644 firstTask.php create mode 100644 model/Task.php create mode 100644 secondTask.php create mode 100644 socketTask/BaseSocketTask.php create mode 100644 socketTask/SimpleSocketTask.php create mode 100644 socketTask/TaskController.php diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..85e7c1d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/.idea/ diff --git a/daemon.php b/daemon.php new file mode 100644 index 0000000..c2cfcca --- /dev/null +++ b/daemon.php @@ -0,0 +1,22 @@ + /dev/null 2>&1 &" +require_once('socketTask\TaskController.php'); + +use SocketTask\TaskController; +use Model\Task; + +ob_implicit_flush(); + +$firstTask = new Task(); +$firstTask + ->setName('first') + ->setPort(10007) + ->setCommand('php firstTask.php'); +$secondTask = new Task(); +$secondTask + ->setName('second') + ->setPort(10008) + ->setCommand('php secondTask.php'); + +$testSocket = new TaskController('0.0.0.0', 10006,['first' => $firstTask, 'second' => $secondTask], 'Task controller'); +$testSocket->run(); diff --git a/firstTask.php b/firstTask.php new file mode 100644 index 0000000..f15d88e --- /dev/null +++ b/firstTask.php @@ -0,0 +1,9 @@ +run(); diff --git a/model/Task.php b/model/Task.php new file mode 100644 index 0000000..52b86f7 --- /dev/null +++ b/model/Task.php @@ -0,0 +1,116 @@ +status; + } + + /** + * @param string $status + */ + public function setStatus($status) + { + $this->status = $status; + } + + /** + * @return int + */ + public function getPort() + { + return $this->port; + } + + /** + * @param int $port + */ + public function setPort($port) + { + $this->port = $port; + } + + /** + * @return string + */ + public function getCommand() + { + return $this->command; + } + + /** + * @param $command + * @return $this + */ + public function setCommand($command) + { + $this->command = $command; + return $this; + } + + /** + * @return string + */ + public function getName() + { + return $this->name; + } + + /** + * @param $name + * @return $this + */ + public function setName($name) + { + $this->name = $name; + return $this; + } + + /** + * @return int + */ + public function getPid() + { + return $this->pid; + } + + /** + * @param $pid + * @return $this + */ + public function setPid($pid) + { + $this->pid = $pid; + return $this; + } +} diff --git a/secondTask.php b/secondTask.php new file mode 100644 index 0000000..5167bc8 --- /dev/null +++ b/secondTask.php @@ -0,0 +1,9 @@ +run(); diff --git a/socketTask/BaseSocketTask.php b/socketTask/BaseSocketTask.php new file mode 100644 index 0000000..15cd6c3 --- /dev/null +++ b/socketTask/BaseSocketTask.php @@ -0,0 +1,149 @@ + command + */ + protected $tasks; + + /** + * @param string $host + * @param int $port + * @param string $greetingText + * @param int $backlog + * @throws Exception + */ + public function __construct($host, $port, $greetingText, $backlog = 5) + { + $this->greetingText = $greetingText; + $this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); + if (!$this->socket) { + throw new Exception( + sprintf( + 'Can\'t create socket message: "%s"', + socket_strerror(socket_last_error()) + ) + ); + } + if (!socket_bind($this->socket, $host, $port)) { + throw new Exception( + sprintf( + 'Can\'t bind socket host: "%s", port: "%s", message: "%s"', + $host, + $port, + socket_strerror(socket_last_error($this->socket)) + ) + ); + } + + if (!socket_listen($this->socket, $backlog)) { + throw new Exception( + sprintf('Can\'t listen socket message: "%s"', socket_strerror(socket_last_error($this->socket))) + ); + } + $this->id = $this->getId(); + } + + /** + * @throws Exception + */ + public function run() + { + do { + $this->connect(); + $this->write($this->greetingText . "\n"); + try { + do { + $command = $this->read(); + } while ($this->processCommand($command)); + } catch (Exception $e) { + $this->logError($e->getMessage()); + } + $this->closeConnection(); + } while (true); + } + + /** + * simple Logging can be override + * @param $message + */ + protected function logError($message) + { + echo 'Error: ' . $message; + } + + protected function connect() + { + $this->conn = socket_accept($this->socket); + if (!($this->conn)) { + throw new Exception( + sprintf('Can\'t accept connection message: "%s"', socket_strerror(socket_last_error($this->socket))) + ); + } + } + + protected function closeConnection() + { + socket_close($this->conn); + } + + /** + * @param string $text + */ + protected function write($text) + { + socket_write($this->conn, $text, strlen($text)); + } + + /** + * @throws Exception + * @return string + */ + protected function read() + { + $message = socket_read($this->conn, 2048, PHP_NORMAL_READ); + if (!$message) { + throw new Exception( + sprintf('Can\'t read message: "%s"', socket_strerror(socket_last_error($this->conn))) + ); + } + return trim($message); + } + /** + * @throws Exception + */ + protected function processCommand($command) + { + switch ($command) { + case 'exit': + return false; + default: + $this->write('Command :' . $command . '!'); + } + return true; + } +} diff --git a/socketTask/SimpleSocketTask.php b/socketTask/SimpleSocketTask.php new file mode 100644 index 0000000..70f0986 --- /dev/null +++ b/socketTask/SimpleSocketTask.php @@ -0,0 +1,73 @@ +connect(); + $this->write($this->greetingText . "\n"); + try { + do { + $command = $this->read(); + } while ($this->processCommand($command)); + } catch (Exception $e) { + $this->logError($e->getMessage()); + } + $this->closeConnection(); + } + + /** + * @param $command + * @return bool (need close process) + */ + protected function processCommand($command) + { + switch ($command) { + case 'status': + $status = $this->getStatus(); + $this->write($this->getStatus()); + if ($status == 'Success') { + return true; + } + break; + case 'terminate': + $this->write('Exit'); + return false; + default: + $this->write('errorInput'); + } + $this->doSomeIterationOperation(); + return true; + } + + /** + * In this place we do some operations + */ + public function doSomeIterationOperation() + { + + } + + /** + * @return string + */ + function getStatus() + { + switch (rand(1,3)) { + case 1: + return 'Proccess OK'; + case 2: + return 'Error'; + case 3: + return 'Success'; + } + } +} diff --git a/socketTask/TaskController.php b/socketTask/TaskController.php new file mode 100644 index 0000000..d4e7f51 --- /dev/null +++ b/socketTask/TaskController.php @@ -0,0 +1,189 @@ + command + */ + protected $tasks; + + private $socket; + + private $socketTasks; + + /** + * @param string $host + * @param int $port + * @param string $greetingText + * @param int $backlog + * @throws Exception + */ + public function __construct($host, $port, $tasks, $greetingText, $backlog = 5) + { + parent::__construct($host, $port, $greetingText, $backlog); + } + + + /** + * @throws Exception + */ + public function run() + { + do { + try { + + $this->connect(); + $this->write($this->greetingText . "\n"); + do { + $command = $this->read(); + } while ($this->processCommand($command)); + $this->closeConnection(); + } catch (Exception $e) { + $this->logError($e->getMessage()); + // Нужно сделать закрытие коннеткта если ошибка была в $this->processCommand($command)) + } + } while (true); + } + + + public function terminateTask($id) + { + exec(sprintf("kill %d", $id)); + } + + /** + * public function taskInfo() + * { + * ps -eo pcpu,pmem + * } + */ + public function getId() + { + return exec('echo $$'); + } + + /** + * @param $id + * @return bool + */ + function isTaskRunning($id) + { + $result = shell_exec(sprintf("ps %d", $id)); + if (count(preg_split("/\n/", $result)) > 2) { + return true; + } + return false; + } + + + /** + * @param $command + * @return bool + * @throws Exception + */ + protected function processCommand($command) + { + $taskName = $this->getTaskNameFromMessage($command); + $command = $this->getCommandFormMessage($command); + if (!isset($this->tasks[$taskName])) { + return true; + } + /** + * @var Task + */ + $task = $this->tasks[$taskName]; + switch ($command) { + case 'task start': + $this->runTask($task); + return false; + case 'task stop': + $this->sentCommandTask($task, 'terminrate'); + break; + case 'task info': + $status = $this->sentCommandTask($task, 'status'); + $task->setStatus($status); + break; + case 'daemon stat': + // Обходим массив тасок собираем по каждой инорфмацию по каждой таксе + // можно либо запрашивать данные у таски либо самому узнат используя метод getProcessParams + // $res = $this->getProcessParams($this->id); + break; + default: + $this->write('Command :' . $command . '!'); + } + return true; + } + + protected function getProcessParams($pId) + { + $res = exec(sprintf('ps %d -eo pcpu,pmem', $pId)); + // Парсим результат (не успел сделать) + return $res; + } + + /** + * I did not have time to do + * @param $command + */ + protected function getTaskNameFromMessage($command) + { + + } + + /** + * I did not have time to do + * @param $command + */ + protected function getCommandFormMessage($command) + { + + } + + + public function sentCommandTask($task, $command) + { + if (!socket_connect($this->socketTasks, '0.0.0.0', $task->getPort())) { + throw new Exception( + sprintf( + 'Can\'t connect task %s message %s', + $task->getName(), + socket_strerror(socket_last_error($this->socket)) + ) + ); + } + $this->socket_write($this->socketTasks, $command, strlen($command)); + try { + $answer = $this->read(); + } catch (Exception $e) { + $task->setStatus('Exit Error'); + $task->setPid(0); + } + socket_close($this->socketTasks); + return $answer; + } + + /** + * @param $cmd + * @param $outputfile + * @param $pidfile + */ + public function runTask(Task $task) + { + $task->setPid( + exec(sprintf("%s > /dev/null 2>&1 & echo $!", $task->getCommand())) + ); + if (!$task->getPid()) { + throw new Exception(sprintf('Can\'t run task %s', $task->getName())); + } + $status = $this->sentCommandTask($task, 'status'); + $task->setStatus($status); + } + +} From 2255b837ed04fb33dc1f782764262da3e2cb8ce7 Mon Sep 17 00:00:00 2001 From: justgoodman Date: Mon, 25 May 2015 10:34:01 +0300 Subject: [PATCH 2/2] second implementration --- composer.json | 10 + composer.lock | 129 +++++++++ daemon.php | 25 +- firstTask.php | 6 +- model/Task.php | 116 -------- secondTask.php | 6 +- socketTask/TaskController.php | 189 ------------- src/ClientTask/ControllerClientTask.php | 264 ++++++++++++++++++ src/ClientTask/SocketClient.php | 60 ++++ .../ServerTask/BaseServerTask.php | 34 ++- src/ServerTask/ControllerServerTask.php | 256 +++++++++++++++++ .../ServerTask/SimpleServerTask.php | 33 ++- 12 files changed, 784 insertions(+), 344 deletions(-) create mode 100644 composer.json create mode 100644 composer.lock delete mode 100644 model/Task.php delete mode 100644 socketTask/TaskController.php create mode 100644 src/ClientTask/ControllerClientTask.php create mode 100644 src/ClientTask/SocketClient.php rename socketTask/BaseSocketTask.php => src/ServerTask/BaseServerTask.php (86%) create mode 100644 src/ServerTask/ControllerServerTask.php rename socketTask/SimpleSocketTask.php => src/ServerTask/SimpleServerTask.php (59%) diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..0ed5e1a --- /dev/null +++ b/composer.json @@ -0,0 +1,10 @@ +{ + "require": { + "monolog/monolog": "~1.13" + }, + "autoload": { + "psr-4": { + "SocketDaemon\\": "src" + } + } +} diff --git a/composer.lock b/composer.lock new file mode 100644 index 0000000..0e8b1ab --- /dev/null +++ b/composer.lock @@ -0,0 +1,129 @@ +{ + "_readme": [ + "This file locks the dependencies of your project to a known state", + "Read more about it at http://getcomposer.org/doc/01-basic-usage.md#composer-lock-the-lock-file", + "This file is @generated automatically" + ], + "hash": "bc0c38f29d7f74457128b2183cb11cd3", + "packages": [ + { + "name": "monolog/monolog", + "version": "1.13.1", + "source": { + "type": "git", + "url": "https://github.com/Seldaek/monolog.git", + "reference": "c31a2c4e8db5da8b46c74cf275d7f109c0f249ac" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/Seldaek/monolog/zipball/c31a2c4e8db5da8b46c74cf275d7f109c0f249ac", + "reference": "c31a2c4e8db5da8b46c74cf275d7f109c0f249ac", + "shasum": "" + }, + "require": { + "php": ">=5.3.0", + "psr/log": "~1.0" + }, + "provide": { + "psr/log-implementation": "1.0.0" + }, + "require-dev": { + "aws/aws-sdk-php": "~2.4, >2.4.8", + "doctrine/couchdb": "~1.0@dev", + "graylog2/gelf-php": "~1.0", + "phpunit/phpunit": "~4.0", + "raven/raven": "~0.5", + "ruflin/elastica": "0.90.*", + "swiftmailer/swiftmailer": "~5.3", + "videlalvaro/php-amqplib": "~2.4" + }, + "suggest": { + "aws/aws-sdk-php": "Allow sending log messages to AWS services like DynamoDB", + "doctrine/couchdb": "Allow sending log messages to a CouchDB server", + "ext-amqp": "Allow sending log messages to an AMQP server (1.0+ required)", + "ext-mongo": "Allow sending log messages to a MongoDB server", + "graylog2/gelf-php": "Allow sending log messages to a GrayLog2 server", + "raven/raven": "Allow sending log messages to a Sentry server", + "rollbar/rollbar": "Allow sending log messages to Rollbar", + "ruflin/elastica": "Allow sending log messages to an Elastic Search server", + "videlalvaro/php-amqplib": "Allow sending log messages to an AMQP server using php-amqplib" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.13.x-dev" + } + }, + "autoload": { + "psr-4": { + "Monolog\\": "src/Monolog" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Jordi Boggiano", + "email": "j.boggiano@seld.be", + "homepage": "http://seld.be" + } + ], + "description": "Sends your logs to files, sockets, inboxes, databases and various web services", + "homepage": "http://github.com/Seldaek/monolog", + "keywords": [ + "log", + "logging", + "psr-3" + ], + "time": "2015-03-09 09:58:04" + }, + { + "name": "psr/log", + "version": "1.0.0", + "source": { + "type": "git", + "url": "https://github.com/php-fig/log.git", + "reference": "fe0936ee26643249e916849d48e3a51d5f5e278b" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-fig/log/zipball/fe0936ee26643249e916849d48e3a51d5f5e278b", + "reference": "fe0936ee26643249e916849d48e3a51d5f5e278b", + "shasum": "" + }, + "type": "library", + "autoload": { + "psr-0": { + "Psr\\Log\\": "" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "PHP-FIG", + "homepage": "http://www.php-fig.org/" + } + ], + "description": "Common interface for logging libraries", + "keywords": [ + "log", + "psr", + "psr-3" + ], + "time": "2012-12-21 11:40:51" + } + ], + "packages-dev": [], + "aliases": [], + "minimum-stability": "stable", + "stability-flags": [], + "prefer-stable": false, + "prefer-lowest": false, + "platform": [], + "platform-dev": [] +} diff --git a/daemon.php b/daemon.php index c2cfcca..dfc9bfe 100644 --- a/daemon.php +++ b/daemon.php @@ -1,22 +1,31 @@ /dev/null 2>&1 &" -require_once('socketTask\TaskController.php'); +$loader = include __DIR__ . '/vendor/autoload.php'; -use SocketTask\TaskController; -use Model\Task; +use SocketDaemon\ClientTask\ControllerClientTask; +use SocketDaemon\ServerTask\ControllerServerTask; +use Monolog\Logger; ob_implicit_flush(); -$firstTask = new Task(); +$firstTask = new ControllerClientTask(); $firstTask ->setName('first') - ->setPort(10007) + ->setHost('0.0.0.0') + ->setPort(10009) ->setCommand('php firstTask.php'); -$secondTask = new Task(); +$secondTask = new ControllerClientTask(); $secondTask ->setName('second') - ->setPort(10008) + ->setHost('0.0.0.0') + ->setPort(10009) ->setCommand('php secondTask.php'); -$testSocket = new TaskController('0.0.0.0', 10006,['first' => $firstTask, 'second' => $secondTask], 'Task controller'); +$testSocket = new ControllerServerTask( + '0.0.0.0', + 10003, + ['first' => $firstTask, 'second' => $secondTask], + 'Task controller', + new Logger('TaskController') +); $testSocket->run(); diff --git a/firstTask.php b/firstTask.php index f15d88e..241635f 100644 --- a/firstTask.php +++ b/firstTask.php @@ -1,9 +1,9 @@ run(); diff --git a/model/Task.php b/model/Task.php deleted file mode 100644 index 52b86f7..0000000 --- a/model/Task.php +++ /dev/null @@ -1,116 +0,0 @@ -status; - } - - /** - * @param string $status - */ - public function setStatus($status) - { - $this->status = $status; - } - - /** - * @return int - */ - public function getPort() - { - return $this->port; - } - - /** - * @param int $port - */ - public function setPort($port) - { - $this->port = $port; - } - - /** - * @return string - */ - public function getCommand() - { - return $this->command; - } - - /** - * @param $command - * @return $this - */ - public function setCommand($command) - { - $this->command = $command; - return $this; - } - - /** - * @return string - */ - public function getName() - { - return $this->name; - } - - /** - * @param $name - * @return $this - */ - public function setName($name) - { - $this->name = $name; - return $this; - } - - /** - * @return int - */ - public function getPid() - { - return $this->pid; - } - - /** - * @param $pid - * @return $this - */ - public function setPid($pid) - { - $this->pid = $pid; - return $this; - } -} diff --git a/secondTask.php b/secondTask.php index 5167bc8..241635f 100644 --- a/secondTask.php +++ b/secondTask.php @@ -1,9 +1,9 @@ run(); diff --git a/socketTask/TaskController.php b/socketTask/TaskController.php deleted file mode 100644 index d4e7f51..0000000 --- a/socketTask/TaskController.php +++ /dev/null @@ -1,189 +0,0 @@ - command - */ - protected $tasks; - - private $socket; - - private $socketTasks; - - /** - * @param string $host - * @param int $port - * @param string $greetingText - * @param int $backlog - * @throws Exception - */ - public function __construct($host, $port, $tasks, $greetingText, $backlog = 5) - { - parent::__construct($host, $port, $greetingText, $backlog); - } - - - /** - * @throws Exception - */ - public function run() - { - do { - try { - - $this->connect(); - $this->write($this->greetingText . "\n"); - do { - $command = $this->read(); - } while ($this->processCommand($command)); - $this->closeConnection(); - } catch (Exception $e) { - $this->logError($e->getMessage()); - // Нужно сделать закрытие коннеткта если ошибка была в $this->processCommand($command)) - } - } while (true); - } - - - public function terminateTask($id) - { - exec(sprintf("kill %d", $id)); - } - - /** - * public function taskInfo() - * { - * ps -eo pcpu,pmem - * } - */ - public function getId() - { - return exec('echo $$'); - } - - /** - * @param $id - * @return bool - */ - function isTaskRunning($id) - { - $result = shell_exec(sprintf("ps %d", $id)); - if (count(preg_split("/\n/", $result)) > 2) { - return true; - } - return false; - } - - - /** - * @param $command - * @return bool - * @throws Exception - */ - protected function processCommand($command) - { - $taskName = $this->getTaskNameFromMessage($command); - $command = $this->getCommandFormMessage($command); - if (!isset($this->tasks[$taskName])) { - return true; - } - /** - * @var Task - */ - $task = $this->tasks[$taskName]; - switch ($command) { - case 'task start': - $this->runTask($task); - return false; - case 'task stop': - $this->sentCommandTask($task, 'terminrate'); - break; - case 'task info': - $status = $this->sentCommandTask($task, 'status'); - $task->setStatus($status); - break; - case 'daemon stat': - // Обходим массив тасок собираем по каждой инорфмацию по каждой таксе - // можно либо запрашивать данные у таски либо самому узнат используя метод getProcessParams - // $res = $this->getProcessParams($this->id); - break; - default: - $this->write('Command :' . $command . '!'); - } - return true; - } - - protected function getProcessParams($pId) - { - $res = exec(sprintf('ps %d -eo pcpu,pmem', $pId)); - // Парсим результат (не успел сделать) - return $res; - } - - /** - * I did not have time to do - * @param $command - */ - protected function getTaskNameFromMessage($command) - { - - } - - /** - * I did not have time to do - * @param $command - */ - protected function getCommandFormMessage($command) - { - - } - - - public function sentCommandTask($task, $command) - { - if (!socket_connect($this->socketTasks, '0.0.0.0', $task->getPort())) { - throw new Exception( - sprintf( - 'Can\'t connect task %s message %s', - $task->getName(), - socket_strerror(socket_last_error($this->socket)) - ) - ); - } - $this->socket_write($this->socketTasks, $command, strlen($command)); - try { - $answer = $this->read(); - } catch (Exception $e) { - $task->setStatus('Exit Error'); - $task->setPid(0); - } - socket_close($this->socketTasks); - return $answer; - } - - /** - * @param $cmd - * @param $outputfile - * @param $pidfile - */ - public function runTask(Task $task) - { - $task->setPid( - exec(sprintf("%s > /dev/null 2>&1 & echo $!", $task->getCommand())) - ); - if (!$task->getPid()) { - throw new Exception(sprintf('Can\'t run task %s', $task->getName())); - } - $status = $this->sentCommandTask($task, 'status'); - $task->setStatus($status); - } - -} diff --git a/src/ClientTask/ControllerClientTask.php b/src/ClientTask/ControllerClientTask.php new file mode 100644 index 0000000..fd433cc --- /dev/null +++ b/src/ClientTask/ControllerClientTask.php @@ -0,0 +1,264 @@ +port; + } + + /** + * @param int $port + * @return $this + */ + public function setPort($port) + { + $this->port = $port; + return $this; + } + + /** + * @return string + */ + public function getCommand() + { + return $this->command; + } + + /** + * @param $command + * @return $this + */ + public function setCommand($command) + { + $this->command = $command; + return $this; + } + + /** + * @return string + */ + public function getName() + { + return $this->name; + } + + /** + * @param $name + * @return $this + */ + public function setName($name) + { + $this->name = $name; + return $this; + } + + /** + * @return int + */ + public function getPid() + { + return $this->pid; + } + + /** + * @param $pid + * @return $this + */ + public function setPid($pid) + { + $this->pid = $pid; + return $this; + } + + /** + * @return SocketClient + */ + public function getSocketClient() + { + return $this->socketClient; + } + + /** + * @param SocketClient $socketClient + */ + public function setSocketClient($socketClient) + { + $this->socketClient = $socketClient; + } + + /** + * @return string + */ + public function getHost() + { + return $this->host; + } + + /** + * @param string $host + * @return $this + */ + public function setHost($host) + { + $this->host = $host; + return $this; + } + + /** + * @return int + */ + public function getFailedCount() + { + return $this->failedCount; + } + + /** + * @param int $failedCount + */ + public function setFailedCount($failedCount) + { + $this->failedCount = $failedCount; + } + + /** + * @return int + */ + public function getRunCount() + { + return $this->runCount; + } + + /** + * @param int $runCount + */ + public function setRunCount($runCount) + { + $this->runCount = $runCount; + } + + public function checkConnection() + { + if ($this->getSocketClient()) { + return $this->socketClient->ping(); + } + return false; + } + + public function connect() + { + $this->socketClient = new SocketClient(); + $this->socketClient->connect($this->host, $this->port); + } + + public function run() + { + $this->setRunCount($this->getRunCount()+1); + $this->setPid( + exec(sprintf("%s > /dev/null 2>&1 & echo $!", $this->getCommand())) + ); + if (!$this->getPid()) { + throw new Exception(sprintf('Can\'t run task %s', $this->getName())); + } + $this->connect(); + } + + /** + * @return bool + */ + public function isRun() + { + if (!$this->getPid()) { + return false; + } + return $this->checkConnection(); + } + + /** + * @return string + */ + public function getStatus() + { + $this->socketClient->write('status'); + return $this->socketClient->read(); + } + + public function sendTerminate() + { + $this->socketClient->write('exit'); + } + + public function closeConnection() + { + $this->socketClient->connectionClose(); + } + + /** + * @return string + */ + public function getLastStatus() + { + return $this->lastStatus; + } + + /** + * @param string $lastStatus + */ + public function setLastStatus($lastStatus) + { + $this->lastStatus = $lastStatus; + } + + +} diff --git a/src/ClientTask/SocketClient.php b/src/ClientTask/SocketClient.php new file mode 100644 index 0000000..70d8d03 --- /dev/null +++ b/src/ClientTask/SocketClient.php @@ -0,0 +1,60 @@ +socket = $socket; + + } + + public function connect($host, $port) + { + if (!socket_connect($this->socket, $host, $port)) { + throw new Exception( + sprintf( + 'Can\'t connect message %s', + socket_strerror(socket_last_error($this->socket)) + ) + ); + } + } + + /** + * @return boolean + */ + public function ping() + { + $this->write('ping'); + return ($this->read() === 'pong'); + } + + public function read() + { + return socket_read($this->socket, 2048); + } + + public function write($text) + { + socket_write($this->socket, $text, strlen($text)); + } + + public function connectionClose() + { + socket_close($this->socket); + } + + +} diff --git a/socketTask/BaseSocketTask.php b/src/ServerTask/BaseServerTask.php similarity index 86% rename from socketTask/BaseSocketTask.php rename to src/ServerTask/BaseServerTask.php index 15cd6c3..cb32ed7 100644 --- a/socketTask/BaseSocketTask.php +++ b/src/ServerTask/BaseServerTask.php @@ -1,10 +1,15 @@ logger = $logger; $this->greetingText = $greetingText; $this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); if (!$this->socket) { @@ -75,27 +82,19 @@ public function run() { do { $this->connect(); + $this->logger->debug('Connected'); $this->write($this->greetingText . "\n"); try { do { $command = $this->read(); } while ($this->processCommand($command)); } catch (Exception $e) { - $this->logError($e->getMessage()); + $this->logger->error($e->getMessage()); } $this->closeConnection(); } while (true); } - /** - * simple Logging can be override - * @param $message - */ - protected function logError($message) - { - echo 'Error: ' . $message; - } - protected function connect() { $this->conn = socket_accept($this->socket); @@ -146,4 +145,13 @@ protected function processCommand($command) } return true; } + + /** + * @return int + */ + public function getId() + { + return exec('echo $$'); + } + } diff --git a/src/ServerTask/ControllerServerTask.php b/src/ServerTask/ControllerServerTask.php new file mode 100644 index 0000000..ffc4043 --- /dev/null +++ b/src/ServerTask/ControllerServerTask.php @@ -0,0 +1,256 @@ + command + */ + protected $tasks; + + /** + * @param string $host + * @param int $port + * @param array $tasks + * @param string $greetingText + * @param LoggerInterface $logger + * @param int $backlog + * @throws Exception + */ + public function __construct($host, $port, array $tasks, $greetingText, LoggerInterface $logger, $backlog = 5) + { + parent::__construct($host, $port, $greetingText, $logger, $backlog); + $this->tasks = $tasks; + } + + + /** + * @throws Exception + */ + public function run() + { + do { + try { + $this->connect(); + $this->write($this->greetingText . "\n"); + do { + $command = $this->read(); + } while ($this->processCommand($command)); + $this->closeConnection(); + } catch (Exception $e) { + $this->logger->error($e->getMessage()); + // Нужно сделать закрытие коннеткта если ошибка была в $this->processCommand($command)) + } + } while (true); + } + + + /** + * @param $command + * @return int pId + */ + public function runProcess($command) + { + $this->logger->alert('Run command :'.$command); + return exec(sprintf("%s > /dev/null 2>&1 & echo $!", $command)); + } + + + /** + * @param $id + * @return bool + */ + function isProcessRunning($id) + { + if (!$id) { + return false; + } + $result = shell_exec(sprintf("ps %d", $id)); + if (count(preg_split("/\n/", $result)) > 2) { + return true; + } + return false; + } + + public function killProcess($id) + { + exec(sprintf("kill %d", $id)); + } + + /** + * @param $command + * @return bool + * @throws Exception + */ + protected function processCommand($command) + { + if (!$command) { + return true; + } + $this->logger->debug(sprintf('Process message: "%s"', $command)); + $commandParams = $this->getCommandParams($command); + if (!$commandParams) { + return true; + } + $this->logger->debug(print_r($commandParams, 1)); + if (!isset($this->tasks[$commandParams['taskName']])) { + $this->logger->debug(sprintf('Task is not supported: "%s"', $command)); + return true; + } + /** + * @var $task ControllerClientTask + */ + $task = $this->tasks[$commandParams['taskName']]; + $this->logger->alert($task->getPid()); + switch ($commandParams['command']) { + case 'task start': + if ($this->isTaskRunning($task)) { + $this->write('Task is already running' . "\n"); + break; + } + $this->write(sprintf('Task run pid: "%d"', $this->runTask($task))); + break; + case 'task stop': + if (!$this->isTaskRunning($task)) { + $this->write('Task is not running' . "\n"); + break; + } + $this->terminateTask($task); + $this->write('Terminate task success' . "\n"); + break; + case 'task info': + if (!$this->isTaskRunning($task)) { + $this->write(sprintf('Task is not running. Last status : "%s"', $task->getLastStatus())); + break; + } + $this->logger->alert('Go'); + $status = $task->getStatus(); + // Check if task done + switch ($status) { + case 'taskDone': + $this->clearTask($task); + break; + case 'errorDone': + $this->clearTask($task); + $task->setFailedCount($task->getFailedCount() + 1); + break; + }; + $task->setLastStatus($status); + $this->write(sprintf('Status : "%s"', $status)); + break; + case 'daemon stat': + // Обходим массив тасок собираем по каждой инорфмацию по каждой таксе + // можно либо запрашивать данные у таски либо самому узнат используя метод getProcessParams + // $res = $this->getProcessParams($this->id); + break; + default: + $this->write('Command :' . $command . '!'); + } + return true; + } + + /** + * @param $command + * @return array|null ['command' => ','taskName' = >] + */ + protected function getCommandParams($command) + { + $commandParams = explode(' ', $command); + if (count($commandParams) != 3) { + return null; + } + return [ + 'taskName' => $commandParams[2], + 'command' => $commandParams[0] . ' ' . $commandParams[1] + ]; + } + + protected function isTaskRunning(ControllerClientTask $task) + { + if (!$task->checkConnection()) { + if ($this->isProcessRunning($task->getPid())) { + $this->killProcess($task->getPid()); + // Free memory for socket + $task->closeConnection(); + $task->setPid(0); + } + return false; + } + return true; + } + + protected function getProcessParams($pId) + { + $res = exec(sprintf('ps %d -eo pcpu,pmem', $pId)); + // Парсим результат (не успел сделать) + return $res; + } + + /** + * I did not have time to do + * @param $command + */ + protected function getTaskNameFromMessage($command) + { + + } + + /** + * I did not have time to do + * @param $command + */ + protected function getCommandFormMessage($command) + { + + } + + + /** + * @param ControllerClientTask $task + * @return int + * @throws Exception + */ + public function runTask(ControllerClientTask $task) + { + $pId = $this->runProcess($task->getCommand()); + $this->logger->alert('Pid:'.$pId); + if (!$pId) { + throw new Exception(sprintf('Can\'t run task %s', $task->getName())); + } + $task->setRunCount($task->getRunCount() + 1); + sleep(2); + $task->connect(); + $task->setPid($pId); + return $pId; + } + + /** + * @param $task + */ + public function clearTask(ControllerClientTask $task) + { + // Free memory for socket + $task->closeConnection(); + $task->setPid(0); + } + + /** + * @param ControllerClientTask $task + */ + public function terminateTask(ControllerClientTask $task) + { + $task->sendTerminate(); + if ($this->isProcessRunning($task->getPid())) { + $this->killProcess($task->getPid()); + } + $this->clearTask($task); + $task->setFailedCount($task->getFailedCount() + 1); + } + +} diff --git a/socketTask/SimpleSocketTask.php b/src/ServerTask/SimpleServerTask.php similarity index 59% rename from socketTask/SimpleSocketTask.php rename to src/ServerTask/SimpleServerTask.php index 70f0986..83b62d2 100644 --- a/socketTask/SimpleSocketTask.php +++ b/src/ServerTask/SimpleServerTask.php @@ -1,12 +1,12 @@ read(); + $this->logger->debug(sprintf('Read message: "%s"', $command)); } while ($this->processCommand($command)); } catch (Exception $e) { - $this->logError($e->getMessage()); + $this->logger->error($e->getMessage()); } $this->closeConnection(); } @@ -30,19 +31,27 @@ public function run() */ protected function processCommand($command) { + if (!$command) { + return true; + } + $this->logger->debug(sprintf('Process message: "%s"', $command)); switch ($command) { case 'status': $status = $this->getStatus(); - $this->write($this->getStatus()); + $this->write($this->getStatus() . "\n"); if ($status == 'Success') { return true; } break; case 'terminate': - $this->write('Exit'); + $this->write('Exit' . "\n"); return false; + case 'ping': + $this->write('pong' . "\n"); + return true; default: - $this->write('errorInput'); + $this->write('errorInput' . "\n"); + break; } $this->doSomeIterationOperation(); return true; @@ -61,13 +70,13 @@ public function doSomeIterationOperation() */ function getStatus() { - switch (rand(1,3)) { + switch (rand(0, 4)) { case 1: - return 'Proccess OK'; + return 'InProcess'; case 2: - return 'Error'; + return 'ErrorDone'; case 3: - return 'Success'; + return 'TaskDone'; } } }