mirror of
https://github.com/danog/amp.git
synced 2024-11-27 04:24:42 +01:00
Throw on consecutive calls to Listener::advance(); better property names
This commit is contained in:
parent
cf5ebb1408
commit
2976ebdbb5
@ -22,13 +22,13 @@ class Listener {
|
||||
private $values = [];
|
||||
|
||||
/** @var \Amp\Deferred[] */
|
||||
private $deferreds = [];
|
||||
private $backPressure = [];
|
||||
|
||||
/** @var int */
|
||||
private $position = -1;
|
||||
|
||||
/** @var \Amp\Deferred|null */
|
||||
private $deferred;
|
||||
private $waiting;
|
||||
|
||||
/** @var bool */
|
||||
private $resolved = false;
|
||||
@ -45,19 +45,19 @@ class Listener {
|
||||
public function __construct(Stream $stream) {
|
||||
$this->stream = $stream;
|
||||
|
||||
$deferred = &$this->deferred;
|
||||
$waiting = &$this->waiting;
|
||||
$values = &$this->values;
|
||||
$deferreds = &$this->deferreds;
|
||||
$backPressure = &$this->backPressure;
|
||||
$resolved = &$this->resolved;
|
||||
|
||||
$this->stream->listen(static function ($value) use (&$deferred, &$values, &$deferreds, &$resolved) {
|
||||
$this->stream->listen(static function ($value) use (&$waiting, &$values, &$backPressure, &$resolved) {
|
||||
$values[] = $value;
|
||||
$deferreds[] = $pressure = new Deferred;
|
||||
$backPressure[] = $pressure = new Deferred;
|
||||
|
||||
if ($deferred !== null) {
|
||||
$temp = $deferred;
|
||||
$deferred = null;
|
||||
$temp->resolve(true);
|
||||
if ($waiting !== null) {
|
||||
$deferred = $waiting;
|
||||
$waiting = null;
|
||||
$deferred->resolve(true);
|
||||
}
|
||||
|
||||
if ($resolved) {
|
||||
@ -70,21 +70,21 @@ class Listener {
|
||||
$result = &$this->result;
|
||||
$error = &$this->exception;
|
||||
|
||||
$this->stream->when(static function ($exception, $value) use (&$deferred, &$result, &$error, &$resolved) {
|
||||
$this->stream->when(static function ($exception, $value) use (&$waiting, &$result, &$error, &$resolved) {
|
||||
$resolved = true;
|
||||
|
||||
if ($exception) {
|
||||
$result = null;
|
||||
$error = $exception;
|
||||
if ($deferred !== null) {
|
||||
$deferred->fail($exception);
|
||||
if ($waiting !== null) {
|
||||
$waiting->fail($exception);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
$result = $value;
|
||||
if ($deferred !== null) {
|
||||
$deferred->resolve(false);
|
||||
if ($waiting !== null) {
|
||||
$waiting->resolve(false);
|
||||
}
|
||||
});
|
||||
}
|
||||
@ -95,7 +95,7 @@ class Listener {
|
||||
public function __destruct() {
|
||||
$this->resolved = true;
|
||||
|
||||
foreach ($this->deferreds as $deferred) {
|
||||
foreach ($this->backPressure as $deferred) {
|
||||
$deferred->resolve();
|
||||
}
|
||||
}
|
||||
@ -112,11 +112,17 @@ class Listener {
|
||||
* resolved. If the stream fails, the returned promise will fail with the same exception.
|
||||
*
|
||||
* @return \AsyncInterop\Promise<bool>
|
||||
*
|
||||
* @throws \Error If the prior promise returned from this method has not resolved.
|
||||
*/
|
||||
public function advance(): Promise {
|
||||
if (isset($this->deferreds[$this->position])) {
|
||||
$future = $this->deferreds[$this->position];
|
||||
unset($this->values[$this->position], $this->deferreds[$this->position]);
|
||||
if ($this->waiting !== null) {
|
||||
throw new \Error("The prior promise returned must resolve before invoking this method again");
|
||||
}
|
||||
|
||||
if (isset($this->backPressure[$this->position])) {
|
||||
$future = $this->backPressure[$this->position];
|
||||
unset($this->values[$this->position], $this->backPressure[$this->position]);
|
||||
$future->resolve();
|
||||
}
|
||||
|
||||
@ -136,8 +142,8 @@ class Listener {
|
||||
return new Success(false);
|
||||
}
|
||||
|
||||
$this->deferred = new Deferred;
|
||||
return $this->deferred->promise();
|
||||
$this->waiting = new Deferred;
|
||||
return $this->waiting->promise();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -196,8 +202,8 @@ class Listener {
|
||||
$this->values = [];
|
||||
$this->position = -1;
|
||||
|
||||
$deferreds = $this->deferreds;
|
||||
$this->deferreds = [];
|
||||
$deferreds = $this->backPressure;
|
||||
$this->backPressure = [];
|
||||
foreach ($deferreds as $deferred) {
|
||||
$deferred->resolve();
|
||||
}
|
||||
|
@ -180,4 +180,15 @@ class ListenerTest extends \PHPUnit_Framework_TestCase {
|
||||
$listener->getResult();
|
||||
}));
|
||||
}
|
||||
|
||||
/**
|
||||
* @expectedException \Error
|
||||
* @expectedExceptionMessage The prior promise returned must resolve before invoking this method again
|
||||
*/
|
||||
public function testConsecutiveAdvanceCalls() {
|
||||
$emitter = new Emitter;
|
||||
$listener = new Listener($emitter->stream());
|
||||
$listener->advance();
|
||||
$listener->advance();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user