1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-26 12:24:40 +01:00

Begin adding web forking

This commit is contained in:
Daniil Gentili 2020-02-11 14:45:11 +01:00
parent 76853f0b78
commit 6f78af6d09
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
4 changed files with 476 additions and 65 deletions

View File

@ -0,0 +1,209 @@
<?php
namespace Amp\Parallel\Context\Internal\Runner;
use Amp\Parallel\Context\Internal\ProcessHub;
use Amp\Process\Process as BaseProcess;
use Amp\Process\ProcessInputStream;
use Amp\Process\ProcessOutputStream;
use Amp\Promise;
final class ProcessRunner extends RunnerAbstract
{
/** @var string|null Cached path to located PHP binary. */
private static $binaryPath;
/** @var \Amp\Process\Process */
private $process;
/**
* Constructor
*
* @param string|array $script Path to PHP script or array with first element as path and following elements options
* to the PHP script (e.g.: ['bin/worker', 'Option1Value', 'Option2Value'].
* @param string $runPath Path to process runner script
* @param string $cwd Current working directory
* @param array $env Environment variables
* @param string $binary PHP binary path
*/
public function __construct($script, string $runPath, ProcessHub $hub, string $cwd = null, array $env = [], string $binary = null)
{
if ($binary === null) {
if (\PHP_SAPI === "cli") {
$binary = \PHP_BINARY;
} else {
$binary = self::$binaryPath ?? self::locateBinary();
}
} elseif (!\is_executable($binary)) {
throw new \Error(\sprintf("The PHP binary path '%s' was not found or is not executable", $binary));
}
if (\is_array($script)) {
$script = \implode(" ", \array_map("escapeshellarg", $script));
} else {
$script = \escapeshellarg($script);
}
$options = [
"html_errors" => "0",
"display_errors" => "0",
"log_errors" => "1",
];
$command = \implode(" ", [
\escapeshellarg($binary),
self::formatOptions($options),
\escapeshellarg($runPath),
$hub->getUri(),
$script,
]);
$this->process = new BaseProcess($command, $cwd, $env);
}
private static function locateBinary(): string
{
$executable = \strncasecmp(\PHP_OS, "WIN", 3) === 0 ? "php.exe" : "php";
$paths = \array_filter(\explode(\PATH_SEPARATOR, \getenv("PATH")));
$paths[] = \PHP_BINDIR;
$paths = \array_unique($paths);
foreach ($paths as $path) {
$path .= \DIRECTORY_SEPARATOR.$executable;
if (\is_executable($path)) {
return self::$binaryPath = $path;
}
}
throw new \Error("Could not locate PHP executable binary");
}
private static function formatOptions(array $options): string
{
$result = [];
foreach ($options as $option => $value) {
$result[] = \sprintf("-d%s=%s", $option, $value);
}
return \implode(" ", $result);
}
/**
* Set process key
*
* @param string $key Process key
*
* @return Promise
*/
public function setProcessKey(string $key): Promise
{
return $this->process->getStdin()->write($key);
}
/**
* @return bool
*/
public function isRunning(): bool
{
return $this->process->isRunning();
}
/**
* Starts the process.
*
* @return Promise<int> Resolved with the PID
*/
public function start(): Promise
{
return $this->process->start();
}
/**
* Immediately kills the process.
*/
public function kill(): void
{
$this->process->kill();
}
/**
* @return \Amp\Promise<mixed> Resolves with the returned from the process.
*/
public function join(): Promise
{
return $this->process->join();
}
/**
* Send a signal to the process.
*
* @see \Amp\Process\Process::signal()
*
* @param int $signo
*
* @throws \Amp\Process\ProcessException
* @throws \Amp\Process\StatusError
*/
public function signal(int $signo): void
{
$this->process->signal($signo);
}
/**
* Returns the PID of the process.
*
* @see \Amp\Process\Process::getPid()
*
* @return int
*
* @throws \Amp\Process\StatusError
*/
public function getPid(): int
{
return $this->process->getPid();
}
/**
* Returns the STDIN stream of the process.
*
* @see \Amp\Process\Process::getStdin()
*
* @return ProcessOutputStream
*
* @throws \Amp\Process\StatusError
*/
public function getStdin(): ProcessOutputStream
{
return $this->process->getStdin();
}
/**
* Returns the STDOUT stream of the process.
*
* @see \Amp\Process\Process::getStdout()
*
* @return ProcessInputStream
*
* @throws \Amp\Process\StatusError
*/
public function getStdout(): ProcessInputStream
{
return $this->process->getStdout();
}
/**
* Returns the STDOUT stream of the process.
*
* @see \Amp\Process\Process::getStderr()
*
* @return ProcessInputStream
*
* @throws \Amp\Process\StatusError
*/
public function getStderr(): ProcessInputStream
{
return $this->process->getStderr();
}
}

