From c45cd5a3a9eb8bd1d1b53e3e82a9b2853a7f095f Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Fri, 20 Sep 2019 22:41:20 +0200 Subject: [PATCH] Fix backpressure release Previously, backpressure was only released once advance() was called again. This means the last backpressure item was never free'd in case the caller knows there are no more items to consume and didn't call advance() the last time. --- lib/Internal/Producer.php | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/lib/Internal/Producer.php b/lib/Internal/Producer.php index 417ce6f..c96dbbd 100644 --- a/lib/Internal/Producer.php +++ b/lib/Internal/Producer.php @@ -54,6 +54,12 @@ trait Producer ++$this->position; if (\array_key_exists($this->position, $this->values)) { + if (isset($this->backPressure[$this->position])) { + $future = $this->backPressure[$this->position]; + unset($this->backPressure[$this->position]); + $future->resolve(); + } + return new Success(true); } @@ -62,6 +68,14 @@ trait Producer } $this->waiting = new Deferred; + $this->waiting->promise()->onResolve(function () { + if (isset($this->backPressure[$this->position])) { + $future = $this->backPressure[$this->position]; + unset($this->backPressure[$this->position]); + $future->resolve(); + } + }); + return $this->waiting->promise(); }