1
0
mirror of https://github.com/danog/parallel.git synced 2025-01-22 14:01:14 +01:00

Move static parallel extension properties into internal class

This commit is contained in:
Aaron Piotrowski 2019-04-29 10:19:32 -05:00
parent d6e7fed763
commit 66ca530f0d
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
3 changed files with 77 additions and 42 deletions

View File

@ -0,0 +1,68 @@
<?php
namespace Amp\Parallel\Context\Internal;
use Amp\Loop;
use Amp\Parallel\Sync\ChannelledSocket;
use parallel\Events;
use parellel\Events\Error\Timeout;
use parallel\Future;
class ParallelHub extends ProcessHub
{
const EXIT_CHECK_FREQUENCY = 250;
/** @var ChannelledSocket[] */
private $channels;
/** @var string */
private $watcher;
/** @var Events */
private $events;
public function __construct()
{
parent::__construct();
$events = $this->events = new Events;
$this->events->setTimeout(0);
$channels = &$this->channels;
$this->watcher = Loop::repeat(self::EXIT_CHECK_FREQUENCY, static function () use (&$channels, $events): void {
try {
while ($event = $events->poll()) {
$id = (int) $event->source;
\assert(isset($channels[$id]), 'Channel for context ID not found');
$channels[$id]->close();
}
} catch (Timeout $exception) {
// Ignore poll timeout.
}
});
Loop::disable($this->watcher);
Loop::unreference($this->watcher);
}
public function add(int $id, ChannelledSocket $channel, Future $future): void
{
$this->channels[$id] = $channel;
$this->events->addFuture((string) $id, $future);
Loop::enable($this->watcher);
}
public function remove(int $id): void
{
if (!isset($this->channels[$id])) {
return;
}
unset($this->channels[$id]);
$this->events->remove((string) $id);
if (empty($this->channels)) {
Loop::disable($this->watcher);
}
}
}

View File

@ -10,7 +10,7 @@ use Amp\Promise;
use Amp\TimeoutException;
use function Amp\call;
final class ProcessHub
class ProcessHub
{
const PROCESS_START_TIMEOUT = 5000;
const KEY_RECEIVE_TIMEOUT = 1000;

View File

@ -7,7 +7,6 @@ use Amp\Parallel\Sync\ChannelledSocket;
use Amp\Parallel\Sync\ExitResult;
use Amp\Parallel\Sync\SynchronizationError;
use Amp\Promise;
use parallel\Events;
use parallel\Runtime;
use function Amp\call;
@ -25,15 +24,6 @@ final class Parallel implements Context
/** @var int Next thread ID. */
private static $nextId = 1;
/** @var Events|null */
private static $events;
/** @var ChannelledSocket[] */
private static $channels = [];
/** @var string|null */
private static $watcher;
/** @var Internal\ProcessHub */
private $hub;
@ -94,21 +84,16 @@ final class Parallel implements Context
*/
public function __construct($script)
{
$this->hub = Loop::getState(self::class);
if (!$this->hub instanceof Internal\ProcessHub) {
$this->hub = new Internal\ProcessHub;
Loop::setState(self::class, $this->hub);
}
if (self::$events === null) {
self::$events = new Events;
self::$events->setTimeout(0);
}
if (!self::isSupported()) {
throw new \Error("The parallel extension is required to create parallel threads.");
}
$this->hub = Loop::getState(self::class);
if (!$this->hub instanceof Internal\ParallelHub) {
$this->hub = new Internal\ParallelHub();
Loop::setState(self::class, $this->hub);
}
if (\is_array($script)) {
$this->script = (string) \array_shift($script);
$this->args = \array_values(\array_map("strval", $script));
@ -184,15 +169,6 @@ final class Parallel implements Context
throw new StatusError('The thread has already been started.');
}
if (self::$watcher === null) {
self::$watcher = Loop::repeat(self::EXIT_CHECK_FREQUENCY, static function () {
while ($event = self::$events->poll()) {
self::$channels[(int) $event->source]->close();
}
});
Loop::unreference(self::$watcher);
}
$this->oid = \getmypid();
$this->runtime = new Runtime(self::$autoloadPath);
@ -241,8 +217,7 @@ final class Parallel implements Context
return call(function () use ($future) {
try {
$this->channel = yield $this->hub->accept($this->id);
self::$channels[$this->id] = $this->channel;
self::$events->addFuture((string) $this->id, $future);
$this->hub->add($this->id, $this->channel, $future);
} catch (\Throwable $exception) {
$this->kill();
throw new ContextException("Starting the parallel runtime failed", 0, $exception);
@ -285,15 +260,7 @@ final class Parallel implements Context
$this->channel = null;
if (isset(self::$channels[$this->id])) {
unset(self::$channels[$this->id]);
self::$events->remove((string) $this->id);
}
if (empty(self::$channels) && self::$watcher !== null) {
Loop::cancel(self::$watcher);
self::$watcher = null;
}
$this->hub->remove($this->id);
}
/**