1
0
mirror of https://github.com/danog/amp.git synced 2025-01-22 21:31:18 +01:00

Make explicit disposal fail pending promises

Destruction of the pipeline does not fail pending promises, but calling dispose() now will.
This commit is contained in:
Aaron Piotrowski 2020-08-27 12:45:48 -05:00
parent 657614c036
commit 9a13937fef
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
4 changed files with 82 additions and 26 deletions

View File

@ -49,7 +49,7 @@ final class AsyncGenerator implements Pipeline
public function __destruct()
{
$this->source->dispose();
$this->source->destroy();
}
/**

View File

@ -6,8 +6,8 @@ use Amp\Pipeline;
use Amp\Promise;
/**
* Wraps a Pipeline instance that has public methods to emit, complete, and fail into an object that only allows
* access to the public API methods and sets $disposed to true when the object is destroyed.
* Wraps an EmitSource instance that has public methods to emit, complete, and fail into an object that only allows
* access to the public API methods and automatically calls EmitSource::destroy() when the object is destroyed.
*
* @internal
*
@ -26,7 +26,7 @@ final class AutoDisposingPipeline implements Pipeline
public function __destruct()
{
$this->source->dispose();
$this->source->destroy();
}
/**

View File

@ -156,11 +156,27 @@ final class EmitSource
*/
public function dispose()
{
$this->cancel(true);
}
public function destroy()
{
$this->cancel(false);
}
private function cancel(bool $cancelPending)
{
try {
if ($this->result) {
return; // Pipeline already completed or failed.
}
$this->finalize(Promise\fail(new DisposedException), true);
} finally {
if ($this->disposed && $cancelPending) {
$this->triggerDisposal();
}
}
}
/**
@ -341,22 +357,42 @@ final class EmitSource
$this->result = $result;
if ($disposed) {
if ($this->disposed) {
if (empty($this->waiting)) {
$this->triggerDisposal();
}
} else {
$this->resolvePending();
}
}
/**
* Resolves all pending promises returned from {@see continue()} with the result promise.
*/
private function resolvePending()
{
$backPressure = $this->backPressure;
$this->backPressure = [];
$waiting = $this->waiting;
$this->waiting = [];
foreach ($waiting as $deferred) {
$deferred->resolve($result);
foreach ($backPressure as $deferred) {
$deferred->resolve($this->result);
}
foreach ($waiting as $deferred) {
$deferred->resolve($this->result);
}
}
/**
* Invokes all pending {@see onDisposal()} callbacks and fails pending {@see continue()} promises.
*/
private function triggerDisposal()
{
\assert($this->disposed, "Pipeline was not disposed on triggering disposal");
if ($this->onDisposal === null) {
return;
}
@ -364,12 +400,7 @@ final class EmitSource
$onDisposal = $this->onDisposal;
$this->onDisposal = null;
$backPressure = $this->backPressure;
$this->backPressure = [];
foreach ($backPressure as $deferred) {
$deferred->resolve($this->result);
}
$this->resolvePending();
/** @psalm-suppress PossiblyNullIterator $alreadyDisposed is a guard against $this->onDisposal being null */
foreach ($onDisposal as $callback) {

View File

@ -244,7 +244,6 @@ class PipelineSourceTest extends AsyncTestCase
public function testEmitAfterDisposal()
{
$this->expectException(DisposedException::class);
$this->expectExceptionMessage('The pipeline has been disposed');
$pipeline = $this->source->pipe();
$promise = $this->source->emit(1);
@ -256,28 +255,54 @@ class PipelineSourceTest extends AsyncTestCase
yield $this->source->emit(1);
}
public function testEmitAfterDisposalWithPendingContinuePromise()
public function testEmitAfterAutomaticDisposal()
{
$this->expectException(DisposedException::class);
$pipeline = $this->source->pipe();
$promise = $this->source->emit(1);
$this->source->onDisposal($this->createCallback(1));
unset($pipeline); // Trigger automatic disposal.
$this->source->onDisposal($this->createCallback(1));
$this->assertTrue($this->source->isDisposed());
$this->assertNull(yield $promise);
yield $this->source->emit(1);
}
public function testEmitAfterAutomaticDisposalWithPendingContinuePromise()
{
$pipeline = $this->source->pipe();
$promise = $pipeline->continue();
$this->source->onDisposal($this->createCallback(1));
$pipeline->dispose();
unset($pipeline); // Trigger automatic disposal.
$this->source->onDisposal($this->createCallback(1));
$this->assertFalse($this->source->isDisposed());
yield $this->source->emit(1);
$this->assertSame(1, yield $promise);
$this->expectException(DisposedException::class);
$this->expectExceptionMessage('The pipeline has been disposed');
$this->assertTrue($this->source->isDisposed());
yield $this->source->emit(2);
}
public function testEmitAfterExplicitDisposalWithPendingContinuePromise()
{
$pipeline = $this->source->pipe();
$promise = $pipeline->continue();
$this->source->onDisposal($this->createCallback(1));
$pipeline->dispose();
$this->source->onDisposal($this->createCallback(1));
$this->assertTrue($this->source->isDisposed());
$this->expectException(DisposedException::class);
$this->assertSame(1, yield $promise);
}
public function testEmitAfterDestruct()
{
$this->expectException(DisposedException::class);
$this->expectExceptionMessage('The pipeline has been disposed');
$pipeline = $this->source->pipe();
$promise = $this->source->emit(1);