1
0
mirror of https://github.com/danog/amp.git synced 2024-12-02 17:37:50 +01:00

Only create deferred for backpressure if necessary

This commit is contained in:
Aaron Piotrowski 2019-09-20 23:39:28 -05:00 committed by Niklas Keller
parent c45cd5a3a9
commit a726db92a5
3 changed files with 21 additions and 28 deletions

View File

@ -23,7 +23,6 @@ Loop::run(function () {
$iterator = $emitter->iterate(); $iterator = $emitter->iterate();
yield $iterator->advance(); yield $iterator->advance();
yield $iterator->advance();
yield new Amp\Delayed(0); yield new Amp\Delayed(0);
unset($emitter, $iterator); unset($emitter, $iterator);

View File

@ -28,7 +28,10 @@ trait Producer
private $backPressure = []; private $backPressure = [];
/** @var int */ /** @var int */
private $position = -1; private $consumePosition = -1;
/** @var int */
private $emitPosition = -1;
/** @var \Amp\Deferred|null */ /** @var \Amp\Deferred|null */
private $waiting; private $waiting;
@ -45,20 +48,15 @@ trait Producer
throw new \Error("The prior promise returned must resolve before invoking this method again"); throw new \Error("The prior promise returned must resolve before invoking this method again");
} }
if (isset($this->backPressure[$this->position])) { unset($this->values[$this->consumePosition]);
$future = $this->backPressure[$this->position];
unset($this->values[$this->position], $this->backPressure[$this->position]);
$future->resolve();
}
++$this->position; $position = ++$this->consumePosition;
if (\array_key_exists($this->position, $this->values)) { if (\array_key_exists($position, $this->values)) {
if (isset($this->backPressure[$this->position])) { \assert(isset($this->backPressure[$position]));
$future = $this->backPressure[$this->position]; $deferred = $this->backPressure[$position];
unset($this->backPressure[$this->position]); unset($this->backPressure[$position]);
$future->resolve(); $deferred->resolve();
}
return new Success(true); return new Success(true);
} }
@ -68,13 +66,6 @@ trait Producer
} }
$this->waiting = new Deferred; $this->waiting = new Deferred;
$this->waiting->promise()->onResolve(function () {
if (isset($this->backPressure[$this->position])) {
$future = $this->backPressure[$this->position];
unset($this->backPressure[$this->position]);
$future->resolve();
}
});
return $this->waiting->promise(); return $this->waiting->promise();
} }
@ -88,16 +79,15 @@ trait Producer
throw new \Error("The iterator has completed"); throw new \Error("The iterator has completed");
} }
if (!\array_key_exists($this->position, $this->values)) { if (!\array_key_exists($this->consumePosition, $this->values)) {
throw new \Error("Promise returned from advance() must resolve before calling this method"); throw new \Error("Promise returned from advance() must resolve before calling this method");
} }
return $this->values[$this->position]; return $this->values[$this->consumePosition];
} }
/** /**
* Emits a value from the iterator. The returned promise is resolved with the emitted value once all listeners * Emits a value from the iterator. The returned promise is resolved once the emitted value has been consumed.
* have been invoked.
* *
* @param mixed $value * @param mixed $value
* *
@ -137,15 +127,19 @@ trait Producer
return $deferred->promise(); return $deferred->promise();
} }
$this->values[] = $value; $position = ++$this->emitPosition;
$this->backPressure[] = $pressure = new Deferred;
$this->values[$position] = $value;
if ($this->waiting !== null) { if ($this->waiting !== null) {
$waiting = $this->waiting; $waiting = $this->waiting;
$this->waiting = null; $this->waiting = null;
$waiting->resolve(true); $waiting->resolve(true);
return new Success; // Consumer was already waiting for a new value, so back-pressure is unnecessary.
} }
$this->backPressure[$position] = $pressure = new Deferred;
return $pressure->promise(); return $pressure->promise();
} }

View File

@ -100,7 +100,7 @@ class ProducerTest extends BaseTest
} }
}); });
$this->assertGreaterThan(self::TIMEOUT * $emits - 1 /* 1ms grace period */, $time * 1000); $this->assertGreaterThan(self::TIMEOUT * ($emits - 1), $time * 1000);
} }
/** /**