1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-26 20:34:40 +01:00

Implement FIFO process hub

This commit is contained in:
Daniil Gentili 2020-02-10 23:33:17 +01:00
parent 72dd3a495f
commit 76853f0b78
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
5 changed files with 132 additions and 23 deletions

View File

@ -33,7 +33,12 @@ class ProcessHub
/** @var string|null */
private $toUnlink;
public function __construct()
/**
* Constructor.
*
* @param boolean $useFIFO Whether to use FIFOs instead of the more reliable UNIX socket server (CHOSEN AUTOMATICALLY, only for testing purposes)
*/
public function __construct(bool $useFIFO = false)
{
$isWindows = \strncasecmp(\PHP_OS, "WIN", 3) === 0;
@ -41,32 +46,73 @@ class ProcessHub
$this->uri = "tcp://127.0.0.1:0";
} else {
$suffix = \bin2hex(\random_bytes(10));
$path = \sys_get_temp_dir() . "/amp-parallel-ipc-" . $suffix . ".sock";
$this->uri = "unix://" . $path;
$path = \sys_get_temp_dir()."/amp-parallel-ipc-".$suffix.".sock";
$this->uri = "unix://".$path;
$this->toUnlink = $path;
}
$this->server = \stream_socket_server($this->uri, $errno, $errstr, \STREAM_SERVER_BIND | \STREAM_SERVER_LISTEN);
if (!$useFIFO) {
$this->server = \stream_socket_server($this->uri, $errno, $errstr, \STREAM_SERVER_BIND | \STREAM_SERVER_LISTEN);
}
$fifo = false;
if (!$this->server) {
throw new \RuntimeException(\sprintf("Could not create IPC server: (Errno: %d) %s", $errno, $errstr));
if ($isWindows) {
throw new \RuntimeException(\sprintf("Could not create IPC server: (Errno: %d) %s", $errno, $errstr));
}
if (!\posix_mkfifo($path, 0777)) {
throw new \RuntimeException(\sprintf("Could not create the FIFO socket, and could not create IPC server: (Errno: %d) %s", $errno, $errstr));
}
if (!$this->server = \fopen($path, 'r+')) {
throw new \RuntimeException(\sprintf("Could not connect to the FIFO socket, and could not create IPC server: (Errno: %d) %s", $errno, $errstr));
}
\stream_set_blocking($this->server, false);
$fifo = true;
$this->uri = $path;
}
if ($isWindows) {
$name = \stream_socket_get_name($this->server, false);
$port = \substr($name, \strrpos($name, ":") + 1);
$this->uri = "tcp://127.0.0.1:" . $port;
$this->uri = "tcp://127.0.0.1:".$port;
}
$keys = &$this->keys;
$acceptor = &$this->acceptor;
$this->watcher = Loop::onReadable($this->server, static function (string $watcher, $server) use (&$keys, &$acceptor): \Generator {
// Error reporting suppressed since stream_socket_accept() emits E_WARNING on client accept failure.
if (!$client = @\stream_socket_accept($server, 0)) { // Timeout of 0 to be non-blocking.
return; // Accepting client failed.
$this->watcher = Loop::onReadable($this->server, static function (string $watcher, $server) use (&$keys, &$acceptor, &$fifo): \Generator {
if ($fifo) {
$length = \ord(\fread($server, 1));
if (!$length) {
return; // Could not accept, wrong length read
}
$prefix = \fread($server, $length);
$sockets = [
$prefix."1",
$prefix."2",
];
foreach ($sockets as $k => &$socket) {
if (@\filetype($socket) !== 'fifo') {
if ($k) {
\fclose($sockets[0]);
}
return; // Is not a FIFO
}
if (!$socket = \fopen($socket, $k ? 'w' : 'r')) {
if ($k) {
\fclose($sockets[0]);
}
return; // Could not open fifo
}
}
$channel = new ChannelledSocket(...$sockets);
} else {
// Error reporting suppressed since stream_socket_accept() emits E_WARNING on client accept failure.
if (!$client = @\stream_socket_accept($server, 0)) { // Timeout of 0 to be non-blocking.
return; // Accepting client failed.
}
$channel = new ChannelledSocket($client, $client);
}
$channel = new ChannelledSocket($client, $client);
try {
$received = yield Promise\timeout($channel->receive(), self::KEY_RECEIVE_TIMEOUT);

View File

@ -17,8 +17,8 @@ if (\function_exists("cli_set_process_title")) {
(function (): void {
$paths = [
\dirname(__DIR__, 5) . "/autoload.php",
\dirname(__DIR__, 3) . "/vendor/autoload.php",
\dirname(__DIR__, 5)."/autoload.php",
\dirname(__DIR__, 3)."/vendor/autoload.php",
];
foreach ($paths as $path) {
@ -29,7 +29,7 @@ if (\function_exists("cli_set_process_title")) {
}
if (!isset($autoloadPath)) {
\trigger_error("Could not locate autoload.php in any of the following files: " . \implode(", ", $paths), E_USER_ERROR);
\trigger_error("Could not locate autoload.php in any of the following files: ".\implode(", ", $paths), E_USER_ERROR);
exit(1);
}
@ -61,12 +61,58 @@ if (\function_exists("cli_set_process_title")) {
$key .= $chunk;
} while (\strlen($key) < Process::KEY_LENGTH);
if (!$socket = \stream_socket_client($uri, $errno, $errstr, 5, \STREAM_CLIENT_CONNECT)) {
\trigger_error("Could not connect to IPC socket", E_USER_ERROR);
exit(1);
if (\strpos($uri, 'tcp://') === false && \strpos($uri, 'unix://') === false) {
$suffix = \bin2hex(\random_bytes(10));
$prefix = \sys_get_temp_dir()."/amp-".$suffix.".fifo";
if (\strlen($prefix) > 0xFF) {
\trigger_error("Prefix is too long!", E_USER_ERROR);
exit(1);
}
$sockets = [
$prefix."2",
$prefix."1",
];
foreach ($sockets as $k => &$socket) {
if (!\posix_mkfifo($socket, 0777)) {
\trigger_error("Could not create FIFO client socket", E_USER_ERROR);
exit(1);
}
\register_shutdown_function(static function () use ($socket): void {
@\unlink($socket);
});
if (!$socket = \fopen($socket, 'r+')) { // Open in either read or write mode to send a close signal when done
\trigger_error("Could not open FIFO client socket", E_USER_ERROR);
exit(1);
}
}
if (!$tempSocket = \fopen($uri, 'r+')) {
\trigger_error("Could not connect to FIFO server", E_USER_ERROR);
exit(1);
}
\stream_set_blocking($tempSocket, false);
\stream_set_write_buffer($tempSocket, 0);
if (!\fwrite($tempSocket, \chr(\strlen($prefix)).$prefix)) {
\trigger_error("Failure sending request to FIFO server", E_USER_ERROR);
exit(1);
}
\fclose($tempSocket);
$tempSocket = null;
$channel = new Sync\ChannelledSocket(...$sockets);
} else {
if (!$socket = \stream_socket_client($uri, $errno, $errstr, 5, \STREAM_CLIENT_CONNECT)) {
\trigger_error("Could not connect to IPC socket", E_USER_ERROR);
exit(1);
}
$channel = new Sync\ChannelledSocket($socket, $socket);
}
$channel = new Sync\ChannelledSocket($socket, $socket);
try {
Promise\wait($channel->send($key));
@ -92,7 +138,7 @@ if (\function_exists("cli_set_process_title")) {
} catch (\TypeError $exception) {
throw new \Error(\sprintf("Script '%s' did not return a callable function", $argv[0]), 0, $exception);
} catch (\ParseError $exception) {
throw new \Error(\sprintf("Script '%s' contains a parse error: " . $exception->getMessage(), $argv[0]), 0, $exception);
throw new \Error(\sprintf("Script '%s' contains a parse error: ".$exception->getMessage(), $argv[0]), 0, $exception);
}
$result = new Sync\ExitSuccess(Promise\wait(call($callable, $channel)));

View File

@ -61,14 +61,15 @@ final class Process implements Context
* @param string|null $cwd Working directory.
* @param mixed[] $env Array of environment variables.
* @param string $binary Path to PHP binary. Null will attempt to automatically locate the binary.
* @param boolean $useFIFO Whether to use FIFOs instead of the more reliable UNIX socket server (CHOSEN AUTOMATICALLY, only for testing purposes)
*
* @throws \Error If the PHP binary path given cannot be found or is not executable.
*/
public function __construct($script, string $cwd = null, array $env = [], string $binary = null)
public function __construct($script, string $cwd = null, array $env = [], string $binary = null, bool $useFIFO = false)
{
$this->hub = Loop::getState(self::class);
if (!$this->hub instanceof Internal\ProcessHub) {
$this->hub = new Internal\ProcessHub;
$this->hub = new Internal\ProcessHub($useFIFO);
Loop::setState(self::class, $this->hub);
}

View File

@ -94,10 +94,12 @@ abstract class AbstractContextTest extends AsyncTestCase
$this->expectException(ContextException::class);
$this->expectExceptionMessage('Failed to receive result');
$context = $this->createContext([
$context = $this->createContext(
[
__DIR__ . "/Fixtures/sleep-process.php",
5,
]);
]
);
yield $context->start();
yield new Delayed(100);
$promise = $context->join();

View File

@ -0,0 +1,14 @@
<?php
namespace Amp\Parallel\Test\Context;
use Amp\Parallel\Context\Context;
use Amp\Parallel\Context\Process;
class ProcessFifoTest extends AbstractContextTest
{
public function createContext($script): Context
{
return new Process($script, null, [], null, true);
}
}