diff --git a/bin/windows/ProcessWrapper.exe b/bin/windows/ProcessWrapper.exe index df431cc..e7a7b71 100644 Binary files a/bin/windows/ProcessWrapper.exe and b/bin/windows/ProcessWrapper.exe differ diff --git a/bin/windows/ProcessWrapper64.exe b/bin/windows/ProcessWrapper64.exe index ffd0dc5..bf912c3 100644 Binary files a/bin/windows/ProcessWrapper64.exe and b/bin/windows/ProcessWrapper64.exe differ diff --git a/examples/basic-command.php b/examples/basic-command.php index 09c2b72..86761e9 100644 --- a/examples/basic-command.php +++ b/examples/basic-command.php @@ -6,7 +6,11 @@ use Amp\ByteStream\Message; use Amp\Process\Process; Amp\Loop::run(function () { - $process = yield Process::start("echo 'Hello, world!'"); + // "echo" is a shell internal command on Windows and doesn't work. + $command = DIRECTORY_SEPARATOR === "\\" ? "cmd /c echo Hello World!" : "echo 'Hello, world!'"; + + $process = new Process($command); + $process->start(); echo yield new Message($process->getStdout()); diff --git a/examples/ping-many.php b/examples/ping-many.php index b618d4e..1b61775 100644 --- a/examples/ping-many.php +++ b/examples/ping-many.php @@ -6,19 +6,18 @@ use Amp\Process\Process; use Amp\Promise; use function Amp\Promise\all; -function show_process_output(Promise $promise): \Generator +function show_process_output(Process $process): \Generator { - /** @var Process $process */ - $process = yield $promise; - $stream = $process->getStdout(); - while ($chunk = yield $stream->read()) { + + while (null !== $chunk = yield $stream->read()) { echo $chunk; } $code = yield $process->join(); + $pid = yield $process->getPid(); - echo "Process {$process->getPid()} exited with {$code}\n"; + echo "Process {$pid} exited with {$code}\n"; } Amp\Loop::run(function () { @@ -27,7 +26,9 @@ Amp\Loop::run(function () { $promises = []; foreach ($hosts as $host) { - $promises[] = new \Amp\Coroutine(show_process_output(Process::start("ping {$host}"))); + $process = new Process("ping {$host}"); + $process->start(); + $promises[] = new \Amp\Coroutine(show_process_output($process)); } yield all($promises); diff --git a/examples/watch-live.php b/examples/watch-live.php index 0d1b7b9..1214eab 100644 --- a/examples/watch-live.php +++ b/examples/watch-live.php @@ -5,10 +5,12 @@ include dirname(__DIR__) . "/vendor/autoload.php"; use Amp\Process\Process; Amp\Loop::run(function () { - $process = yield Process::start("echo 1; sleep 1; echo 2; sleep 1; echo 3; exit 42"); + $process = new Process("echo 1; sleep 1; echo 2; sleep 1; echo 3; exit 42"); + $process->start(); $stream = $process->getStdout(); - while ($chunk = yield $stream->read()) { + + while (null !== $chunk = yield $stream->read()) { echo $chunk; } diff --git a/examples/write-command.php b/examples/write-command.php index 373830b..96601b7 100644 --- a/examples/write-command.php +++ b/examples/write-command.php @@ -6,7 +6,13 @@ use Amp\ByteStream\Message; use Amp\Process\Process; Amp\Loop::run(function () { - $process = yield Process::start('read ; echo "$REPLY"'); + if (DIRECTORY_SEPARATOR === "\\") { + echo "This example doesn't work on Windows." . PHP_EOL; + exit(1); + } + + $process = new Process('read; echo "$REPLY"'); + $process->start(); /* send to stdin */ $process->getStdin()->write("abc\n"); diff --git a/lib/Internal/Posix/Handle.php b/lib/Internal/Posix/Handle.php index 1747c85..c1da8a4 100644 --- a/lib/Internal/Posix/Handle.php +++ b/lib/Internal/Posix/Handle.php @@ -24,6 +24,9 @@ final class Handle extends ProcessHandle { /** @var string */ public $extraDataPipeWatcher; + /** @var string */ + public $extraDataPipeStartWatcher; + /** @var int */ public $originalParentPid; } diff --git a/lib/Internal/Posix/Runner.php b/lib/Internal/Posix/Runner.php index ba6a61b..e2585e5 100644 --- a/lib/Internal/Posix/Runner.php +++ b/lib/Internal/Posix/Runner.php @@ -24,6 +24,7 @@ final class Runner implements ProcessRunner { public static function onProcessEndExtraDataPipeReadable($watcher, $stream, Handle $handle) { Loop::cancel($watcher); + $handle->extraDataPipeWatcher = null; $handle->status = ProcessStatus::ENDED; @@ -55,14 +56,13 @@ final class Runner implements ProcessRunner { $handle->status = ProcessStatus::RUNNING; $handle->pidDeferred->resolve((int) $pid); - $deferreds[0]->resolve(new ResourceOutputStream($pipes[0])); - $deferreds[1]->resolve(new ResourceInputStream($pipes[1])); - $deferreds[2]->resolve(new ResourceInputStream($pipes[2])); + $deferreds[0]->resolve($pipes[0]); + $deferreds[1]->resolve($pipes[1]); + $deferreds[2]->resolve($pipes[2]); - $handle->extraDataPipeWatcher = Loop::onReadable($stream, [self::class, 'onProcessEndExtraDataPipeReadable'], $handle); - Loop::unreference($handle->extraDataPipeWatcher); - - $handle->sockets->resolve(); + if ($handle->extraDataPipeWatcher !== null) { + Loop::enable($handle->extraDataPipeWatcher); + } } /** @inheritdoc */ @@ -104,10 +104,18 @@ final class Runner implements ProcessRunner { \stream_set_blocking($pipes[3], false); - Loop::onReadable($pipes[3], [self::class, 'onProcessStartExtraDataPipeReadable'], [$handle, $pipes, [ + $handle->extraDataPipeStartWatcher = Loop::onReadable($pipes[3], [self::class, 'onProcessStartExtraDataPipeReadable'], [$handle, [ + new ResourceOutputStream($pipes[0]), + new ResourceInputStream($pipes[1]), + new ResourceInputStream($pipes[2]), + ], [ $stdinDeferred, $stdoutDeferred, $stderrDeferred ]]); + $handle->extraDataPipeWatcher = Loop::onReadable($pipes[3], [self::class, 'onProcessEndExtraDataPipeReadable'], $handle); + Loop::unreference($handle->extraDataPipeWatcher); + Loop::disable($handle->extraDataPipeWatcher); + return $handle; } @@ -124,13 +132,21 @@ final class Runner implements ProcessRunner { /** @inheritdoc */ public function kill(ProcessHandle $handle) { /** @var Handle $handle */ + if ($handle->extraDataPipeWatcher !== null) { + Loop::cancel($handle->extraDataPipeWatcher); + $handle->extraDataPipeWatcher = null; + } + + /** @var Handle $handle */ + if ($handle->extraDataPipeStartWatcher !== null) { + Loop::cancel($handle->extraDataPipeStartWatcher); + $handle->extraDataPipeStartWatcher = null; + } + if (!\proc_terminate($handle->proc, 9)) { // Forcefully kill the process using SIGKILL. throw new ProcessException("Terminating process failed"); } - Loop::cancel($handle->extraDataPipeWatcher); - $handle->extraDataPipeWatcher = null; - $handle->status = ProcessStatus::ENDED; $handle->joinDeferred->fail(new ProcessException("The process was killed")); } @@ -147,11 +163,23 @@ final class Runner implements ProcessRunner { public function destroy(ProcessHandle $handle) { /** @var Handle $handle */ if ($handle->status < ProcessStatus::ENDED && \getmypid() === $handle->originalParentPid) { - $this->kill($handle); + try { + $this->kill($handle); + } catch (ProcessException $e) { + // ignore + } } + /** @var Handle $handle */ if ($handle->extraDataPipeWatcher !== null) { Loop::cancel($handle->extraDataPipeWatcher); + $handle->extraDataPipeWatcher = null; + } + + /** @var Handle $handle */ + if ($handle->extraDataPipeStartWatcher !== null) { + Loop::cancel($handle->extraDataPipeStartWatcher); + $handle->extraDataPipeStartWatcher = null; } if (\is_resource($handle->extraDataPipe)) { diff --git a/lib/Internal/Windows/Handle.php b/lib/Internal/Windows/Handle.php index 196180f..58d5507 100644 --- a/lib/Internal/Windows/Handle.php +++ b/lib/Internal/Windows/Handle.php @@ -8,6 +8,7 @@ use Amp\Process\Internal\ProcessHandle; final class Handle extends ProcessHandle { public function __construct() { $this->joinDeferred = new Deferred; + $this->pidDeferred = new Deferred; } /** @var Deferred */ @@ -16,6 +17,9 @@ final class Handle extends ProcessHandle { /** @var string */ public $exitCodeWatcher; + /** @var bool */ + public $exitCodeRequested = false; + /** @var resource */ public $proc; @@ -26,7 +30,7 @@ final class Handle extends ProcessHandle { public $wrapperStderrPipe; /** @var resource[] */ - public $sockets; + public $sockets = []; /** @var Deferred[] */ public $stdioDeferreds; diff --git a/lib/Internal/Windows/Runner.php b/lib/Internal/Windows/Runner.php index 03e4681..ed00764 100644 --- a/lib/Internal/Windows/Runner.php +++ b/lib/Internal/Windows/Runner.php @@ -27,7 +27,7 @@ final class Runner implements ProcessRunner { private $socketConnector; - private function makeCommand(string $command, string $workingDirectory): string { + private function makeCommand(string $workingDirectory): string { $result = sprintf( '%s --address=%s --port=%d --token-size=%d', \escapeshellarg(self::WRAPPER_EXE_PATH), @@ -40,8 +40,6 @@ final class Runner implements ProcessRunner { $result .= ' ' . \escapeshellarg('--cwd=' . \rtrim($workingDirectory, '\\')); } - $result .= ' ' . $command; - return $result; } @@ -51,12 +49,14 @@ final class Runner implements ProcessRunner { /** @inheritdoc */ public function start(string $command, string $cwd = null, array $env = [], array $options = []): ProcessHandle { - $command = $this->makeCommand($command, $cwd ?? ''); + if (strpos($command, "\0") !== false) { + throw new ProcessException("Can't execute commands that contain null bytes."); + } $options['bypass_shell'] = true; $handle = new Handle; - $handle->proc = @\proc_open($command, self::FD_SPEC, $pipes, $cwd ?: null, $env ?: null, $options); + $handle->proc = @\proc_open($this->makeCommand($cwd ?? ''), self::FD_SPEC, $pipes, $cwd ?: null, $env ?: null, $options); if (!\is_resource($handle->proc)) { $message = "Could not start process"; @@ -74,16 +74,16 @@ final class Runner implements ProcessRunner { } $securityTokens = \random_bytes(SocketConnector::SECURITY_TOKEN_SIZE * 6); - $written = \fwrite($pipes[0], $securityTokens); + $written = \fwrite($pipes[0], $securityTokens . "\0" . $command . "\0"); \fclose($pipes[0]); \fclose($pipes[1]); - if ($written !== SocketConnector::SECURITY_TOKEN_SIZE * 6) { + if ($written !== SocketConnector::SECURITY_TOKEN_SIZE * 6 + \strlen($command) + 2) { \fclose($pipes[2]); \proc_close($handle->proc); - throw new ProcessException("Could not send security tokens to process wrapper"); + throw new ProcessException("Could not send security tokens / command to process wrapper"); } $handle->securityTokens = \str_split($securityTokens, SocketConnector::SECURITY_TOKEN_SIZE); @@ -91,13 +91,16 @@ final class Runner implements ProcessRunner { $handle->wrapperStderrPipe = $pipes[2]; $stdinDeferred = new Deferred; - $handle->stdioDeferreds[] = new ProcessOutputStream($stdinDeferred->promise()); + $handle->stdioDeferreds[] = $stdinDeferred; + $handle->stdin = new ProcessOutputStream($stdinDeferred->promise()); $stdoutDeferred = new Deferred; - $handle->stdioDeferreds[] = new ProcessInputStream($stdoutDeferred->promise()); + $handle->stdioDeferreds[] = $stdoutDeferred; + $handle->stdout = new ProcessInputStream($stdoutDeferred->promise()); $stderrDeferred = new Deferred; - $handle->stdioDeferreds[] = new ProcessInputStream($stderrDeferred->promise()); + $handle->stdioDeferreds[] = $stderrDeferred; + $handle->stderr = new ProcessInputStream($stderrDeferred->promise()); $this->socketConnector->registerPendingProcess($handle); @@ -107,6 +110,8 @@ final class Runner implements ProcessRunner { /** @inheritdoc */ public function join(ProcessHandle $handle): Promise { /** @var Handle $handle */ + $handle->exitCodeRequested = true; + if ($handle->exitCodeWatcher !== null) { Loop::reference($handle->exitCodeWatcher); } @@ -122,8 +127,10 @@ final class Runner implements ProcessRunner { throw new ProcessException("Terminating process failed"); } - Loop::cancel($handle->exitCodeWatcher); - $handle->exitCodeWatcher = null; + if ($handle->exitCodeWatcher !== null) { + Loop::cancel($handle->exitCodeWatcher); + $handle->exitCodeWatcher = null; + } $handle->status = ProcessStatus::ENDED; $handle->joinDeferred->fail(new ProcessException("The process was killed")); @@ -137,12 +144,17 @@ final class Runner implements ProcessRunner { /** @inheritdoc */ public function destroy(ProcessHandle $handle) { /** @var Handle $handle */ - if ($handle->status < ProcessStatus::ENDED) { - $this->kill($handle); + if ($handle->status < ProcessStatus::ENDED && \is_resource($handle->proc)) { + try { + $this->kill($handle); + } catch (ProcessException $e) { + // ignore + } } if ($handle->exitCodeWatcher !== null) { Loop::cancel($handle->exitCodeWatcher); + $handle->exitCodeWatcher = null; } for ($i = 0; $i < 4; $i++) { @@ -151,8 +163,8 @@ final class Runner implements ProcessRunner { } } - \stream_get_contents($handle->wrapperStderrPipe); - \fclose($handle->wrapperStderrPipe); + @\stream_get_contents($handle->wrapperStderrPipe); + @\fclose($handle->wrapperStderrPipe); if (\is_resource($handle->proc)) { \proc_close($handle->proc); diff --git a/lib/Internal/Windows/SocketConnector.php b/lib/Internal/Windows/SocketConnector.php index 853e3bc..5f516eb 100644 --- a/lib/Internal/Windows/SocketConnector.php +++ b/lib/Internal/Windows/SocketConnector.php @@ -46,7 +46,7 @@ final class SocketConnector { Loop::unreference(Loop::onReadable($this->server, [$this, 'onServerSocketReadable'])); } - private function failClientHandshake($socket, int $code): void { + private function failClientHandshake($socket, int $code) { \fwrite($socket, \chr(SignalCode::HANDSHAKE_ACK) . \chr($code)); \fclose($socket); @@ -84,10 +84,6 @@ final class SocketConnector { $data = \fread($socket, $length); if ($data === false || $data === '') { - \fclose($socket); - Loop::cancel($state->readWatcher); - Loop::cancel($state->timeoutWatcher); - unset($this->pendingClients[(int) $socket]); return null; } @@ -204,16 +200,15 @@ final class SocketConnector { } public function onReadableChildPid($watcher, $socket, Handle $handle) { - Loop::cancel($watcher); - Loop::cancel($handle->connectTimeoutWatcher); - $data = \fread($socket, 5); if ($data === false || $data === '') { - $this->failHandleStart($handle, 'Failed to read PID from wrapper: No data received'); return; } + Loop::cancel($watcher); + Loop::cancel($handle->connectTimeoutWatcher); + if (\strlen($data) !== 5) { $this->failHandleStart( $handle, 'Failed to read PID from wrapper: Received %d of 5 expected bytes', \strlen($data) @@ -237,27 +232,27 @@ final class SocketConnector { $handle->stdioDeferreds[2]->resolve(new ResourceInputStream($handle->sockets[2])); $handle->exitCodeWatcher = Loop::onReadable($handle->sockets[0], [$this, 'onReadableExitCode'], $handle); - Loop::unreference($handle->exitCodeWatcher); + if (!$handle->exitCodeRequested) { + Loop::unreference($handle->exitCodeWatcher); + } unset($this->pendingProcesses[$handle->wrapperPid]); } public function onReadableExitCode($watcher, $socket, Handle $handle) { - $handle->exitCodeWatcher = null; - Loop::cancel($watcher); - $data = \fread($socket, 5); if ($data === false || $data === '') { - $handle->status = ProcessStatus::ENDED; - $handle->joinDeferred->fail(new ProcessException('Failed to read exit code from wrapper: No data received')); return; } + $handle->exitCodeWatcher = null; + Loop::cancel($watcher); + if (\strlen($data) !== 5) { $handle->status = ProcessStatus::ENDED; $handle->joinDeferred->fail(new ProcessException( - \sprintf('Failed to read exit code from wrapper: Recieved %d of 5 expected bytes', \strlen($data)) + \sprintf('Failed to read exit code from wrapper: Received %d of 5 expected bytes', \strlen($data)) )); return; } @@ -317,6 +312,8 @@ final class SocketConnector { foreach ($handle->stdioDeferreds as $deferred) { $deferred->fail($error); } + + $handle->joinDeferred->fail($error); } public function registerPendingProcess(Handle $handle) { diff --git a/lib/Process.php b/lib/Process.php index fcfe91c..9e7cf4d 100644 --- a/lib/Process.php +++ b/lib/Process.php @@ -2,6 +2,7 @@ namespace Amp\Process; +use Amp\Loop; use Amp\Process\Internal\Posix\Runner as PosixProcessRunner; use Amp\Process\Internal\ProcessHandle; use Amp\Process\Internal\ProcessRunner; @@ -11,7 +12,7 @@ use Amp\Promise; class Process { /** @var ProcessRunner */ - private static $processRunner; + private $processRunner; /** @var string */ private $command; @@ -57,6 +58,16 @@ class Process { $this->cwd = $cwd; $this->env = $envVars; $this->options = $options; + + $this->processRunner = Loop::getState(self::class); + + if ($this->processRunner === null) { + $this->processRunner = \strncasecmp(\PHP_OS, "WIN", 3) === 0 + ? new WindowsProcessRunner + : new PosixProcessRunner; + + Loop::setState(self::class, $this->processRunner); + } } /** @@ -64,7 +75,7 @@ class Process { */ public function __destruct() { if ($this->handle !== null) { - self::$processRunner->destroy($this->handle); + $this->processRunner->destroy($this->handle); } } @@ -82,7 +93,7 @@ class Process { throw new StatusError("Process has already been started."); } - $this->handle = self::$processRunner->start($this->command, $this->cwd, $this->env, $this->options); + $this->handle = $this->processRunner->start($this->command, $this->cwd, $this->env, $this->options); } /** @@ -97,7 +108,7 @@ class Process { throw new StatusError("Process has not been started."); } - return self::$processRunner->join($this->handle); + return $this->processRunner->join($this->handle); } /** @@ -111,7 +122,7 @@ class Process { throw new StatusError("The process is not running"); } - self::$processRunner->kill($this->handle); + $this->processRunner->kill($this->handle); } /** @@ -127,7 +138,7 @@ class Process { throw new StatusError("The process is not running"); } - self::$processRunner->signal($this->handle, $signo); + $this->processRunner->signal($this->handle, $signo); } /** @@ -200,7 +211,7 @@ class Process { * @return ProcessOutputStream */ public function getStdin(): ProcessOutputStream { - return $this->stdin; + return $this->handle->stdin; } /** @@ -213,7 +224,7 @@ class Process { throw new StatusError("The process is not running"); } - return $this->stdout; + return $this->handle->stdout; } /** @@ -226,13 +237,6 @@ class Process { throw new StatusError("The process is not running"); } - return $this->stderr; + return $this->handle->stderr; } } - -(function () { - /** @noinspection PhpUndefinedClassInspection */ - self::$processRunner = \strncasecmp(\PHP_OS, "WIN", 3) === 0 - ? new WindowsProcessRunner - : new PosixProcessRunner; -})->bindTo(null, Process::class)(); diff --git a/lib/ProcessInputStream.php b/lib/ProcessInputStream.php index ec43048..92992b1 100644 --- a/lib/ProcessInputStream.php +++ b/lib/ProcessInputStream.php @@ -17,6 +17,9 @@ class ProcessInputStream implements InputStream { /** @var bool */ private $shouldClose = false; + /** @var bool */ + private $referenced = true; + /** @var ResourceInputStream */ private $resourceStream; @@ -27,12 +30,18 @@ class ProcessInputStream implements InputStream { $resourceStreamPromise->onResolve(function ($error, $resourceStream) { if ($error) { $this->error = new StreamException("Failed to launch process", 0, $error); - $this->initialRead->fail($this->error); + if ($this->initialRead) { + $this->initialRead->fail($this->error); + } return; } $this->resourceStream = $resourceStream; + if (!$this->referenced) { + $this->resourceStream->unreference(); + } + if ($this->shouldClose) { $this->resourceStream->close(); } @@ -70,6 +79,22 @@ class ProcessInputStream implements InputStream { return $this->initialRead->promise(); } + public function reference() { + $this->referenced = true; + + if ($this->resourceStream) { + $this->resourceStream->reference(); + } + } + + public function unreference() { + $this->referenced = false; + + if ($this->resourceStream) { + $this->resourceStream->unreference(); + } + } + public function close() { $this->shouldClose = true; diff --git a/lib/ProcessOutputStream.php b/lib/ProcessOutputStream.php index de84be1..29d8f11 100644 --- a/lib/ProcessOutputStream.php +++ b/lib/ProcessOutputStream.php @@ -12,7 +12,7 @@ use Amp\Promise; class ProcessOutputStream implements OutputStream { /** @var array */ - private $queuedWrites; + private $queuedWrites = []; /** @var bool */ private $shouldClose = false; diff --git a/phpunit.xml.dist b/phpunit.xml.dist index 4856517..e535180 100644 --- a/phpunit.xml.dist +++ b/phpunit.xml.dist @@ -1,17 +1,5 @@ - + test diff --git a/test/ProcessTest.php b/test/ProcessTest.php index ac914fc..2e1ff71 100644 --- a/test/ProcessTest.php +++ b/test/ProcessTest.php @@ -7,7 +7,7 @@ use Amp\Process\Process; use PHPUnit\Framework\TestCase; class ProcessTest extends TestCase { - const CMD_PROCESS = 'echo foo'; + const CMD_PROCESS = \DIRECTORY_SEPARATOR === "\\" ? "cmd /c echo foo" : "echo foo"; /** * @expectedException \Amp\Process\StatusError @@ -22,7 +22,7 @@ class ProcessTest extends TestCase { public function testIsRunning() { Loop::run(function () { - $process = new Process("exit 42"); + $process = new Process(\DIRECTORY_SEPARATOR === "\\" ? "cmd /c exit 42" : "exit 42"); $process->start(); $promise = $process->join(); @@ -36,8 +36,9 @@ class ProcessTest extends TestCase { public function testExecuteResolvesToExitCode() { Loop::run(function () { - $process = new Process("exit 42"); + $process = new Process(\DIRECTORY_SEPARATOR === "\\" ? "cmd /c exit 42" : "exit 42"); $process->start(); + $code = yield $process->join(); $this->assertSame(42, $code); @@ -54,7 +55,7 @@ class ProcessTest extends TestCase { $completed = false; $promise->onResolve(function () use (&$completed) { $completed = true; }); $this->assertFalse($completed); - $this->assertInternalType('int', $process->getPid()); + $this->assertInternalType('int', yield $process->getPid()); }); } @@ -70,7 +71,7 @@ class ProcessTest extends TestCase { $process->kill(); - $code = yield $promise; + yield $promise; }); }