From 72b50523a3e2e9bdb674b527c947329af1d72afd Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Fri, 25 Sep 2020 12:32:37 -0500 Subject: [PATCH] Add separate method to await back pressure --- examples/pipeline/backpressure.php | 20 ++++++++++---------- examples/pipeline/fast-consumption.php | 4 ++-- lib/PipelineSource.php | 21 ++++++++++++++++++--- 3 files changed, 30 insertions(+), 15 deletions(-) diff --git a/examples/pipeline/backpressure.php b/examples/pipeline/backpressure.php index 68c21a9..1f82ffc 100644 --- a/examples/pipeline/backpressure.php +++ b/examples/pipeline/backpressure.php @@ -16,16 +16,16 @@ try { $pipeline = $source->pipe(); Promise\rethrow(async(function (PipelineSource $source): void { - await($source->emit(await(new Delayed(500, 1)))); - await($source->emit(await(new Delayed(1500, 2)))); - await($source->emit(await(new Delayed(1000, 3)))); - await($source->emit(await(new Delayed(2000, 4)))); - await($source->emit(5)); - await($source->emit(6)); - await($source->emit(7)); - await($source->emit(await(new Delayed(2000, 8)))); - await($source->emit(9)); - await($source->emit(10)); + $source->yield(await(new Delayed(500, 1))); + $source->yield(await(new Delayed(1500, 2))); + $source->yield(await(new Delayed(1000, 3))); + $source->yield(await(new Delayed(2000, 4))); + $source->yield(5); + $source->yield(6); + $source->yield(7); + $source->yield(await(new Delayed(2000, 8))); + $source->yield(9); + $source->yield(10); $source->complete(); }, $source)); diff --git a/examples/pipeline/fast-consumption.php b/examples/pipeline/fast-consumption.php index 1d9feea..87e9e39 100644 --- a/examples/pipeline/fast-consumption.php +++ b/examples/pipeline/fast-consumption.php @@ -23,9 +23,9 @@ try { yield await(new Delayed(600, 10)); }); - // Flow listener attempts to consume 11 values at once. Only 10 will be emitted. + // Pipeline consumer attempts to consume 11 values at once. Only 10 will be emitted. $promises = []; - for ($i = 0; $i < 11 && ($promises[] = async(fn () => $pipeline->continue())); ++$i); + for ($i = 0; $i < 11 && ($promises[] = async(fn (): ?int => $pipeline->continue())); ++$i); foreach ($promises as $key => $promise) { if (null === $yielded = await($promise)) { diff --git a/lib/PipelineSource.php b/lib/PipelineSource.php index 0649e6a..4cd3f9c 100644 --- a/lib/PipelineSource.php +++ b/lib/PipelineSource.php @@ -34,20 +34,35 @@ final class PipelineSource } /** - * Emits a value to the pipeline. + * Emits a value to the pipeline, returning a promise that is resolved once the emitted value is consumed. + * Use {@see yield()} to wait until the value is consumed or use {@see await()} on the promise returned + * to wait at a later time. * * @param mixed $value * * @psalm-param TValue $value * * @return Promise Resolves with null when the emitted value has been consumed or fails with - * {@see DisposedException} if the pipeline has been destroyed. + * {@see DisposedException} if the pipeline has been disposed. */ - public function emit($value): Promise + public function emit(mixed $value): Promise { return $this->source->emit($value); } + /** + * Emits a value to the pipeline and does not return until the emitted value is consumed. + * Use {@see emit()} to emit a value without waiting for the value to be consumed. + * + * @param mixed $value + * + * @throws DisposedException Thrown if the pipeline is disposed. + */ + public function yield(mixed $value): void + { + await($this->source->emit($value)); + } + /** * @return bool True if the pipeline has been completed or failed. */