mirror of
https://github.com/danog/parallel.git
synced 2024-11-30 04:39:01 +01:00
Add ability to set Environment class name
This commit is contained in:
parent
faf1555b9c
commit
cab8bbe8f6
30
bin/worker
30
bin/worker
@ -40,10 +40,36 @@ ob_start(function ($data) {
|
||||
|
||||
Amp\Loop::run(function () {
|
||||
$channel = new Sync\ChannelledSocket(STDIN, STDOUT);
|
||||
$environment = new Worker\BasicEnvironment;
|
||||
$runner = new Worker\TaskRunner($channel, $environment);
|
||||
|
||||
try {
|
||||
$environment = (function (): Worker\Environment {
|
||||
$options = getopt("e:");
|
||||
|
||||
if (!isset($options["e"])) {
|
||||
throw new Error("No environment class name provided");
|
||||
}
|
||||
|
||||
$className = $options["e"];
|
||||
|
||||
try {
|
||||
$reflection = new ReflectionClass($className);
|
||||
} catch (ReflectionException $e) {
|
||||
throw new Error(sprintf("Invalid class name '%s'", $className));
|
||||
}
|
||||
|
||||
if (!$reflection->isInstantiable()) {
|
||||
throw new Error(sprintf("'%s' is not instatiable class", $className));
|
||||
}
|
||||
|
||||
if (!$reflection->implementsInterface(Worker\Environment::class)) {
|
||||
throw new Error(sprintf("The class '%s' does not implement '%s'", $className,Worker\Environment::class));
|
||||
}
|
||||
|
||||
return $reflection->newInstance();
|
||||
})();
|
||||
|
||||
$runner = new Worker\TaskRunner($channel, $environment);
|
||||
|
||||
$result = new Sync\Internal\ExitSuccess(yield $runner->run());
|
||||
} catch (Sync\ChannelException $exception) {
|
||||
exit(1); // Parent context died, simply exit.
|
||||
|
@ -25,20 +25,27 @@ class ChannelledProcess implements ProcessContext, Strand {
|
||||
private $channel;
|
||||
|
||||
/**
|
||||
* @param string $path Path to PHP script.
|
||||
* @param string $cwd Working directory.
|
||||
* @param string|array $script Path to PHP script or array with first element as path and following elements options
|
||||
* to the PHP script (e.g.: ['bin/worker', '-eOptionValue', '-nOptionValue'].
|
||||
* @param string $cwd Working directory.
|
||||
* @param mixed[] $env Array of environment variables.
|
||||
*/
|
||||
public function __construct(string $path, string $cwd = "", array $env = []) {
|
||||
public function __construct($script, string $cwd = "", array $env = []) {
|
||||
$options = [
|
||||
"html_errors" => "0",
|
||||
"display_errors" => "0",
|
||||
"log_errors" => "1",
|
||||
];
|
||||
|
||||
if (\is_array($script)) {
|
||||
$script = \implode(" ", \array_map("escapeshellarg", $script));
|
||||
} else {
|
||||
$script = \escapeshellarg($script);
|
||||
}
|
||||
|
||||
$options = (\PHP_SAPI === "phpdbg" ? " -b -qrr " : " ") . $this->formatOptions($options);
|
||||
$separator = \PHP_SAPI === "phpdbg" ? " -- " : " ";
|
||||
$command = \escapeshellarg(\PHP_BINARY) . $options . $separator . \escapeshellarg($path);
|
||||
$command = \escapeshellarg(\PHP_BINARY) . $options . $separator . $script;
|
||||
|
||||
$processOptions = [];
|
||||
|
||||
|
@ -8,8 +8,18 @@ use Amp\Parallel\Process\ChannelledProcess;
|
||||
* A worker thread that executes task objects.
|
||||
*/
|
||||
class WorkerProcess extends AbstractWorker {
|
||||
public function __construct() {
|
||||
/**
|
||||
* @param string $envClassName Name of class implementing \Amp\Parallel\Worker\Environment to instigate.
|
||||
* Defaults to \Amp\Parallel\Worker\BasicEnvironment.
|
||||
* @param mixed[] $env Array of environment variables to pass to the worker. Empty array inherits from the current
|
||||
* PHP process. See the $env parameter of \Amp\Process\Process::__construct().
|
||||
*/
|
||||
public function __construct(string $envClassName = BasicEnvironment::class, array $env = []) {
|
||||
$dir = \dirname(__DIR__, 2) . '/bin';
|
||||
parent::__construct(new ChannelledProcess($dir . '/worker', $dir));
|
||||
$script = [
|
||||
$dir . "/worker",
|
||||
"-e" . $envClassName,
|
||||
];
|
||||
parent::__construct(new ChannelledProcess($script, $dir, $env));
|
||||
}
|
||||
}
|
||||
|
@ -9,10 +9,34 @@ use Amp\Promise;
|
||||
* A worker thread that executes task objects.
|
||||
*/
|
||||
class WorkerThread extends AbstractWorker {
|
||||
public function __construct() {
|
||||
parent::__construct(new Thread(function (): Promise {
|
||||
$runner = new TaskRunner($this, new BasicEnvironment);
|
||||
/**
|
||||
* @param string $envClassName Name of class implementing \Amp\Parallel\Worker\Environment to instigate.
|
||||
* Defaults to \Amp\Parallel\Worker\BasicEnvironment.
|
||||
*/
|
||||
public function __construct(string $envClassName = BasicEnvironment::class) {
|
||||
parent::__construct(new Thread(function (string $className): Promise {
|
||||
try {
|
||||
$reflection = new \ReflectionClass($className);
|
||||
} catch (\ReflectionException $e) {
|
||||
throw new \Error(\sprintf("Invalid class name '%s'", $className));
|
||||
}
|
||||
|
||||
if (!$reflection->isInstantiable()) {
|
||||
throw new \Error(\sprintf("'%s' is not instatiable class", $className));
|
||||
}
|
||||
|
||||
if (!$reflection->implementsInterface(Environment::class)) {
|
||||
throw new \Error(\sprintf("The class '%s' does not implement '%s'", $className, Environment::class));
|
||||
}
|
||||
|
||||
$environment = $reflection->newInstance();
|
||||
|
||||
if (!\defined("AMP_WORKER")) {
|
||||
\define("AMP_WORKER", "amp-worker");
|
||||
}
|
||||
|
||||
$runner = new TaskRunner($this, $environment);
|
||||
return $runner->run();
|
||||
}));
|
||||
}, $envClassName));
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user