1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-26 12:24:40 +01:00

Merge branch 'fifo'

This commit is contained in:
Daniil Gentili 2020-02-11 17:57:01 +01:00
commit bde5900826
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
5 changed files with 29 additions and 22 deletions

View File

@ -34,8 +34,6 @@ class ProcessHub
private $toUnlink;
/**
* Constructor.
*
* @param boolean $useFIFO Whether to use FIFOs instead of the more reliable UNIX socket server (CHOSEN AUTOMATICALLY, only for testing purposes)
*/
public function __construct(bool $useFIFO = false)
@ -49,8 +47,8 @@ class ProcessHub
$this->uri = "tcp://127.0.0.1:0";
} else {
$suffix = \bin2hex(\random_bytes(10));
$path = \sys_get_temp_dir()."/amp-parallel-ipc-".$suffix.".sock";
$this->uri = "unix://".$path;
$path = \sys_get_temp_dir() . "/amp-parallel-ipc-" . $suffix . ".sock";
$this->uri = "unix://" . $path;
$this->toUnlink = $path;
}
@ -66,7 +64,7 @@ class ProcessHub
if (!\posix_mkfifo($path, 0777)) {
throw new \RuntimeException(\sprintf("Could not create the FIFO socket, and could not create IPC server: (Errno: %d) %s", $errno, $errstr));
}
if (!$this->server = \fopen($path, 'r+')) {
if (!$this->server = \fopen($path, 'r+')) { // Open in r+w mode to prevent blocking if there is no reader
throw new \RuntimeException(\sprintf("Could not connect to the FIFO socket, and could not create IPC server: (Errno: %d) %s", $errno, $errstr));
}
\stream_set_blocking($this->server, false);
@ -77,22 +75,24 @@ class ProcessHub
if ($isWindows) {
$name = \stream_socket_get_name($this->server, false);
$port = \substr($name, \strrpos($name, ":") + 1);
$this->uri = "tcp://127.0.0.1:".$port;
$this->uri = "tcp://127.0.0.1:" . $port;
}
$keys = &$this->keys;
$acceptor = &$this->acceptor;
$this->watcher = Loop::onReadable($this->server, static function (string $watcher, $server) use (&$keys, &$acceptor, &$fifo): \Generator {
$this->watcher = Loop::onReadable($this->server, static function (string $watcher, $server) use (&$keys, &$acceptor, $fifo): \Generator {
if ($fifo) {
$length = \ord(\fread($server, 1));
$length = \unpack('v', \fread($server, 2))[1];
if (!$length) {
return; // Could not accept, wrong length read
}
$prefix = \fread($server, $length);
$sockets = [
$prefix."1",
$prefix."2",
$prefix . '1',
$prefix . '2',
];
foreach ($sockets as $k => &$socket) {
if (@\filetype($socket) !== 'fifo') {
if ($k) {
@ -100,6 +100,8 @@ class ProcessHub
}
return; // Is not a FIFO
}
// Open in either read or write mode to send a close signal when done
if (!$socket = \fopen($socket, $k ? 'w' : 'r')) {
if ($k) {
\fclose($sockets[0]);
@ -116,7 +118,6 @@ class ProcessHub
$channel = new ChannelledSocket($client, $client);
}
try {
$received = yield Promise\timeout($channel->receive(), self::KEY_RECEIVE_TIMEOUT);
} catch (\Throwable $exception) {

View File

@ -65,7 +65,7 @@ if (\function_exists("cli_set_process_title")) {
$suffix = \bin2hex(\random_bytes(10));
$prefix = \sys_get_temp_dir()."/amp-".$suffix.".fifo";
if (\strlen($prefix) > 0xFF) {
if (\strlen($prefix) > 0xFFFF) {
\trigger_error("Prefix is too long!", E_USER_ERROR);
exit(1);
}
@ -84,20 +84,20 @@ if (\function_exists("cli_set_process_title")) {
@\unlink($socket);
});
if (!$socket = \fopen($socket, 'r+')) { // Open in either read or write mode to send a close signal when done
if (!$socket = \fopen($socket, 'r+')) { // Open in r+w mode to prevent blocking if there is no reader
\trigger_error("Could not open FIFO client socket", E_USER_ERROR);
exit(1);
}
}
if (!$tempSocket = \fopen($uri, 'r+')) {
if (!$tempSocket = \fopen($uri, 'r+')) { // Open in r+w mode to prevent blocking if there is no reader
\trigger_error("Could not connect to FIFO server", E_USER_ERROR);
exit(1);
}
\stream_set_blocking($tempSocket, false);
\stream_set_write_buffer($tempSocket, 0);
if (!\fwrite($tempSocket, \chr(\strlen($prefix)).$prefix)) {
if (!\fwrite($tempSocket, \pack('v', \strlen($prefix)).$prefix)) {
\trigger_error("Failure sending request to FIFO server", E_USER_ERROR);
exit(1);
}

View File

@ -59,15 +59,14 @@ final class Process implements Context
* @param string|null $cwd Working directory.
* @param mixed[] $env Array of environment variables.
* @param string $binary Path to PHP binary. Null will attempt to automatically locate the binary.
* @param boolean $useFIFO Whether to use FIFOs instead of the more reliable UNIX socket server (CHOSEN AUTOMATICALLY, only for testing purposes)
*
* @throws \Error If the PHP binary path given cannot be found or is not executable.
*/
public function __construct($script, string $cwd = null, array $env = [], string $binary = null, bool $useFIFO = false)
public function __construct($script, string $cwd = null, array $env = [], string $binary = null)
{
$this->hub = Loop::getState(self::class);
if (!$this->hub instanceof Internal\ProcessHub) {
$this->hub = new Internal\ProcessHub($useFIFO);
$this->hub = new Internal\ProcessHub;
Loop::setState(self::class, $this->hub);
}

View File

@ -2,16 +2,20 @@
namespace Amp\Parallel\Test\Context;
use Amp\Loop;
use Amp\Parallel\Context\Context;
use Amp\Parallel\Context\Internal\ProcessHub;
use Amp\Parallel\Context\Process;
/**
* @requires OS Linux
*/
class ProcessFifoTest extends AbstractContextTest
{
public function createContext($script): Context
{
return new Process($script, null, [], null, true);
if (\strncasecmp(\PHP_OS, "WIN", 3) === 0) {
$this->markTestSkipped('FIFO pipes do not work on Windows');
}
Loop::setState(Process::class, new ProcessHub(true)); // Manually set ProcessHub using FIFO pipes.
return new Process($script);
}
}

View File

@ -2,13 +2,16 @@
namespace Amp\Parallel\Test\Context;
use Amp\Loop;
use Amp\Parallel\Context\Context;
use Amp\Parallel\Context\Internal\ProcessHub;
use Amp\Parallel\Context\Process;
class ProcessTest extends AbstractContextTest
{
public function createContext($script): Context
{
Loop::setState(Process::class, new ProcessHub(false)); // Manually set ProcessHub using socket server.
return new Process($script);
}
}