1
0
mirror of https://github.com/danog/process.git synced 2025-01-22 05:41:17 +01:00

Refactor StreamedProcess

This commit is contained in:
Aaron Piotrowski 2017-06-15 12:02:50 -05:00
parent 594a5ef039
commit 18ef28bf71
6 changed files with 83 additions and 197 deletions

View File

@ -3,11 +3,12 @@
"homepage": "https://github.com/amphp/process",
"description": "Asynchronous process manager",
"require": {
"amphp/amp": "dev-master as 2.0"
"amphp/amp": "^2",
"amphp/byte-stream": "^1"
},
"require-dev": {
"phpunit/phpunit": "^5.0",
"friendsofphp/php-cs-fixer": "~1.9",
"friendsofphp/php-cs-fixer": "^2.3",
"kelunik/fqn-check": "^0.1.3"
},
"license": "MIT",

View File

@ -2,14 +2,15 @@
include dirname(__DIR__) . "/vendor/autoload.php";
use Amp\ByteStream\Message;
use Amp\Process\StreamedProcess;
Amp\Loop::run(function() {
$process = new StreamedProcess("echo 'Hello, world!'");
$promise = $process->execute();
$process->start();
echo yield $process->getStdout();
echo yield new Message($process);
$code = yield $promise;
$code = yield $process->join();
echo "Process exited with {$code}.\n";
});

View File

