1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-30 04:39:01 +01:00

Authenticate worker when connecting to IPC

This commit is contained in:
Aaron Piotrowski 2018-10-10 18:31:34 -05:00
parent 9e40d3aa0c
commit 4cf26d4dfb
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
3 changed files with 67 additions and 18 deletions

View File

@ -13,6 +13,7 @@ use function Amp\call;
class ProcessHub
{
const PROCESS_START_TIMEOUT = 5000;
const KEY_RECEIVE_TIMEOUT = 1000;
/** @var resource|null */
private $server;
@ -20,11 +21,14 @@ class ProcessHub
/** @var string|null */
private $uri;
/** @var string[] */
private $keys;
/** @var string|null */
private $watcher;
/** @var Deferred|null */
private $acceptor;
/** @var Deferred[] */
private $acceptor = [];
public function __construct()
{
@ -48,18 +52,33 @@ class ProcessHub
$this->uri = "tcp://localhost:" . $port;
}
$keys = &$this->keys;
$acceptor = &$this->acceptor;
$this->watcher = Loop::onReadable($this->server, static function (string $watcher, $server) use (&$acceptor) {
$this->watcher = Loop::onReadable($this->server, static function (string $watcher, $server) use (&$keys, &$acceptor) {
// Error reporting suppressed since stream_socket_accept() emits E_WARNING on client accept failure.
if (!$client = @\stream_socket_accept($server, 0)) { // Timeout of 0 to be non-blocking.
return; // Accepting client failed.
}
$deferred = $acceptor;
$acceptor = null;
$deferred->resolve(new ChannelledSocket($client, $client));
$channel = new ChannelledSocket($client, $client);
if (!$acceptor) {
try {
$received = yield Promise\timeout($channel->receive(), self::KEY_RECEIVE_TIMEOUT);
} catch (TimeoutException $exception) {
return; // Ignore possible foreign connection attempt.
}
if (!\is_string($received) || !isset($keys[$received])) {
return; // Ignore possible foreign connection attempt.
}
$pid = $keys[$received];
$deferred = $acceptor[$pid];
unset($acceptor[$pid], $keys[$received]);
$deferred->resolve($channel);
if (empty($acceptor)) {
Loop::disable($watcher);
}
});
@ -78,21 +97,29 @@ class ProcessHub
return $this->uri;
}
public function accept(): Promise
public function generateKey(int $pid, int $length): string
{
return call(function () {
while ($this->acceptor) {
yield $this->acceptor->promise();
}
$key = \random_bytes($length);
$this->keys[$key] = $pid;
return $key;
}
$this->acceptor = new Deferred;
public function accept(int $pid): Promise
{
return call(function () use ($pid) {
$this->acceptor[$pid] = new Deferred;
Loop::enable($this->watcher);
try {
return yield Promise\timeout($this->acceptor->promise(), self::PROCESS_START_TIMEOUT);
return yield Promise\timeout($this->acceptor[$pid]->promise(), self::PROCESS_START_TIMEOUT);
} catch (TimeoutException $exception) {
Loop::disable($this->watcher);
unset($this->acceptor[$pid], $this->keys[$pid]);
if (empty($this->acceptor)) {
Loop::disable($this->watcher);
}
throw new ContextException("Starting the process timed out", 0, $exception);
}
});

View File

@ -3,6 +3,7 @@
namespace Amp\Parallel\Context\Internal;
use Amp\Loop;
use Amp\Parallel\Context\Process;
use Amp\Parallel\Sync;
use function Amp\call;
@ -51,12 +52,21 @@ Loop::run(function () use ($argc, $argv) {
--$argc;
$uri = \array_shift($argv);
// Read random key from STDIN and send back to parent over IPC socket to authenticate.
$key = \fread(\STDIN, Process::KEY_LENGTH);
if (!$socket = \stream_socket_client($uri, $errno, $errstr, 5, \STREAM_CLIENT_CONNECT)) {
throw new \RuntimeException("Could not connect to IPC socket");
exit(1); // Parent context died, simply exit.
}
$channel = new Sync\ChannelledSocket($socket, $socket);
try {
yield $channel->send($key);
} catch (\Throwable $exception) {
exit(1); // Parent context died, simply exit.
}
try {
// Protect current scope by requiring script within another function.
$callable = (function () use ($argc, $argv): callable {

View File

@ -15,6 +15,7 @@ use function Amp\call;
class Process implements Context
{
const SCRIPT_PATH = __DIR__ . "/Internal/process-runner.php";
const KEY_LENGTH = 16;
/** @var ByteStream\ResourceOutputStream */
private static $stderr;
@ -188,9 +189,20 @@ class Process implements Context
return call(function () {
$this->process->start();
$this->channel = yield $this->hub->accept();
$pid = yield $this->process->getPid();
yield $this->process->getStdin()->write($this->hub->generateKey($pid, self::KEY_LENGTH));
$this->channel = yield $this->hub->accept($pid);
$childStdout = $this->process->getStdout();
$childStdout->unreference();
asyncCall(static function () use ($childStdout) {
$stdout = new ByteStream\ResourceOutputStream(\STDOUT);
yield ByteStream\pipe($childStdout, $stdout);
});
/** @var ByteStream\ResourceInputStream $childStderr */
$childStderr = $this->process->getStderr();
$childStderr->unreference();