1
0
mirror of https://github.com/danog/process.git synced 2024-12-02 09:37:55 +01:00

Refactor for full Windows compatibility

This commit is contained in:
Chris Wright 2017-09-14 19:34:18 +02:00 committed by Niklas Keller
parent 5aa6040fcf
commit 068c0f6f9e
22 changed files with 1004 additions and 242 deletions

1
.gitignore vendored
View File

@ -1,3 +1,4 @@
/.idea
build
composer.lock
phpunit.xml

BIN
bin/windows/.gitattributes vendored Normal file

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -26,6 +26,7 @@
"autoload": {
"psr-4": {
"Amp\\Process\\": "lib"
}
},
"files": ["lib/functions.php"]
}
}

View File

@ -6,8 +6,7 @@ use Amp\ByteStream\Message;
use Amp\Process\Process;
Amp\Loop::run(function () {
$process = new Process("echo 'Hello, world!'");
$process->start();
$process = yield Process::start("echo 'Hello, world!'");
echo yield new Message($process->getStdout());

34
examples/ping-many.php Normal file
View File

@ -0,0 +1,34 @@
<?php
include dirname(__DIR__) . "/vendor/autoload.php";
use Amp\Process\Process;
use Amp\Promise;
use function Amp\Promise\all;
function show_process_output(Promise $promise): \Generator
{
/** @var Process $process */
$process = yield $promise;
$stream = $process->getStdout();
while ($chunk = yield $stream->read()) {
echo $chunk;
}
$code = yield $process->join();
echo "Process {$process->getPid()} exited with {$code}\n";
}
Amp\Loop::run(function () {
$hosts = ['8.8.8.8', '8.8.4.4', 'google.com', 'stackoverflow.com', 'github.com'];
$promises = [];
foreach ($hosts as $host) {
$promises[] = new \Amp\Coroutine(show_process_output(Process::start("ping {$host}")));
}
yield all($promises);
});

View File

@ -5,8 +5,7 @@ include dirname(__DIR__) . "/vendor/autoload.php";
use Amp\Process\Process;
Amp\Loop::run(function () {
$process = new Process("echo 1; sleep 1; echo 2; sleep 1; echo 3; exit 42");
$process->start();
$process = yield Process::start("echo 1; sleep 1; echo 2; sleep 1; echo 3; exit 42");
$stream = $process->getStdout();
while ($chunk = yield $stream->read()) {

View File

@ -6,8 +6,7 @@ use Amp\ByteStream\Message;
use Amp\Process\Process;
Amp\Loop::run(function () {
$process = new Process('read ; echo "$REPLY"');
$process->start();
$process = yield Process::start('read ; echo "$REPLY"');
/* send to stdin */
$process->getStdin()->write("abc\n");

View File

@ -0,0 +1,33 @@
<?php
namespace Amp\Process\Internal\Posix;
use Amp\Deferred;
use Amp\Process\Internal\ProcessHandle;
final class Handle extends ProcessHandle
{
public function __construct() {
$this->startDeferred = new Deferred;
$this->endDeferred = new Deferred;
$this->originalParentPid = \getmypid();
}
/** @var Deferred */
public $endDeferred;
/** @var Deferred */
public $startDeferred;
/** @var resource */
public $proc;
/** @var resource[] */
public $pipes;
/** @var string */
public $extraDataPipeWatcher;
/** @var int */
public $originalParentPid;
}

View File

@ -0,0 +1,162 @@
<?php
namespace Amp\Process\Internal\Posix;
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Loop;
use Amp\Process\Internal\ProcessHandle;
use Amp\Process\Internal\ProcessRunner;
use Amp\Process\Internal\ProcessStatus;
use Amp\Process\ProcessException;
use Amp\Promise;
final class Runner implements ProcessRunner
{
const FD_SPEC = [
["pipe", "r"], // stdin
["pipe", "w"], // stdout
["pipe", "w"], // stderr
["pipe", "w"], // exit code pipe
];
public function onProcessEndExtraDataPipeReadable($watcher, $stream, Handle $handle) {
Loop::cancel($watcher);
$handle->status = ProcessStatus::ENDED;
if (!\is_resource($stream) || \feof($stream)) {
$handle->endDeferred->fail(new ProcessException("Process ended unexpectedly"));
} else {
$handle->endDeferred->resolve((int) \rtrim(@\stream_get_contents($stream)));
}
}
public function onProcessStartExtraDataPipeReadable($watcher, $stream, Handle $handle) {
Loop::cancel($watcher);
$pid = \rtrim(@\fgets($stream));
if (!$pid || !\is_numeric($pid)) {
$handle->startDeferred->fail(new ProcessException("Could not determine PID"));
return;
}
$handle->status = ProcessStatus::RUNNING;
$handle->pid = (int) $pid;
$handle->stdin = new ResourceOutputStream($handle->pipes[0]);
$handle->stdout = new ResourceInputStream($handle->pipes[1]);
$handle->stderr = new ResourceInputStream($handle->pipes[2]);
$handle->extraDataPipeWatcher = Loop::onReadable($stream, [$this, 'onProcessEndExtraDataPipeReadable'], $handle);
Loop::unreference($handle->extraDataPipeWatcher);
$handle->startDeferred->resolve($handle);
}
/**
* {@inheritdoc}
*/
public function start(string $command, string $cwd = null, array $env = [], array $options = []): Promise {
$command = \sprintf(
'{ (%s) <&3 3<&- 3>/dev/null & } 3<&0;' .
'pid=$!; echo $pid >&3; wait $pid; RC=$?; echo $RC >&3; exit $RC',
$command
);
$handle = new Handle;
$handle->proc = @\proc_open($command, self::FD_SPEC, $handle->pipes, $cwd ?: null, $env ?: null, $options);
if (!\is_resource($handle->proc)) {
$message = "Could not start process";
if ($error = \error_get_last()) {
$message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]);
}
throw new ProcessException($message);
}
$status = \proc_get_status($handle->proc);
if (!$status) {
\proc_close($handle->proc);
throw new ProcessException("Could not get process status");
}
\stream_set_blocking($handle->pipes[3], false);
/* It's fine to use an instance method here because this object is assigned to a static var in Process and never
needs to be dtor'd before the process ends */
Loop::onReadable($handle->pipes[3], [$this, 'onProcessStartExtraDataPipeReadable'], $handle);
return $handle->startDeferred->promise();
}
/**
* {@inheritdoc}
*/
public function join(ProcessHandle $handle): Promise {
/** @var Handle $handle */
if ($handle->extraDataPipeWatcher !== null) {
Loop::reference($handle->extraDataPipeWatcher);
}
return $handle->endDeferred->promise();
}
/**
* {@inheritdoc}
*/
public function kill(ProcessHandle $handle) {
/** @var Handle $handle */
// Forcefully kill the process using SIGKILL.
if (!\proc_terminate($handle->proc, 9)) {
throw new ProcessException("Terminating process failed");
}
Loop::cancel($handle->extraDataPipeWatcher);
$handle->extraDataPipeWatcher = null;
$handle->status = ProcessStatus::ENDED;
$handle->endDeferred->fail(new ProcessException("The process was killed"));
}
/**
* {@inheritdoc}
*/
public function signal(ProcessHandle $handle, int $signo) {
/** @var Handle $handle */
if (!\proc_terminate($handle->proc, $signo)) {
throw new ProcessException("Sending signal to process failed");
}
}
/**
* {@inheritdoc}
*/
public function destroy(ProcessHandle $handle) {
/** @var Handle $handle */
if (\getmypid() === $handle->originalParentPid && $handle->status < ProcessStatus::ENDED) {
$this->kill($handle);
}
if ($handle->extraDataPipeWatcher !== null) {
Loop::cancel($handle->extraDataPipeWatcher);
}
for ($i = 0; $i < 4; $i++) {
if (\is_resource($handle->pipes[$i] ?? null)) {
\fclose($handle->pipes[$i]);
}
}
if (\is_resource($handle->proc)) {
\proc_close($handle->proc);
}
}
}

View File

@ -0,0 +1,24 @@
<?php
namespace Amp\Process\Internal;
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
abstract class ProcessHandle
{
/** @var ResourceOutputStream */
public $stdin;
/** @var ResourceInputStream */
public $stdout;
/** @var ResourceInputStream */
public $stderr;
/** @var int */
public $pid = 0;
/** @var bool */
public $status = ProcessStatus::STARTING;
}

View File

@ -0,0 +1,55 @@
<?php
namespace Amp\Process\Internal;
use Amp\Promise;
interface ProcessRunner
{
/**
* Start a process using the supplied parameters
*
* @param string $command The command to execute
* @param string|null $cwd The working directory for the child process
* @param array $env Environment variables to pass to the child process
* @param array $options proc_open() options
* @return Promise <ProcessInfo> Succeeds with a process descriptor or fails if the process cannot be started
* @throws \Amp\Process\ProcessException If starting the process fails.
*/
function start(string $command, string $cwd = null, array $env = [], array $options = []): Promise;
/**
* Wait for the child process to end
*
* @param ProcessHandle $handle The process descriptor
* @return Promise <int> Succeeds with exit code of the process or fails if the process is killed.
*/
function join(ProcessHandle $handle): Promise;
/**
* Forcibly end the child process
*
* @param ProcessHandle $handle The process descriptor
* @return void
* @throws \Amp\Process\ProcessException If terminating the process fails
*/
function kill(ProcessHandle $handle);
/**
* Send a signal signal to the child process
*
* @param ProcessHandle $handle The process descriptor
* @param int $signo Signal number to send to process.
* @return void
* @throws \Amp\Process\ProcessException If sending the signal fails.
*/
function signal(ProcessHandle $handle, int $signo);
/**
* Release all resources held by the process handle
*
* @param ProcessHandle $handle The process descriptor
* @return void
*/
function destroy(ProcessHandle $handle);
}

View File

@ -0,0 +1,12 @@
<?php
namespace Amp\Process\Internal;
final class ProcessStatus
{
const STARTING = 0;
const RUNNING = 1;
const ENDED = 2;
private function __construct() { }
}

View File

@ -0,0 +1,41 @@
<?php
namespace Amp\Process\Internal\Windows;
use Amp\Deferred;
use Amp\Process\Internal\ProcessHandle;
final class Handle extends ProcessHandle
{
public function __construct() {
$this->startDeferred = new Deferred;
$this->endDeferred = new Deferred;
}
/** @var Deferred */
public $startDeferred;
/** @var Deferred */
public $endDeferred;
/** @var string */
public $exitCodeWatcher;
/** @var resource */
public $proc;
/** @var int */
public $wrapperPid;
/** @var resource */
public $wrapperStderrPipe;
/** @var resource[] */
public $sockets;
/** @var string */
public $connectTimeoutWatcher;
/** @var string[] */
public $securityTokens;
}

View File

@ -0,0 +1,15 @@
<?php
namespace Amp\Process\Internal\Windows;
final class HandshakeStatus
{
const SUCCESS = 0;
const SIGNAL_UNEXPECTED = 0x01;
const INVALID_STREAM_ID = 0x02;
const INVALID_PROCESS_ID = 0x03;
const DUPLICATE_STREAM_ID = 0x04;
const INVALID_CLIENT_TOKEN = 0x05;
private function __construct() { }
}

View File

@ -0,0 +1,12 @@
<?php
namespace Amp\Process\Internal\Windows;
final class PendingSocketClient
{
public $readWatcher;
public $timeoutWatcher;
public $recievedDataBuffer = '';
public $pid;
public $streamId;
}

View File

@ -0,0 +1,168 @@
<?php
namespace Amp\Process\Internal\Windows;
use Amp\Loop;
use Amp\Process\Internal\ProcessHandle;
use Amp\Process\Internal\ProcessRunner;
use Amp\Process\Internal\ProcessStatus;
use Amp\Process\ProcessException;
use Amp\Promise;
use const Amp\Process\BIN_DIR;
final class Runner implements ProcessRunner
{
const FD_SPEC = [
["pipe", "r"], // stdin
["pipe", "w"], // stdout
["pipe", "w"], // stderr
["pipe", "w"], // exit code pipe
];
const WRAPPER_EXE_PATH = PHP_INT_SIZE === 8
? BIN_DIR . '\\windows\\ProcessWrapper64.exe'
: BIN_DIR . '\\windows\\ProcessWrapper.exe';
private $socketConnector;
private function makeCommand(string $command, string $workingDirectory): string {
$result = sprintf(
'"%s" --address=%s --port=%d --token-size=%d',
self::WRAPPER_EXE_PATH,
$this->socketConnector->address,
$this->socketConnector->port,
SocketConnector::SECURITY_TOKEN_SIZE
);
if ($workingDirectory !== '') {
$result .= ' ' . \escapeshellarg('--cwd=' . \rtrim($workingDirectory, '\\'));
}
$result .= ' ' . $command;
return $result;
}
public function __construct() {
$this->socketConnector = new SocketConnector;
}
/**
* {@inheritdoc}
*/
public function start(string $command, string $cwd = null, array $env = [], array $options = []): Promise
{
$command = $this->makeCommand($command, $cwd ?? '');
$options['bypass_shell'] = true;
$handle = new Handle;
$handle->proc = @\proc_open($command, self::FD_SPEC, $pipes, $cwd ?: null, $env ?: null, $options);
if (!\is_resource($handle->proc)) {
$message = "Could not start process";
if ($error = \error_get_last()) {
$message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]);
}
throw new ProcessException($message);
}
$status = \proc_get_status($handle->proc);
if (!$status) {
\proc_close($handle->proc);
throw new ProcessException("Could not get process status");
}
$securityTokens = \random_bytes(SocketConnector::SECURITY_TOKEN_SIZE * 6);
$written = \fwrite($pipes[0], $securityTokens);
\fclose($pipes[0]);
\fclose($pipes[1]);
if ($written !== SocketConnector::SECURITY_TOKEN_SIZE * 6) {
\fclose($pipes[2]);
\proc_close($handle->proc);
throw new ProcessException("Could not send security tokens to process wrapper");
}
$handle->securityTokens = \str_split($securityTokens, SocketConnector::SECURITY_TOKEN_SIZE);
$handle->wrapperPid = $status['pid'];
$handle->wrapperStderrPipe = $pipes[2];
$this->socketConnector->registerPendingProcess($handle);
return $handle->startDeferred->promise();
}
/**
* {@inheritdoc}
*/
public function join(ProcessHandle $handle): Promise
{
/** @var Handle $handle */
if ($handle->exitCodeWatcher !== null) {
Loop::reference($handle->exitCodeWatcher);
}
return $handle->endDeferred->promise();
}
/**
* {@inheritdoc}
*/
public function kill(ProcessHandle $handle)
{
/** @var Handle $handle */
// todo: send a signal to the wrapper to kill the child instead ?
if (!\proc_terminate($handle->proc)) {
throw new ProcessException("Terminating process failed");
}
Loop::cancel($handle->exitCodeWatcher);
$handle->exitCodeWatcher = null;
$handle->status = ProcessStatus::ENDED;
$handle->endDeferred->fail(new ProcessException("The process was killed"));
}
/**
* {@inheritdoc}
*/
public function signal(ProcessHandle $handle, int $signo)
{
throw new ProcessException('Signals are not supported on Windows');
}
/**
* {@inheritdoc}
*/
public function destroy(ProcessHandle $handle)
{
/** @var Handle $handle */
if ($handle->status < ProcessStatus::ENDED) {
$this->kill($handle);
}
if ($handle->exitCodeWatcher !== null) {
Loop::cancel($handle->exitCodeWatcher);
}
for ($i = 0; $i < 4; $i++) {
if (\is_resource($handle->sockets[$i] ?? null)) {
\fclose($handle->sockets[$i]);
}
}
\stream_get_contents($handle->wrapperStderrPipe);
\fclose($handle->wrapperStderrPipe);
if (\is_resource($handle->proc)) {
\proc_close($handle->proc);
}
}
}

View File

@ -0,0 +1,13 @@
<?php
namespace Amp\Process\Internal\Windows;
final class SignalCode
{
const HANDSHAKE = 0x01;
const HANDSHAKE_ACK = 0x02;
const CHILD_PID = 0x03;
const EXIT_CODE = 0x04;
private function __construct() { }
}

View File

@ -0,0 +1,331 @@
<?php
namespace Amp\Process\Internal\Windows;
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Loop;
use Amp\Process\Internal\ProcessStatus;
use Amp\Process\ProcessException;
final class SocketConnector
{
const SERVER_SOCKET_URI = 'tcp://127.0.0.1:0';
const SECURITY_TOKEN_SIZE = 16;
const CONNECT_TIMEOUT = 1000;
/** @var resource */
private $server;
/** @var PendingSocketClient[] */
private $pendingClients = [];
/** @var Handle[] */
private $pendingProcesses = [];
/** @var string */
public $address;
/** @var int */
public $port;
public function __construct() {
$this->server = \stream_socket_server(
self::SERVER_SOCKET_URI,
$errNo, $errStr,
\STREAM_SERVER_LISTEN | \STREAM_SERVER_BIND
);
if (!$this->server) {
throw new \Error("Failed to create TCP server socket for process wrapper: {$errNo}: {$errStr}");
}
if (!\stream_set_blocking($this->server, false)) {
throw new \Error("Failed to set server socket to non-blocking mode");
}
list($this->address, $this->port) = \explode(':', \stream_socket_get_name($this->server, false));
$this->port = (int)$this->port;
Loop::unreference(Loop::onReadable($this->server, [$this, 'onServerSocketReadable']));
}
private function failClientHandshake($socket, int $code): void
{
\fwrite($socket, \chr(SignalCode::HANDSHAKE_ACK) . \chr($code));
\fclose($socket);
unset($this->pendingClients[(int)$socket]);
}
private function failHandleStart(Handle $handle, string $message, ...$args)
{
Loop::cancel($handle->connectTimeoutWatcher);
unset($this->pendingProcesses[$handle->wrapperPid]);
foreach ($handle->sockets as $socket) {
\fclose($socket);
}
$handle->startDeferred->fail(new ProcessException(\vsprintf($message, $args)));
}
/**
* Read data from a client socket
*
* This method cleans up internal state as appropriate. Returns null if the read fails or needs to be repeated.
*
* @param resource $socket
* @param int $length
* @param PendingSocketClient $state
* @return string|null
*/
private function readDataFromPendingClient($socket, int $length, PendingSocketClient $state)
{
$data = \fread($socket, $length);
if ($data === false || $data === '') {
\fclose($socket);
Loop::cancel($state->readWatcher);
Loop::cancel($state->timeoutWatcher);
unset($this->pendingClients[(int)$socket]);
return null;
}
$data = $state->recievedDataBuffer . $data;
if (\strlen($data) < $length) {
$state->recievedDataBuffer = $data;
return null;
}
$state->recievedDataBuffer = '';
Loop::cancel($state->readWatcher);
Loop::cancel($state->timeoutWatcher);
return $data;
}
public function onReadable_Handshake($watcher, $socket) {
$socketId = (int)$socket;
$pendingClient = $this->pendingClients[$socketId];
if (null === $data = $this->readDataFromPendingClient($socket, self::SECURITY_TOKEN_SIZE + 6, $pendingClient)) {
return;
}
$packet = \unpack('Csignal/Npid/Cstream_id/a*client_token', $data);
// validate the client's handshake
if ($packet['signal'] !== SignalCode::HANDSHAKE) {
$this->failClientHandshake($socket, HandshakeStatus::SIGNAL_UNEXPECTED);
return;
}
if ($packet['stream_id'] > 2) {
$this->failClientHandshake($socket, HandshakeStatus::INVALID_STREAM_ID);
return;
}
if (!isset($this->pendingProcesses[$packet['pid']])) {
$this->failClientHandshake($socket, HandshakeStatus::INVALID_PROCESS_ID);
return;
}
$handle = $this->pendingProcesses[$packet['pid']];
if (isset($handle->sockets[$packet['stream_id']])) {
$this->failClientHandshake($socket, HandshakeStatus::DUPLICATE_STREAM_ID);
\trigger_error(\sprintf(
"%s: Received duplicate socket for process #%s stream #%d",
self::class,
$handle->pid,
$packet['stream_id']
), E_USER_WARNING);
return;
}
if ($packet['client_token'] !== $handle->securityTokens[$packet['stream_id']]) {
$this->failClientHandshake($socket, HandshakeStatus::INVALID_CLIENT_TOKEN);
$this->failHandleStart($handle, "Invalid client security token for stream #%d", $packet['stream_id']);
return;
}
$ackData = \chr(SignalCode::HANDSHAKE_ACK) . \chr(HandshakeStatus::SUCCESS)
. $handle->securityTokens[$packet['stream_id'] + 3];
// Unless we set the security token size so high that it won't fit in the
// buffer, this probably shouldn't ever happen unless something has gone wrong
if (\fwrite($socket, $ackData) !== self::SECURITY_TOKEN_SIZE + 2) {
unset($this->pendingClients[$socketId]);
return;
}
$pendingClient->pid = $packet['pid'];
$pendingClient->streamId = $packet['stream_id'];
$pendingClient->readWatcher = Loop::onReadable($socket, [$this, 'onReadable_HandshakeAck']);
}
public function onReadable_HandshakeAck($watcher, $socket)
{
$socketId = (int)$socket;
$pendingClient = $this->pendingClients[$socketId];
// can happen if the start promise was failed
if (!isset($this->pendingProcesses[$pendingClient->pid])) {
\fclose($socket);
Loop::cancel($watcher);
Loop::cancel($pendingClient->timeoutWatcher);
unset($this->pendingClients[$socketId]);
return;
}
if (null === $data = $this->readDataFromPendingClient($socket, 2, $pendingClient)) {
return;
}
unset($this->pendingClients[$socketId]);
$handle = $this->pendingProcesses[$pendingClient->pid];
$packet = \unpack('Csignal/Cstatus', $data);
if ($packet['signal'] !== SignalCode::HANDSHAKE_ACK || $packet['status'] !== HandshakeStatus::SUCCESS) {
$this->failHandleStart(
$handle, "Client rejected handshake with code %d for stream #%d",
$packet['status'], $pendingClient->streamId
);
return;
}
$handle->sockets[$pendingClient->streamId] = $socket;
if (count($handle->sockets) === 3) {
$pendingClient->readWatcher = Loop::onReadable($handle->sockets[0], [$this, 'onReadable_ChildPid'], $handle);
}
}
public function onReadable_ChildPid($watcher, $socket, Handle $handle)
{
Loop::cancel($watcher);
Loop::cancel($handle->connectTimeoutWatcher);
$data = \fread($socket, 5);
if ($data === false || $data === '') {
$this->failHandleStart($handle, 'Failed to read PID from wrapper: No data received');
return;
}
if (\strlen($data) !== 5) {
$this->failHandleStart(
$handle, 'Failed to read PID from wrapper: Recieved %d of 5 expected bytes', \strlen($data)
);
return;
}
$packet = \unpack('Csignal/Npid', $data);
if ($packet['signal'] !== SignalCode::CHILD_PID) {
$this->failHandleStart(
$handle, "Failed to read PID from wrapper: Unexpected signal code %d", $packet['signal']
);
return;
}
$handle->status = ProcessStatus::RUNNING;
$handle->pid = $packet['pid'];
$handle->stdin = new ResourceOutputStream($handle->sockets[0]);
$handle->stdout = new ResourceInputStream($handle->sockets[1]);
$handle->stderr = new ResourceInputStream($handle->sockets[2]);
$handle->exitCodeWatcher = Loop::onReadable($handle->sockets[0], [$this, 'onReadable_ExitCode'], $handle);
Loop::unreference($handle->exitCodeWatcher);
unset($this->pendingProcesses[$handle->wrapperPid]);
$handle->startDeferred->resolve($handle);
}
public function onReadable_ExitCode($watcher, $socket, Handle $handle)
{
$handle->exitCodeWatcher = null;
Loop::cancel($watcher);
$data = \fread($socket, 5);
if ($data === false || $data === '') {
$handle->status = ProcessStatus::ENDED;
$handle->endDeferred->fail(new ProcessException('Failed to read exit code from wrapper: No data received'));
return;
}
if (\strlen($data) !== 5) {
$handle->status = ProcessStatus::ENDED;
$handle->endDeferred->fail(new ProcessException(
\sprintf('Failed to read exit code from wrapper: Recieved %d of 5 expected bytes', \strlen($data))
));
return;
}
$packet = \unpack('Csignal/Ncode', $data);
if ($packet['signal'] !== SignalCode::EXIT_CODE) {
$this->failHandleStart(
$handle, "Failed to read exit code from wrapper: Unexpected signal code %d", $packet['signal']
);
return;
}
$handle->status = ProcessStatus::ENDED;
$handle->endDeferred->resolve($packet['code']);
}
public function onClientSocketConnectTimeout($watcher, $socket) {
$id = (int)$socket;
Loop::cancel($this->pendingClients[$id]->readWatcher);
unset($this->pendingClients[$id]);
\fclose($socket);
}
public function onServerSocketReadable() {
$socket = \stream_socket_accept($this->server);
if (!\stream_set_blocking($socket, false)) {
throw new \Error("Failed to set client socket to non-blocking mode");
}
$pendingClient = new PendingSocketClient;
$pendingClient->readWatcher = Loop::onReadable($socket, [$this, 'onReadable_Handshake']);
$pendingClient->timeoutWatcher = Loop::delay(self::CONNECT_TIMEOUT, [$this, 'onClientSocketConnectTimeout'], $socket);
$this->pendingClients[(int)$socket] = $pendingClient;
}
public function onProcessConnectTimeout($watcher, Handle $handle) {
$status = \proc_get_status($handle->proc);
$error = null;
if (!$status['running']) {
$error = \stream_get_contents($handle->wrapperStderrPipe);
}
$error = $error ?: 'Process did not connect to server before timeout elapsed';
\fclose($handle->wrapperStderrPipe);
\proc_close($handle->proc);
foreach ($handle->sockets as $socket) {
\fclose($socket);
}
$handle->startDeferred->fail(new ProcessException(\trim($error)));
}
public function registerPendingProcess(Handle $handle)
{
$handle->connectTimeoutWatcher = Loop::delay(self::CONNECT_TIMEOUT, [$this, 'onProcessConnectTimeout'], $handle);
$this->pendingProcesses[$handle->wrapperPid] = $handle;
}
}

View File

@ -5,17 +5,16 @@ namespace Amp\Process;
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Deferred;
use Amp\Delayed;
use Amp\Loop;
use Amp\Process\Internal\Posix\Runner as PosixProcessRunner;
use Amp\Process\Internal\ProcessHandle;
use Amp\Process\Internal\ProcessRunner;
use Amp\Process\Internal\ProcessStatus;
use Amp\Process\Internal\Windows\Runner as WindowsProcessRunner;
use Amp\Promise;
use function Amp\call;
class Process {
/** @var bool */
private static $onWindows;
/** @var resource|null */
private $process;
/** @var ProcessRunner */
private static $processRunner;
/** @var string */
private $command;
@ -29,278 +28,133 @@ class Process {
/** @var array */
private $options;
/** @var \Amp\ByteStream\ResourceOutputStream|null */
private $stdin;
/** @var \Amp\ByteStream\ResourceInputStream|null */
private $stdout;
/** @var \Amp\ByteStream\ResourceInputStream|null */
private $stderr;
/** @var int */
private $pid = 0;
/** @var int */
private $oid = 0;
/** @var \Amp\Deferred|null */
private $deferred;
/** @var string */
private $watcher;
/** @var bool */
private $running = false;
/** @var ProcessHandle */
private $handle;
/**
* @param string|array $command Command to run.
* @param string|null $cwd Working directory or use an empty string to use the working directory of the current
* PHP process.
* @param mixed[] $env Environment variables or use an empty array to inherit from the current PHP process.
* @param mixed[] $options Options for proc_open().
* @param string $command Command to run.
* @param string $cwd Working directory of child process.
* @param array $env Environment variables for child process.
* @param array $options Options for proc_open().
* @param ProcessHandle $handle Handle for the created process.
*/
public function __construct($command, string $cwd = null, array $env = [], array $options = []) {
if (self::$onWindows === null) {
self::$onWindows = \strncasecmp(\PHP_OS, "WIN", 3) === 0;
}
if (\is_array($command)) {
$command = \implode(" ", \array_map("escapeshellarg", $command));
}
private function __construct(string $command, string $cwd, array $env, array $options, ProcessHandle $handle) {
$this->command = $command;
$this->cwd = $cwd ?? "";
foreach ($env as $key => $value) {
if (\is_array($value)) {
throw new \Error("\$env cannot accept array values");
}
$this->env[(string) $key] = (string) $value;
}
$this->cwd = $cwd;
$this->env = $env;
$this->options = $options;
$this->handle = $handle;
}
/**
* Stops the process if it is still running.
*/
public function __destruct() {
if (\getmypid() === $this->oid) {
$this->kill(); // Will only terminate if the process is still running.
}
if ($this->watcher !== null) {
Loop::cancel($this->watcher);
}
if ($this->stdin && \is_resource($resource = $this->stdin->getResource())) {
\fclose($resource);
}
if ($this->stdout && \is_resource($resource = $this->stdout->getResource())) {
\fclose($resource);
}
if ($this->stderr && \is_resource($resource = $this->stderr->getResource())) {
\fclose($resource);
}
if (\is_resource($this->process)) {
\proc_close($this->process);
if ($this->handle !== null) {
self::$processRunner->destroy($this->handle);
}
}
/**
* Resets process values.
* Throw to prevent cloning
*
* @throws \Error
*/
public function __clone() {
$this->process = null;
$this->deferred = null;
$this->watcher = null;
$this->pid = 0;
$this->oid = 0;
$this->stdin = null;
$this->stdout = null;
$this->stderr = null;
$this->running = false;
throw new \Error(self::class . ' instances cannot be cloned');
}
/**
* @throws \Amp\Process\ProcessException If starting the process fails.
* Start a new process.
*
* @param string|string[] $command Command to run.
* @param string|null $cwd Working directory or use an empty string to use the working directory of the current
* PHP process.
* @param mixed[] $env Environment variables or use an empty array to inherit from the current PHP process.
* @param mixed[] $options Options for proc_open().
* @return Promise <Process> Fails with a ProcessException if starting the process fails.
* @throws \Error If the arguments are invalid.
* @throws \Amp\Process\StatusError If the process is already running.
* @throws \Amp\Process\ProcessException If starting the process fails.
*/
public function start() {
if ($this->deferred !== null) {
throw new StatusError("The process has already been started");
}
public static function start($command, string $cwd = null, array $env = [], array $options = []): Promise {
$command = \is_array($command)
? \implode(" ", \array_map("escapeshellarg", $command))
: (string) $command;
$this->deferred = $deferred = new Deferred;
$cwd = $cwd ?? "";
$fd = [
["pipe", "r"], // stdin
["pipe", "w"], // stdout
["pipe", "w"], // stderr
["pipe", "w"], // exit code pipe
];
if (self::$onWindows) {
$command = $this->command;
} else {
$command = \sprintf(
'{ (%s) <&3 3<&- 3>/dev/null & } 3<&0;' .
'pid=$!; echo $pid >&3; wait $pid; RC=$?; echo $RC >&3; exit $RC',
$this->command
);
}
$this->process = @\proc_open($command, $fd, $pipes, $this->cwd ?: null, $this->env ?: null, $this->options);
if (!\is_resource($this->process)) {
$message = "Could not start process";
if ($error = \error_get_last()) {
$message .= \sprintf(" Errno: %d; %s", $error["type"], $error["message"]);
}
$deferred->fail(new ProcessException($message));
return;
}
$this->oid = \getmypid();
$status = \proc_get_status($this->process);
if (!$status) {
\proc_close($this->process);
$this->process = null;
$deferred->fail(new ProcessException("Could not get process status"));
return;
}
if (self::$onWindows) {
$this->pid = $status["pid"];
$exitcode = $status["exitcode"];
} else {
// This blocking read will only block until the process scheduled, generally a few microseconds.
$pid = \rtrim(@\fgets($pipes[3]));
$exitcode = -1;
if (!$pid || !\is_numeric($pid)) {
$deferred->fail(new ProcessException("Could not determine PID"));
return;
$envVars = [];
foreach ($env as $key => $value) {
if (\is_array($value)) {
throw new \Error("\$env cannot accept array values");
}
$this->pid = (int) $pid;
$envVars[(string) $key] = (string) $value;
}
$this->stdin = new ResourceOutputStream($pipes[0]);
$this->stdout = new ResourceInputStream($pipes[1]);
$this->stderr = new ResourceInputStream($pipes[2]);
\stream_set_blocking($pipes[3], false);
$deferred = new Deferred;
$this->running = true;
$process = &$this->process;
$running = &$this->running;
$this->watcher = Loop::onReadable($pipes[3], static function ($watcher, $resource) use (
&$process, &$running, $exitcode, $deferred
) {
Loop::cancel($watcher);
$running = false;
try {
try {
if (self::$onWindows) {
// Avoid a generator on Unix
$code = call(function () use ($exitcode, $process) {
$status = \proc_get_status($process);
while ($status["running"]) {
yield new Delayed(10);
$status = \proc_get_status($process);
}
$code = $exitcode !== -1 ? $exitcode : $status["exitcode"];
return (int) $code;
});
} elseif (!\is_resource($resource) || \feof($resource)) {
throw new ProcessException("Process ended unexpectedly");
} else {
$code = (int) \rtrim(@\stream_get_contents($resource));
}
} finally {
if (\is_resource($resource)) {
\fclose($resource);
}
self::$processRunner->start($command, $cwd, $env, $options)
->onResolve(function($error, $handle) use($deferred, $command, $cwd, $env, $options) {
if ($error) {
$deferred->fail($error);
} else {
$deferred->resolve(new Process($command, $cwd, $env, $options, $handle));
}
} catch (\Throwable $exception) {
$deferred->fail($exception);
return;
}
});
$deferred->resolve($code);
});
Loop::unreference($this->watcher);
return $deferred->promise();
}
/**
* @return \Amp\Promise<int> Succeeds with exit code of the process or fails if the process is killed.
* Wait for the process to end..
*
* @return Promise <int> Succeeds with process exit code or fails with a ProcessException if the process is killed.
*/
public function join(): Promise {
if ($this->deferred === null) {
return self::$processRunner->join($this->handle);
}
/**
* Forcibly end the process.
*
* @return void
* @throws \Amp\Process\StatusError If the process is not running.
* @throws \Amp\Process\ProcessException If terminating the process fails.
*/
public function kill() {
if (!$this->isRunning()) {
throw new StatusError("The process is not running");
}
if ($this->watcher !== null && $this->running) {
Loop::reference($this->watcher);
}
return $this->deferred->promise();
self::$processRunner->kill($this->handle);
}
/**
* {@inheritdoc}
*/
public function kill() {
if ($this->running && \is_resource($this->process)) {
$this->running = false;
// Forcefully kill the process using SIGKILL.
\proc_terminate($this->process, 9);
Loop::cancel($this->watcher);
$this->deferred->fail(new ProcessException("The process was killed"));
}
}
/**
* Sends the given signal to the process.
* Send a signal signal to the process.
*
* @param int $signo Signal number to send to process.
*
* @return void
* @throws \Amp\Process\StatusError If the process is not running.
* @throws \Amp\Process\ProcessException If sending the signal fails.
*/
public function signal(int $signo) {
if (!$this->isRunning()) {
throw new StatusError("The process is not running");
}
\proc_terminate($this->process, $signo);
self::$processRunner->signal($this->handle, $signo);
}
/**
* Returns the PID of the child process. Value is only meaningful if PHP was not compiled with --enable-sigchild.
* Returns the PID of the child process.
*
* @return int
*
* @throws \Amp\Process\StatusError
* @throws \Amp\Process\StatusError If the process has not started.
*/
public function getPid(): int {
if ($this->pid === 0) {
throw new StatusError("The process has not been started");
}
return $this->pid;
return $this->handle->pid;
}
/**
@ -349,51 +203,55 @@ class Process {
* @return bool
*/
public function isRunning(): bool {
return $this->running;
return $this->handle->status === ProcessStatus::RUNNING;
}
/**
* Gets the process input stream (STDIN).
*
* @return \Amp\ByteStream\ResourceOutputStream
*
* @throws \Amp\Process\StatusError If the process is not running.
*/
public function getStdin(): ResourceOutputStream {
if ($this->stdin === null) {
throw new StatusError("The process has not been started");
if (!$this->isRunning()) {
throw new StatusError("The process is not running");
}
return $this->stdin;
return $this->handle->stdin;
}
/**
* Gets the process output stream (STDOUT).
*
* @return \Amp\ByteStream\ResourceInputStream
*
* @throws \Amp\Process\StatusError If the process is not running.
*/
public function getStdout(): ResourceInputStream {
if ($this->stdout === null) {
throw new StatusError("The process has not been started");
if (!$this->isRunning()) {
throw new StatusError("The process is not running");
}
return $this->stdout;
return $this->handle->stdout;
}
/**
* Gets the process error stream (STDERR).
*
* @return \Amp\ByteStream\ResourceInputStream
*
* @throws \Amp\Process\StatusError If the process is not running.
*/
public function getStderr(): ResourceInputStream {
if ($this->stderr === null) {
throw new StatusError("The process has not been started");
if (!$this->isRunning()) {
throw new StatusError("The process is not running");
}
return $this->stderr;
return $this->handle->stderr;
}
}
(function() {
/** @noinspection PhpUndefinedClassInspection */
self::$processRunner = \strncasecmp(\PHP_OS, "WIN", 3) === 0
? new WindowsProcessRunner()
: new PosixProcessRunner();
})->bindTo(null, Process::class)();

5
lib/functions.php Normal file
View File

@ -0,0 +1,5 @@
<?php
namespace Amp\Process;
const BIN_DIR = __DIR__ . DIRECTORY_SEPARATOR . '..' . DIRECTORY_SEPARATOR . 'bin';