diff --git a/bin/worker b/bin/worker index 2a88e59..cc480b6 100644 --- a/bin/worker +++ b/bin/worker @@ -40,10 +40,36 @@ ob_start(function ($data) { Amp\Loop::run(function () { $channel = new Sync\ChannelledSocket(STDIN, STDOUT); - $environment = new Worker\BasicEnvironment; - $runner = new Worker\TaskRunner($channel, $environment); try { + $environment = (function (): Worker\Environment { + $options = getopt("e:"); + + if (!isset($options["e"])) { + throw new Error("No environment class name provided"); + } + + $className = $options["e"]; + + try { + $reflection = new ReflectionClass($className); + } catch (ReflectionException $e) { + throw new Error(sprintf("Invalid class name '%s'", $className)); + } + + if (!$reflection->isInstantiable()) { + throw new Error(sprintf("'%s' is not instatiable class", $className)); + } + + if (!$reflection->implementsInterface(Worker\Environment::class)) { + throw new Error(sprintf("The class '%s' does not implement '%s'", $className,Worker\Environment::class)); + } + + return $reflection->newInstance(); + })(); + + $runner = new Worker\TaskRunner($channel, $environment); + $result = new Sync\Internal\ExitSuccess(yield $runner->run()); } catch (Sync\ChannelException $exception) { exit(1); // Parent context died, simply exit. diff --git a/lib/Process/ChannelledProcess.php b/lib/Process/ChannelledProcess.php index 7485cb0..d159c83 100644 --- a/lib/Process/ChannelledProcess.php +++ b/lib/Process/ChannelledProcess.php @@ -25,20 +25,27 @@ class ChannelledProcess implements ProcessContext, Strand { private $channel; /** - * @param string $path Path to PHP script. - * @param string $cwd Working directory. + * @param string|array $script Path to PHP script or array with first element as path and following elements options + * to the PHP script (e.g.: ['bin/worker', '-eOptionValue', '-nOptionValue']. + * @param string $cwd Working directory. * @param mixed[] $env Array of environment variables. */ - public function __construct(string $path, string $cwd = "", array $env = []) { + public function __construct($script, string $cwd = "", array $env = []) { $options = [ "html_errors" => "0", "display_errors" => "0", "log_errors" => "1", ]; + if (\is_array($script)) { + $script = \implode(" ", \array_map("escapeshellarg", $script)); + } else { + $script = \escapeshellarg($script); + } + $options = (\PHP_SAPI === "phpdbg" ? " -b -qrr " : " ") . $this->formatOptions($options); $separator = \PHP_SAPI === "phpdbg" ? " -- " : " "; - $command = \escapeshellarg(\PHP_BINARY) . $options . $separator . \escapeshellarg($path); + $command = \escapeshellarg(\PHP_BINARY) . $options . $separator . $script; $processOptions = []; diff --git a/lib/Worker/WorkerProcess.php b/lib/Worker/WorkerProcess.php index 4bb0a8c..1ca54aa 100644 --- a/lib/Worker/WorkerProcess.php +++ b/lib/Worker/WorkerProcess.php @@ -8,8 +8,18 @@ use Amp\Parallel\Process\ChannelledProcess; * A worker thread that executes task objects. */ class WorkerProcess extends AbstractWorker { - public function __construct() { + /** + * @param string $envClassName Name of class implementing \Amp\Parallel\Worker\Environment to instigate. + * Defaults to \Amp\Parallel\Worker\BasicEnvironment. + * @param mixed[] $env Array of environment variables to pass to the worker. Empty array inherits from the current + * PHP process. See the $env parameter of \Amp\Process\Process::__construct(). + */ + public function __construct(string $envClassName = BasicEnvironment::class, array $env = []) { $dir = \dirname(__DIR__, 2) . '/bin'; - parent::__construct(new ChannelledProcess($dir . '/worker', $dir)); + $script = [ + $dir . "/worker", + "-e" . $envClassName, + ]; + parent::__construct(new ChannelledProcess($script, $dir, $env)); } } diff --git a/lib/Worker/WorkerThread.php b/lib/Worker/WorkerThread.php index 8140bfe..67f7693 100644 --- a/lib/Worker/WorkerThread.php +++ b/lib/Worker/WorkerThread.php @@ -9,10 +9,34 @@ use Amp\Promise; * A worker thread that executes task objects. */ class WorkerThread extends AbstractWorker { - public function __construct() { - parent::__construct(new Thread(function (): Promise { - $runner = new TaskRunner($this, new BasicEnvironment); + /** + * @param string $envClassName Name of class implementing \Amp\Parallel\Worker\Environment to instigate. + * Defaults to \Amp\Parallel\Worker\BasicEnvironment. + */ + public function __construct(string $envClassName = BasicEnvironment::class) { + parent::__construct(new Thread(function (string $className): Promise { + try { + $reflection = new \ReflectionClass($className); + } catch (\ReflectionException $e) { + throw new \Error(\sprintf("Invalid class name '%s'", $className)); + } + + if (!$reflection->isInstantiable()) { + throw new \Error(\sprintf("'%s' is not instatiable class", $className)); + } + + if (!$reflection->implementsInterface(Environment::class)) { + throw new \Error(\sprintf("The class '%s' does not implement '%s'", $className, Environment::class)); + } + + $environment = $reflection->newInstance(); + + if (!\defined("AMP_WORKER")) { + \define("AMP_WORKER", "amp-worker"); + } + + $runner = new TaskRunner($this, $environment); return $runner->run(); - })); + }, $envClassName)); } }