View File

@ -0,0 +1,123 @@
<?php
namespace Amp\Parallel\Context\Internal\Runner;
use Amp\Parallel\Context\Internal\ProcessHub;
use Amp\Process\ProcessException;
use Amp\Process\ProcessInputStream;
use Amp\Process\ProcessOutputStream;
use Amp\Promise;
abstract class RunnerAbstract
{
/**
* Constructor.
*
* @param string|array $script Path to PHP script or array with first element as path and following elements options
* to the PHP script (e.g.: ['bin/worker', 'Option1Value', 'Option2Value'].
* @param string $runPath Path to process runner script
* @param string $cwd Current working directory
* @param array $env Environment variables
* @param string $binary PHP binary path
*/
abstract public function __construct($script, string $runPath, ProcessHub $hub, string $cwd = null, array $env = [], string $binary = null);
/**
* Set process key.
*
* @param string $key Process key
*
* @return Promise
*/
abstract public function setProcessKey(string $key): Promise;
/**
* @return bool
*/
abstract public function isRunning(): bool;
/**
* Starts the execution process.
*
* @return Promise<int> Resolves with the PID
*/
abstract public function start(): Promise;
/**
* Immediately kills the process.
*/
abstract public function kill(): void;
/**
* @return \Amp\Promise<mixed> Resolves with the returned from the process.
*/
abstract public function join(): Promise;
/**
* Returns the PID of the process.
*
* @see \Amp\Process\Process::getPid()
*
* @return int
*
* @throws \Amp\Process\StatusError
*/
abstract public function getPid(): int;
/**
* Send a signal to the process.
*
* @see \Amp\Process\Process::signal()
*
* @param int $signo
*
* @throws \Amp\Process\ProcessException
* @throws \Amp\Process\StatusError
*/
public function signal(int $signo): void
{
throw new ProcessException("Not supported!");
}
/**
* Returns the STDIN stream of the process.
*
* @see \Amp\Process\Process::getStdin()
*
* @return ProcessOutputStream
*
* @throws \Amp\Process\StatusError
*/
public function getStdin(): ProcessOutputStream
{
throw new ProcessException("Not supported!");
}
/**
* Returns the STDOUT stream of the process.
*
* @see \Amp\Process\Process::getStdout()
*
* @return ProcessInputStream
*
* @throws \Amp\Process\StatusError
*/
public function getStdout(): ProcessInputStream
{
throw new ProcessException("Not supported!");
}
/**
* Returns the STDOUT stream of the process.
*
* @see \Amp\Process\Process::getStderr()
*
* @return ProcessInputStream
*
* @throws \Amp\Process\StatusError
*/
public function getStderr(): ProcessInputStream
{
throw new ProcessException("Not supported!");
}
}

View File

@ -0,0 +1,140 @@
<?php
namespace Amp\Parallel\Context;
use Amp\Parallel\Context\Internal\ProcessHub;
use Amp\Parallel\Context\Internal\Runner\RunnerAbstract;
use Amp\Promise;
use Amp\Success;
final class WebRunner extends RunnerAbstract
{
/**
* PID.
*
* @var int
*/
private $pid;
/**
* Initialization payload.
*
* @var array
*/
private $params;
/**
* Whether the process is running.
*
* @var boolean
*/
private $running = false;
/**
* Constructor.
*
* @param string|array $script Path to PHP script or array with first element as path and following elements options
* to the PHP script (e.g.: ['bin/worker', 'Option1Value', 'Option2Value'].
* @param string $runPath Path to process runner script
* @param string $cwd Current working directory
* @param array $env Environment variables
* @param string $binary PHP binary path
*/
public function __construct($script, string $runPath, ProcessHub $hub, string $cwd = null, array $env = [], string $binary = null)
{
if (!isset($_SERVER['SERVER_NAME'])) {
throw new ContextException("Could not initialize web runner!");
}
if (!\is_array($script)) {
$script = [$script];
}
$this->params = [
'options' => [
"html_errors" => "0",
"display_errors" => "0",
"log_errors" => "1",
],
'argv' => [
$hub->getUri(),
...$script
]
];
$this->pid = \random_int(0, PHP_INT_MAX);
}
/**
* Returns the PID of the process.
*
* @see \Amp\Process\Process::getPid()
*
* @return int
*
* @throws \Amp\Process\StatusError
*/
public function getPid(): int
{
return $this->pid;
}
/**
* Set process key.
*
* @param string $key Process key
*
* @return Promise
*/
public function setProcessKey(string $key): Promise
{
$this->params['key'] = $key;
$params = \http_build_query($params);
$address = ($_SERVER['HTTPS'] ?? false ? 'tls' : 'tcp').'://'.$_SERVER['SERVER_NAME'];
$port = $_SERVER['SERVER_PORT'];
$uri = $_SERVER['REQUEST_URI'];
$params = $_GET;
$url = \explode('?', $uri, 2)[0] ?? '';
$query = \http_build_query($params);
$uri = \implode('?', [$url, $query]);
$this->payload = "GET $uri HTTP/1.1\r\nHost: ${_SERVER['SERVER_NAME']}\r\n\r\n";
$a = \fsockopen($address, $port);
\fwrite($a, $payload);
$this->running =true;
}
/**
* @return bool
*/
public function isRunning(): bool
{
return $this->running;
}
/**
* Starts the process.
*
* @return Promise<int> Resolved with the PID
*/
public function start(): Promise
{
return new Success();
}
/**
* Immediately kills the process.
*/
public function kill(): void
{
$this->process->kill();
}
/**
* @return \Amp\Promise<mixed> Resolves with the returned from the process.
*/
public function join(): Promise
{
return new Success();
}
}

