1
0
mirror of https://github.com/danog/process.git synced 2024-11-26 12:14:43 +01:00

Refactor for full Windows compatibility

This commit is contained in:
Niklas Keller 2017-09-17 19:07:13 +02:00 committed by Aaron Piotrowski
parent 88f8865a6a
commit 7cf91efd08
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
16 changed files with 168 additions and 93 deletions

Binary file not shown.

Binary file not shown.

View File

@ -6,7 +6,11 @@ use Amp\ByteStream\Message;
use Amp\Process\Process;
Amp\Loop::run(function () {
$process = yield Process::start("echo 'Hello, world!'");
// "echo" is a shell internal command on Windows and doesn't work.
$command = DIRECTORY_SEPARATOR === "\\" ? "cmd /c echo Hello World!" : "echo 'Hello, world!'";
$process = new Process($command);
$process->start();
echo yield new Message($process->getStdout());

View File

@ -6,19 +6,18 @@ use Amp\Process\Process;
use Amp\Promise;
use function Amp\Promise\all;
function show_process_output(Promise $promise): \Generator
function show_process_output(Process $process): \Generator
{
/** @var Process $process */
$process = yield $promise;
$stream = $process->getStdout();
while ($chunk = yield $stream->read()) {
while (null !== $chunk = yield $stream->read()) {
echo $chunk;
}
$code = yield $process->join();
$pid = yield $process->getPid();
echo "Process {$process->getPid()} exited with {$code}\n";
echo "Process {$pid} exited with {$code}\n";
}
Amp\Loop::run(function () {
@ -27,7 +26,9 @@ Amp\Loop::run(function () {
$promises = [];
foreach ($hosts as $host) {
$promises[] = new \Amp\Coroutine(show_process_output(Process::start("ping {$host}")));
$process = new Process("ping {$host}");
$process->start();
$promises[] = new \Amp\Coroutine(show_process_output($process));
}
yield all($promises);

View File

@ -5,10 +5,12 @@ include dirname(__DIR__) . "/vendor/autoload.php";
use Amp\Process\Process;
Amp\Loop::run(function () {
$process = yield Process::start("echo 1; sleep 1; echo 2; sleep 1; echo 3; exit 42");
$process = new Process("echo 1; sleep 1; echo 2; sleep 1; echo 3; exit 42");
$process->start();
$stream = $process->getStdout();
while ($chunk = yield $stream->read()) {
while (null !== $chunk = yield $stream->read()) {
echo $chunk;
}

View File

@ -6,7 +6,13 @@ use Amp\ByteStream\Message;
use Amp\Process\Process;
Amp\Loop::run(function () {
$process = yield Process::start('read ; echo "$REPLY"');
if (DIRECTORY_SEPARATOR === "\\") {
echo "This example doesn't work on Windows." . PHP_EOL;
exit(1);
}
$process = new Process('read; echo "$REPLY"');
$process->start();
/* send to stdin */
$process->getStdin()->write("abc\n");

View File

@ -24,6 +24,9 @@ final class Handle extends ProcessHandle {
/** @var string */
public $extraDataPipeWatcher;
/** @var string */
public $extraDataPipeStartWatcher;
/** @var int */
public $originalParentPid;
}

View File

@ -24,6 +24,7 @@ final class Runner implements ProcessRunner {
public static function onProcessEndExtraDataPipeReadable($watcher, $stream, Handle $handle) {
Loop::cancel($watcher);
$handle->extraDataPipeWatcher = null;
$handle->status = ProcessStatus::ENDED;
@ -55,14 +56,13 @@ final class Runner implements ProcessRunner {
$handle->status = ProcessStatus::RUNNING;
$handle->pidDeferred->resolve((int) $pid);
$deferreds[0]->resolve(new ResourceOutputStream($pipes[0]));
$deferreds[1]->resolve(new ResourceInputStream($pipes[1]));
$deferreds[2]->resolve(new ResourceInputStream($pipes[2]));
$deferreds[0]->resolve($pipes[0]);
$deferreds[1]->resolve($pipes[1]);
$deferreds[2]->resolve($pipes[2]);
$handle->extraDataPipeWatcher = Loop::onReadable($stream, [self::class, 'onProcessEndExtraDataPipeReadable'], $handle);
Loop::unreference($handle->extraDataPipeWatcher);
$handle->sockets->resolve();
if ($handle->extraDataPipeWatcher !== null) {
Loop::enable($handle->extraDataPipeWatcher);
}
}
/** @inheritdoc */
@ -104,10 +104,18 @@ final class Runner implements ProcessRunner {
\stream_set_blocking($pipes[3], false);
Loop::onReadable($pipes[3], [self::class, 'onProcessStartExtraDataPipeReadable'], [$handle, $pipes, [
$handle->extraDataPipeStartWatcher = Loop::onReadable($pipes[3], [self::class, 'onProcessStartExtraDataPipeReadable'], [$handle, [
new ResourceOutputStream($pipes[0]),
new ResourceInputStream($pipes[1]),
new ResourceInputStream($pipes[2]),
], [
$stdinDeferred, $stdoutDeferred, $stderrDeferred
]]);
$handle->extraDataPipeWatcher = Loop::onReadable($pipes[3], [self::class, 'onProcessEndExtraDataPipeReadable'], $handle);
Loop::unreference($handle->extraDataPipeWatcher);
Loop::disable($handle->extraDataPipeWatcher);
return $handle;
}
@ -124,13 +132,21 @@ final class Runner implements ProcessRunner {
/** @inheritdoc */
public function kill(ProcessHandle $handle) {
/** @var Handle $handle */
if ($handle->extraDataPipeWatcher !== null) {
Loop::cancel($handle->extraDataPipeWatcher);
$handle->extraDataPipeWatcher = null;
}
/** @var Handle $handle */
if ($handle->extraDataPipeStartWatcher !== null) {
Loop::cancel($handle->extraDataPipeStartWatcher);
$handle->extraDataPipeStartWatcher = null;
}
if (!\proc_terminate($handle->proc, 9)) { // Forcefully kill the process using SIGKILL.
throw new ProcessException("Terminating process failed");
}
Loop::cancel($handle->extraDataPipeWatcher);
$handle->extraDataPipeWatcher = null;
$handle->status = ProcessStatus::ENDED;
$handle->joinDeferred->fail(new ProcessException("The process was killed"));
}
@ -147,11 +163,23 @@ final class Runner implements ProcessRunner {
public function destroy(ProcessHandle $handle) {
/** @var Handle $handle */
if ($handle->status < ProcessStatus::ENDED && \getmypid() === $handle->originalParentPid) {
$this->kill($handle);
try {
$this->kill($handle);
} catch (ProcessException $e) {
// ignore
}
}
/** @var Handle $handle */
if ($handle->extraDataPipeWatcher !== null) {
Loop::cancel($handle->extraDataPipeWatcher);
$handle->extraDataPipeWatcher = null;
}
/** @var Handle $handle */
if ($handle->extraDataPipeStartWatcher !== null) {
Loop::cancel($handle->extraDataPipeStartWatcher);
$handle->extraDataPipeStartWatcher = null;
}
if (\is_resource($handle->extraDataPipe)) {

View File

@ -8,6 +8,7 @@ use Amp\Process\Internal\ProcessHandle;
final class Handle extends ProcessHandle {
public function __construct() {
$this->joinDeferred = new Deferred;
$this->pidDeferred = new Deferred;
}
/** @var Deferred */
@ -16,6 +17,9 @@ final class Handle extends ProcessHandle {
/** @var string */
public $exitCodeWatcher;
/** @var bool */
public $exitCodeRequested = false;
/** @var resource */
public $proc;
@ -26,7 +30,7 @@ final class Handle extends ProcessHandle {
public $wrapperStderrPipe;
/** @var resource[] */
public $sockets;
public $sockets = [];
/** @var Deferred[] */
public $stdioDeferreds;

View File

@ -27,7 +27,7 @@ final class Runner implements ProcessRunner {
private $socketConnector;
private function makeCommand(string $command, string $workingDirectory): string {
private function makeCommand(string $workingDirectory): string {
$result = sprintf(
'%s --address=%s --port=%d --token-size=%d',
\escapeshellarg(self::WRAPPER_EXE_PATH),
@ -40,8 +40,6 @@ final class Runner implements ProcessRunner {
$result .= ' ' . \escapeshellarg('--cwd=' . \rtrim($workingDirectory, '\\'));
}
$result .= ' ' . $command;
return $result;
}
@ -51,12 +49,14 @@ final class Runner implements ProcessRunner {
/** @inheritdoc */
public function start(string $command, string $cwd = null, array $env = [], array $options = []): ProcessHandle {
$command = $this->makeCommand($command, $cwd ?? '');
if (strpos($command, "\0") !== false) {
throw new ProcessException("Can't execute commands that contain null bytes.");
}
$options['bypass_shell'] = true;
$handle = new Handle;
$handle->proc = @\proc_open($command, self::FD_SPEC, $pipes, $cwd ?: null, $env ?: null, $options);
$handle->proc = @\proc_open($this->makeCommand($cwd ?? ''), self::FD_SPEC, $pipes, $cwd ?: null, $env ?: null, $options);
if (!\is_resource($handle->proc)) {
$message = "Could not start process";
@ -74,16 +74,16 @@ final class Runner implements ProcessRunner {
}
$securityTokens = \random_bytes(SocketConnector::SECURITY_TOKEN_SIZE * 6);
$written = \fwrite($pipes[0], $securityTokens);
$written = \fwrite($pipes[0], $securityTokens . "\0" . $command . "\0");
\fclose($pipes[0]);
\fclose($pipes[1]);
if ($written !== SocketConnector::SECURITY_TOKEN_SIZE * 6) {
if ($written !== SocketConnector::SECURITY_TOKEN_SIZE * 6 + \strlen($command) + 2) {
\fclose($pipes[2]);
\proc_close($handle->proc);
throw new ProcessException("Could not send security tokens to process wrapper");
throw new ProcessException("Could not send security tokens / command to process wrapper");
}
$handle->securityTokens = \str_split($securityTokens, SocketConnector::SECURITY_TOKEN_SIZE);
@ -91,13 +91,16 @@ final class Runner implements ProcessRunner {
$handle->wrapperStderrPipe = $pipes[2];
$stdinDeferred = new Deferred;
$handle->stdioDeferreds[] = new ProcessOutputStream($stdinDeferred->promise());
$handle->stdioDeferreds[] = $stdinDeferred;
$handle->stdin = new ProcessOutputStream($stdinDeferred->promise());
$stdoutDeferred = new Deferred;
$handle->stdioDeferreds[] = new ProcessInputStream($stdoutDeferred->promise());
$handle->stdioDeferreds[] = $stdoutDeferred;
$handle->stdout = new ProcessInputStream($stdoutDeferred->promise());
$stderrDeferred = new Deferred;
$handle->stdioDeferreds[] = new ProcessInputStream($stderrDeferred->promise());
$handle->stdioDeferreds[] = $stderrDeferred;
$handle->stderr = new ProcessInputStream($stderrDeferred->promise());
$this->socketConnector->registerPendingProcess($handle);
@ -107,6 +110,8 @@ final class Runner implements ProcessRunner {
/** @inheritdoc */
public function join(ProcessHandle $handle): Promise {
/** @var Handle $handle */
$handle->exitCodeRequested = true;
if ($handle->exitCodeWatcher !== null) {
Loop::reference($handle->exitCodeWatcher);
}
@ -122,8 +127,10 @@ final class Runner implements ProcessRunner {
throw new ProcessException("Terminating process failed");
}
Loop::cancel($handle->exitCodeWatcher);
$handle->exitCodeWatcher = null;
if ($handle->exitCodeWatcher !== null) {
Loop::cancel($handle->exitCodeWatcher);
$handle->exitCodeWatcher = null;
}
$handle->status = ProcessStatus::ENDED;
$handle->joinDeferred->fail(new ProcessException("The process was killed"));
@ -137,12 +144,17 @@ final class Runner implements ProcessRunner {
/** @inheritdoc */
public function destroy(ProcessHandle $handle) {
/** @var Handle $handle */
if ($handle->status < ProcessStatus::ENDED) {
$this->kill($handle);
if ($handle->status < ProcessStatus::ENDED && \is_resource($handle->proc)) {
try {
$this->kill($handle);
} catch (ProcessException $e) {
// ignore
}
}
if ($handle->exitCodeWatcher !== null) {
Loop::cancel($handle->exitCodeWatcher);
$handle->exitCodeWatcher = null;
}
for ($i = 0; $i < 4; $i++) {
@ -151,8 +163,8 @@ final class Runner implements ProcessRunner {
}
}
\stream_get_contents($handle->wrapperStderrPipe);
\fclose($handle->wrapperStderrPipe);
@\stream_get_contents($handle->wrapperStderrPipe);
@\fclose($handle->wrapperStderrPipe);
if (\is_resource($handle->proc)) {
\proc_close($handle->proc);

View File

@ -46,7 +46,7 @@ final class SocketConnector {
Loop::unreference(Loop::onReadable($this->server, [$this, 'onServerSocketReadable']));
}
private function failClientHandshake($socket, int $code): void {
private function failClientHandshake($socket, int $code) {
\fwrite($socket, \chr(SignalCode::HANDSHAKE_ACK) . \chr($code));
\fclose($socket);
@ -84,10 +84,6 @@ final class SocketConnector {
$data = \fread($socket, $length);
if ($data === false || $data === '') {
\fclose($socket);
Loop::cancel($state->readWatcher);
Loop::cancel($state->timeoutWatcher);
unset($this->pendingClients[(int) $socket]);
return null;
}
@ -204,16 +200,15 @@ final class SocketConnector {
}
public function onReadableChildPid($watcher, $socket, Handle $handle) {
Loop::cancel($watcher);
Loop::cancel($handle->connectTimeoutWatcher);
$data = \fread($socket, 5);
if ($data === false || $data === '') {
$this->failHandleStart($handle, 'Failed to read PID from wrapper: No data received');
return;
}
Loop::cancel($watcher);
Loop::cancel($handle->connectTimeoutWatcher);
if (\strlen($data) !== 5) {
$this->failHandleStart(
$handle, 'Failed to read PID from wrapper: Received %d of 5 expected bytes', \strlen($data)
@ -237,27 +232,27 @@ final class SocketConnector {
$handle->stdioDeferreds[2]->resolve(new ResourceInputStream($handle->sockets[2]));
$handle->exitCodeWatcher = Loop::onReadable($handle->sockets[0], [$this, 'onReadableExitCode'], $handle);
Loop::unreference($handle->exitCodeWatcher);
if (!$handle->exitCodeRequested) {
Loop::unreference($handle->exitCodeWatcher);
}
unset($this->pendingProcesses[$handle->wrapperPid]);
}
public function onReadableExitCode($watcher, $socket, Handle $handle) {
$handle->exitCodeWatcher = null;
Loop::cancel($watcher);
$data = \fread($socket, 5);
if ($data === false || $data === '') {
$handle->status = ProcessStatus::ENDED;
$handle->joinDeferred->fail(new ProcessException('Failed to read exit code from wrapper: No data received'));
return;
}
$handle->exitCodeWatcher = null;
Loop::cancel($watcher);
if (\strlen($data) !== 5) {
$handle->status = ProcessStatus::ENDED;
$handle->joinDeferred->fail(new ProcessException(
\sprintf('Failed to read exit code from wrapper: Recieved %d of 5 expected bytes', \strlen($data))
\sprintf('Failed to read exit code from wrapper: Received %d of 5 expected bytes', \strlen($data))
));
return;
}
@ -317,6 +312,8 @@ final class SocketConnector {
foreach ($handle->stdioDeferreds as $deferred) {
$deferred->fail($error);
}
$handle->joinDeferred->fail($error);
}
public function registerPendingProcess(Handle $handle) {

View File

@ -2,6 +2,7 @@
namespace Amp\Process;
use Amp\Loop;
use Amp\Process\Internal\Posix\Runner as PosixProcessRunner;
use Amp\Process\Internal\ProcessHandle;
use Amp\Process\Internal\ProcessRunner;
@ -11,7 +12,7 @@ use Amp\Promise;
class Process {
/** @var ProcessRunner */
private static $processRunner;
private $processRunner;
/** @var string */
private $command;
@ -57,6 +58,16 @@ class Process {
$this->cwd = $cwd;
$this->env = $envVars;
$this->options = $options;
$this->processRunner = Loop::getState(self::class);
if ($this->processRunner === null) {
$this->processRunner = \strncasecmp(\PHP_OS, "WIN", 3) === 0
? new WindowsProcessRunner
: new PosixProcessRunner;
Loop::setState(self::class, $this->processRunner);
}
}
/**
@ -64,7 +75,7 @@ class Process {
*/
public function __destruct() {
if ($this->handle !== null) {
self::$processRunner->destroy($this->handle);
$this->processRunner->destroy($this->handle);
}
}
@ -82,7 +93,7 @@ class Process {
throw new StatusError("Process has already been started.");
}
$this->handle = self::$processRunner->start($this->command, $this->cwd, $this->env, $this->options);
$this->handle = $this->processRunner->start($this->command, $this->cwd, $this->env, $this->options);
}
/**
@ -97,7 +108,7 @@ class Process {
throw new StatusError("Process has not been started.");
}
return self::$processRunner->join($this->handle);
return $this->processRunner->join($this->handle);
}
/**
@ -111,7 +122,7 @@ class Process {
throw new StatusError("The process is not running");
}
self::$processRunner->kill($this->handle);
$this->processRunner->kill($this->handle);
}
/**
@ -127,7 +138,7 @@ class Process {
throw new StatusError("The process is not running");
}
self::$processRunner->signal($this->handle, $signo);
$this->processRunner->signal($this->handle, $signo);
}
/**
@ -200,7 +211,7 @@ class Process {
* @return ProcessOutputStream
*/
public function getStdin(): ProcessOutputStream {
return $this->stdin;
return $this->handle->stdin;
}
/**
@ -213,7 +224,7 @@ class Process {
throw new StatusError("The process is not running");
}
return $this->stdout;
return $this->handle->stdout;
}
/**
@ -226,13 +237,6 @@ class Process {
throw new StatusError("The process is not running");
}
return $this->stderr;
return $this->handle->stderr;
}
}
(function () {
/** @noinspection PhpUndefinedClassInspection */
self::$processRunner = \strncasecmp(\PHP_OS, "WIN", 3) === 0
? new WindowsProcessRunner
: new PosixProcessRunner;
})->bindTo(null, Process::class)();

View File

@ -17,6 +17,9 @@ class ProcessInputStream implements InputStream {
/** @var bool */
private $shouldClose = false;
/** @var bool */
private $referenced = true;
/** @var ResourceInputStream */
private $resourceStream;
@ -27,12 +30,18 @@ class ProcessInputStream implements InputStream {
$resourceStreamPromise->onResolve(function ($error, $resourceStream) {
if ($error) {
$this->error = new StreamException("Failed to launch process", 0, $error);
$this->initialRead->fail($this->error);
if ($this->initialRead) {
$this->initialRead->fail($this->error);
}
return;
}
$this->resourceStream = $resourceStream;
if (!$this->referenced) {
$this->resourceStream->unreference();
}
if ($this->shouldClose) {
$this->resourceStream->close();
}
@ -70,6 +79,22 @@ class ProcessInputStream implements InputStream {
return $this->initialRead->promise();
}
public function reference() {
$this->referenced = true;
if ($this->resourceStream) {
$this->resourceStream->reference();
}
}
public function unreference() {
$this->referenced = false;
if ($this->resourceStream) {
$this->resourceStream->unreference();
}
}
public function close() {
$this->shouldClose = true;

View File

@ -12,7 +12,7 @@ use Amp\Promise;
class ProcessOutputStream implements OutputStream {
/** @var array */
private $queuedWrites;
private $queuedWrites = [];
/** @var bool */
private $shouldClose = false;

View File

@ -1,17 +1,5 @@
<?xml version="1.0" encoding="UTF-8"?>
<phpunit
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="http://schema.phpunit.de/4.1/phpunit.xsd"
backupGlobals="false"
backupStaticAttributes="false"
bootstrap="vendor/autoload.php"
colors="true"
convertErrorsToExceptions="true"
convertNoticesToExceptions="true"
convertWarningsToExceptions="true"
processIsolation="false"
stopOnFailure="false"
>
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="http://schema.phpunit.de/6.0/phpunit.xsd" bootstrap="vendor/autoload.php" colors="true">
<testsuites>
<testsuite name="Amp Process">
<directory>test</directory>

View File

@ -7,7 +7,7 @@ use Amp\Process\Process;
use PHPUnit\Framework\TestCase;
class ProcessTest extends TestCase {
const CMD_PROCESS = 'echo foo';
const CMD_PROCESS = \DIRECTORY_SEPARATOR === "\\" ? "cmd /c echo foo" : "echo foo";
/**
* @expectedException \Amp\Process\StatusError
@ -22,7 +22,7 @@ class ProcessTest extends TestCase {
public function testIsRunning() {
Loop::run(function () {
$process = new Process("exit 42");
$process = new Process(\DIRECTORY_SEPARATOR === "\\" ? "cmd /c exit 42" : "exit 42");
$process->start();
$promise = $process->join();
@ -36,8 +36,9 @@ class ProcessTest extends TestCase {
public function testExecuteResolvesToExitCode() {
Loop::run(function () {
$process = new Process("exit 42");
$process = new Process(\DIRECTORY_SEPARATOR === "\\" ? "cmd /c exit 42" : "exit 42");
$process->start();
$code = yield $process->join();
$this->assertSame(42, $code);
@ -54,7 +55,7 @@ class ProcessTest extends TestCase {
$completed = false;
$promise->onResolve(function () use (&$completed) { $completed = true; });
$this->assertFalse($completed);
$this->assertInternalType('int', $process->getPid());
$this->assertInternalType('int', yield $process->getPid());
});
}
@ -70,7 +71,7 @@ class ProcessTest extends TestCase {
$process->kill();
$code = yield $promise;
yield $promise;
});
}