1
0
mirror of https://github.com/danog/process.git synced 2024-11-26 12:14:43 +01:00

More refactoring for Windows compatibility with improved BC

This commit is contained in:
Niklas Keller 2017-09-17 17:58:05 +02:00 committed by Aaron Piotrowski
parent 6d2b28f4f5
commit 88f8865a6a
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
16 changed files with 468 additions and 298 deletions

View File

@ -1,8 +1,9 @@
{
"name": "amphp/process",
"homepage": "https://github.com/amphp/process",
"description": "Asynchronous process manager",
"description": "Asynchronous process manager.",
"require": {
"php": ">=7",
"amphp/amp": "^2",
"amphp/byte-stream": "^1"
},
@ -21,6 +22,10 @@
{
"name": "Aaron Piotrowski",
"email": "aaron@trowski.com"
},
{
"name": "Niklas Keller",
"email": "me@kelunik.com"
}
],
"autoload": {

View File

@ -5,25 +5,21 @@ namespace Amp\Process\Internal\Posix;
use Amp\Deferred;
use Amp\Process\Internal\ProcessHandle;
final class Handle extends ProcessHandle
{
final class Handle extends ProcessHandle {
public function __construct() {
$this->startDeferred = new Deferred;
$this->endDeferred = new Deferred;
$this->pidDeferred = new Deferred;
$this->joinDeferred = new Deferred;
$this->originalParentPid = \getmypid();
}
/** @var Deferred */
public $endDeferred;
/** @var Deferred */
public $startDeferred;
public $joinDeferred;
/** @var resource */
public $proc;
/** @var resource[] */
public $pipes;
/** @var resource */
public $extraDataPipe;
/** @var string */
public $extraDataPipeWatcher;

View File

@ -4,15 +4,17 @@ namespace Amp\Process\Internal\Posix;
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Deferred;
use Amp\Loop;
use Amp\Process\Internal\ProcessHandle;
use Amp\Process\Internal\ProcessRunner;
use Amp\Process\Internal\ProcessStatus;
use Amp\Process\ProcessException;
use Amp\Process\ProcessInputStream;
use Amp\Process\ProcessOutputStream;
use Amp\Promise;
final class Runner implements ProcessRunner
{
final class Runner implements ProcessRunner {
const FD_SPEC = [
["pipe", "r"], // stdin
["pipe", "w"], // stdout
@ -20,44 +22,51 @@ final class Runner implements ProcessRunner
["pipe", "w"], // exit code pipe
];
public function onProcessEndExtraDataPipeReadable($watcher, $stream, Handle $handle) {
public static function onProcessEndExtraDataPipeReadable($watcher, $stream, Handle $handle) {
Loop::cancel($watcher);
$handle->status = ProcessStatus::ENDED;
if (!\is_resource($stream) || \feof($stream)) {
$handle->endDeferred->fail(new ProcessException("Process ended unexpectedly"));
$handle->joinDeferred->fail(new ProcessException("Process ended unexpectedly"));
} else {
$handle->endDeferred->resolve((int) \rtrim(@\stream_get_contents($stream)));
$handle->joinDeferred->resolve((int) \rtrim(@\stream_get_contents($stream)));
}
}
public function onProcessStartExtraDataPipeReadable($watcher, $stream, Handle $handle) {
public static function onProcessStartExtraDataPipeReadable($watcher, $stream, $data) {
Loop::cancel($watcher);
$pid = \rtrim(@\fgets($stream));
/** @var $deferreds Deferred[] */
list($handle, $pipes, $deferreds) = $data;
if (!$pid || !\is_numeric($pid)) {
$handle->startDeferred->fail(new ProcessException("Could not determine PID"));
$error = new ProcessException("Could not determine PID");
$handle->pidDeferred->fail($error);
$handle->joinDeferred->fail($error);
foreach ($deferreds as $deferred) {
/** @var $deferred Deferred */
$deferred->fail($error);
}
return;
}
$handle->status = ProcessStatus::RUNNING;
$handle->pid = (int) $pid;
$handle->stdin = new ResourceOutputStream($handle->pipes[0]);
$handle->stdout = new ResourceInputStream($handle->pipes[1]);
$handle->stderr = new ResourceInputStream($handle->pipes[2]);
$handle->pidDeferred->resolve((int) $pid);
$deferreds[0]->resolve(new ResourceOutputStream($pipes[0]));
$deferreds[1]->resolve(new ResourceInputStream($pipes[1]));
$deferreds[2]->resolve(new ResourceInputStream($pipes[2]));
$handle->extraDataPipeWatcher = Loop::onReadable($stream, [$this, 'onProcessEndExtraDataPipeReadable'], $handle);
$handle->extraDataPipeWatcher = Loop::onReadable($stream, [self::class, 'onProcessEndExtraDataPipeReadable'], $handle);
Loop::unreference($handle->extraDataPipeWatcher);
$handle->startDeferred->resolve($handle);
$handle->sockets->resolve();
}
/**
* {@inheritdoc}
*/
public function start(string $command, string $cwd = null, array $env = [], array $options = []): Promise {
/** @inheritdoc */
public function start(string $command, string $cwd = null, array $env = [], array $options = []): ProcessHandle {
$command = \sprintf(
'{ (%s) <&3 3<&- 3>/dev/null & } 3<&0;' .
'pid=$!; echo $pid >&3; wait $pid; RC=$?; echo $RC >&3; exit $RC',
@ -65,8 +74,7 @@ final class Runner implements ProcessRunner
);
$handle = new Handle;
$handle->proc = @\proc_open($command, self::FD_SPEC, $handle->pipes, $cwd ?: null, $env ?: null, $options);
$handle->proc = @\proc_open($command, self::FD_SPEC, $pipes, $cwd ?: null, $env ?: null, $options);
if (!\is_resource($handle->proc)) {
$message = "Could not start process";
@ -83,36 +91,40 @@ final class Runner implements ProcessRunner
throw new ProcessException("Could not get process status");
}
\stream_set_blocking($handle->pipes[3], false);
$stdinDeferred = new Deferred;
$handle->stdin = new ProcessOutputStream($stdinDeferred->promise());
/* It's fine to use an instance method here because this object is assigned to a static var in Process and never
needs to be dtor'd before the process ends */
Loop::onReadable($handle->pipes[3], [$this, 'onProcessStartExtraDataPipeReadable'], $handle);
$stdoutDeferred = new Deferred;
$handle->stdout = new ProcessInputStream($stdoutDeferred->promise());
return $handle->startDeferred->promise();
$stderrDeferred = new Deferred;
$handle->stderr = new ProcessInputStream($stderrDeferred->promise());
$handle->extraDataPipe = $pipes[3];
\stream_set_blocking($pipes[3], false);
Loop::onReadable($pipes[3], [self::class, 'onProcessStartExtraDataPipeReadable'], [$handle, $pipes, [
$stdinDeferred, $stdoutDeferred, $stderrDeferred
]]);
return $handle;
}
/**
* {@inheritdoc}
*/
/** @inheritdoc */
public function join(ProcessHandle $handle): Promise {
/** @var Handle $handle */
if ($handle->extraDataPipeWatcher !== null) {
Loop::reference($handle->extraDataPipeWatcher);
}
return $handle->endDeferred->promise();
return $handle->joinDeferred->promise();
}
/**
* {@inheritdoc}
*/
/** @inheritdoc */
public function kill(ProcessHandle $handle) {
/** @var Handle $handle */
// Forcefully kill the process using SIGKILL.
if (!\proc_terminate($handle->proc, 9)) {
if (!\proc_terminate($handle->proc, 9)) { // Forcefully kill the process using SIGKILL.
throw new ProcessException("Terminating process failed");
}
@ -120,28 +132,21 @@ final class Runner implements ProcessRunner
$handle->extraDataPipeWatcher = null;
$handle->status = ProcessStatus::ENDED;
$handle->endDeferred->fail(new ProcessException("The process was killed"));
$handle->joinDeferred->fail(new ProcessException("The process was killed"));
}
/**
* {@inheritdoc}
*/
/** @inheritdoc */
public function signal(ProcessHandle $handle, int $signo) {
/** @var Handle $handle */
if (!\proc_terminate($handle->proc, $signo)) {
throw new ProcessException("Sending signal to process failed");
}
}
/**
* {@inheritdoc}
*/
/** @inheritdoc */
public function destroy(ProcessHandle $handle) {
/** @var Handle $handle */
if (\getmypid() === $handle->originalParentPid && $handle->status < ProcessStatus::ENDED) {
if ($handle->status < ProcessStatus::ENDED && \getmypid() === $handle->originalParentPid) {
$this->kill($handle);
}
@ -149,12 +154,14 @@ final class Runner implements ProcessRunner
Loop::cancel($handle->extraDataPipeWatcher);
}
for ($i = 0; $i < 4; $i++) {
if (\is_resource($handle->pipes[$i] ?? null)) {
\fclose($handle->pipes[$i]);
}
if (\is_resource($handle->extraDataPipe)) {
\fclose($handle->extraDataPipe);
}
$handle->stdin->close();
$handle->stdout->close();
$handle->stderr->close();
if (\is_resource($handle->proc)) {
\proc_close($handle->proc);
}

View File

@ -2,22 +2,25 @@
namespace Amp\Process\Internal;
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Deferred;
use Amp\Process\ProcessInputStream;
use Amp\Process\ProcessOutputStream;
use Amp\Struct;
abstract class ProcessHandle
{
/** @var ResourceOutputStream */
abstract class ProcessHandle {
use Struct;
/** @var ProcessOutputStream */
public $stdin;
/** @var ResourceInputStream */
/** @var ProcessInputStream */
public $stdout;
/** @var ResourceInputStream */
/** @var ProcessInputStream */
public $stderr;
/** @var int */
public $pid = 0;
/** @var Deferred */
public $pidDeferred;
/** @var bool */
public $status = ProcessStatus::STARTING;

View File

@ -2,54 +2,56 @@
namespace Amp\Process\Internal;
use Amp\Process\ProcessException;
use Amp\Promise;
interface ProcessRunner
{
interface ProcessRunner {
/**
* Start a process using the supplied parameters
* Start a process using the supplied parameters.
*
* @param string $command The command to execute
* @param string|null $cwd The working directory for the child process
* @param array $env Environment variables to pass to the child process
* @param array $options proc_open() options
* @return Promise <ProcessInfo> Succeeds with a process descriptor or fails if the process cannot be started
* @throws \Amp\Process\ProcessException If starting the process fails.
* @param string $command The command to execute.
* @param string|null $cwd The working directory for the child process.
* @param array $env Environment variables to pass to the child process.
* @param array $options `proc_open()` options.
*
* @return ProcessHandle
*
* @throws ProcessException If starting the process fails.
*/
function start(string $command, string $cwd = null, array $env = [], array $options = []): Promise;
public function start(string $command, string $cwd = null, array $env = [], array $options = []): ProcessHandle;
/**
* Wait for the child process to end
* Wait for the child process to end.
*
* @param ProcessHandle $handle The process descriptor.
*
* @param ProcessHandle $handle The process descriptor
* @return Promise <int> Succeeds with exit code of the process or fails if the process is killed.
*/
function join(ProcessHandle $handle): Promise;
public function join(ProcessHandle $handle): Promise;
/**
* Forcibly end the child process
* Forcibly end the child process.
*
* @param ProcessHandle $handle The process descriptor
* @return void
* @throws \Amp\Process\ProcessException If terminating the process fails
* @param ProcessHandle $handle The process descriptor.
*
* @throws ProcessException If terminating the process fails.
*/
function kill(ProcessHandle $handle);
public function kill(ProcessHandle $handle);
/**
* Send a signal signal to the child process
*
* @param ProcessHandle $handle The process descriptor
* @param int $signo Signal number to send to process.
* @return void
* @throws \Amp\Process\ProcessException If sending the signal fails.
* @param ProcessHandle $handle The process descriptor.
* @param int $signo Signal number to send to process.
*
* @throws ProcessException If sending the signal fails.
*/
function signal(ProcessHandle $handle, int $signo);
public function signal(ProcessHandle $handle, int $signo);
/**
* Release all resources held by the process handle
* Release all resources held by the process handle.
*
* @param ProcessHandle $handle The process descriptor
* @return void
* @param ProcessHandle $handle The process descriptor.
*/
function destroy(ProcessHandle $handle);
public function destroy(ProcessHandle $handle);
}

View File

@ -2,11 +2,12 @@
namespace Amp\Process\Internal;
final class ProcessStatus
{
final class ProcessStatus {
const STARTING = 0;
const RUNNING = 1;
const ENDED = 2;
private function __construct() { }
private function __construct() {
// empty to prevent instances of this class
}
}

View File

@ -5,18 +5,13 @@ namespace Amp\Process\Internal\Windows;
use Amp\Deferred;
use Amp\Process\Internal\ProcessHandle;
final class Handle extends ProcessHandle
{
final class Handle extends ProcessHandle {
public function __construct() {
$this->startDeferred = new Deferred;
$this->endDeferred = new Deferred;
$this->joinDeferred = new Deferred;
}
/** @var Deferred */
public $startDeferred;
/** @var Deferred */
public $endDeferred;
public $joinDeferred;
/** @var string */
public $exitCodeWatcher;
@ -33,6 +28,9 @@ final class Handle extends ProcessHandle
/** @var resource[] */
public $sockets;
/** @var Deferred[] */
public $stdioDeferreds;
/** @var string */
public $connectTimeoutWatcher;

View File

@ -2,8 +2,7 @@
namespace Amp\Process\Internal\Windows;
final class HandshakeStatus
{
final class HandshakeStatus {
const SUCCESS = 0;
const SIGNAL_UNEXPECTED = 0x01;
const INVALID_STREAM_ID = 0x02;
@ -11,5 +10,7 @@ final class HandshakeStatus
const DUPLICATE_STREAM_ID = 0x04;
const INVALID_CLIENT_TOKEN = 0x05;
private function __construct() { }
private function __construct() {
// empty to prevent instances of this class
}
}

View File

@ -2,11 +2,14 @@
namespace Amp\Process\Internal\Windows;
final class PendingSocketClient
{
use Amp\Struct;
final class PendingSocketClient {
use Struct;
public $readWatcher;
public $timeoutWatcher;
public $recievedDataBuffer = '';
public $receivedDataBuffer = '';
public $pid;
public $streamId;
}

View File

@ -2,22 +2,25 @@
namespace Amp\Process\Internal\Windows;
use Amp\Deferred;
use Amp\Loop;
use Amp\Process\Internal\ProcessHandle;
use Amp\Process\Internal\ProcessRunner;
use Amp\Process\Internal\ProcessStatus;
use Amp\Process\ProcessException;
use Amp\Process\ProcessInputStream;
use Amp\Process\ProcessOutputStream;
use Amp\Promise;
use const Amp\Process\BIN_DIR;
final class Runner implements ProcessRunner
{
final class Runner implements ProcessRunner {
const FD_SPEC = [
["pipe", "r"], // stdin
["pipe", "w"], // stdout
["pipe", "w"], // stderr
["pipe", "w"], // exit code pipe
];
const WRAPPER_EXE_PATH = PHP_INT_SIZE === 8
? BIN_DIR . '\\windows\\ProcessWrapper64.exe'
: BIN_DIR . '\\windows\\ProcessWrapper.exe';
@ -26,8 +29,8 @@ final class Runner implements ProcessRunner
private function makeCommand(string $command, string $workingDirectory): string {
$result = sprintf(
'"%s" --address=%s --port=%d --token-size=%d',
self::WRAPPER_EXE_PATH,
'%s --address=%s --port=%d --token-size=%d',
\escapeshellarg(self::WRAPPER_EXE_PATH),
$this->socketConnector->address,
$this->socketConnector->port,
SocketConnector::SECURITY_TOKEN_SIZE
@ -46,11 +49,8 @@ final class Runner implements ProcessRunner
$this->socketConnector = new SocketConnector;
}
/**
* {@inheritdoc}
*/
public function start(string $command, string $cwd = null, array $env = [], array $options = []): Promise
{
/** @inheritdoc */
public function start(string $command, string $cwd = null, array $env = [], array $options = []): ProcessHandle {
$command = $this->makeCommand($command, $cwd ?? '');
$options['bypass_shell'] = true;
@ -90,33 +90,34 @@ final class Runner implements ProcessRunner
$handle->wrapperPid = $status['pid'];
$handle->wrapperStderrPipe = $pipes[2];
$stdinDeferred = new Deferred;
$handle->stdioDeferreds[] = new ProcessOutputStream($stdinDeferred->promise());
$stdoutDeferred = new Deferred;
$handle->stdioDeferreds[] = new ProcessInputStream($stdoutDeferred->promise());
$stderrDeferred = new Deferred;
$handle->stdioDeferreds[] = new ProcessInputStream($stderrDeferred->promise());
$this->socketConnector->registerPendingProcess($handle);
return $handle->startDeferred->promise();
return $handle;
}
/**
* {@inheritdoc}
*/
public function join(ProcessHandle $handle): Promise
{
/** @inheritdoc */
public function join(ProcessHandle $handle): Promise {
/** @var Handle $handle */
if ($handle->exitCodeWatcher !== null) {
Loop::reference($handle->exitCodeWatcher);
}
return $handle->endDeferred->promise();
return $handle->joinDeferred->promise();
}
/**
* {@inheritdoc}
*/
public function kill(ProcessHandle $handle)
{
/** @inheritdoc */
public function kill(ProcessHandle $handle) {
/** @var Handle $handle */
// todo: send a signal to the wrapper to kill the child instead ?
// todo: send a signal to the wrapper to kill the child instead?
if (!\proc_terminate($handle->proc)) {
throw new ProcessException("Terminating process failed");
}
@ -125,25 +126,17 @@ final class Runner implements ProcessRunner
$handle->exitCodeWatcher = null;
$handle->status = ProcessStatus::ENDED;
$handle->endDeferred->fail(new ProcessException("The process was killed"));
$handle->joinDeferred->fail(new ProcessException("The process was killed"));
}
/**
* {@inheritdoc}
*/
public function signal(ProcessHandle $handle, int $signo)
{
/** @inheritdoc */
public function signal(ProcessHandle $handle, int $signo) {
throw new ProcessException('Signals are not supported on Windows');
}
/**
* {@inheritdoc}
*/
public function destroy(ProcessHandle $handle)
{
/** @inheritdoc */
public function destroy(ProcessHandle $handle) {
/** @var Handle $handle */
if ($handle->status < ProcessStatus::ENDED) {
$this->kill($handle);
}

View File

@ -2,12 +2,13 @@
namespace Amp\Process\Internal\Windows;
final class SignalCode
{
final class SignalCode {
const HANDSHAKE = 0x01;
const HANDSHAKE_ACK = 0x02;
const CHILD_PID = 0x03;
const EXIT_CODE = 0x04;
private function __construct() { }
private function __construct() {
// empty to prevent instances of this class
}
}

View File

@ -8,8 +8,7 @@ use Amp\Loop;
use Amp\Process\Internal\ProcessStatus;
use Amp\Process\ProcessException;
final class SocketConnector
{
final class SocketConnector {
const SERVER_SOCKET_URI = 'tcp://127.0.0.1:0';
const SECURITY_TOKEN_SIZE = 16;
const CONNECT_TIMEOUT = 1000;
@ -29,12 +28,9 @@ final class SocketConnector
/** @var int */
public $port;
public function __construct() {
$this->server = \stream_socket_server(
self::SERVER_SOCKET_URI,
$errNo, $errStr,
\STREAM_SERVER_LISTEN | \STREAM_SERVER_BIND
);
public function __construct() {
$flags = \STREAM_SERVER_LISTEN | \STREAM_SERVER_BIND;
$this->server = \stream_socket_server(self::SERVER_SOCKET_URI, $errNo, $errStr, $flags);
if (!$this->server) {
throw new \Error("Failed to create TCP server socket for process wrapper: {$errNo}: {$errStr}");
@ -45,61 +41,64 @@ final class SocketConnector
}
list($this->address, $this->port) = \explode(':', \stream_socket_get_name($this->server, false));
$this->port = (int)$this->port;
$this->port = (int) $this->port;
Loop::unreference(Loop::onReadable($this->server, [$this, 'onServerSocketReadable']));
}
private function failClientHandshake($socket, int $code): void
{
private function failClientHandshake($socket, int $code): void {
\fwrite($socket, \chr(SignalCode::HANDSHAKE_ACK) . \chr($code));
\fclose($socket);
unset($this->pendingClients[(int)$socket]);
unset($this->pendingClients[(int) $socket]);
}
private function failHandleStart(Handle $handle, string $message, ...$args)
{
private function failHandleStart(Handle $handle, string $message, ...$args) {
Loop::cancel($handle->connectTimeoutWatcher);
unset($this->pendingProcesses[$handle->wrapperPid]);
foreach ($handle->sockets as $socket) {
\fclose($socket);
}
$handle->startDeferred->fail(new ProcessException(\vsprintf($message, $args)));
$error = new ProcessException(\vsprintf($message, $args));
foreach ($handle->stdioDeferreds as $deferred) {
$deferred->fail($error);
}
}
/**
* Read data from a client socket
* Read data from a client socket.
*
* This method cleans up internal state as appropriate. Returns null if the read fails or needs to be repeated.
*
* @param resource $socket
* @param int $length
* @param resource $socket
* @param int $length
* @param PendingSocketClient $state
*
* @return string|null
*/
private function readDataFromPendingClient($socket, int $length, PendingSocketClient $state)
{
private function readDataFromPendingClient($socket, int $length, PendingSocketClient $state) {
$data = \fread($socket, $length);
if ($data === false || $data === '') {
\fclose($socket);
Loop::cancel($state->readWatcher);
Loop::cancel($state->timeoutWatcher);
unset($this->pendingClients[(int)$socket]);
unset($this->pendingClients[(int) $socket]);
return null;
}
$data = $state->recievedDataBuffer . $data;
$data = $state->receivedDataBuffer . $data;
if (\strlen($data) < $length) {
$state->recievedDataBuffer = $data;
$state->receivedDataBuffer = $data;
return null;
}
$state->recievedDataBuffer = '';
$state->receivedDataBuffer = '';
Loop::cancel($state->readWatcher);
Loop::cancel($state->timeoutWatcher);
@ -107,8 +106,8 @@ final class SocketConnector
return $data;
}
public function onReadable_Handshake($watcher, $socket) {
$socketId = (int)$socket;
public function onReadableHandshake($watcher, $socket) {
$socketId = (int) $socket;
$pendingClient = $this->pendingClients[$socketId];
if (null === $data = $this->readDataFromPendingClient($socket, self::SECURITY_TOKEN_SIZE + 6, $pendingClient)) {
@ -146,7 +145,7 @@ final class SocketConnector
return;
}
if ($packet['client_token'] !== $handle->securityTokens[$packet['stream_id']]) {
if (!\hash_equals($packet['client_token'], $handle->securityTokens[$packet['stream_id']])) {
$this->failClientHandshake($socket, HandshakeStatus::INVALID_CLIENT_TOKEN);
$this->failHandleStart($handle, "Invalid client security token for stream #%d", $packet['stream_id']);
return;
@ -164,13 +163,11 @@ final class SocketConnector
$pendingClient->pid = $packet['pid'];
$pendingClient->streamId = $packet['stream_id'];
$pendingClient->readWatcher = Loop::onReadable($socket, [$this, 'onReadable_HandshakeAck']);
$pendingClient->readWatcher = Loop::onReadable($socket, [$this, 'onReadableHandshakeAck']);
}
public function onReadable_HandshakeAck($watcher, $socket)
{
$socketId = (int)$socket;
public function onReadableHandshakeAck($watcher, $socket) {
$socketId = (int) $socket;
$pendingClient = $this->pendingClients[$socketId];
// can happen if the start promise was failed
@ -202,12 +199,11 @@ final class SocketConnector
$handle->sockets[$pendingClient->streamId] = $socket;
if (count($handle->sockets) === 3) {
$pendingClient->readWatcher = Loop::onReadable($handle->sockets[0], [$this, 'onReadable_ChildPid'], $handle);
$pendingClient->readWatcher = Loop::onReadable($handle->sockets[0], [$this, 'onReadableChildPid'], $handle);
}
}
public function onReadable_ChildPid($watcher, $socket, Handle $handle)
{
public function onReadableChildPid($watcher, $socket, Handle $handle) {
Loop::cancel($watcher);
Loop::cancel($handle->connectTimeoutWatcher);
@ -220,7 +216,7 @@ final class SocketConnector
if (\strlen($data) !== 5) {
$this->failHandleStart(
$handle, 'Failed to read PID from wrapper: Recieved %d of 5 expected bytes', \strlen($data)
$handle, 'Failed to read PID from wrapper: Received %d of 5 expected bytes', \strlen($data)
);
return;
}
@ -235,20 +231,18 @@ final class SocketConnector
}
$handle->status = ProcessStatus::RUNNING;
$handle->pid = $packet['pid'];
$handle->stdin = new ResourceOutputStream($handle->sockets[0]);
$handle->stdout = new ResourceInputStream($handle->sockets[1]);
$handle->stderr = new ResourceInputStream($handle->sockets[2]);
$handle->pidDeferred->resolve($packet['pid']);
$handle->stdioDeferreds[0]->resolve(new ResourceOutputStream($handle->sockets[0]));
$handle->stdioDeferreds[1]->resolve(new ResourceInputStream($handle->sockets[1]));
$handle->stdioDeferreds[2]->resolve(new ResourceInputStream($handle->sockets[2]));
$handle->exitCodeWatcher = Loop::onReadable($handle->sockets[0], [$this, 'onReadable_ExitCode'], $handle);
$handle->exitCodeWatcher = Loop::onReadable($handle->sockets[0], [$this, 'onReadableExitCode'], $handle);
Loop::unreference($handle->exitCodeWatcher);
unset($this->pendingProcesses[$handle->wrapperPid]);
$handle->startDeferred->resolve($handle);
}
public function onReadable_ExitCode($watcher, $socket, Handle $handle)
{
public function onReadableExitCode($watcher, $socket, Handle $handle) {
$handle->exitCodeWatcher = null;
Loop::cancel($watcher);
@ -256,13 +250,13 @@ final class SocketConnector
if ($data === false || $data === '') {
$handle->status = ProcessStatus::ENDED;
$handle->endDeferred->fail(new ProcessException('Failed to read exit code from wrapper: No data received'));
$handle->joinDeferred->fail(new ProcessException('Failed to read exit code from wrapper: No data received'));
return;
}
if (\strlen($data) !== 5) {
$handle->status = ProcessStatus::ENDED;
$handle->endDeferred->fail(new ProcessException(
$handle->joinDeferred->fail(new ProcessException(
\sprintf('Failed to read exit code from wrapper: Recieved %d of 5 expected bytes', \strlen($data))
));
return;
@ -278,11 +272,11 @@ final class SocketConnector
}
$handle->status = ProcessStatus::ENDED;
$handle->endDeferred->resolve($packet['code']);
$handle->joinDeferred->resolve($packet['code']);
}
public function onClientSocketConnectTimeout($watcher, $socket) {
$id = (int)$socket;
$id = (int) $socket;
Loop::cancel($this->pendingClients[$id]->readWatcher);
unset($this->pendingClients[$id]);
@ -298,10 +292,10 @@ final class SocketConnector
}
$pendingClient = new PendingSocketClient;
$pendingClient->readWatcher = Loop::onReadable($socket, [$this, 'onReadable_Handshake']);
$pendingClient->readWatcher = Loop::onReadable($socket, [$this, 'onReadableHandshake']);
$pendingClient->timeoutWatcher = Loop::delay(self::CONNECT_TIMEOUT, [$this, 'onClientSocketConnectTimeout'], $socket);
$this->pendingClients[(int)$socket] = $pendingClient;
$this->pendingClients[(int) $socket] = $pendingClient;
}
public function onProcessConnectTimeout($watcher, Handle $handle) {
@ -319,11 +313,13 @@ final class SocketConnector
\fclose($socket);
}
$handle->startDeferred->fail(new ProcessException(\trim($error)));
$error = new ProcessException(\trim($error));
foreach ($handle->stdioDeferreds as $deferred) {
$deferred->fail($error);
}
}
public function registerPendingProcess(Handle $handle)
{
public function registerPendingProcess(Handle $handle) {
$handle->connectTimeoutWatcher = Loop::delay(self::CONNECT_TIMEOUT, [$this, 'onProcessConnectTimeout'], $handle);
$this->pendingProcesses[$handle->wrapperPid] = $handle;

View File

@ -2,9 +2,6 @@
namespace Amp\Process;
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Deferred;
use Amp\Process\Internal\Posix\Runner as PosixProcessRunner;
use Amp\Process\Internal\ProcessHandle;
use Amp\Process\Internal\ProcessRunner;
@ -32,52 +29,15 @@ class Process {
private $handle;
/**
* @param string $command Command to run.
* @param string $cwd Working directory of child process.
* @param array $env Environment variables for child process.
* @param array $options Options for proc_open().
* @param ProcessHandle $handle Handle for the created process.
*/
private function __construct(string $command, string $cwd, array $env, array $options, ProcessHandle $handle) {
$this->command = $command;
$this->cwd = $cwd;
$this->env = $env;
$this->options = $options;
$this->handle = $handle;
}
/**
* Stops the process if it is still running.
*/
public function __destruct() {
if ($this->handle !== null) {
self::$processRunner->destroy($this->handle);
}
}
/**
* Throw to prevent cloning
*
* @throws \Error
*/
public function __clone() {
throw new \Error(self::class . ' instances cannot be cloned');
}
/**
* Start a new process.
*
* @param string|string[] $command Command to run.
* @param string|null $cwd Working directory or use an empty string to use the working directory of the current
* PHP process.
* @param mixed[] $env Environment variables or use an empty array to inherit from the current PHP process.
* @param mixed[] $options Options for proc_open().
* @return Promise <Process> Fails with a ProcessException if starting the process fails.
* @param string|null $cwd Working directory or use an empty string to use the working directory of the
* parent.
* @param mixed[] $env Environment variables or use an empty array to inherit from the parent.
* @param mixed[] $options Options for `proc_open()`.
*
* @throws \Error If the arguments are invalid.
* @throws \Amp\Process\StatusError If the process is already running.
* @throws \Amp\Process\ProcessException If starting the process fails.
*/
public static function start($command, string $cwd = null, array $env = [], array $options = []): Promise {
public function __construct($command, string $cwd = null, array $env = [], array $options = []) {
$command = \is_array($command)
? \implode(" ", \array_map("escapeshellarg", $command))
: (string) $command;
@ -93,35 +53,58 @@ class Process {
$envVars[(string) $key] = (string) $value;
}
$deferred = new Deferred;
self::$processRunner->start($command, $cwd, $env, $options)
->onResolve(function($error, $handle) use($deferred, $command, $cwd, $env, $options) {
if ($error) {
$deferred->fail($error);
} else {
$deferred->resolve(new Process($command, $cwd, $env, $options, $handle));
}
});
return $deferred->promise();
$this->command = $command;
$this->cwd = $cwd;
$this->env = $envVars;
$this->options = $options;
}
/**
* Wait for the process to end..
* Stops the process if it is still running.
*/
public function __destruct() {
if ($this->handle !== null) {
self::$processRunner->destroy($this->handle);
}
}
public function __clone() {
throw new \Error("Cloning is not allowed!");
}
/**
* Start the process.
*
* @throws StatusError If the process has already been started.
*/
public function start() {
if ($this->handle) {
throw new StatusError("Process has already been started.");
}
$this->handle = self::$processRunner->start($this->command, $this->cwd, $this->env, $this->options);
}
/**
* Wait for the process to end.
*
* @return Promise <int> Succeeds with process exit code or fails with a ProcessException if the process is killed.
*
* @throws StatusError If the process has already been started.
*/
public function join(): Promise {
if (!$this->handle) {
throw new StatusError("Process has not been started.");
}
return self::$processRunner->join($this->handle);
}
/**
* Forcibly end the process.
*
* @return void
* @throws \Amp\Process\StatusError If the process is not running.
* @throws \Amp\Process\ProcessException If terminating the process fails.
* @throws StatusError If the process is not running.
* @throws ProcessException If terminating the process fails.
*/
public function kill() {
if (!$this->isRunning()) {
@ -135,9 +118,9 @@ class Process {
* Send a signal signal to the process.
*
* @param int $signo Signal number to send to process.
* @return void
* @throws \Amp\Process\StatusError If the process is not running.
* @throws \Amp\Process\ProcessException If sending the signal fails.
*
* @throws StatusError If the process is not running.
* @throws ProcessException If sending the signal fails.
*/
public function signal(int $signo) {
if (!$this->isRunning()) {
@ -150,11 +133,16 @@ class Process {
/**
* Returns the PID of the child process.
*
* @return int
* @throws \Amp\Process\StatusError If the process has not started.
* @return Promise<int>
*
* @throws StatusError If the process has not started.
*/
public function getPid(): int {
return $this->handle->pid;
public function getPid(): Promise {
if (!$this->handle) {
throw new StatusError("The process has not been started");
}
return $this->handle->pidDeferred->promise();
}
/**
@ -203,55 +191,48 @@ class Process {
* @return bool
*/
public function isRunning(): bool {
return $this->handle->status === ProcessStatus::RUNNING;
return $this->handle && $this->handle->status !== ProcessStatus::ENDED;
}
/**
* Gets the process input stream (STDIN).
*
* @return \Amp\ByteStream\ResourceOutputStream
* @throws \Amp\Process\StatusError If the process is not running.
* @return ProcessOutputStream
*/
public function getStdin(): ResourceOutputStream {
if (!$this->isRunning()) {
throw new StatusError("The process is not running");
}
return $this->handle->stdin;
public function getStdin(): ProcessOutputStream {
return $this->stdin;
}
/**
* Gets the process output stream (STDOUT).
*
* @return \Amp\ByteStream\ResourceInputStream
* @throws \Amp\Process\StatusError If the process is not running.
* @return ProcessInputStream
*/
public function getStdout(): ResourceInputStream {
public function getStdout(): ProcessInputStream {
if (!$this->isRunning()) {
throw new StatusError("The process is not running");
}
return $this->handle->stdout;
return $this->stdout;
}
/**
* Gets the process error stream (STDERR).
*
* @return \Amp\ByteStream\ResourceInputStream
* @throws \Amp\Process\StatusError If the process is not running.
* @return ProcessInputStream
*/
public function getStderr(): ResourceInputStream {
public function getStderr(): ProcessInputStream {
if (!$this->isRunning()) {
throw new StatusError("The process is not running");
}
return $this->handle->stderr;
return $this->stderr;
}
}
(function() {
(function () {
/** @noinspection PhpUndefinedClassInspection */
self::$processRunner = \strncasecmp(\PHP_OS, "WIN", 3) === 0
? new WindowsProcessRunner()
: new PosixProcessRunner();
? new WindowsProcessRunner
: new PosixProcessRunner;
})->bindTo(null, Process::class)();

View File

@ -0,0 +1,80 @@
<?php
namespace Amp\Process;
use Amp\ByteStream\InputStream;
use Amp\ByteStream\PendingReadError;
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\StreamException;
use Amp\Deferred;
use Amp\Failure;
use Amp\Promise;
class ProcessInputStream implements InputStream {
/** @var Deferred */
private $initialRead;
/** @var bool */
private $shouldClose = false;
/** @var ResourceInputStream */
private $resourceStream;
/** @var StreamException|null */
private $error;
public function __construct(Promise $resourceStreamPromise) {
$resourceStreamPromise->onResolve(function ($error, $resourceStream) {
if ($error) {
$this->error = new StreamException("Failed to launch process", 0, $error);
$this->initialRead->fail($this->error);
return;
}
$this->resourceStream = $resourceStream;
if ($this->shouldClose) {
$this->resourceStream->close();
}
if ($this->initialRead) {
$initialRead = $this->initialRead;
$this->initialRead = null;
$initialRead->resolve($this->shouldClose ? null : "");
}
});
}
/**
* Reads data from the stream.
*
* @return Promise Resolves with a string when new data is available or `null` if the stream has closed.
*
* @throws PendingReadError Thrown if another read operation is still pending.
*/
public function read(): Promise {
if ($this->initialRead) {
throw new PendingReadError;
}
if ($this->error) {
return new Failure($this->error);
}
if ($this->resourceStream) {
return $this->resourceStream->read();
}
$this->initialRead = new Deferred;
return $this->initialRead->promise();
}
public function close() {
$this->shouldClose = true;
if ($this->resourceStream) {
$this->resourceStream->close();
}
}
}

104
lib/ProcessOutputStream.php Normal file
View File

@ -0,0 +1,104 @@
<?php
namespace Amp\Process;
use Amp\ByteStream\ClosedException;
use Amp\ByteStream\OutputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\ByteStream\StreamException;
use Amp\Deferred;
use Amp\Failure;
use Amp\Promise;
class ProcessOutputStream implements OutputStream {
/** @var array */
private $queuedWrites;
/** @var bool */
private $shouldClose = false;
/** @var ResourceOutputStream */
private $resourceStream;
/** @var StreamException|null */
private $error;
public function __construct(Promise $resourceStreamPromise) {
$resourceStreamPromise->onResolve(function ($error, $resourceStream) {
if ($error) {
$this->error = new StreamException("Failed to launch process", 0, $error);
while ($write = array_shift($this->queuedWrites)) {
/** @var $deferred Deferred */
list(, $deferred) = $write;
$deferred->fail($this->error);
}
return;
}
$this->resourceStream = $resourceStream;
$queue = $this->queuedWrites;
$this->queuedWrites = [];
foreach ($queue as list($data, $deferred)) {
$deferred->resolve($this->resourceStream->write($data));
}
if ($this->shouldClose) {
$this->resourceStream->close();
}
});
}
/** @inheritdoc */
public function write(string $data): Promise {
if ($this->resourceStream) {
return $this->resourceStream->write($data);
}
if ($this->error) {
return new Failure($this->error);
}
if ($this->shouldClose) {
throw new ClosedException("Stream has already been closed.");
}
$deferred = new Deferred;
$this->queuedWrites[] = [$data, $deferred];
return $deferred->promise();
}
/** @inheritdoc */
public function end(string $finalData = ""): Promise {
if ($this->resourceStream) {
return $this->resourceStream->end($finalData);
}
if ($this->error) {
return new Failure($this->error);
}
if ($this->shouldClose) {
throw new ClosedException("Stream has already been closed.");
}
$deferred = new Deferred;
$this->queuedWrites[] = [$finalData, $deferred];
$this->shouldClose = true;
return $deferred->promise();
}
public function close() {
$this->shouldClose = true;
if ($this->resourceStream) {
$this->resourceStream->close();
}
}
}

View File

@ -34,7 +34,6 @@ class ProcessTest extends TestCase {
});
}
public function testExecuteResolvesToExitCode() {
Loop::run(function () {
$process = new Process("exit 42");