1
0
mirror of https://github.com/danog/parallel.git synced 2024-11-30 04:39:01 +01:00

Use Future::select()

This commit is contained in:
Aaron Piotrowski 2019-02-18 13:05:43 -06:00
parent 36d3a3d963
commit b8b73cdea3
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB

View File

@ -7,8 +7,8 @@ use Amp\Parallel\Sync\ChannelledSocket;
use Amp\Parallel\Sync\ExitResult;
use Amp\Parallel\Sync\SynchronizationError;
use Amp\Promise;
use parallel\Future;
use parallel\Runtime;
use parallel\TimeoutException as ParallelTimeoutException;
use function Amp\call;
/**
@ -29,9 +29,12 @@ final class Parallel implements Context
/** @var int Next ID to be used for IPC hub. */
private static $nextId = 1;
/** @var array Array of [\parallel\Future, ChannelledSocket] pairs. */
/** @var Future[] */
private static $futures = [];
/** @var ChannelledSocket[] */
private static $channels = [];
/** @var string|null */
private static $watcher;
@ -182,14 +185,13 @@ final class Parallel implements Context
if (self::$watcher === null) {
self::$watcher = Loop::repeat(self::EXIT_CHECK_FREQUENCY, static function () {
foreach (self::$futures as list($future, $channel)) {
try {
$future->value(0);
} catch (ParallelTimeoutException $exception) {
// Ignore timeout that just means the thread is still running.
} catch (\Throwable $exception) {
$channel->close();
}
$futures = self::$futures;
$resolved = $errored = $timedout = [];
Future::select($futures, $resolved, $errored, $timedout, 0);
foreach ($errored as $id => $future) {
self::$channels[$id]->close();
}
});
Loop::unreference(self::$watcher);
@ -235,7 +237,8 @@ final class Parallel implements Context
return call(function () use ($future) {
try {
$this->channel = $channel = yield $this->hub->accept($this->id);
self::$futures[$this->id] = [$future, $channel];
self::$futures[$this->id] = $future;
self::$channels[$this->id] = $channel;
} catch (\Throwable $exception) {
$this->kill();
throw new ContextException("Starting the parallel runtime failed", 0, $exception);
@ -268,7 +271,7 @@ final class Parallel implements Context
*/
private function close()
{
unset(self::$futures[$this->id]);
unset(self::$futures[$this->id], self::$channels[$this->id]);
$this->runtime = null;