diff --git a/lib/Listener.php b/lib/Listener.php index 1973cc3..95a3180 100644 --- a/lib/Listener.php +++ b/lib/Listener.php @@ -22,13 +22,13 @@ class Listener { private $values = []; /** @var \Amp\Deferred[] */ - private $deferreds = []; + private $backPressure = []; /** @var int */ private $position = -1; /** @var \Amp\Deferred|null */ - private $deferred; + private $waiting; /** @var bool */ private $resolved = false; @@ -45,19 +45,19 @@ class Listener { public function __construct(Stream $stream) { $this->stream = $stream; - $deferred = &$this->deferred; + $waiting = &$this->waiting; $values = &$this->values; - $deferreds = &$this->deferreds; + $backPressure = &$this->backPressure; $resolved = &$this->resolved; - $this->stream->listen(static function ($value) use (&$deferred, &$values, &$deferreds, &$resolved) { + $this->stream->listen(static function ($value) use (&$waiting, &$values, &$backPressure, &$resolved) { $values[] = $value; - $deferreds[] = $pressure = new Deferred; + $backPressure[] = $pressure = new Deferred; - if ($deferred !== null) { - $temp = $deferred; - $deferred = null; - $temp->resolve(true); + if ($waiting !== null) { + $deferred = $waiting; + $waiting = null; + $deferred->resolve(true); } if ($resolved) { @@ -70,21 +70,21 @@ class Listener { $result = &$this->result; $error = &$this->exception; - $this->stream->when(static function ($exception, $value) use (&$deferred, &$result, &$error, &$resolved) { + $this->stream->when(static function ($exception, $value) use (&$waiting, &$result, &$error, &$resolved) { $resolved = true; if ($exception) { $result = null; $error = $exception; - if ($deferred !== null) { - $deferred->fail($exception); + if ($waiting !== null) { + $waiting->fail($exception); } return; } $result = $value; - if ($deferred !== null) { - $deferred->resolve(false); + if ($waiting !== null) { + $waiting->resolve(false); } }); } @@ -95,7 +95,7 @@ class Listener { public function __destruct() { $this->resolved = true; - foreach ($this->deferreds as $deferred) { + foreach ($this->backPressure as $deferred) { $deferred->resolve(); } } @@ -112,11 +112,17 @@ class Listener { * resolved. If the stream fails, the returned promise will fail with the same exception. * * @return \AsyncInterop\Promise + * + * @throws \Error If the prior promise returned from this method has not resolved. */ public function advance(): Promise { - if (isset($this->deferreds[$this->position])) { - $future = $this->deferreds[$this->position]; - unset($this->values[$this->position], $this->deferreds[$this->position]); + if ($this->waiting !== null) { + throw new \Error("The prior promise returned must resolve before invoking this method again"); + } + + if (isset($this->backPressure[$this->position])) { + $future = $this->backPressure[$this->position]; + unset($this->values[$this->position], $this->backPressure[$this->position]); $future->resolve(); } @@ -136,8 +142,8 @@ class Listener { return new Success(false); } - $this->deferred = new Deferred; - return $this->deferred->promise(); + $this->waiting = new Deferred; + return $this->waiting->promise(); } /** @@ -196,8 +202,8 @@ class Listener { $this->values = []; $this->position = -1; - $deferreds = $this->deferreds; - $this->deferreds = []; + $deferreds = $this->backPressure; + $this->backPressure = []; foreach ($deferreds as $deferred) { $deferred->resolve(); } diff --git a/test/ObserverTest.php b/test/ListenerTest.php similarity index 93% rename from test/ObserverTest.php rename to test/ListenerTest.php index be1458e..39ae5f8 100644 --- a/test/ObserverTest.php +++ b/test/ListenerTest.php @@ -180,4 +180,15 @@ class ListenerTest extends \PHPUnit_Framework_TestCase { $listener->getResult(); })); } + + /** + * @expectedException \Error + * @expectedExceptionMessage The prior promise returned must resolve before invoking this method again + */ + public function testConsecutiveAdvanceCalls() { + $emitter = new Emitter; + $listener = new Listener($emitter->stream()); + $listener->advance(); + $listener->advance(); + } }