1
0
mirror of https://github.com/danog/amp.git synced 2025-01-22 21:31:18 +01:00

Combine Future/Suspension arrays

This commit is contained in:
Aaron Piotrowski 2021-09-03 18:00:25 -05:00
parent 73fb73614e
commit a236223eac
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
2 changed files with 22 additions and 50 deletions

View File

@ -22,20 +22,17 @@ final class EmitSource
{
private bool $completed = false;
private \Throwable $exception;
private ?\Throwable $exception = null;
/** @var mixed[] */
/** @var array<int, mixed> */
private array $emittedValues = [];
/** @var [?\Throwable, mixed][] */
/** @var array<int, array{?\Throwable, mixed}> */
private array $sendValues = [];
/** @var Deferred[] */
/** @var array<int, Deferred|Suspension> */
private array $backPressure = [];
/** @var Suspension[] */
private array $yielding = [];
/** @var Suspension[] */
private array $waiting = [];
@ -94,21 +91,17 @@ final class EmitSource
$position = $this->consumePosition++ - 1;
// Relieve backpressure from prior emit.
if (isset($this->yielding[$position])) {
$suspension = $this->yielding[$position];
unset($this->yielding[$position]);
if ($exception) {
$suspension->throw($exception);
} else {
$suspension->resume($value);
}
} elseif (isset($this->backPressure[$position])) {
$deferred = $this->backPressure[$position];
if (isset($this->backPressure[$position])) {
$placeholder = $this->backPressure[$position];
unset($this->backPressure[$position]);
if ($exception) {
$deferred->error($exception);
$placeholder instanceof Suspension
? $placeholder->throw($exception)
: $placeholder->error($exception);
} else {
$deferred->complete($value);
$placeholder instanceof Suspension
? $placeholder->resume($value)
: $placeholder->complete($value);
}
} elseif ($position >= 0) {
// Send-values are indexed as $this->consumePosition - 1.
@ -124,7 +117,7 @@ final class EmitSource
}
if ($this->completed || $this->disposed) {
if (isset($this->exception)) {
if ($this->exception) {
throw $this->exception;
}
return null;
@ -294,7 +287,7 @@ final class EmitSource
++$this->emitPosition;
if ($pair === null) {
$this->yielding[$position] = $suspension = Loop::createSuspension();
$this->backPressure[$position] = $suspension = Loop::createSuspension();
return $suspension->suspend();
}
@ -408,27 +401,22 @@ final class EmitSource
*/
private function resolvePending(): void
{
$backPressure = \array_merge($this->backPressure, $this->yielding);
$backPressure = $this->backPressure;
$waiting = $this->waiting;
unset($this->waiting, $this->backPressure, $this->yielding);
unset($this->waiting, $this->backPressure);
$exception = $this->exception ?? null;
foreach ($backPressure as $placeholder) {
if ($placeholder instanceof Deferred) {
if ($exception) {
$placeholder->error($this->exception);
} else {
$placeholder->complete(null);
}
continue;
}
if ($exception) {
$placeholder->throw($exception);
$placeholder instanceof Suspension
? $placeholder->throw($exception)
: $placeholder->error($exception);
} else {
$placeholder->resume(null);
$placeholder instanceof Suspension
? $placeholder->resume(null)
: $placeholder->complete(null);
}
}

View File

@ -1,16 +0,0 @@
<?xml version="1.0"?>
<psalm
totallyTyped="true"
phpVersion="7.0"
resolveFromConfigFile="true"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="https://getpsalm.org/schema/config"
xsi:schemaLocation="https://getpsalm.org/schema/config vendor/vimeo/psalm/config.xsd"
>
<projectFiles>
<directory name="examples/psalm"/>
<ignoreFiles>
<directory name="vendor"/>
</ignoreFiles>
</projectFiles>
</psalm>