1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-26 12:24:40 +01:00

Implement web forking

This commit is contained in:
Daniil Gentili 2020-02-11 23:18:06 +01:00
parent bde5900826
commit c851a13dbf
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
7 changed files with 327 additions and 85 deletions

View File

@ -29,6 +29,7 @@
"amphp/sync": "^1.0.1"
},
"require-dev": {
"jelix/fakeserverconf": "^1.0",
"phpunit/phpunit": "^8 || ^7",
"amphp/phpunit-util": "^1.1",
"amphp/php-cs-fixer-config": "dev-master"

View File

@ -16,7 +16,7 @@ final class ProcessRunner extends RunnerAbstract
/** @var \Amp\Process\Process */
private $process;
/**
* Constructor
* Constructor.
*
* @param string|array $script Path to PHP script or array with first element as path and following elements options
* to the PHP script (e.g.: ['bin/worker', 'Option1Value', 'Option2Value'].
@ -25,7 +25,7 @@ final class ProcessRunner extends RunnerAbstract
* @param array $env Environment variables
* @param string $binary PHP binary path
*/
public function __construct($script, string $runPath, ProcessHub $hub, string $cwd = null, array $env = [], string $binary = null)
public function __construct($script, ProcessHub $hub, string $cwd = null, array $env = [], string $binary = null)
{
if ($binary === null) {
if (\PHP_SAPI === "cli") {
@ -37,22 +37,30 @@ final class ProcessRunner extends RunnerAbstract
throw new \Error(\sprintf("The PHP binary path '%s' was not found or is not executable", $binary));
}
if (\is_array($script)) {
$script = \implode(" ", \array_map("escapeshellarg", $script));
} else {
$script = \escapeshellarg($script);
}
$options = [
"html_errors" => "0",
"display_errors" => "0",
"log_errors" => "1",
];
$runner = self::getScriptPath();
// Monkey-patch the script path in the same way, only supported if the command is given as array.
if (isset(self::$pharCopy) && \is_array($script) && isset($script[0])) {
$script[0] = "phar://".self::$pharCopy.\substr($script[0], \strlen(\Phar::running(true)));
}
if (\is_array($script)) {
$script = \implode(" ", \array_map("escapeshellarg", $script));
} else {
$script = \escapeshellarg($script);
}
$command = \implode(" ", [
\escapeshellarg($binary),
self::formatOptions($options),
\escapeshellarg($runPath),
\escapeshellarg($runner),
$hub->getUri(),
$script,
]);
@ -90,10 +98,10 @@ final class ProcessRunner extends RunnerAbstract
/**
* Set process key
* Set process key.
*
* @param string $key Process key
*
*
* @return Promise
*/
public function setProcessKey(string $key): Promise
@ -205,5 +213,4 @@ final class ProcessRunner extends RunnerAbstract
{
return $this->process->getStderr();
}
}

View File

@ -10,6 +10,52 @@ use Amp\Promise;
abstract class RunnerAbstract
{
const SCRIPT_PATH = __DIR__ . "/process-runner.php";
/** @var string|null External version of SCRIPT_PATH if inside a PHAR. */
protected static $pharScriptPath;
/** @var string|null PHAR path with a '.phar' extension. */
protected static $pharCopy;
protected static function getScriptPath(string $alternateTmpDir = '')
{
// Write process runner to external file if inside a PHAR,
// because PHP can't open files inside a PHAR directly except for the stub.
if (\strpos(self::SCRIPT_PATH, "phar://") === 0) {
$alternateTmpDir = $alternateTmpDir ?: \sys_get_temp_dir();
if (self::$pharScriptPath) {
$scriptPath = self::$pharScriptPath;
} else {
$path = \dirname(self::SCRIPT_PATH);
if (\substr(\Phar::running(false), -5) !== ".phar") {
self::$pharCopy = $alternateTmpDir . "/phar-" . \bin2hex(\random_bytes(10)) . ".phar";
\copy(\Phar::running(false), self::$pharCopy);
\register_shutdown_function(static function (): void {
@\unlink(self::$pharCopy);
});
$path = "phar://" . self::$pharCopy . "/" . \substr($path, \strlen(\Phar::running(true)));
}
$contents = \file_get_contents(self::SCRIPT_PATH);
$contents = \str_replace("__DIR__", \var_export($path, true), $contents);
$suffix = \bin2hex(\random_bytes(10));
self::$pharScriptPath = $scriptPath = $alternateTmpDir . "/amp-process-runner-" . $suffix . ".php";
\file_put_contents($scriptPath, $contents);
\register_shutdown_function(static function (): void {
@\unlink(self::$pharScriptPath);
});
}
} else {
$scriptPath = self::SCRIPT_PATH;
}
return $scriptPath;
}
/**
* Constructor.
*
@ -20,7 +66,7 @@ abstract class RunnerAbstract
* @param array $env Environment variables
* @param string $binary PHP binary path
*/
abstract public function __construct($script, string $runPath, ProcessHub $hub, string $cwd = null, array $env = [], string $binary = null);
abstract public function __construct($script, ProcessHub $hub, string $cwd = null, array $env = [], string $binary = null);
/**
* Set process key.

View File

@ -1,7 +1,9 @@
<?php
namespace Amp\Parallel\Context;
namespace Amp\Parallel\Context\Internal\Runner;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Parallel\Context\ContextException;
use Amp\Parallel\Context\Internal\ProcessHub;
use Amp\Parallel\Context\Internal\Runner\RunnerAbstract;
use Amp\Promise;
@ -9,6 +11,8 @@ use Amp\Success;
final class WebRunner extends RunnerAbstract
{
/** @var string|null Cached path to the runner script. */
private static $runPath;
/**
* PID.
*
@ -27,6 +31,13 @@ final class WebRunner extends RunnerAbstract
* @var boolean
*/
private $running = false;
/**
* Socket
*
* @var ResourceOutputStream
*/
private $res;
/**
* Constructor.
*
@ -37,21 +48,67 @@ final class WebRunner extends RunnerAbstract
* @param array $env Environment variables
* @param string $binary PHP binary path
*/
public function __construct($script, string $runPath, ProcessHub $hub, string $cwd = null, array $env = [], string $binary = null)
public function __construct($script, ProcessHub $hub, string $cwd = null, array $env = [], string $binary = null)
{
if (!isset($_SERVER['SERVER_NAME'])) {
throw new ContextException("Could not initialize web runner!");
}
if (!self::$runPath) {
$uri = \parse_url('tcp://'.$_SERVER['SERVER_NAME'].$_SERVER['REQUEST_URI'], PHP_URL_PATH);
if (\substr($uri, -1) === '/') { // http://example.com/path/ (assumed index.php)
$uri .= 'index'; // Add fake file name
}
$uri = str_replace('//', '/', $uri);
$rootDir = \debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS);
$rootDir = \end($rootDir)['file'] ?? '';
if (!$rootDir) {
throw new ContextException('Could not get entry file!');
}
$rootDir = \dirname($rootDir);
$uriDir = \dirname($uri);
if (\substr($rootDir, -\strlen($uriDir)) !== $uriDir) {
throw new ContextException("Mismatch between absolute root dir ($rootDir) and URI dir ($uriDir)");
}
// Absolute root of (presumably) readable document root
$localRootDir = \substr($rootDir, 0, \strlen($rootDir)-\strlen($uriDir)).DIRECTORY_SEPARATOR;
$runPath = self::getScriptPath($localRootDir);
if (\substr($runPath, 0, \strlen($localRootDir)) === $localRootDir) { // Process runner is within readable document root
self::$runPath = \substr($runPath, \strlen($localRootDir)-1);
} else {
$contents = \file_get_contents(self::SCRIPT_PATH);
$contents = \str_replace("__DIR__", \var_export($localRootDir, true), $contents);
$suffix = \bin2hex(\random_bytes(10));
$runPath = $localRootDir."/amp-process-runner-".$suffix.".php";
\file_put_contents($runPath, $contents);
self::$runPath = \substr($runPath, \strlen($localRootDir)-1);
\register_shutdown_function(static function () use ($runPath): void {
@\unlink($runPath);
});
}
self::$runPath = \str_replace(DIRECTORY_SEPARATOR, '/', self::$runPath);
self::$runPath = \str_replace('//', '/', self::$runPath);
}
// Monkey-patch the script path in the same way, only supported if the command is given as array.
if (isset(self::$pharCopy) && \is_array($script) && isset($script[0])) {
$script[0] = "phar://".self::$pharCopy.\substr($script[0], \strlen(\Phar::running(true)));
}
if (!\is_array($script)) {
$script = [$script];
}
$this->params = [
'options' => [
"html_errors" => "0",
"display_errors" => "0",
"log_errors" => "1",
],
'cwd' => $cwd,
'env' => $env,
'argv' => [
$hub->getUri(),
...$script
@ -85,23 +142,20 @@ final class WebRunner extends RunnerAbstract
public function setProcessKey(string $key): Promise
{
$this->params['key'] = $key;
$params = \http_build_query($params);
$params = \http_build_query($this->params);
$address = ($_SERVER['HTTPS'] ?? false ? 'tls' : 'tcp').'://'.$_SERVER['SERVER_NAME'];
$port = $_SERVER['SERVER_PORT'];
$uri = $_SERVER['REQUEST_URI'];
$params = $_GET;
$url = \explode('?', $uri, 2)[0] ?? '';
$query = \http_build_query($params);
$uri = \implode('?', [$url, $query]);
$uri = self::$runPath.'?'.$params;
$this->payload = "GET $uri HTTP/1.1\r\nHost: ${_SERVER['SERVER_NAME']}\r\n\r\n";
$payload = "GET $uri HTTP/1.1\r\nHost: ${_SERVER['SERVER_NAME']}\r\n\r\n";
$a = \fsockopen($address, $port);
\fwrite($a, $payload);
$this->running =true;
// We don't care for results or timeouts here, PHP doesn't count IOwait time as execution time anyway
// Technically should use amphp/socket, but I guess it's OK to not introduce another dependency just for a socket that will be used once.
$this->res = new ResourceOutputStream(\fsockopen($address, $port));
$this->running = true;
return $this->res->write($payload);
}
/**
@ -119,7 +173,7 @@ final class WebRunner extends RunnerAbstract
*/
public function start(): Promise
{
return new Success();
return new Success($this->pid);
}
/**
@ -127,7 +181,10 @@ final class WebRunner extends RunnerAbstract
*/
public function kill(): void
{
$this->process->kill();
if (isset($this->res)) {
unset($this->res);
}
$this->isRunning = false;
}
/**
@ -135,6 +192,6 @@ final class WebRunner extends RunnerAbstract
*/
public function join(): Promise
{
return new Success();
return new Success(0);
}
}

View File

@ -17,8 +17,8 @@ if (\function_exists("cli_set_process_title")) {
(function (): void {
$paths = [
\dirname(__DIR__, 5)."/autoload.php",
\dirname(__DIR__, 3)."/vendor/autoload.php",
\dirname(__DIR__, 6)."/autoload.php",
\dirname(__DIR__, 4)."/vendor/autoload.php",
];
foreach ($paths as $path) {
@ -36,7 +36,19 @@ if (\function_exists("cli_set_process_title")) {
require $autoloadPath;
})();
(function () use ($argc, $argv): void {
$fromWeb = false;
if (!isset($argv)) { // Running from web
$argv = $_REQUEST['argv'] ?? [];
array_unshift($argv, __DIR__);
$argc = count($argv);
$fromWeb = true;
@\ini_set('html_errors', 0);
@\ini_set('display_errors', 0);
@\ini_set('log_errors', 1);
}
(function () use ($argc, $argv, $fromWeb): void {
// Remove this scripts path from process arguments.
--$argc;
\array_shift($argv);
@ -50,16 +62,16 @@ if (\function_exists("cli_set_process_title")) {
--$argc;
$uri = \array_shift($argv);
$key = "";
$key = $fromWeb ? $_REQUEST['key'] : "";
// Read random key from STDIN and send back to parent over IPC socket to authenticate.
do {
while (\strlen($key) < Process::KEY_LENGTH) {
if (($chunk = \fread(\STDIN, Process::KEY_LENGTH)) === false || \feof(\STDIN)) {
\trigger_error("Could not read key from parent", E_USER_ERROR);
exit(1);
}
$key .= $chunk;
} while (\strlen($key) < Process::KEY_LENGTH);
}
if (\strpos($uri, 'tcp://') === false && \strpos($uri, 'unix://') === false) {
$suffix = \bin2hex(\random_bytes(10));
@ -121,6 +133,17 @@ if (\function_exists("cli_set_process_title")) {
exit(1);
}
if ($fromWeb) { // Set environment variables only after auth
if (isset($_REQUEST['cwd'])) {
chdir($_REQUEST['cwd']);
}
if (isset($_REQUEST['env']) && is_array($_REQUEST['env'])) {
foreach ($_REQUEST['env'] as $key => $value) {
@\putenv("$key=$value");
}
}
}
try {
if (!isset($argv[0])) {
throw new \Error("No script path given");

View File

@ -4,6 +4,7 @@ namespace Amp\Parallel\Context;
use Amp\Loop;
use Amp\Parallel\Context\Internal\Runner\ProcessRunner;
use Amp\Parallel\Context\Internal\Runner\WebRunner;
use Amp\Parallel\Sync\ChannelException;
use Amp\Parallel\Sync\ExitResult;
use Amp\Parallel\Sync\SynchronizationError;
@ -15,15 +16,8 @@ use function Amp\call;
final class Process implements Context
{
const SCRIPT_PATH = __DIR__ . "/Internal/process-runner.php";
const KEY_LENGTH = 32;
/** @var string|null External version of SCRIPT_PATH if inside a PHAR. */
private static $pharScriptPath;
/** @var string|null PHAR path with a '.phar' extension. */
private static $pharCopy;
/** @var Internal\ProcessHub */
private $hub;
@ -59,10 +53,11 @@ final class Process implements Context
* @param string|null $cwd Working directory.
* @param mixed[] $env Array of environment variables.
* @param string $binary Path to PHP binary. Null will attempt to automatically locate the binary.
* @param bool $useWeb Whether to use the WebRunner by default
*
* @throws \Error If the PHP binary path given cannot be found or is not executable.
*/
public function __construct($script, string $cwd = null, array $env = [], string $binary = null)
public function __construct($script, string $cwd = null, array $env = [], string $binary = null, bool $useWeb = false)
{
$this->hub = Loop::getState(self::class);
if (!$this->hub instanceof Internal\ProcessHub) {
@ -70,45 +65,15 @@ final class Process implements Context
Loop::setState(self::class, $this->hub);
}
// Write process runner to external file if inside a PHAR,
// because PHP can't open files inside a PHAR directly except for the stub.
if (\strpos(self::SCRIPT_PATH, "phar://") === 0) {
if (self::$pharScriptPath) {
$scriptPath = self::$pharScriptPath;
} else {
$path = \dirname(self::SCRIPT_PATH);
if (\substr(\Phar::running(false), -5) !== ".phar") {
self::$pharCopy = \sys_get_temp_dir() . "/phar-" . \bin2hex(\random_bytes(10)) . ".phar";
\copy(\Phar::running(false), self::$pharCopy);
\register_shutdown_function(static function (): void {
@\unlink(self::$pharCopy);
});
$path = "phar://" . self::$pharCopy . "/" . \substr($path, \strlen(\Phar::running(true)));
}
$contents = \file_get_contents(self::SCRIPT_PATH);
$contents = \str_replace("__DIR__", \var_export($path, true), $contents);
$suffix = \bin2hex(\random_bytes(10));
self::$pharScriptPath = $scriptPath = \sys_get_temp_dir() . "/amp-process-runner-" . $suffix . ".php";
\file_put_contents($scriptPath, $contents);
\register_shutdown_function(static function (): void {
@\unlink(self::$pharScriptPath);
});
try {
if (!$useWeb) {
$this->process = new ProcessRunner($script, $this->hub, $cwd, $env, $binary);
}
// Monkey-patch the script path in the same way, only supported if the command is given as array.
if (isset(self::$pharCopy) && \is_array($script) && isset($script[0])) {
$script[0] = "phar://" . self::$pharCopy . \substr($script[0], \strlen(\Phar::running(true)));
}
} else {
$scriptPath = self::SCRIPT_PATH;
} catch (\Throwable $e) {
}
if (!$this->process) {
$this->process = new WebRunner($script, $this->hub, $cwd, $env);
}
$this->process = new ProcessRunner($script, $scriptPath, $this->hub, $cwd, $env, $binary);
}

View File

@ -0,0 +1,143 @@
<?php
namespace Amp\Parallel\Test\Context;
use Amp\Delayed;
use Amp\Loop;
use Amp\Parallel\Context\Context;
use Amp\Parallel\Context\ContextException;
use Amp\Parallel\Context\Internal\ProcessHub;
use Amp\Parallel\Context\Process;
use Amp\Parallel\Sync\PanicError;
use Amp\PHPUnit\AsyncTestCase;
use Jelix\FakeServerConf\ApacheCGI;
class ProcessWebTest extends AsyncTestCase
{
private static $proc;
public static function setUpBeforeClass(): void
{
self::$proc = proc_open(self::locateBinary()." -S localhost:8080", [2 => ["pipe", "w"]], $pipes, $root = realpath(__DIR__.'/../../'));
fgets($pipes[2]);
$server = new ApacheCGI($root);
$file = debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS);
$file = end($file)['file'];
$file = substr($file, strlen($root));
// now simulate an HTTP request
$server->setHttpRequest("http://localhost:8080/$file?baz=2");
}
public static function tearDownAfterClass(): void
{
proc_terminate(self::$proc);
}
private static function locateBinary(): string
{
$executable = \strncasecmp(\PHP_OS, "WIN", 3) === 0 ? "php.exe" : "php";
$paths = \array_filter(\explode(\PATH_SEPARATOR, \getenv("PATH")));
$paths[] = \PHP_BINDIR;
$paths = \array_unique($paths);
foreach ($paths as $path) {
$path .= \DIRECTORY_SEPARATOR.$executable;
if (\is_executable($path)) {
return $path;
}
}
throw new \Error("Could not locate PHP executable binary");
}
public function createContext($script): Context
{
Loop::setState(Process::class, new ProcessHub(false)); // Manually set ProcessHub using socket server.
return new Process($script, null, [], null, true);
}
public function testBasicProcess()
{
$context = $this->createContext([
__DIR__ . "/Fixtures/test-process.php",
"Test"
]);
yield $context->start();
$this->assertSame("Test", yield $context->join());
}
public function testFailingProcess()
{
$this->expectException(PanicError::class);
$this->expectExceptionMessage('No string provided');
$context = $this->createContext(__DIR__ . "/Fixtures/test-process.php");
yield $context->start();
yield $context->join();
}
public function testThrowingProcessOnReceive()
{
$this->expectException(PanicError::class);
$this->expectExceptionMessage('Test message');
$context = $this->createContext(__DIR__ . "/Fixtures/throwing-process.php");
yield $context->start();
yield $context->receive();
}
public function testThrowingProcessOnSend()
{
$this->expectException(PanicError::class);
$this->expectExceptionMessage('Test message');
$context = $this->createContext(__DIR__ . "/Fixtures/throwing-process.php");
yield $context->start();
yield new Delayed(100);
yield $context->send(1);
}
public function testInvalidScriptPath()
{
$this->expectException(PanicError::class);
$this->expectExceptionMessage("No script found at '../test-process.php'");
$context = $this->createContext("../test-process.php");
yield $context->start();
yield $context->join();
}
public function testInvalidResult()
{
$this->expectException(PanicError::class);
$this->expectExceptionMessage('The given data cannot be sent because it is not serializable');
$context = $this->createContext(__DIR__ . "/Fixtures/invalid-result-process.php");
yield $context->start();
\var_dump(yield $context->join());
}
public function testNoCallbackReturned()
{
$this->expectException(PanicError::class);
$this->expectExceptionMessage('did not return a callable function');
$context = $this->createContext(__DIR__ . "/Fixtures/no-callback-process.php");
yield $context->start();
\var_dump(yield $context->join());
}
public function testParseError()
{
$this->expectException(PanicError::class);
$this->expectExceptionMessage('contains a parse error');
$context = $this->createContext(__DIR__ . "/Fixtures/parse-error-process.inc");
yield $context->start();
yield $context->join();
}
}