From 76853f0b78350e4b6eca9f0e51c72d1aca3eda49 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Mon, 10 Feb 2020 23:33:17 +0100 Subject: [PATCH] Implement FIFO process hub --- lib/Context/Internal/ProcessHub.php | 68 +++++++++++++++++++++---- lib/Context/Internal/process-runner.php | 62 +++++++++++++++++++--- lib/Context/Process.php | 5 +- test/Context/AbstractContextTest.php | 6 ++- test/Context/ProcessFifoTest.php | 14 +++++ 5 files changed, 132 insertions(+), 23 deletions(-) create mode 100644 test/Context/ProcessFifoTest.php diff --git a/lib/Context/Internal/ProcessHub.php b/lib/Context/Internal/ProcessHub.php index 612d5bc..fe0a9af 100644 --- a/lib/Context/Internal/ProcessHub.php +++ b/lib/Context/Internal/ProcessHub.php @@ -33,7 +33,12 @@ class ProcessHub /** @var string|null */ private $toUnlink; - public function __construct() + /** + * Constructor. + * + * @param boolean $useFIFO Whether to use FIFOs instead of the more reliable UNIX socket server (CHOSEN AUTOMATICALLY, only for testing purposes) + */ + public function __construct(bool $useFIFO = false) { $isWindows = \strncasecmp(\PHP_OS, "WIN", 3) === 0; @@ -41,32 +46,73 @@ class ProcessHub $this->uri = "tcp://127.0.0.1:0"; } else { $suffix = \bin2hex(\random_bytes(10)); - $path = \sys_get_temp_dir() . "/amp-parallel-ipc-" . $suffix . ".sock"; - $this->uri = "unix://" . $path; + $path = \sys_get_temp_dir()."/amp-parallel-ipc-".$suffix.".sock"; + $this->uri = "unix://".$path; $this->toUnlink = $path; } - $this->server = \stream_socket_server($this->uri, $errno, $errstr, \STREAM_SERVER_BIND | \STREAM_SERVER_LISTEN); + if (!$useFIFO) { + $this->server = \stream_socket_server($this->uri, $errno, $errstr, \STREAM_SERVER_BIND | \STREAM_SERVER_LISTEN); + } + $fifo = false; if (!$this->server) { - throw new \RuntimeException(\sprintf("Could not create IPC server: (Errno: %d) %s", $errno, $errstr)); + if ($isWindows) { + throw new \RuntimeException(\sprintf("Could not create IPC server: (Errno: %d) %s", $errno, $errstr)); + } + if (!\posix_mkfifo($path, 0777)) { + throw new \RuntimeException(\sprintf("Could not create the FIFO socket, and could not create IPC server: (Errno: %d) %s", $errno, $errstr)); + } + if (!$this->server = \fopen($path, 'r+')) { + throw new \RuntimeException(\sprintf("Could not connect to the FIFO socket, and could not create IPC server: (Errno: %d) %s", $errno, $errstr)); + } + \stream_set_blocking($this->server, false); + $fifo = true; + $this->uri = $path; } if ($isWindows) { $name = \stream_socket_get_name($this->server, false); $port = \substr($name, \strrpos($name, ":") + 1); - $this->uri = "tcp://127.0.0.1:" . $port; + $this->uri = "tcp://127.0.0.1:".$port; } $keys = &$this->keys; $acceptor = &$this->acceptor; - $this->watcher = Loop::onReadable($this->server, static function (string $watcher, $server) use (&$keys, &$acceptor): \Generator { - // Error reporting suppressed since stream_socket_accept() emits E_WARNING on client accept failure. - if (!$client = @\stream_socket_accept($server, 0)) { // Timeout of 0 to be non-blocking. - return; // Accepting client failed. + $this->watcher = Loop::onReadable($this->server, static function (string $watcher, $server) use (&$keys, &$acceptor, &$fifo): \Generator { + if ($fifo) { + $length = \ord(\fread($server, 1)); + if (!$length) { + return; // Could not accept, wrong length read + } + $prefix = \fread($server, $length); + $sockets = [ + $prefix."1", + $prefix."2", + ]; + foreach ($sockets as $k => &$socket) { + if (@\filetype($socket) !== 'fifo') { + if ($k) { + \fclose($sockets[0]); + } + return; // Is not a FIFO + } + if (!$socket = \fopen($socket, $k ? 'w' : 'r')) { + if ($k) { + \fclose($sockets[0]); + } + return; // Could not open fifo + } + } + $channel = new ChannelledSocket(...$sockets); + } else { + // Error reporting suppressed since stream_socket_accept() emits E_WARNING on client accept failure. + if (!$client = @\stream_socket_accept($server, 0)) { // Timeout of 0 to be non-blocking. + return; // Accepting client failed. + } + $channel = new ChannelledSocket($client, $client); } - $channel = new ChannelledSocket($client, $client); try { $received = yield Promise\timeout($channel->receive(), self::KEY_RECEIVE_TIMEOUT); diff --git a/lib/Context/Internal/process-runner.php b/lib/Context/Internal/process-runner.php index f83e6f8..92d3ffd 100644 --- a/lib/Context/Internal/process-runner.php +++ b/lib/Context/Internal/process-runner.php @@ -17,8 +17,8 @@ if (\function_exists("cli_set_process_title")) { (function (): void { $paths = [ - \dirname(__DIR__, 5) . "/autoload.php", - \dirname(__DIR__, 3) . "/vendor/autoload.php", + \dirname(__DIR__, 5)."/autoload.php", + \dirname(__DIR__, 3)."/vendor/autoload.php", ]; foreach ($paths as $path) { @@ -29,7 +29,7 @@ if (\function_exists("cli_set_process_title")) { } if (!isset($autoloadPath)) { - \trigger_error("Could not locate autoload.php in any of the following files: " . \implode(", ", $paths), E_USER_ERROR); + \trigger_error("Could not locate autoload.php in any of the following files: ".\implode(", ", $paths), E_USER_ERROR); exit(1); } @@ -61,12 +61,58 @@ if (\function_exists("cli_set_process_title")) { $key .= $chunk; } while (\strlen($key) < Process::KEY_LENGTH); - if (!$socket = \stream_socket_client($uri, $errno, $errstr, 5, \STREAM_CLIENT_CONNECT)) { - \trigger_error("Could not connect to IPC socket", E_USER_ERROR); - exit(1); + if (\strpos($uri, 'tcp://') === false && \strpos($uri, 'unix://') === false) { + $suffix = \bin2hex(\random_bytes(10)); + $prefix = \sys_get_temp_dir()."/amp-".$suffix.".fifo"; + + if (\strlen($prefix) > 0xFF) { + \trigger_error("Prefix is too long!", E_USER_ERROR); + exit(1); + } + + $sockets = [ + $prefix."2", + $prefix."1", + ]; + foreach ($sockets as $k => &$socket) { + if (!\posix_mkfifo($socket, 0777)) { + \trigger_error("Could not create FIFO client socket", E_USER_ERROR); + exit(1); + } + + \register_shutdown_function(static function () use ($socket): void { + @\unlink($socket); + }); + + if (!$socket = \fopen($socket, 'r+')) { // Open in either read or write mode to send a close signal when done + \trigger_error("Could not open FIFO client socket", E_USER_ERROR); + exit(1); + } + } + + if (!$tempSocket = \fopen($uri, 'r+')) { + \trigger_error("Could not connect to FIFO server", E_USER_ERROR); + exit(1); + } + \stream_set_blocking($tempSocket, false); + \stream_set_write_buffer($tempSocket, 0); + + if (!\fwrite($tempSocket, \chr(\strlen($prefix)).$prefix)) { + \trigger_error("Failure sending request to FIFO server", E_USER_ERROR); + exit(1); + } + \fclose($tempSocket); + $tempSocket = null; + + $channel = new Sync\ChannelledSocket(...$sockets); + } else { + if (!$socket = \stream_socket_client($uri, $errno, $errstr, 5, \STREAM_CLIENT_CONNECT)) { + \trigger_error("Could not connect to IPC socket", E_USER_ERROR); + exit(1); + } + $channel = new Sync\ChannelledSocket($socket, $socket); } - $channel = new Sync\ChannelledSocket($socket, $socket); try { Promise\wait($channel->send($key)); @@ -92,7 +138,7 @@ if (\function_exists("cli_set_process_title")) { } catch (\TypeError $exception) { throw new \Error(\sprintf("Script '%s' did not return a callable function", $argv[0]), 0, $exception); } catch (\ParseError $exception) { - throw new \Error(\sprintf("Script '%s' contains a parse error: " . $exception->getMessage(), $argv[0]), 0, $exception); + throw new \Error(\sprintf("Script '%s' contains a parse error: ".$exception->getMessage(), $argv[0]), 0, $exception); } $result = new Sync\ExitSuccess(Promise\wait(call($callable, $channel))); diff --git a/lib/Context/Process.php b/lib/Context/Process.php index 55fdbb4..498ffe2 100644 --- a/lib/Context/Process.php +++ b/lib/Context/Process.php @@ -61,14 +61,15 @@ final class Process implements Context * @param string|null $cwd Working directory. * @param mixed[] $env Array of environment variables. * @param string $binary Path to PHP binary. Null will attempt to automatically locate the binary. + * @param boolean $useFIFO Whether to use FIFOs instead of the more reliable UNIX socket server (CHOSEN AUTOMATICALLY, only for testing purposes) * * @throws \Error If the PHP binary path given cannot be found or is not executable. */ - public function __construct($script, string $cwd = null, array $env = [], string $binary = null) + public function __construct($script, string $cwd = null, array $env = [], string $binary = null, bool $useFIFO = false) { $this->hub = Loop::getState(self::class); if (!$this->hub instanceof Internal\ProcessHub) { - $this->hub = new Internal\ProcessHub; + $this->hub = new Internal\ProcessHub($useFIFO); Loop::setState(self::class, $this->hub); } diff --git a/test/Context/AbstractContextTest.php b/test/Context/AbstractContextTest.php index bca578e..a538b8e 100644 --- a/test/Context/AbstractContextTest.php +++ b/test/Context/AbstractContextTest.php @@ -94,10 +94,12 @@ abstract class AbstractContextTest extends AsyncTestCase $this->expectException(ContextException::class); $this->expectExceptionMessage('Failed to receive result'); - $context = $this->createContext([ + $context = $this->createContext( + [ __DIR__ . "/Fixtures/sleep-process.php", 5, - ]); + ] + ); yield $context->start(); yield new Delayed(100); $promise = $context->join(); diff --git a/test/Context/ProcessFifoTest.php b/test/Context/ProcessFifoTest.php new file mode 100644 index 0000000..91ee661 --- /dev/null +++ b/test/Context/ProcessFifoTest.php @@ -0,0 +1,14 @@ +