1
0
mirror of https://github.com/danog/parallel.git synced 2024-12-11 08:39:48 +01:00
parallel/lib/Context/Internal/ProcessHub.php

101 lines
2.8 KiB
PHP
Raw Normal View History

<?php
namespace Amp\Parallel\Context\Internal;
use Amp\Deferred;
use Amp\Loop;
2018-10-09 16:12:23 +02:00
use Amp\Parallel\Context\ContextException;
use Amp\Parallel\Sync\ChannelledSocket;
use Amp\Promise;
2018-10-09 16:12:23 +02:00
use Amp\TimeoutException;
use function Amp\call;
2018-10-07 16:50:45 +02:00
class ProcessHub
{
2018-10-09 16:12:23 +02:00
const PROCESS_START_TIMEOUT = 5000;
/** @var resource|null */
private $server;
/** @var string|null */
private $uri;
/** @var string|null */
private $watcher;
/** @var Deferred|null */
private $acceptor;
2018-10-07 16:50:45 +02:00
public function __construct()
{
2018-10-09 16:12:23 +02:00
$isWindows = \strncasecmp(\PHP_OS, "WIN", 3) === 0;
if ($isWindows) {
$this->uri = "tcp://localhost:0";
} else {
$this->uri = "unix://" . \tempnam(\sys_get_temp_dir(), "amp-parallel-ipc-") . ".sock";
}
$this->server = \stream_socket_server($this->uri, $errno, $errstr, \STREAM_SERVER_BIND | \STREAM_SERVER_LISTEN);
2018-10-09 16:12:23 +02:00
if (!$this->server) {
throw new \RuntimeException(\sprintf("Could not create IPC server: (Errno: %d) %s", $errno, $errstr));
}
if ($isWindows) {
$name = \stream_socket_get_name($this->server, false);
$port = \substr($name, \strrpos($name, ":") + 1);
$this->uri = "tcp://localhost:" . $port;
}
$acceptor = &$this->acceptor;
$this->watcher = Loop::onReadable($this->server, static function (string $watcher, $server) use (&$acceptor) {
// Error reporting suppressed since stream_socket_accept() emits E_WARNING on client accept failure.
if (!$client = @\stream_socket_accept($server, 0)) { // Timeout of 0 to be non-blocking.
return; // Accepting client failed.
}
$deferred = $acceptor;
$acceptor = null;
$deferred->resolve(new ChannelledSocket($client, $client));
if (!$acceptor) {
Loop::disable($watcher);
}
});
Loop::disable($this->watcher);
}
2018-10-07 16:50:45 +02:00
public function __destruct()
{
Loop::cancel($this->watcher);
\fclose($this->server);
}
2018-10-07 16:50:45 +02:00
public function getUri(): string
{
return $this->uri;
}
2018-10-07 16:50:45 +02:00
public function accept(): Promise
{
return call(function () {
while ($this->acceptor) {
yield $this->acceptor->promise();
}
$this->acceptor = new Deferred;
Loop::enable($this->watcher);
2018-10-09 16:12:23 +02:00
try {
return yield Promise\timeout($this->acceptor->promise(), self::PROCESS_START_TIMEOUT);
} catch (TimeoutException $exception) {
Loop::disable($this->watcher);
throw new ContextException("Starting the process timed out", 0, $exception);
}
});
}
}