diff --git a/lib/AsyncGenerator.php b/lib/AsyncGenerator.php index 0f6ca82..b6aeb58 100644 --- a/lib/AsyncGenerator.php +++ b/lib/AsyncGenerator.php @@ -56,17 +56,15 @@ final class AsyncGenerator implements Stream } /** - * @return TransformationStream + * @inheritDoc */ - public function transform(): TransformationStream + public function transform(callable $operator = null): TransformationStream { - return $this->generator->transform(); + return $this->generator->transform($operator); } /** - * Continues the async generator, resolving the back-pressure promise with null. - * - * @return Promise + * @inheritDoc */ public function continue(): Promise { diff --git a/lib/Internal/AutoDisposingGenerator.php b/lib/Internal/AutoDisposingGenerator.php index 8e92b8d..76c6567 100644 --- a/lib/Internal/AutoDisposingGenerator.php +++ b/lib/Internal/AutoDisposingGenerator.php @@ -27,11 +27,7 @@ class AutoDisposingGenerator extends AutoDisposingStream implements GeneratorStr } /** - * @param mixed $value - * - * @psalm-param TSend $value - * - * @return Promise + * @inheritDoc */ public function send($value): Promise { @@ -39,9 +35,7 @@ class AutoDisposingGenerator extends AutoDisposingStream implements GeneratorStr } /** - * @param \Throwable $exception - * - * @return Promise + * @inheritDoc */ public function throw(\Throwable $exception): Promise { diff --git a/lib/Internal/AutoDisposingStream.php b/lib/Internal/AutoDisposingStream.php index 6152f4a..eba5c52 100644 --- a/lib/Internal/AutoDisposingStream.php +++ b/lib/Internal/AutoDisposingStream.php @@ -31,7 +31,7 @@ class AutoDisposingStream implements Stream } /** - * @return Promise + * @inheritDoc */ public function continue(): Promise { @@ -39,7 +39,7 @@ class AutoDisposingStream implements Stream } /** - * @return void + * @inheritDoc */ public function dispose() { @@ -47,10 +47,10 @@ class AutoDisposingStream implements Stream } /** - * @return TransformationStream + * @inheritDoc */ - public function transform(): TransformationStream + public function transform(callable $operator = null): TransformationStream { - return $this->stream->transform(); + return $this->stream->transform($operator); } } diff --git a/lib/Internal/Yielder.php b/lib/Internal/Yielder.php index c336fac..65e3b89 100644 --- a/lib/Internal/Yielder.php +++ b/lib/Internal/Yielder.php @@ -131,9 +131,9 @@ trait Yielder return $deferred->promise(); } - public function transform(): TransformationStream + public function transform(callable $operator = null): TransformationStream { - return new TransformationStream($this); + return new TransformationStream($this, $operator); } private function createStream(): Stream diff --git a/lib/Stream.php b/lib/Stream.php index 82e1c8c..9f75516 100644 --- a/lib/Stream.php +++ b/lib/Stream.php @@ -29,7 +29,9 @@ interface Stream /** * Returns a stream object with fluent transformation methods. * + * @param callable(TransformationStream):Stream $operator + * * @return TransformationStream */ - public function transform(): TransformationStream; + public function transform(callable $operator = null): TransformationStream; } diff --git a/lib/TransformationStream.php b/lib/TransformationStream.php index 10b5a46..96eeef7 100644 --- a/lib/TransformationStream.php +++ b/lib/TransformationStream.php @@ -11,9 +11,13 @@ final class TransformationStream implements Stream /** @var Stream */ private $stream; - public function __construct(Stream $stream) + public function __construct(Stream $stream, callable $operator = null) { - $this->stream = $stream; + $this->stream = $stream instanceof self ? $stream->stream : $stream; + + if ($operator !== null) { + $this->stream = $this->apply($operator); + } } public function continue(): Promise @@ -26,9 +30,24 @@ final class TransformationStream implements Stream $this->stream->dispose(); } - public function transform(): self + public function transform(callable $operator = null): self { - return $this; + if ($operator === null) { + return $this; + } + + return new self($this->apply($operator)); + } + + private function apply(callable $operator): self + { + $stream = $operator($this); + + if ($stream instanceof Stream) { + throw new \TypeError('$operator must return an instance of ' . Stream::class); + } + + return $stream; } /** @@ -87,7 +106,7 @@ final class TransformationStream implements Stream return new self(new AsyncGenerator(function (callable $yield) use ($count) { $skipped = 0; while (list($value) = yield $this->stream->continue()) { - if (++$skipped < $count) { + if (++$skipped <= $count) { continue; }