diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..85e7c1d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/.idea/ diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..0ed5e1a --- /dev/null +++ b/composer.json @@ -0,0 +1,10 @@ +{ + "require": { + "monolog/monolog": "~1.13" + }, + "autoload": { + "psr-4": { + "SocketDaemon\\": "src" + } + } +} diff --git a/composer.lock b/composer.lock new file mode 100644 index 0000000..0e8b1ab --- /dev/null +++ b/composer.lock @@ -0,0 +1,129 @@ +{ + "_readme": [ + "This file locks the dependencies of your project to a known state", + "Read more about it at http://getcomposer.org/doc/01-basic-usage.md#composer-lock-the-lock-file", + "This file is @generated automatically" + ], + "hash": "bc0c38f29d7f74457128b2183cb11cd3", + "packages": [ + { + "name": "monolog/monolog", + "version": "1.13.1", + "source": { + "type": "git", + "url": "https://github.com/Seldaek/monolog.git", + "reference": "c31a2c4e8db5da8b46c74cf275d7f109c0f249ac" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/Seldaek/monolog/zipball/c31a2c4e8db5da8b46c74cf275d7f109c0f249ac", + "reference": "c31a2c4e8db5da8b46c74cf275d7f109c0f249ac", + "shasum": "" + }, + "require": { + "php": ">=5.3.0", + "psr/log": "~1.0" + }, + "provide": { + "psr/log-implementation": "1.0.0" + }, + "require-dev": { + "aws/aws-sdk-php": "~2.4, >2.4.8", + "doctrine/couchdb": "~1.0@dev", + "graylog2/gelf-php": "~1.0", + "phpunit/phpunit": "~4.0", + "raven/raven": "~0.5", + "ruflin/elastica": "0.90.*", + "swiftmailer/swiftmailer": "~5.3", + "videlalvaro/php-amqplib": "~2.4" + }, + "suggest": { + "aws/aws-sdk-php": "Allow sending log messages to AWS services like DynamoDB", + "doctrine/couchdb": "Allow sending log messages to a CouchDB server", + "ext-amqp": "Allow sending log messages to an AMQP server (1.0+ required)", + "ext-mongo": "Allow sending log messages to a MongoDB server", + "graylog2/gelf-php": "Allow sending log messages to a GrayLog2 server", + "raven/raven": "Allow sending log messages to a Sentry server", + "rollbar/rollbar": "Allow sending log messages to Rollbar", + "ruflin/elastica": "Allow sending log messages to an Elastic Search server", + "videlalvaro/php-amqplib": "Allow sending log messages to an AMQP server using php-amqplib" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "1.13.x-dev" + } + }, + "autoload": { + "psr-4": { + "Monolog\\": "src/Monolog" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Jordi Boggiano", + "email": "j.boggiano@seld.be", + "homepage": "http://seld.be" + } + ], + "description": "Sends your logs to files, sockets, inboxes, databases and various web services", + "homepage": "http://github.com/Seldaek/monolog", + "keywords": [ + "log", + "logging", + "psr-3" + ], + "time": "2015-03-09 09:58:04" + }, + { + "name": "psr/log", + "version": "1.0.0", + "source": { + "type": "git", + "url": "https://github.com/php-fig/log.git", + "reference": "fe0936ee26643249e916849d48e3a51d5f5e278b" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-fig/log/zipball/fe0936ee26643249e916849d48e3a51d5f5e278b", + "reference": "fe0936ee26643249e916849d48e3a51d5f5e278b", + "shasum": "" + }, + "type": "library", + "autoload": { + "psr-0": { + "Psr\\Log\\": "" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "PHP-FIG", + "homepage": "http://www.php-fig.org/" + } + ], + "description": "Common interface for logging libraries", + "keywords": [ + "log", + "psr", + "psr-3" + ], + "time": "2012-12-21 11:40:51" + } + ], + "packages-dev": [], + "aliases": [], + "minimum-stability": "stable", + "stability-flags": [], + "prefer-stable": false, + "prefer-lowest": false, + "platform": [], + "platform-dev": [] +} diff --git a/daemon.php b/daemon.php new file mode 100644 index 0000000..dfc9bfe --- /dev/null +++ b/daemon.php @@ -0,0 +1,31 @@ + /dev/null 2>&1 &" +$loader = include __DIR__ . '/vendor/autoload.php'; + +use SocketDaemon\ClientTask\ControllerClientTask; +use SocketDaemon\ServerTask\ControllerServerTask; +use Monolog\Logger; + +ob_implicit_flush(); + +$firstTask = new ControllerClientTask(); +$firstTask + ->setName('first') + ->setHost('0.0.0.0') + ->setPort(10009) + ->setCommand('php firstTask.php'); +$secondTask = new ControllerClientTask(); +$secondTask + ->setName('second') + ->setHost('0.0.0.0') + ->setPort(10009) + ->setCommand('php secondTask.php'); + +$testSocket = new ControllerServerTask( + '0.0.0.0', + 10003, + ['first' => $firstTask, 'second' => $secondTask], + 'Task controller', + new Logger('TaskController') +); +$testSocket->run(); diff --git a/firstTask.php b/firstTask.php new file mode 100644 index 0000000..241635f --- /dev/null +++ b/firstTask.php @@ -0,0 +1,9 @@ +run(); diff --git a/secondTask.php b/secondTask.php new file mode 100644 index 0000000..241635f --- /dev/null +++ b/secondTask.php @@ -0,0 +1,9 @@ +run(); diff --git a/shmest.txt b/shmest.txt deleted file mode 100644 index 999ea5a..0000000 --- a/shmest.txt +++ /dev/null @@ -1 +0,0 @@ -shmest diff --git a/src/ClientTask/ControllerClientTask.php b/src/ClientTask/ControllerClientTask.php new file mode 100644 index 0000000..fd433cc --- /dev/null +++ b/src/ClientTask/ControllerClientTask.php @@ -0,0 +1,264 @@ +port; + } + + /** + * @param int $port + * @return $this + */ + public function setPort($port) + { + $this->port = $port; + return $this; + } + + /** + * @return string + */ + public function getCommand() + { + return $this->command; + } + + /** + * @param $command + * @return $this + */ + public function setCommand($command) + { + $this->command = $command; + return $this; + } + + /** + * @return string + */ + public function getName() + { + return $this->name; + } + + /** + * @param $name + * @return $this + */ + public function setName($name) + { + $this->name = $name; + return $this; + } + + /** + * @return int + */ + public function getPid() + { + return $this->pid; + } + + /** + * @param $pid + * @return $this + */ + public function setPid($pid) + { + $this->pid = $pid; + return $this; + } + + /** + * @return SocketClient + */ + public function getSocketClient() + { + return $this->socketClient; + } + + /** + * @param SocketClient $socketClient + */ + public function setSocketClient($socketClient) + { + $this->socketClient = $socketClient; + } + + /** + * @return string + */ + public function getHost() + { + return $this->host; + } + + /** + * @param string $host + * @return $this + */ + public function setHost($host) + { + $this->host = $host; + return $this; + } + + /** + * @return int + */ + public function getFailedCount() + { + return $this->failedCount; + } + + /** + * @param int $failedCount + */ + public function setFailedCount($failedCount) + { + $this->failedCount = $failedCount; + } + + /** + * @return int + */ + public function getRunCount() + { + return $this->runCount; + } + + /** + * @param int $runCount + */ + public function setRunCount($runCount) + { + $this->runCount = $runCount; + } + + public function checkConnection() + { + if ($this->getSocketClient()) { + return $this->socketClient->ping(); + } + return false; + } + + public function connect() + { + $this->socketClient = new SocketClient(); + $this->socketClient->connect($this->host, $this->port); + } + + public function run() + { + $this->setRunCount($this->getRunCount()+1); + $this->setPid( + exec(sprintf("%s > /dev/null 2>&1 & echo $!", $this->getCommand())) + ); + if (!$this->getPid()) { + throw new Exception(sprintf('Can\'t run task %s', $this->getName())); + } + $this->connect(); + } + + /** + * @return bool + */ + public function isRun() + { + if (!$this->getPid()) { + return false; + } + return $this->checkConnection(); + } + + /** + * @return string + */ + public function getStatus() + { + $this->socketClient->write('status'); + return $this->socketClient->read(); + } + + public function sendTerminate() + { + $this->socketClient->write('exit'); + } + + public function closeConnection() + { + $this->socketClient->connectionClose(); + } + + /** + * @return string + */ + public function getLastStatus() + { + return $this->lastStatus; + } + + /** + * @param string $lastStatus + */ + public function setLastStatus($lastStatus) + { + $this->lastStatus = $lastStatus; + } + + +} diff --git a/src/ClientTask/SocketClient.php b/src/ClientTask/SocketClient.php new file mode 100644 index 0000000..70d8d03 --- /dev/null +++ b/src/ClientTask/SocketClient.php @@ -0,0 +1,60 @@ +socket = $socket; + + } + + public function connect($host, $port) + { + if (!socket_connect($this->socket, $host, $port)) { + throw new Exception( + sprintf( + 'Can\'t connect message %s', + socket_strerror(socket_last_error($this->socket)) + ) + ); + } + } + + /** + * @return boolean + */ + public function ping() + { + $this->write('ping'); + return ($this->read() === 'pong'); + } + + public function read() + { + return socket_read($this->socket, 2048); + } + + public function write($text) + { + socket_write($this->socket, $text, strlen($text)); + } + + public function connectionClose() + { + socket_close($this->socket); + } + + +} diff --git a/src/ServerTask/BaseServerTask.php b/src/ServerTask/BaseServerTask.php new file mode 100644 index 0000000..cb32ed7 --- /dev/null +++ b/src/ServerTask/BaseServerTask.php @@ -0,0 +1,157 @@ + command + */ + protected $tasks; + + /** + * @param string $host + * @param int $port + * @param string $greetingText + * @param LoggerInterface $logger + * @param int $backlog + * @throws Exception + */ + public function __construct($host, $port, $greetingText, LoggerInterface $logger, $backlog = 5) + { + $this->logger = $logger; + $this->greetingText = $greetingText; + $this->socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); + if (!$this->socket) { + throw new Exception( + sprintf( + 'Can\'t create socket message: "%s"', + socket_strerror(socket_last_error()) + ) + ); + } + if (!socket_bind($this->socket, $host, $port)) { + throw new Exception( + sprintf( + 'Can\'t bind socket host: "%s", port: "%s", message: "%s"', + $host, + $port, + socket_strerror(socket_last_error($this->socket)) + ) + ); + } + + if (!socket_listen($this->socket, $backlog)) { + throw new Exception( + sprintf('Can\'t listen socket message: "%s"', socket_strerror(socket_last_error($this->socket))) + ); + } + $this->id = $this->getId(); + } + + /** + * @throws Exception + */ + public function run() + { + do { + $this->connect(); + $this->logger->debug('Connected'); + $this->write($this->greetingText . "\n"); + try { + do { + $command = $this->read(); + } while ($this->processCommand($command)); + } catch (Exception $e) { + $this->logger->error($e->getMessage()); + } + $this->closeConnection(); + } while (true); + } + + protected function connect() + { + $this->conn = socket_accept($this->socket); + if (!($this->conn)) { + throw new Exception( + sprintf('Can\'t accept connection message: "%s"', socket_strerror(socket_last_error($this->socket))) + ); + } + } + + protected function closeConnection() + { + socket_close($this->conn); + } + + /** + * @param string $text + */ + protected function write($text) + { + socket_write($this->conn, $text, strlen($text)); + } + + /** + * @throws Exception + * @return string + */ + protected function read() + { + $message = socket_read($this->conn, 2048, PHP_NORMAL_READ); + if (!$message) { + throw new Exception( + sprintf('Can\'t read message: "%s"', socket_strerror(socket_last_error($this->conn))) + ); + } + return trim($message); + } + /** + * @throws Exception + */ + protected function processCommand($command) + { + switch ($command) { + case 'exit': + return false; + default: + $this->write('Command :' . $command . '!'); + } + return true; + } + + /** + * @return int + */ + public function getId() + { + return exec('echo $$'); + } + +} diff --git a/src/ServerTask/ControllerServerTask.php b/src/ServerTask/ControllerServerTask.php new file mode 100644 index 0000000..ffc4043 --- /dev/null +++ b/src/ServerTask/ControllerServerTask.php @@ -0,0 +1,256 @@ + command + */ + protected $tasks; + + /** + * @param string $host + * @param int $port + * @param array $tasks + * @param string $greetingText + * @param LoggerInterface $logger + * @param int $backlog + * @throws Exception + */ + public function __construct($host, $port, array $tasks, $greetingText, LoggerInterface $logger, $backlog = 5) + { + parent::__construct($host, $port, $greetingText, $logger, $backlog); + $this->tasks = $tasks; + } + + + /** + * @throws Exception + */ + public function run() + { + do { + try { + $this->connect(); + $this->write($this->greetingText . "\n"); + do { + $command = $this->read(); + } while ($this->processCommand($command)); + $this->closeConnection(); + } catch (Exception $e) { + $this->logger->error($e->getMessage()); + // Нужно сделать закрытие коннеткта если ошибка была в $this->processCommand($command)) + } + } while (true); + } + + + /** + * @param $command + * @return int pId + */ + public function runProcess($command) + { + $this->logger->alert('Run command :'.$command); + return exec(sprintf("%s > /dev/null 2>&1 & echo $!", $command)); + } + + + /** + * @param $id + * @return bool + */ + function isProcessRunning($id) + { + if (!$id) { + return false; + } + $result = shell_exec(sprintf("ps %d", $id)); + if (count(preg_split("/\n/", $result)) > 2) { + return true; + } + return false; + } + + public function killProcess($id) + { + exec(sprintf("kill %d", $id)); + } + + /** + * @param $command + * @return bool + * @throws Exception + */ + protected function processCommand($command) + { + if (!$command) { + return true; + } + $this->logger->debug(sprintf('Process message: "%s"', $command)); + $commandParams = $this->getCommandParams($command); + if (!$commandParams) { + return true; + } + $this->logger->debug(print_r($commandParams, 1)); + if (!isset($this->tasks[$commandParams['taskName']])) { + $this->logger->debug(sprintf('Task is not supported: "%s"', $command)); + return true; + } + /** + * @var $task ControllerClientTask + */ + $task = $this->tasks[$commandParams['taskName']]; + $this->logger->alert($task->getPid()); + switch ($commandParams['command']) { + case 'task start': + if ($this->isTaskRunning($task)) { + $this->write('Task is already running' . "\n"); + break; + } + $this->write(sprintf('Task run pid: "%d"', $this->runTask($task))); + break; + case 'task stop': + if (!$this->isTaskRunning($task)) { + $this->write('Task is not running' . "\n"); + break; + } + $this->terminateTask($task); + $this->write('Terminate task success' . "\n"); + break; + case 'task info': + if (!$this->isTaskRunning($task)) { + $this->write(sprintf('Task is not running. Last status : "%s"', $task->getLastStatus())); + break; + } + $this->logger->alert('Go'); + $status = $task->getStatus(); + // Check if task done + switch ($status) { + case 'taskDone': + $this->clearTask($task); + break; + case 'errorDone': + $this->clearTask($task); + $task->setFailedCount($task->getFailedCount() + 1); + break; + }; + $task->setLastStatus($status); + $this->write(sprintf('Status : "%s"', $status)); + break; + case 'daemon stat': + // Обходим массив тасок собираем по каждой инорфмацию по каждой таксе + // можно либо запрашивать данные у таски либо самому узнат используя метод getProcessParams + // $res = $this->getProcessParams($this->id); + break; + default: + $this->write('Command :' . $command . '!'); + } + return true; + } + + /** + * @param $command + * @return array|null ['command' => ','taskName' = >] + */ + protected function getCommandParams($command) + { + $commandParams = explode(' ', $command); + if (count($commandParams) != 3) { + return null; + } + return [ + 'taskName' => $commandParams[2], + 'command' => $commandParams[0] . ' ' . $commandParams[1] + ]; + } + + protected function isTaskRunning(ControllerClientTask $task) + { + if (!$task->checkConnection()) { + if ($this->isProcessRunning($task->getPid())) { + $this->killProcess($task->getPid()); + // Free memory for socket + $task->closeConnection(); + $task->setPid(0); + } + return false; + } + return true; + } + + protected function getProcessParams($pId) + { + $res = exec(sprintf('ps %d -eo pcpu,pmem', $pId)); + // Парсим результат (не успел сделать) + return $res; + } + + /** + * I did not have time to do + * @param $command + */ + protected function getTaskNameFromMessage($command) + { + + } + + /** + * I did not have time to do + * @param $command + */ + protected function getCommandFormMessage($command) + { + + } + + + /** + * @param ControllerClientTask $task + * @return int + * @throws Exception + */ + public function runTask(ControllerClientTask $task) + { + $pId = $this->runProcess($task->getCommand()); + $this->logger->alert('Pid:'.$pId); + if (!$pId) { + throw new Exception(sprintf('Can\'t run task %s', $task->getName())); + } + $task->setRunCount($task->getRunCount() + 1); + sleep(2); + $task->connect(); + $task->setPid($pId); + return $pId; + } + + /** + * @param $task + */ + public function clearTask(ControllerClientTask $task) + { + // Free memory for socket + $task->closeConnection(); + $task->setPid(0); + } + + /** + * @param ControllerClientTask $task + */ + public function terminateTask(ControllerClientTask $task) + { + $task->sendTerminate(); + if ($this->isProcessRunning($task->getPid())) { + $this->killProcess($task->getPid()); + } + $this->clearTask($task); + $task->setFailedCount($task->getFailedCount() + 1); + } + +} diff --git a/src/ServerTask/SimpleServerTask.php b/src/ServerTask/SimpleServerTask.php new file mode 100644 index 0000000..83b62d2 --- /dev/null +++ b/src/ServerTask/SimpleServerTask.php @@ -0,0 +1,82 @@ +connect(); + $this->write($this->greetingText . "\n"); + try { + do { + $command = $this->read(); + $this->logger->debug(sprintf('Read message: "%s"', $command)); + } while ($this->processCommand($command)); + } catch (Exception $e) { + $this->logger->error($e->getMessage()); + } + $this->closeConnection(); + } + + /** + * @param $command + * @return bool (need close process) + */ + protected function processCommand($command) + { + if (!$command) { + return true; + } + $this->logger->debug(sprintf('Process message: "%s"', $command)); + switch ($command) { + case 'status': + $status = $this->getStatus(); + $this->write($this->getStatus() . "\n"); + if ($status == 'Success') { + return true; + } + break; + case 'terminate': + $this->write('Exit' . "\n"); + return false; + case 'ping': + $this->write('pong' . "\n"); + return true; + default: + $this->write('errorInput' . "\n"); + break; + } + $this->doSomeIterationOperation(); + return true; + } + + /** + * In this place we do some operations + */ + public function doSomeIterationOperation() + { + + } + + /** + * @return string + */ + function getStatus() + { + switch (rand(0, 4)) { + case 1: + return 'InProcess'; + case 2: + return 'ErrorDone'; + case 3: + return 'TaskDone'; + } + } +}