mirror of
https://github.com/danog/process.git
synced 2025-01-22 22:01:22 +01:00
Update to Amp 1.0
This commit is contained in:
parent
76dc3701ef
commit
e7a1d052fb
70
Process.php
70
Process.php
@ -3,7 +3,6 @@
|
||||
namespace Amp;
|
||||
|
||||
class Process {
|
||||
private $reactor;
|
||||
private $cmd;
|
||||
private $options;
|
||||
private $proc;
|
||||
@ -12,8 +11,8 @@ class Process {
|
||||
private $stdout;
|
||||
private $stderr;
|
||||
|
||||
private $future;
|
||||
private $writeFutures = [];
|
||||
private $deferred;
|
||||
private $writeDeferreds = [];
|
||||
|
||||
private $writeBuf;
|
||||
private $writeTotal;
|
||||
@ -25,8 +24,7 @@ class Process {
|
||||
const BUFFER_ALL = 3;
|
||||
|
||||
/* $options are passed directly to proc_open(), "cwd" and "env" entries are passed as fourth respectively fifth parameters to proc_open() */
|
||||
public function __construct($cmd, array $options = [], Reactor $reactor = null) {
|
||||
$this->reactor = $reactor ?: getReactor();
|
||||
public function __construct($cmd, array $options = []) {
|
||||
$this->cmd = $cmd;
|
||||
$this->options = $options;
|
||||
}
|
||||
@ -45,7 +43,7 @@ class Process {
|
||||
$cwd = isset($this->options["cwd"]) ? $this->options["cwd"] : NULL;
|
||||
$env = isset($this->options["env"]) ? $this->options["env"] : NULL;
|
||||
if (!$this->proc = @proc_open($this->cmd, $fds, $pipes, $cwd, $env, $this->options)) {
|
||||
return new Failure(new RuntimeException("Failed executing command: $cmd"));
|
||||
return new Failure(new \RuntimeException("Failed executing command: $this->cmd"));
|
||||
}
|
||||
|
||||
$this->writeBuf = "";
|
||||
@ -56,7 +54,7 @@ class Process {
|
||||
stream_set_blocking($pipes[1], false);
|
||||
stream_set_blocking($pipes[2], false);
|
||||
|
||||
$this->future = new Future;
|
||||
$this->deferred = new Deferred;
|
||||
$result = new \stdClass;
|
||||
|
||||
if ($buffer & self::BUFFER_STDOUT) {
|
||||
@ -66,11 +64,11 @@ class Process {
|
||||
$result->stderr = "";
|
||||
}
|
||||
|
||||
$this->stdout = $this->reactor->onReadable($pipes[1], function($reactor, $watcher, $sock) use ($result) {
|
||||
$this->stdout = \Amp\onReadable($pipes[1], function($watcher, $sock) use ($result) {
|
||||
if ("" == $data = @fread($sock, 8192)) {
|
||||
$reactor->cancel($watcher);
|
||||
$reactor->cancel($this->stdin);
|
||||
$reactor->immediately(function() use ($result, $future) {
|
||||
\Amp\cancel($watcher);
|
||||
\Amp\cancel($this->stdin);
|
||||
\Amp\immediately(function() use ($result) {
|
||||
$status = proc_get_status($this->proc);
|
||||
assert($status["running"] === false);
|
||||
if ($status["signaled"]) {
|
||||
@ -78,40 +76,40 @@ class Process {
|
||||
}
|
||||
$result->exit = $status["exitcode"];
|
||||
$this->proc = NULL;
|
||||
$this->future->succeed($result);
|
||||
$this->deferred->succeed($result);
|
||||
|
||||
foreach ($this->writeFutures as $future) {
|
||||
$future->fail(\Exception("Write could not be completed, process finished"));
|
||||
foreach ($this->writeDeferreds as $deferred) {
|
||||
$deferred->fail(new \Exception("Write could not be completed, process finished"));
|
||||
}
|
||||
$this->writeFutures = [];
|
||||
$this->writeDeferreds = [];
|
||||
});
|
||||
} else {
|
||||
isset($result->stdout) && $result->stdout .= $data;
|
||||
$this->future->update(["out", $data]);
|
||||
$this->deferred->update(["out", $data]);
|
||||
}
|
||||
});
|
||||
$this->stderr = $this->reactor->onReadable($pipes[2], function($reactor, $watcher, $sock) use ($result) {
|
||||
$this->stderr = \Amp\onReadable($pipes[2], function($watcher, $sock) use ($result) {
|
||||
if ("" == $data = @fread($sock, 8192)) {
|
||||
$reactor->cancel($watcher);
|
||||
\Amp\cancel($watcher);
|
||||
} else {
|
||||
isset($result->stderr) && $result->stderr .= $data;
|
||||
$this->future->update(["err", $data]);
|
||||
$this->deferred->update(["err", $data]);
|
||||
}
|
||||
});
|
||||
$this->stdin = $this->reactor->onWritable($pipes[0], function($reactor, $watcher, $sock) {
|
||||
$this->stdin = \Amp\onWritable($pipes[0], function($watcher, $sock) {
|
||||
$this->writeCur += @fwrite($sock, $this->writeBuf);
|
||||
|
||||
if ($this->writeCur == $this->writeTotal) {
|
||||
$reactor->disable($watcher);
|
||||
\Amp\disable($watcher);
|
||||
}
|
||||
|
||||
while (($next = key($this->writeFutures)) !== null && $next <= $this->writeCur) {
|
||||
$this->writeFutures[$next]->succeed($this->writeCur);
|
||||
unset($this->writeFutures[$next]);
|
||||
while (($next = key($this->writeDeferreds)) !== null && $next <= $this->writeCur) {
|
||||
$this->writeDeferreds[$next]->succeed($this->writeCur);
|
||||
unset($this->writeDeferreds[$next]);
|
||||
}
|
||||
}, false);
|
||||
}, ["enable" => false]);
|
||||
|
||||
return $this->future;
|
||||
return $this->deferred->promise();
|
||||
}
|
||||
|
||||
/* Only kills process, Promise returned by exec() will succeed in the next tick */
|
||||
@ -129,15 +127,15 @@ class Process {
|
||||
}
|
||||
|
||||
$this->kill($signal);
|
||||
$this->reactor->cancel($this->stdout);
|
||||
$this->reactor->cancel($this->stderr);
|
||||
$this->reactor->cancel($this->stdin);
|
||||
$this->future->fail(new \RuntimeException("Process watching was cancelled"));
|
||||
\Amp\cancel($this->stdout);
|
||||
\Amp\cancel($this->stderr);
|
||||
\Amp\cancel($this->stdin);
|
||||
$this->deferred->fail(new \RuntimeException("Process watching was cancelled"));
|
||||
|
||||
foreach ($this->writeFutures as $future) {
|
||||
$future->fail(\Exception("Write could not be completed, process watching was cancelled"));
|
||||
foreach ($this->writeDeferreds as $deferred) {
|
||||
$deferred->fail(new \Exception("Write could not be completed, process watching was cancelled"));
|
||||
}
|
||||
$this->writeFutures = [];
|
||||
$this->writeDeferreds = [];
|
||||
}
|
||||
|
||||
/**
|
||||
@ -151,9 +149,11 @@ class Process {
|
||||
}
|
||||
|
||||
$this->writeBuf .= $str;
|
||||
$this->reactor->enable($this->stdin);
|
||||
\Amp\enable($this->stdin);
|
||||
|
||||
$this->writeTotal += strlen($str);
|
||||
return $this->writeFutures[$this->writeTotal] = new Future;
|
||||
$deferred = $this->writeDeferreds[$this->writeTotal] = new Deferred;
|
||||
|
||||
return $deferred->promise();
|
||||
}
|
||||
}
|
||||
|
@ -3,7 +3,7 @@
|
||||
"homepage": "https://github.com/amphp/process",
|
||||
"description": "Asynchronous process manager",
|
||||
"require": {
|
||||
"amphp/amp": "~0.16.0"
|
||||
"amphp/amp": "^1"
|
||||
},
|
||||
"license": "MIT",
|
||||
"authors": [
|
||||
|
Loading…
x
Reference in New Issue
Block a user