1
0
mirror of https://github.com/danog/amp.git synced 2024-12-04 10:28:01 +01:00

Add separate method to await back pressure

This commit is contained in:
Aaron Piotrowski 2020-09-25 12:32:37 -05:00
parent d48e6bd5d2
commit 72b50523a3
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
3 changed files with 30 additions and 15 deletions

View File

@ -16,16 +16,16 @@ try {
$pipeline = $source->pipe(); $pipeline = $source->pipe();
Promise\rethrow(async(function (PipelineSource $source): void { Promise\rethrow(async(function (PipelineSource $source): void {
await($source->emit(await(new Delayed(500, 1)))); $source->yield(await(new Delayed(500, 1)));
await($source->emit(await(new Delayed(1500, 2)))); $source->yield(await(new Delayed(1500, 2)));
await($source->emit(await(new Delayed(1000, 3)))); $source->yield(await(new Delayed(1000, 3)));
await($source->emit(await(new Delayed(2000, 4)))); $source->yield(await(new Delayed(2000, 4)));
await($source->emit(5)); $source->yield(5);
await($source->emit(6)); $source->yield(6);
await($source->emit(7)); $source->yield(7);
await($source->emit(await(new Delayed(2000, 8)))); $source->yield(await(new Delayed(2000, 8)));
await($source->emit(9)); $source->yield(9);
await($source->emit(10)); $source->yield(10);
$source->complete(); $source->complete();
}, $source)); }, $source));

View File

@ -23,9 +23,9 @@ try {
yield await(new Delayed(600, 10)); yield await(new Delayed(600, 10));
}); });
// Flow listener attempts to consume 11 values at once. Only 10 will be emitted. // Pipeline consumer attempts to consume 11 values at once. Only 10 will be emitted.
$promises = []; $promises = [];
for ($i = 0; $i < 11 && ($promises[] = async(fn () => $pipeline->continue())); ++$i); for ($i = 0; $i < 11 && ($promises[] = async(fn (): ?int => $pipeline->continue())); ++$i);
foreach ($promises as $key => $promise) { foreach ($promises as $key => $promise) {
if (null === $yielded = await($promise)) { if (null === $yielded = await($promise)) {

View File

@ -34,20 +34,35 @@ final class PipelineSource
} }
/** /**
* Emits a value to the pipeline. * Emits a value to the pipeline, returning a promise that is resolved once the emitted value is consumed.
* Use {@see yield()} to wait until the value is consumed or use {@see await()} on the promise returned
* to wait at a later time.
* *
* @param mixed $value * @param mixed $value
* *
* @psalm-param TValue $value * @psalm-param TValue $value
* *
* @return Promise<null> Resolves with null when the emitted value has been consumed or fails with * @return Promise<null> Resolves with null when the emitted value has been consumed or fails with
* {@see DisposedException} if the pipeline has been destroyed. * {@see DisposedException} if the pipeline has been disposed.
*/ */
public function emit($value): Promise public function emit(mixed $value): Promise
{ {
return $this->source->emit($value); return $this->source->emit($value);
} }
/**
* Emits a value to the pipeline and does not return until the emitted value is consumed.
* Use {@see emit()} to emit a value without waiting for the value to be consumed.
*
* @param mixed $value
*
* @throws DisposedException Thrown if the pipeline is disposed.
*/
public function yield(mixed $value): void
{
await($this->source->emit($value));
}
/** /**
* @return bool True if the pipeline has been completed or failed. * @return bool True if the pipeline has been completed or failed.
*/ */