mirror of
https://github.com/danog/amp.git
synced 2024-12-03 09:57:51 +01:00
Fix backpressure release
Previously, backpressure was only released once advance() was called again. This means the last backpressure item was never free'd in case the caller knows there are no more items to consume and didn't call advance() the last time.
This commit is contained in:
parent
27ea578649
commit
c45cd5a3a9
@ -54,6 +54,12 @@ trait Producer
|
||||
++$this->position;
|
||||
|
||||
if (\array_key_exists($this->position, $this->values)) {
|
||||
if (isset($this->backPressure[$this->position])) {
|
||||
$future = $this->backPressure[$this->position];
|
||||
unset($this->backPressure[$this->position]);
|
||||
$future->resolve();
|
||||
}
|
||||
|
||||
return new Success(true);
|
||||
}
|
||||
|
||||
@ -62,6 +68,14 @@ trait Producer
|
||||
}
|
||||
|
||||
$this->waiting = new Deferred;
|
||||
$this->waiting->promise()->onResolve(function () {
|
||||
if (isset($this->backPressure[$this->position])) {
|
||||
$future = $this->backPressure[$this->position];
|
||||
unset($this->backPressure[$this->position]);
|
||||
$future->resolve();
|
||||
}
|
||||
});
|
||||
|
||||
return $this->waiting->promise();
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user