1
0
mirror of https://github.com/danog/ipc.git synced 2024-11-26 20:15:05 +01:00

Better error handling and fallbacks

This commit is contained in:
Daniil Gentili 2021-04-21 15:27:22 +02:00
parent 9450af5ca3
commit be76cf8125
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
7 changed files with 131 additions and 68 deletions

1
.gitignore vendored
View File

@ -5,3 +5,4 @@ phpunit.xml
vendor
.php_cs.cache
coverage
.phpunit.result.cache

View File

@ -10,6 +10,10 @@ use Amp\Success;
class IpcServer
{
public const TYPE_AUTO = 0;
public const TYPE_UNIX = 1 << 0;
public const TYPE_FIFO = 1 << 1;
public const TYPE_TCP = 1 << 2;
/** @var resource|null */
private $server;
@ -23,10 +27,10 @@ class IpcServer
private $uri;
/**
* @param string $uri Local endpoint on which to listen for requests
* @param boolean $useFIFO Whether to use FIFOs instead of the more reliable UNIX socket server (CHOSEN AUTOMATICALLY, only for testing purposes)
* @param string $uri Local endpoint on which to listen for requests
* @param self::TYPE_* $type Server type
*/
public function __construct(string $uri = '', bool $useFIFO = false)
public function __construct(string $uri = '', int $type = self::TYPE_AUTO)
{
if (!$uri) {
$suffix = \bin2hex(\random_bytes(10));
@ -39,47 +43,86 @@ class IpcServer
$isWindows = \strncasecmp(\PHP_OS, "WIN", 3) === 0;
if ($isWindows) {
if ($useFIFO) {
throw new \RuntimeException("Cannot use FIFOs on windows");
if ($type === self::TYPE_AUTO || $type === self::TYPE_TCP) {
$types = [self::TYPE_TCP];
} else {
throw new \RuntimeException("Cannot use FIFOs and UNIX sockets on windows");
}
$listenUri = "tcp://127.0.0.1:0";
} elseif ($type === self::TYPE_AUTO) {
$types = [self::TYPE_UNIX, self::TYPE_TCP, self::TYPE_FIFO];
} else {
$types = [];
if ($type & self::TYPE_UNIX) {
$types []= self::TYPE_UNIX;
}
if ($type & self::TYPE_TCP) {
$types []= self::TYPE_TCP;
}
if ($type & self::TYPE_FIFO) {
$types []= self::TYPE_FIFO;
}
$listenUri = "unix://".$uri;
}
if (!$useFIFO) {
try {
$this->server = \stream_socket_server($listenUri, $errno, $errstr, \STREAM_SERVER_BIND | \STREAM_SERVER_LISTEN);
} catch (\Throwable $e) {
$errors = [];
foreach ($types as $type) {
if ($type === self::TYPE_FIFO) {
if (!\posix_mkfifo($uri, 0777)) {
$errors[$type] = "could not create the FIFO socket";
continue;
}
$error = '';
try {
// Open in r+w mode to prevent blocking if there is no reader
$this->server = \fopen($uri, 'r+');
} catch (\Throwable $e) {
$error = "$e";
}
if ($this->server) {
\stream_set_blocking($this->server, false);
break;
}
$errors[$type] = "could not connect to the FIFO socket: $error";
} else {
$listenUri = $type === self::TYPE_TCP ? "tcp://127.0.0.1:0" : "unix://".$uri;
try {
$this->server = \stream_socket_server($listenUri, $errno, $errstr, \STREAM_SERVER_BIND | \STREAM_SERVER_LISTEN);
} catch (\Throwable $e) {
$errno = -1;
$errstr = "exception: $e";
}
if ($this->server) {
if ($type === self::TYPE_TCP) {
$name = \stream_socket_get_name($this->server, false);
$port = \substr($name, \strrpos($name, ":") + 1);
try {
if (!\file_put_contents($this->uri, "tcp://127.0.0.1:".$port)) {
$errors[$type] = 'could not create URI file';
$this->server = null;
}
} catch (\Throwable $e) {
$errors[$type] = "could not create URI file: $e";
$this->server = null;
}
if (!$this->server) {
continue;
}
}
break;
}
$errors[$type] = "(errno: $errno) $errstr";
}
}
$fifo = false;
if (!$this->server) {
if ($isWindows) {
throw new \RuntimeException(\sprintf("Could not create IPC server: (Errno: %d) %s", $errno, $errstr));
}
if (!\posix_mkfifo($uri, 0777)) {
throw new \RuntimeException(\sprintf("Could not create the FIFO socket, and could not create IPC server: (Errno: %d) %s", $errno, $errstr));
}
if (!$this->server = \fopen($uri, 'r+')) { // Open in r+w mode to prevent blocking if there is no reader
throw new \RuntimeException(\sprintf("Could not connect to the FIFO socket, and could not create IPC server: (Errno: %d) %s", $errno, $errstr));
}
\stream_set_blocking($this->server, false);
$fifo = true;
}
if ($isWindows) {
$name = \stream_socket_get_name($this->server, false);
$port = \substr($name, \strrpos($name, ":") + 1);
\file_put_contents($this->uri, "tcp://127.0.0.1:".$port);
throw new IpcServerException($errors);
}
$acceptor = &$this->acceptor;
$this->watcher = Loop::onReadable($this->server, static function (string $watcher, $server) use (&$acceptor, $fifo): void {
if ($fifo) {
$this->watcher = Loop::onReadable($this->server, static function (string $watcher, $server) use (&$acceptor, $type): void {
if ($type === self::TYPE_FIFO) {
$length = \unpack('v', \fread($server, 2))[1];
if (!$length) {
return; // Could not accept, wrong length read

View File

@ -0,0 +1,26 @@
<?php
namespace Amp\Ipc;
/**
* Thrown in case server connection fails.
*/
final class IpcServerException extends \Exception
{
private const TYPE_MAP = [
IpcServer::TYPE_UNIX => 'UNIX',
IpcServer::TYPE_TCP => 'TCP',
IpcServer::TYPE_FIFO => 'FIFO',
];
public function __construct(
array $messages,
int $code = 0,
\Throwable $previous = null
) {
$message = "Could not create IPC server: ";
foreach ($messages as $type => $error) {
$message .= self::TYPE_MAP[$type].": $error; ";
}
parent::__construct($message, $code, $previous);
}
}

View File

@ -1,28 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<phpunit
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="http://schema.phpunit.de/4.1/phpunit.xsd"
backupGlobals="false"
backupStaticAttributes="false"
bootstrap="vendor/autoload.php"
colors="true"
convertErrorsToExceptions="true"
convertNoticesToExceptions="true"
convertWarningsToExceptions="true"
processIsolation="false"
stopOnFailure="false"
>
<testsuites>
<testsuite name="Amp Concurrent">
<directory>test</directory>
</testsuite>
</testsuites>
<filter>
<whitelist>
<directory suffix=".php">lib</directory>
</whitelist>
</filter>
<logging>
<log type="coverage-html" target="build/coverage"/>
</logging>
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="https://schema.phpunit.de/9.3/phpunit.xsd" backupGlobals="false" backupStaticAttributes="false" bootstrap="vendor/autoload.php" colors="true" convertErrorsToExceptions="true" convertNoticesToExceptions="true" convertWarningsToExceptions="true" processIsolation="false" stopOnFailure="false">
<coverage>
<include>
<directory suffix=".php">lib</directory>
</include>
<report>
<html outputDirectory="build/coverage" />
</report>
</coverage>
<testsuites>
<testsuite name="Amp Concurrent">
<directory>test</directory>
</testsuite>
</testsuites>
<logging />
</phpunit>

View File

@ -11,7 +11,7 @@ use Amp\Parallel\Sync\Channel;
use function Amp\delay;
return function (Channel $channel) use ($argv) {
$server = new IpcServer($argv[1], $argv[2] === "1" ? true : false);
$server = new IpcServer($argv[1], (int) $argv[2]);
yield $channel->send($server->getUri());

View File

@ -11,7 +11,7 @@ use Amp\Parallel\Sync\Channel;
use function Amp\delay;
return function (Channel $channel) use ($argv) {
$server = new IpcServer($argv[1], $argv[2] === "1" ? true : false);
$server = new IpcServer($argv[1], (int) $argv[2]);
yield $channel->send($server->getUri());

View File

@ -2,6 +2,7 @@
namespace Amp\Ipc\Test;
use Amp\Ipc\IpcServer;
use Amp\Ipc\Sync\ChannelledSocket;
use Amp\Parallel\Context\Process;
use Amp\PHPUnit\AsyncTestCase;
@ -11,10 +12,10 @@ use function Amp\Ipc\connect;
class IpcTest extends AsyncTestCase
{
/** @dataProvider provideUriFifo */
public function testBasicIPC(string $uri, bool $fifo)
/** @dataProvider provideUriType */
public function testBasicIPC(string $uri, int $type)
{
$process = new Process([__DIR__.'/Fixtures/server.php', $uri, $fifo]);
$process = new Process([__DIR__.'/Fixtures/server.php', $uri, $type]);
yield $process->start();
$recvUri = yield $process->receive();
@ -33,10 +34,10 @@ class IpcTest extends AsyncTestCase
$this->assertNull(yield $process->join());
}
/** @dataProvider provideUriFifo */
public function testIPCDisconectWhileReading(string $uri, bool $fifo)
/** @dataProvider provideUriType */
public function testIPCDisconectWhileReading(string $uri, int $type)
{
$process = new Process([__DIR__.'/Fixtures/echoServer.php', $uri, $fifo]);
$process = new Process([__DIR__.'/Fixtures/echoServer.php', $uri, $type]);
yield $process->start();
$recvUri = yield $process->receive();
@ -57,14 +58,17 @@ class IpcTest extends AsyncTestCase
$this->assertNull(yield $process->join());
}
public function provideUriFifo(): \Generator
public function provideUriType(): \Generator
{
foreach (['', \sys_get_temp_dir().'/pony'] as $uri) {
if (\strncasecmp(\PHP_OS, "WIN", 3) === 0) {
yield [$uri, false];
yield [$uri, IpcServer::TYPE_AUTO];
yield [$uri, IpcServer::TYPE_TCP];
} else {
yield [$uri, true];
yield [$uri, false];
yield [$uri, IpcServer::TYPE_AUTO];
yield [$uri, IpcServer::TYPE_TCP];
yield [$uri, IpcServer::TYPE_UNIX];
yield [$uri, IpcServer::TYPE_FIFO];
}
}
}