1
0
mirror of https://github.com/danog/amp.git synced 2024-12-04 18:38:17 +01:00

Dispose after all pending promises are fulfilled

Allows continue() to be called, then dispose(), but the pipeline is not actually disposed until those pending promises are fulfilled.
This commit is contained in:
Aaron Piotrowski 2020-08-24 16:28:08 -05:00
parent 5e521daa16
commit 657614c036
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
2 changed files with 62 additions and 28 deletions

View File

@ -208,14 +208,6 @@ final class EmitSource
*/ */
public function emit($value): Promise public function emit($value): Promise
{ {
if ($this->result) {
if ($this->completed) {
throw new \Error("Pipelines cannot emit values after calling complete");
}
return $this->result;
}
if ($value === null) { if ($value === null) {
throw new \TypeError("Pipelines cannot emit NULL"); throw new \TypeError("Pipelines cannot emit NULL");
} }
@ -237,10 +229,24 @@ final class EmitSource
unset($this->sendValues[$position]); unset($this->sendValues[$position]);
return $promise; return $promise;
} }
} elseif ($this->result) {
if ($this->completed) {
throw new \Error("Pipelines cannot emit values after calling complete");
}
return $this->result;
} else { } else {
$this->emittedValues[$position] = $value; $this->emittedValues[$position] = $value;
} }
if ($this->disposed) {
if (empty($this->waiting)) {
$this->triggerDisposal();
}
return Promise\succeed();
}
$this->backPressure[$position] = $deferred = new Deferred; $this->backPressure[$position] = $deferred = new Deferred;
return $deferred->promise(); return $deferred->promise();
@ -259,7 +265,7 @@ final class EmitSource
*/ */
public function isDisposed(): bool public function isDisposed(): bool
{ {
return $this->disposed; return $this->disposed && empty($this->waiting);
} }
/** /**
@ -335,33 +341,44 @@ final class EmitSource
$this->result = $result; $this->result = $result;
$waiting = $this->waiting; if ($disposed) {
$this->waiting = []; if (empty($this->waiting)) {
$this->triggerDisposal();
}
} else {
$waiting = $this->waiting;
$this->waiting = [];
foreach ($waiting as $deferred) { foreach ($waiting as $deferred) {
$deferred->resolve($result); $deferred->resolve($result);
}
}
}
private function triggerDisposal()
{
if ($this->onDisposal === null) {
return;
} }
$onDisposal = $this->onDisposal; $onDisposal = $this->onDisposal;
$this->onDisposal = null; $this->onDisposal = null;
if ($disposed) { $backPressure = $this->backPressure;
$backPressure = $this->backPressure; $this->backPressure = [];
$this->backPressure = [];
foreach ($backPressure as $deferred) { foreach ($backPressure as $deferred) {
$deferred->resolve($result); $deferred->resolve($this->result);
} }
/** @psalm-suppress PossiblyNullIterator $alreadyDisposed is a guard against $this->onDisposal being null */ /** @psalm-suppress PossiblyNullIterator $alreadyDisposed is a guard against $this->onDisposal being null */
foreach ($onDisposal as $callback) { foreach ($onDisposal as $callback) {
try { try {
$callback(); $callback();
} catch (\Throwable $e) { } catch (\Throwable $e) {
Loop::defer(static function () use ($e) { Loop::defer(static function () use ($e) {
throw $e; throw $e;
}); });
}
} }
} }
} }

View File

@ -256,6 +256,23 @@ class PipelineSourceTest extends AsyncTestCase
yield $this->source->emit(1); yield $this->source->emit(1);
} }
public function testEmitAfterDisposalWithPendingContinuePromise()
{
$pipeline = $this->source->pipe();
$promise = $pipeline->continue();
$this->source->onDisposal($this->createCallback(1));
$pipeline->dispose();
$this->source->onDisposal($this->createCallback(1));
$this->assertFalse($this->source->isDisposed());
yield $this->source->emit(1);
$this->assertSame(1, yield $promise);
$this->expectException(DisposedException::class);
$this->expectExceptionMessage('The pipeline has been disposed');
$this->assertTrue($this->source->isDisposed());
yield $this->source->emit(2);
}
public function testEmitAfterDestruct() public function testEmitAfterDestruct()
{ {