View File

@ -3,10 +3,10 @@
namespace Amp\Parallel\Context;
use Amp\Loop;
use Amp\Parallel\Context\Internal\Runner\ProcessRunner;
use Amp\Parallel\Sync\ChannelException;
use Amp\Parallel\Sync\ExitResult;
use Amp\Parallel\Sync\SynchronizationError;
use Amp\Process\Process as BaseProcess;
use Amp\Process\ProcessInputStream;
use Amp\Process\ProcessOutputStream;
use Amp\Promise;
@ -23,13 +23,10 @@ final class Process implements Context
/** @var string|null PHAR path with a '.phar' extension. */
private static $pharCopy;
/** @var string|null Cached path to located PHP binary. */
private static $binaryPath;
/** @var Internal\ProcessHub */
private $hub;
/** @var \Amp\Process\Process */
/** @var Internal\Runner\RunnerAbstract */
private $process;
/** @var \Amp\Parallel\Sync\ChannelledSocket */
@ -73,22 +70,6 @@ final class Process implements Context
Loop::setState(self::class, $this->hub);
}
$options = [
"html_errors" => "0",
"display_errors" => "0",
"log_errors" => "1",
];
if ($binary === null) {
if (\PHP_SAPI === "cli") {
$binary = \PHP_BINARY;
} else {
$binary = self::$binaryPath ?? self::locateBinary();
}
} elseif (!\is_executable($binary)) {
throw new \Error(\sprintf("The PHP binary path '%s' was not found or is not executable", $binary));
}
// Write process runner to external file if inside a PHAR,
// because PHP can't open files inside a PHAR directly except for the stub.
if (\strpos(self::SCRIPT_PATH, "phar://") === 0) {
@ -127,51 +108,9 @@ final class Process implements Context
$scriptPath = self::SCRIPT_PATH;
}
if (\is_array($script)) {
$script = \implode(" ", \array_map("escapeshellarg", $script));
} else {
$script = \escapeshellarg($script);
}
$command = \implode(" ", [
\escapeshellarg($binary),
$this->formatOptions($options),
\escapeshellarg($scriptPath),
$this->hub->getUri(),
$script,
]);
$this->process = new BaseProcess($command, $cwd, $env);
$this->process = new ProcessRunner($script, $scriptPath, $this->hub, $cwd, $env, $binary);
}
private static function locateBinary(): string
{
$executable = \strncasecmp(\PHP_OS, "WIN", 3) === 0 ? "php.exe" : "php";
$paths = \array_filter(\explode(\PATH_SEPARATOR, \getenv("PATH")));
$paths[] = \PHP_BINDIR;
$paths = \array_unique($paths);
foreach ($paths as $path) {
$path .= \DIRECTORY_SEPARATOR . $executable;
if (\is_executable($path)) {
return self::$binaryPath = $path;
}
}
throw new \Error("Could not locate PHP executable binary");
}
private function formatOptions(array $options): string
{
$result = [];
foreach ($options as $option => $value) {
$result[] = \sprintf("-d%s=%s", $option, $value);
}
return \implode(" ", $result);
}
/**
* Private method to prevent cloning.
@ -189,7 +128,7 @@ final class Process implements Context
try {
$pid = yield $this->process->start();
yield $this->process->getStdin()->write($this->hub->generateKey($pid, self::KEY_LENGTH));
yield $this->process->setProcessKey($this->hub->generateKey($pid, self::KEY_LENGTH));
$this->channel = yield $this->hub->accept($pid);