1
0
mirror of https://github.com/danog/parallel.git synced 2025-01-22 14:01:14 +01:00

Fix some bugs, clean up, and update doc blocks

This commit is contained in:
Aaron Piotrowski 2017-01-10 17:21:47 -06:00
parent 9dc54a9b47
commit ed73b1dd7b
2 changed files with 32 additions and 66 deletions

View File

@ -146,12 +146,14 @@ class Process implements ProcessContext {
$this->pid = $status["pid"];
foreach ($pipes as $pipe) {
\stream_set_blocking($pipe, false);
}
$this->stdin = $stdin = $pipes[0];
$this->stdout = $pipes[1];
$this->stderr = $pipes[2];
$stream = $pipes[3];
\stream_set_blocking($stream, false);
$process = &$this->process;

View File

@ -12,10 +12,10 @@ class StreamedProcess implements ProcessContext {
/** @var \Amp\Parallel\Process\Process */
private $process;
/** @var \Amp\Emitter|null */
/** @var \Amp\Emitter Emits bytes read from STDOUT. */
private $stdoutEmitter;
/** @var \Amp\Emitter|null */
/** @var \Amp\Emitter Emits bytes read from STDERR. */
private $stderrEmitter;
/** @var string|null */
@ -27,9 +27,10 @@ class StreamedProcess implements ProcessContext {
/** @var string|null */
private $stderrWatcher;
/** @var \SplQueue|null */
/** @var \SplQueue Queue of data to write to STDIN. */
private $writeQueue;
/** @var \AsyncInterop\Promise Promise resolved when process ends. */
private $promise;
/**
@ -43,21 +44,7 @@ class StreamedProcess implements ProcessContext {
$this->process = new Process($command, $cwd, $env, $options);
$this->stdoutEmitter = new Emitter;
$this->stderrEmitter = new Emitter;
}
public function __destruct() {
if ($this->stdinWatcher) {
Loop::cancel($this->stdinWatcher);
}
if ($this->stdoutWatcher) {
Loop::cancel($this->stdoutWatcher);
}
if ($this->stderrWatcher) {
Loop::cancel($this->stderrWatcher);
}
$this->writeQueue = new \SplQueue;
}
/**
@ -70,6 +57,8 @@ class StreamedProcess implements ProcessContext {
$this->stderrWatcher = null;
$this->stdoutEmitter = new Emitter;
$this->stderrEmitter = new Emitter;
$this->writeQueue = new \SplQueue;
$this->promise = null;
}
/**
@ -78,7 +67,7 @@ class StreamedProcess implements ProcessContext {
public function start() {
$this->process->start();
$this->writeQueue = $writes = new \SplQueue;
$writes = $this->writeQueue;
$this->stdinWatcher = Loop::onWritable($this->process->getStdIn(), static function ($watcher, $resource) use ($writes) {
while (!$writes->isEmpty()) {
/** @var \Amp\Deferred $deferred */
@ -114,56 +103,35 @@ class StreamedProcess implements ProcessContext {
});
Loop::disable($this->stdinWatcher);
$stdout = &$this->stdoutEmitter;
$this->stdoutWatcher = Loop::onReadable($this->process->getStdOut(), static function ($watcher, $resource) use (&$stdout) {
if (@\feof($resource)) {
Loop::cancel($watcher);
return;
}
$callback = static function ($watcher, $resource, Emitter $emitter) {
// Error reporting suppressed since fread() produces a warning if the stream unexpectedly closes.
$data = @\fread($resource, self::CHUNK_SIZE);
if ($data === false) {
Loop::cancel($watcher);
if (@\feof($resource) || ($data = @\fread($resource, self::CHUNK_SIZE)) === false) {
Loop::disable($watcher);
return;
}
if ($data !== "") {
$stdout->emit($data);
$emitter->emit($data);
}
});
};
$stderr = &$this->stderrEmitter;
$this->stderrWatcher = Loop::onReadable($this->process->getStdErr(), static function ($watcher, $resource) use (&$stderr) {
if (@\feof($resource)) {
Loop::cancel($watcher);
return;
}
// Error reporting suppressed since fread() produces a warning if the stream unexpectedly closes.
$data = @\fread($resource, self::CHUNK_SIZE);
if ($data === false) {
Loop::cancel($watcher);
return;
}
if ($data !== "") {
$stderr->emit($data);
}
});
$this->stdoutWatcher = Loop::onReadable($this->process->getStdOut(), $callback, $this->stdoutEmitter);
$this->stderrWatcher = Loop::onReadable($this->process->getStdErr(), $callback, $this->stderrEmitter);
$this->promise = $this->process->join();
$this->promise->when(static function (\Throwable $exception = null, int $code = null) use (&$stdout, &$stderr) {
$this->promise->when(function (\Throwable $exception = null, int $code = null) {
Loop::cancel($this->stdinWatcher);
Loop::cancel($this->stdoutWatcher);
Loop::cancel($this->stderrWatcher);
if ($exception) {
$stdout->fail($exception);
$stderr->fail($exception);
$this->stdoutEmitter->fail($exception);
$this->stderrEmitter->fail($exception);
return;
}
$stdout->resolve($code);
$stderr->resolve($code);
$this->stdoutEmitter->resolve($code);
$this->stderrEmitter->resolve($code);
});
}
@ -230,18 +198,10 @@ class StreamedProcess implements ProcessContext {
}
public function getStdOut(): Stream {
if ($this->stdoutEmitter === null) {
throw new StatusError("The process has not been started");
}
return $this->stdoutEmitter->stream();
}
public function getStdErr(): Stream {
if ($this->stderrEmitter === null) {
throw new StatusError("The process has not been started");
}
return $this->stderrEmitter->stream();
}
@ -249,6 +209,10 @@ class StreamedProcess implements ProcessContext {
* {@inheritdoc}
*/
public function join(): Promise {
if ($this->promise === null) {
throw new StatusError("The process has not been started");
}
return $this->promise;
}