@ -6,14 +6,12 @@ use Amp\Process\StreamedProcess;
Amp\Loop::run(function() {
$process = new StreamedProcess("echo 1; sleep 1; echo 2; sleep 1; echo 3; exit 42");
$promise = $process->execute();
$process->start();
$stdout = $process->getStdout();
while (yield $stdout->advance()) {
echo $stdout->getCurrent();
while ($chunk = yield $process->read()) {
echo $chunk;
}
$code = yield $promise;
$code = yield $process->join();
echo "Process exited with {$code}.\n";
});

View File

@ -2,17 +2,18 @@
include dirname(__DIR__) . "/vendor/autoload.php";
use Amp\ByteStream\Message;
use Amp\Process\StreamedProcess;
Amp\Loop::run(function() {
$process = new StreamedProcess('read ; echo "$REPLY"');
$promise = $process->execute();
$process->start();
/* send to stdin */
$process->write("abc\n");
echo yield $process->getStdout();
echo yield new Message($process);
$code = yield $promise;
$code = yield $process->join();
echo "Process exited with {$code}.\n";
});

View File

@ -2,7 +2,9 @@
namespace Amp\Process;
use Amp\{ Deferred, Loop, Promise };
use Amp\Deferred;
use Amp\Loop;
use Amp\Promise;
class Process {
/** @var bool */

View File

@ -2,37 +2,27 @@
namespace Amp\Process;
use Amp\{ Deferred, Emitter, Failure, Loop, Message, Promise, Success };
use Amp\ByteStream\InputStream;
use Amp\ByteStream\OutputStream;
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Promise;
use function Amp\call;
class StreamedProcess {
class StreamedProcess implements InputStream, OutputStream {
const CHUNK_SIZE = 8192;
/** @var \Amp\Process\Process */
private $process;
/** @var \Amp\Emitter Emits bytes read from STDOUT. */
private $stdoutEmitter;
/** @var \Amp\ByteStream\ResourceOutputStream */
private $stdin;
/** @var \Amp\Emitter Emits bytes read from STDERR. */
private $stderrEmitter;
/** @var \Amp\ByteStream\ResourceInputStream */
private $stdout;
/** @var \Amp\Message */
private $stdoutMessage;
/** @var \Amp\Message */
private $stderrMessage;
/** @var string|null */
private $stdinWatcher;
/** @var string|null */
private $stdoutWatcher;
/** @var string|null */
private $stderrWatcher;
/** @var \SplQueue Queue of data to write to STDIN. */
private $writeQueue;
/** @var \Amp\ByteStream\ResourceInputStream */
private $stderr;
/**
* @param string|array $command Command to run.
@ -43,11 +33,6 @@ class StreamedProcess {
*/
public function __construct($command, string $cwd = null, array $env = [], array $options = []) {
$this->process = new Process($command, $cwd, $env, $options);
$this->stdoutEmitter = new Emitter;
$this->stderrEmitter = new Emitter;
$this->stdoutMessage = new Message($this->stdoutEmitter->stream());
$this->stderrMessage = new Message($this->stderrEmitter->stream());
$this->writeQueue = new \SplQueue;
}
/**
@ -55,14 +40,9 @@ class StreamedProcess {
*/
public function __clone() {
$this->process = clone $this->process;
$this->stdinWatcher = null;
$this->stdoutWatcher = null;
$this->stderrWatcher = null;
$this->stdoutEmitter = new Emitter;
$this->stderrEmitter = new Emitter;
$this->stdoutMessage = new Message($this->stdoutEmitter->stream());
$this->stderrMessage = new Message($this->stderrEmitter->stream());
$this->writeQueue = new \SplQueue;
$this->stdin = null;
$this->stdout = null;
$this->stderr = null;
}
/**
@ -71,177 +51,80 @@ class StreamedProcess {
public function start() {
$this->process->start();
$process = $this->process;
$writes = $this->writeQueue;
$this->stdinWatcher = Loop::onWritable($this->process->getStdin(), static function ($watcher, $resource) use (
$process, $writes
) {
try {
while (!$writes->isEmpty()) {
/** @var \Amp\Deferred $deferred */
list($data, $previous, $deferred) = $writes->shift();
$length = \strlen($data);
if ($length === 0) {
$deferred->resolve(0);
continue;
}
// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
$written = @\fwrite($resource, $data);
if ($written === false || $written === 0) {
$message = "Failed to write to STDIN";
if ($error = \error_get_last()) {
$message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]);
}
$exception = new ProcessException($message);
$deferred->fail($exception);
while (!$writes->isEmpty()) { // Empty the write queue and fail all Deferreds.
list(, , $deferred) = $writes->shift();
$deferred->fail($exception);
}
$process->kill();
return;
}
if ($length <= $written) {
$deferred->resolve($written + $previous);
continue;
}
$data = \substr($data, $written);
$writes->unshift([$data, $written + $previous, $deferred]);
return;
}
} finally {
if ($writes->isEmpty()) {
Loop::disable($watcher);
}
}
});
if ($this->writeQueue->isEmpty()) {
Loop::disable($this->stdinWatcher);
}
$callback = static function ($watcher, $resource, Emitter $emitter) {
// Error reporting suppressed since fread() produces a warning if the stream unexpectedly closes.
if (@\feof($resource) || ($data = @\fread($resource, self::CHUNK_SIZE)) === false) {
Loop::disable($watcher);
return;
}
if ($data !== "") {
$emitter->emit($data);
}
};
$this->stdoutWatcher = Loop::onReadable($this->process->getStdout(), $callback, $this->stdoutEmitter);
$this->stderrWatcher = Loop::onReadable($this->process->getStderr(), $callback, $this->stderrEmitter);
$this->process->join()->onResolve(function (\Throwable $exception = null, int $code = null) {
Loop::cancel($this->stdinWatcher);
Loop::cancel($this->stdoutWatcher);
Loop::cancel($this->stderrWatcher);
if ($exception) {
$this->stdoutEmitter->fail($exception);
$this->stderrEmitter->fail($exception);
return;
}
$this->stdoutEmitter->resolve($code);
$this->stderrEmitter->resolve($code);
});
$this->stdin = new ResourceOutputStream($this->process->getStdin());
$this->stdout = new ResourceInputStream($this->process->getStdout());
$this->stderr = new ResourceInputStream($this->process->getStderr());
}
/** @inheritdoc */
public function join(): Promise {
return $this->process->join();
return call(function () {
try {
return yield $this->process->join();
} finally {
$this->stdin->close();
$this->stdout->close();
$this->stderr->close();
}
});
}
/**
* {@inheritdoc}
*/
/** @inheritdoc */
public function isRunning(): bool {
return $this->process->isRunning();
}
/** @inheritdoc */
public function write(string $data): Promise {
if (!$this->stdin) {
throw new StatusError("The process has not been started");
}
return $this->stdin->write($data);
}
/** @inheritdoc */
public function end(string $data = ""): Promise {
if (!$this->stdin) {
throw new StatusError("The process has not been started");
}
return $this->stdin->end($data);
}
/** @inheritdoc */
public function read(): Promise {
if (!$this->stdout) {
throw new StatusError("The process has not been started");
}
return $this->stdout->read();
}
/**
* @param string $data
* Reads from stderr in the same way that read() reads from stdout.
*
* @return \Amp\Promise
*/
public function write(string $data): Promise {
$length = \strlen($data);
$written = 0;
if ($this->writeQueue->isEmpty()) {
if ($length === 0) {
return new Success(0);
}
// Error reporting suppressed since fwrite() emits E_WARNING if the pipe is broken or the buffer is full.
$written = @\fwrite($this->process->getStdIn(), $data);
if ($written === false) {
$message = "Failed to write to stream";
if ($error = \error_get_last()) {
$message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]);
}
return new Failure(new ProcessException($message));
}
if ($length <= $written) {
return new Success($written);
}
$data = \substr($data, $written);
public function readError(): Promise {
if (!$this->stderr) {
throw new StatusError("The process has not been started");
}
$deferred = new Deferred;
$this->writeQueue->push([$data, $written, $deferred]);
if ($this->stdinWatcher) {
Loop::enable($this->stdinWatcher);
}
return $deferred->promise();
return $this->stderr->read();
}
/**
* Message buffering the output of STDOUT.
*
* @return \Amp\Message
*/
public function getStdout(): Message {
return $this->stdoutMessage;
}
/**
* Message buffering the output of STDERR.
*
* @return \Amp\Message
*/
public function getStderr(): Message {
return $this->stderrMessage;
}
/**
* {@inheritdoc}
*/
/** @inheritdoc */
public function kill() {
$this->process->kill();
}
/**
* {@inheritdoc}
*/
/** @inheritdoc */
public function getPid(): int {
return $this->process->getPid();
}
/**
* {@inheritdoc}
*/
/** @inheritdoc */
public function signal(int $signo) {
$this->process->signal($signo);
}