1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-26 20:34:40 +01:00

Refactor process to unreference exit code stream

This commit is contained in:
Aaron Piotrowski 2015-09-18 15:07:59 -05:00
parent dbda7e6506
commit 4a804536f2

View File

@ -4,7 +4,8 @@ namespace Icicle\Concurrent\Process;
use Exception; use Exception;
use Icicle\Concurrent\Exception\ProcessException; use Icicle\Concurrent\Exception\ProcessException;
use Icicle\Concurrent\Exception\StatusError; use Icicle\Concurrent\Exception\StatusError;
use Icicle\Coroutine\Coroutine; use Icicle\Loop;
use Icicle\Promise\Promise;
use Icicle\Socket\Stream\ReadableStream; use Icicle\Socket\Stream\ReadableStream;
use Icicle\Socket\Stream\WritableStream; use Icicle\Socket\Stream\WritableStream;
@ -56,10 +57,15 @@ class Process
private $pid = 0; private $pid = 0;
/** /**
* @var \Icicle\Promise\PromiseInterface * @var \Icicle\Promise\PromiseInterface|null
*/ */
private $promise; private $promise;
/**
* @var \Icicle\Loop\Events\SocketEventInterface|null
*/
private $poll;
/** /**
* @param string $command Command to run. * @param string $command Command to run.
* @param string|null $cwd Working directory or use an empty string to use the working directory of the current * @param string|null $cwd Working directory or use an empty string to use the working directory of the current
@ -91,6 +97,7 @@ class Process
{ {
$this->process = null; $this->process = null;
$this->promise = null; $this->promise = null;
$this->poll = null;
$this->pid = 0; $this->pid = 0;
$this->stdin = null; $this->stdin = null;
$this->stdout = null; $this->stdout = null;
@ -135,12 +142,22 @@ class Process
$this->stdout = new ReadableStream($pipes[1]); $this->stdout = new ReadableStream($pipes[1]);
$this->stderr = new ReadableStream($pipes[2]); $this->stderr = new ReadableStream($pipes[2]);
$stream = new ReadableStream($pipes[3]); $stream = $pipes[3];
stream_set_blocking($stream, 0);
$this->promise = (new Coroutine($stream->read())) $this->promise = new Promise(function (callable $resolve) use ($stream) {
$this->poll = Loop\poll($stream, function ($resource) use ($resolve) {
$resolve((string) fread($resource, 1));
});
$this->poll->unreference();
$this->poll->listen();
});
$this->promise = $this->promise
->then(function ($code) { ->then(function ($code) {
if ('' === $code) { if ('' === $code) {
throw new Exception('Process ended unexpectedly.'); throw new Exception('Process ended unexpectedly without providing a status code.');
} }
return (int) $code; return (int) $code;
}) })
@ -150,7 +167,7 @@ class Process
$this->process = null; $this->process = null;
} }
$this->stdin->close(); $this->stdin->close();
$stream->close(); $this->poll->free();
}); });
} }
@ -167,6 +184,8 @@ class Process
throw new StatusError('The process has not been started.'); throw new StatusError('The process has not been started.');
} }
$this->poll->reference();
try { try {
yield $this->promise; yield $this->promise;
} finally { } finally {