diff --git a/examples/iterators/emitter-gc.php b/examples/iterators/emitter-gc.php index 51b8d20..be7da5c 100644 --- a/examples/iterators/emitter-gc.php +++ b/examples/iterators/emitter-gc.php @@ -23,7 +23,6 @@ Loop::run(function () { $iterator = $emitter->iterate(); yield $iterator->advance(); - yield $iterator->advance(); yield new Amp\Delayed(0); unset($emitter, $iterator); diff --git a/lib/Internal/Producer.php b/lib/Internal/Producer.php index c96dbbd..5bb9c1f 100644 --- a/lib/Internal/Producer.php +++ b/lib/Internal/Producer.php @@ -28,7 +28,10 @@ trait Producer private $backPressure = []; /** @var int */ - private $position = -1; + private $consumePosition = -1; + + /** @var int */ + private $emitPosition = -1; /** @var \Amp\Deferred|null */ private $waiting; @@ -45,20 +48,15 @@ trait Producer throw new \Error("The prior promise returned must resolve before invoking this method again"); } - if (isset($this->backPressure[$this->position])) { - $future = $this->backPressure[$this->position]; - unset($this->values[$this->position], $this->backPressure[$this->position]); - $future->resolve(); - } + unset($this->values[$this->consumePosition]); - ++$this->position; + $position = ++$this->consumePosition; - if (\array_key_exists($this->position, $this->values)) { - if (isset($this->backPressure[$this->position])) { - $future = $this->backPressure[$this->position]; - unset($this->backPressure[$this->position]); - $future->resolve(); - } + if (\array_key_exists($position, $this->values)) { + \assert(isset($this->backPressure[$position])); + $deferred = $this->backPressure[$position]; + unset($this->backPressure[$position]); + $deferred->resolve(); return new Success(true); } @@ -68,13 +66,6 @@ trait Producer } $this->waiting = new Deferred; - $this->waiting->promise()->onResolve(function () { - if (isset($this->backPressure[$this->position])) { - $future = $this->backPressure[$this->position]; - unset($this->backPressure[$this->position]); - $future->resolve(); - } - }); return $this->waiting->promise(); } @@ -88,16 +79,15 @@ trait Producer throw new \Error("The iterator has completed"); } - if (!\array_key_exists($this->position, $this->values)) { + if (!\array_key_exists($this->consumePosition, $this->values)) { throw new \Error("Promise returned from advance() must resolve before calling this method"); } - return $this->values[$this->position]; + return $this->values[$this->consumePosition]; } /** - * Emits a value from the iterator. The returned promise is resolved with the emitted value once all listeners - * have been invoked. + * Emits a value from the iterator. The returned promise is resolved once the emitted value has been consumed. * * @param mixed $value * @@ -137,15 +127,19 @@ trait Producer return $deferred->promise(); } - $this->values[] = $value; - $this->backPressure[] = $pressure = new Deferred; + $position = ++$this->emitPosition; + + $this->values[$position] = $value; if ($this->waiting !== null) { $waiting = $this->waiting; $this->waiting = null; $waiting->resolve(true); + return new Success; // Consumer was already waiting for a new value, so back-pressure is unnecessary. } + $this->backPressure[$position] = $pressure = new Deferred; + return $pressure->promise(); } diff --git a/test/ProducerTest.php b/test/ProducerTest.php index 3451627..0ce3f72 100644 --- a/test/ProducerTest.php +++ b/test/ProducerTest.php @@ -100,7 +100,7 @@ class ProducerTest extends BaseTest } }); - $this->assertGreaterThan(self::TIMEOUT * $emits - 1 /* 1ms grace period */, $time * 1000); + $this->assertGreaterThan(self::TIMEOUT * ($emits - 1), $time * 1000); } /**