diff --git a/examples/worker-process.php b/examples/worker-process.php new file mode 100755 index 0000000..5d73627 --- /dev/null +++ b/examples/worker-process.php @@ -0,0 +1,22 @@ +#!/usr/bin/env php +start(); + + $returnValue = (yield $worker->enqueue(new HelloTask())); + + printf("Return value: %d\n", $returnValue); + + $code = (yield $worker->shutdown()); + printf("Code: %d\n", $code); +})->done(); + +Loop\run(); diff --git a/examples/worker-thread.php b/examples/worker-thread.php index 8641c8b..68335cf 100755 --- a/examples/worker-thread.php +++ b/examples/worker-thread.php @@ -2,26 +2,17 @@ start(); $returnValue = (yield $worker->enqueue(new HelloTask())); yield $worker->shutdown(); -}); +})->done(); Loop\run(); diff --git a/src/Process/Process.php b/src/Process/Process.php new file mode 100644 index 0000000..457cf10 --- /dev/null +++ b/src/Process/Process.php @@ -0,0 +1,314 @@ +command = (string) $command; + + if ('' !== $cwd) { + $this->cwd = (string) $cwd; + } + + foreach ($env as $key => $value) { + if (!is_array($value)) { // $env cannot accept array values. + $this->env[(string) $key] = (string) $value; + } + } + + $this->options = $options; + } + + /** + * Stops the process if it is still running. + */ + public function __destruct() + { + $this->kill(); // Will only terminate if the process is still running. + } + + /** + * Resets process values. + */ + public function __clone() + { + $this->process = null; + $this->promise = null; + $this->pid = 0; + $this->stdin = null; + $this->stdout = null; + $this->stderr = null; + } + + /** + * @throws \Exception + */ + public function start() + { + if (null !== $this->promise) { + throw new Exception('The process has already been started.'); + } + + $fd = [ + ['pipe', 'r'], // stdin + ['pipe', 'w'], // stdout + ['pipe', 'a'], // stderr + ['pipe', 'w'], // exit code pipe + ]; + + $command = sprintf('(%s) 3>/dev/null; code=$?; echo $code >&3; exit $code', $this->command); + + $this->process = proc_open($command, $fd, $pipes, $this->cwd ?: null, $this->env ?: null, $this->options); + + if (!is_resource($this->process)) { + throw new Exception('Could not start process.'); + } + + $status = proc_get_status($this->process); + + $this->pid = $status['pid']; + + list($this->stdin, $this->stdout, $this->stderr, $stream) = $pipes; + +// $this->stdin = new WritableStream($pipes[0]); +// $this->stdout = new ReadableStream($pipes[1]); +// $this->stderr = new ReadableStream($pipes[2]); + + $stream = new ReadableStream($stream); + + $this->promise = new Coroutine($stream->read()); + $this->promise = $this->promise + ->then(function ($code) { + if ('' === $code) { + throw new Exception('Process ended unexpectedly.'); + } + return (int) $code; + }) + ->cleanup(function () use ($stream) { + proc_close($this->process); + $this->process = null; + //$this->stdin->close(); + $stream->close(); + }); + } + + /** + * @return \Generator + * + * @throws \Exception + */ + public function join() + { + if (null === $this->promise) { + throw new Exception('The process has not been started.'); + } + + try { + yield $this->promise; + } finally { + //$this->stdout->close(); + //$this->stderr->close(); + } + } + + /** + */ + public function kill() + { + if (is_resource($this->process)) { + proc_terminate($this->process, 9); // Sends SIGKILL. + $this->process = null; + } + } + + /** + * Sends the given signal to the process. + * + * @param int $signo Signal number to send to process. + * + * @throws Exception + */ + public function signal($signo) + { + if (!$this->isRunning()) { + throw new Exception('The process is not running.'); + } + + proc_terminate($this->process, (int) $signo); + } + + /** + * Returns the PID of the child process. Value is only meaningful if the process has been started and PHP was not + * compiled with --enable-sigchild. + * + * @return int + */ + public function getPid() + { + return $this->pid; + } + + /** + * Returns the command to execute. + * + * @return string The command to execute. + */ + public function getCommand() + { + return $this->command; + } + + /** + * Gets the current working directory. + * + * @return string The current working directory or null if inherited from the current PHP process. + */ + public function getWorkingDirectory() + { + if ('' === $this->cwd) { + return getcwd() ?: ''; + } + + return $this->cwd; + } + + /** + * Gets the environment variables array. + * + * @return mixed[] Array of environment variables. + */ + public function getEnv() + { + return $this->env; + } + + /** + * Gets the options to pass to proc_open(). + * + * @return mixed[] Array of options. + */ + public function getOptions() + { + return $this->options; + } + + /** + * Determines if the process is still running. + * + * @return bool + */ + public function isRunning() + { + return is_resource($this->process); + } + + /** + * Gets the process input stream (STDIN). + * + * @return resource + * + * @throws Exception If the process has not been started. + */ + public function getStdIn() + { + if (null === $this->stdin) { + throw new Exception('The process has not been started.'); + } + + return $this->stdin; + } + + /** + * Gets the process output stream (STDOUT). + * + * @return resource + * + * @throws Exception If the process has not been started. + */ + public function getStdOut() + { + if (null === $this->stdout) { + throw new Exception('The process has not been started.'); + } + + return $this->stdout; + } + + /** + * Gets the process error stream (STDERR). + * + * @return resource + * + * @throws Exception If the process has not been started. + */ + public function getStdErr() + { + if (null === $this->stderr) { + throw new Exception('The process has not been started.'); + } + + return $this->stderr; + } +} \ No newline at end of file diff --git a/src/Worker/HelloTask.php b/src/Worker/HelloTask.php new file mode 100644 index 0000000..c1cd635 --- /dev/null +++ b/src/Worker/HelloTask.php @@ -0,0 +1,11 @@ +process = new Process(sprintf('%s %s/process.php', PHP_BINARY, __DIR__), __DIR__); + } + + /** + * {@inheritdoc} + */ + public function isRunning() + { + return $this->process->isRunning(); + } + + /** + * {@inheritdoc} + */ + public function isIdle() + { + return false; + } + + /** + * {@inheritdoc} + */ + public function start() + { + $this->process->start(); + + $this->channel = new Channel( + $this->process->getStdOut(), + $this->process->getStdIn() + ); + } + + /** + * {@inheritdoc} + */ + public function kill() + { + $this->process->kill(); + } + + /** + * @return \Generator + */ + public function join() + { + return $this->process->join(); + } + + /** + * {@inheritdoc} + */ + public function shutdown() + { + yield $this->channel->send(1); + yield $this->process->join(); + //var_dump('SENT SHUTDOWN'); + } + + /** + * {@inheritdoc} + */ + public function enqueue(TaskInterface $task) + { + if (!$this->channel instanceof ChannelInterface) { + throw new SynchronizationError('Worker has not been started.'); + } + + yield $this->channel->send($task); + + yield $this->channel->receive(); + } +} \ No newline at end of file diff --git a/src/Worker/process.php b/src/Worker/process.php new file mode 100644 index 0000000..140d9e3 --- /dev/null +++ b/src/Worker/process.php @@ -0,0 +1,41 @@ +receive()); + + // Shutdown request + if ($task === 1) { + break; + } + + if (!($task instanceof TaskInterface)) { + throw new \Exception('Invalid message.'); + } + + yield $channel->send(yield $task->run()); + } + } finally { + $channel->close(); + } +} + +ob_start(function ($data) { + $written = fwrite(STDERR, $data); + return ''; +}, 1); + +$coroutine = new Coroutine(run(new Channel(STDIN, STDOUT))); +$coroutine->done(); + +Loop\run();