diff --git a/src/Process/Process.php b/src/Process/Process.php index 9b6230a..a8fc9d6 100644 --- a/src/Process/Process.php +++ b/src/Process/Process.php @@ -4,7 +4,8 @@ namespace Icicle\Concurrent\Process; use Exception; use Icicle\Concurrent\Exception\ProcessException; use Icicle\Concurrent\Exception\StatusError; -use Icicle\Coroutine\Coroutine; +use Icicle\Loop; +use Icicle\Promise\Promise; use Icicle\Socket\Stream\ReadableStream; use Icicle\Socket\Stream\WritableStream; @@ -56,10 +57,15 @@ class Process private $pid = 0; /** - * @var \Icicle\Promise\PromiseInterface + * @var \Icicle\Promise\PromiseInterface|null */ private $promise; + /** + * @var \Icicle\Loop\Events\SocketEventInterface|null + */ + private $poll; + /** * @param string $command Command to run. * @param string|null $cwd Working directory or use an empty string to use the working directory of the current @@ -91,6 +97,7 @@ class Process { $this->process = null; $this->promise = null; + $this->poll = null; $this->pid = 0; $this->stdin = null; $this->stdout = null; @@ -135,12 +142,22 @@ class Process $this->stdout = new ReadableStream($pipes[1]); $this->stderr = new ReadableStream($pipes[2]); - $stream = new ReadableStream($pipes[3]); + $stream = $pipes[3]; + stream_set_blocking($stream, 0); - $this->promise = (new Coroutine($stream->read())) + $this->promise = new Promise(function (callable $resolve) use ($stream) { + $this->poll = Loop\poll($stream, function ($resource) use ($resolve) { + $resolve((string) fread($resource, 1)); + }); + + $this->poll->unreference(); + $this->poll->listen(); + }); + + $this->promise = $this->promise ->then(function ($code) { if ('' === $code) { - throw new Exception('Process ended unexpectedly.'); + throw new Exception('Process ended unexpectedly without providing a status code.'); } return (int) $code; }) @@ -150,7 +167,7 @@ class Process $this->process = null; } $this->stdin->close(); - $stream->close(); + $this->poll->free(); }); } @@ -167,6 +184,8 @@ class Process throw new StatusError('The process has not been started.'); } + $this->poll->reference(); + try { yield $this->promise; } finally {