diff --git a/lib/AsyncGenerator.php b/lib/AsyncGenerator.php index b6aeb58..1086373 100644 --- a/lib/AsyncGenerator.php +++ b/lib/AsyncGenerator.php @@ -55,14 +55,6 @@ final class AsyncGenerator implements Stream $this->generator = $source->createGenerator(); } - /** - * @inheritDoc - */ - public function transform(callable $operator = null): TransformationStream - { - return $this->generator->transform($operator); - } - /** * @inheritDoc */ @@ -114,5 +106,4 @@ final class AsyncGenerator implements Stream { return $this->coroutine; } - } diff --git a/lib/Internal/AutoDisposingStream.php b/lib/Internal/AutoDisposingStream.php index eba5c52..79252af 100644 --- a/lib/Internal/AutoDisposingStream.php +++ b/lib/Internal/AutoDisposingStream.php @@ -2,7 +2,6 @@ namespace Amp\Internal; -use Amp\TransformationStream; use Amp\Promise; use Amp\Stream; @@ -45,12 +44,4 @@ class AutoDisposingStream implements Stream { $this->stream->dispose(); } - - /** - * @inheritDoc - */ - public function transform(callable $operator = null): TransformationStream - { - return $this->stream->transform($operator); - } } diff --git a/lib/Internal/Yielder.php b/lib/Internal/Yielder.php index 65e3b89..ab0ffa0 100644 --- a/lib/Internal/Yielder.php +++ b/lib/Internal/Yielder.php @@ -5,7 +5,6 @@ namespace Amp\Internal; use Amp\Deferred; use Amp\DisposedException; use Amp\Failure; -use Amp\TransformationStream; use Amp\Promise; use Amp\Stream; use Amp\Success; @@ -19,7 +18,6 @@ use React\Promise\PromiseInterface as ReactPromise; * * @template TValue * @template TSend - * @template TReturn */ trait Yielder { @@ -60,7 +58,9 @@ trait Yielder private $used = false; /** - * @return Promise + * @return Promise + * + * @psalm-return Promise|null> */ public function continue(): Promise { @@ -72,7 +72,9 @@ trait Yielder * * @psalm-param TSend $value * - * @return Promise + * @return Promise + * + * @psalm-return Promise|null> */ public function send($value): Promise { @@ -86,7 +88,9 @@ trait Yielder /** * @param \Throwable $exception * - * @return Promise + * @return Promise + * + * @psalm-return Promise|null> */ public function throw(\Throwable $exception): Promise { @@ -98,9 +102,13 @@ trait Yielder } /** - * @param Promise $promise + * @param Promise $promise * - * @return Promise + * @psalm-param Promise + * + * @return Promise + * + * @psalm-return Promise|null> */ private function next(Promise $promise): Promise { @@ -131,11 +139,6 @@ trait Yielder return $deferred->promise(); } - public function transform(callable $operator = null): TransformationStream - { - return new TransformationStream($this, $operator); - } - private function createStream(): Stream { \assert($this instanceof Stream, \sprintf("Users of this trait must implement %s to call %s", Stream::class, __METHOD__)); diff --git a/lib/Stream.php b/lib/Stream.php index 9f75516..f36f807 100644 --- a/lib/Stream.php +++ b/lib/Stream.php @@ -13,7 +13,9 @@ interface Stream * Succeeds with a tuple of the yielded value and key or null if the stream has completed. If the stream fails, * the returned promise will fail with the same exception. * - * @return Promise + * @return Promise + * + * @psalm-return Promise|null> * * @throws \Throwable The exception used to fail the stream. */ @@ -25,13 +27,4 @@ interface Stream * @return void */ public function dispose(); - - /** - * Returns a stream object with fluent transformation methods. - * - * @param callable(TransformationStream):Stream $operator - * - * @return TransformationStream - */ - public function transform(callable $operator = null): TransformationStream; } diff --git a/lib/Stream/ApplyStream.php b/lib/Stream/ApplyStream.php new file mode 100644 index 0000000..acae754 --- /dev/null +++ b/lib/Stream/ApplyStream.php @@ -0,0 +1,44 @@ + */ + private $stream; + + /** + * @param Stream $stream + * @param callable(Stream):Stream $operator + */ + public function __construct(Stream $stream, callable $operator) + { + $stream = $operator($stream); + + if (!$stream instanceof Stream) { + throw new \TypeError('$operator callback must return an instance of ' . Stream::class); + } + + $this->stream = $stream; + } + + /** + * @psalm-return Promise|null> + */ + public function continue(): Promise + { + return $this->stream->continue(); + } + + public function dispose() + { + $this->stream->dispose(); + } +} diff --git a/lib/Stream/DropStream.php b/lib/Stream/DropStream.php new file mode 100644 index 0000000..cea00c1 --- /dev/null +++ b/lib/Stream/DropStream.php @@ -0,0 +1,46 @@ + */ + private $stream; + + /** @var int */ + private $count; + + /** @var int */ + private $dropped = 0; + + public function __construct(Stream $stream, int $count) + { + $this->stream = $stream; + $this->count = $count; + } + + public function continue(): Promise + { + return call(function () { + while (++$this->dropped <= $this->count) { + if (yield $this->stream->continue() === null) { + return null; + } + } + + return yield $this->stream->continue(); + }); + } + + public function dispose() + { + $this->stream->dispose(); + } +} diff --git a/lib/Stream/EachOperator.php b/lib/Stream/EachOperator.php new file mode 100644 index 0000000..98e519b --- /dev/null +++ b/lib/Stream/EachOperator.php @@ -0,0 +1,37 @@ +> */ + private $promise; + + /** + * @param Stream $stream + * @param callable(TValue):void $each + */ + public function __construct(Stream $stream, callable $each) + { + $this->promise = call(function () use ($stream, $each) { + while (list($value, $key) = yield $stream->continue()) { + yield call($each, $value, $key); + } + }); + } + + /** + * @return Promise> + */ + public function promise(): Promise + { + return $this->promise; + } +} diff --git a/lib/Stream/FilterStream.php b/lib/Stream/FilterStream.php new file mode 100644 index 0000000..26326b0 --- /dev/null +++ b/lib/Stream/FilterStream.php @@ -0,0 +1,43 @@ + */ + private $stream; + + /** @var callable(TValue):Promise */ + private $filter; + + public function __construct(Stream $stream, callable $filter) + { + $this->stream = $stream; + $this->filter = $filter; + } + + public function continue(): Promise + { + return call(function () { + while (list($value) = yield $this->stream->continue()) { + if (!yield call($this->filter, $value)) { + return $value; + } + } + + return null; + }); + } + + public function dispose() + { + $this->stream->dispose(); + } +} diff --git a/lib/Stream/LimitStream.php b/lib/Stream/LimitStream.php new file mode 100644 index 0000000..665fbf5 --- /dev/null +++ b/lib/Stream/LimitStream.php @@ -0,0 +1,46 @@ + */ + private $stream; + + /** @var int */ + private $limit; + + /** @var int */ + private $yielded = 0; + + public function __construct(Stream $stream, int $limit) + { + $this->stream = $stream; + $this->limit = $limit; + } + + public function continue(): Promise + { + return call(function () { + $value = yield $this->stream->continue(); + + if (++$this->yielded > $this->limit) { + $this->stream->dispose(); + } + + return $value; + }); + } + + public function dispose() + { + $this->stream->dispose(); + } +} diff --git a/lib/Stream/MapStream.php b/lib/Stream/MapStream.php new file mode 100644 index 0000000..255d1e7 --- /dev/null +++ b/lib/Stream/MapStream.php @@ -0,0 +1,42 @@ + */ + private $stream; + + /** @var callable(TValue):Promise */ + private $mapper; + + public function __construct(Stream $stream, callable $mapper) + { + $this->stream = $stream; + $this->mapper = $mapper; + } + + public function continue(): Promise + { + return call(function () { + if (list($value) = yield $this->stream->continue()) { + return yield call($this->mapper, $value); + } + + return null; + }); + } + + public function dispose() + { + $this->stream->dispose(); + } +} diff --git a/lib/Stream/ToArrayOperator.php b/lib/Stream/ToArrayOperator.php new file mode 100644 index 0000000..b6fdb98 --- /dev/null +++ b/lib/Stream/ToArrayOperator.php @@ -0,0 +1,41 @@ +> */ + private $promise; + + /** + * @param Stream $stream + */ + public function __construct(Stream $stream) + { + $this->promise = call(function () use ($stream) { + /** @psalm-var list $array */ + $array = []; + + while (list($value) = yield $stream->continue()) { + $array[] = $value; + } + + return $array; + }); + } + + /** + * @return Promise> + */ + public function promise(): Promise + { + return $this->promise; + } +} diff --git a/lib/StreamModifier.php b/lib/StreamModifier.php new file mode 100644 index 0000000..b3288af --- /dev/null +++ b/lib/StreamModifier.php @@ -0,0 +1,105 @@ + */ + private $stream; + + /** + * @param Stream $stream + */ + public function __construct(Stream $stream) + { + $this->stream = $stream; + } + + /** + * @return Stream + */ + public function stream(): Stream + { + return $this->stream; + } + + public function apply(callable $operator): self + { + $clone = clone $this; + $clone->stream = new Stream\ApplyStream($clone->stream, $operator); + return $clone; + } + + /** + * @template TMap + * + * @param callable(TValue, int):TMap $onYield + * + * @return self + */ + public function map(callable $onYield): self + { + $clone = clone $this; + $clone->stream = new Stream\MapStream($clone->stream, $onYield); + return $clone; + } + + /** + * @param callable(TValue, int):bool $filter + * + * @return self + */ + public function filter(callable $filter): self + { + $clone = clone $this; + $clone->stream = new Stream\FilterStream($clone->stream, $filter); + return $clone; + } + + /** + * @param callable(TValue, int):void $onYield + * + * @return Promise + */ + public function each(callable $onYield): Promise + { + return (new EachOperator($this->stream, $onYield))->promise(); + } + + /** + * @param int $count + * + * @return self + */ + public function drop(int $count): self + { + $clone = clone $this; + $clone->stream = new Stream\DropStream($clone->stream, $count); + return $clone; + } + + /** + * @param int $limit + * + * @return self + */ + public function limit(int $limit): self + { + $clone = clone $this; + $clone->stream = new Stream\LimitStream($clone->stream, $limit); + return $clone; + } + + /** + * @return Promise> + */ + public function toArray(): Promise + { + return (new Stream\ToArrayOperator($this->stream))->promise(); + } +} diff --git a/lib/TransformationStream.php b/lib/TransformationStream.php deleted file mode 100644 index cca27c7..0000000 --- a/lib/TransformationStream.php +++ /dev/null @@ -1,154 +0,0 @@ - - */ -final class TransformationStream implements Stream -{ - /** @var Stream */ - private $stream; - - public function __construct(Stream $stream, callable $operator = null) - { - $this->stream = $stream instanceof self ? $stream->stream : $stream; - - if ($operator !== null) { - $this->stream = $this->apply($operator); - } - } - - public function continue(): Promise - { - return $this->stream->continue(); - } - - public function dispose() - { - $this->stream->dispose(); - } - - public function transform(callable $operator = null): self - { - if ($operator === null) { - return $this; - } - - return new self($this->apply($operator)); - } - - private function apply(callable $operator): Stream - { - $stream = $operator($this); - - if (!$stream instanceof Stream) { - throw new \TypeError('$operator callback must return an instance of ' . Stream::class); - } - - return $stream; - } - - /** - * @template TMap - * - * @param callable(TValue, int):TMap $onYield - * - * @return self - */ - public function map(callable $onYield): self - { - return new self(new AsyncGenerator(function (callable $yield) use ($onYield): \Generator { - while (list($value, $key) = yield $this->stream->continue()) { - yield $yield(yield call($onYield, $value, $key)); - } - })); - } - - /** - * @param callable(TValue, int):bool $filter - * - * @return self - */ - public function filter(callable $filter): self - { - return new self(new AsyncGenerator(function (callable $yield) use ($filter) { - while (list($value, $key) = yield $this->stream->continue()) { - if (yield call($filter, $value, $key)) { - yield $yield($value, $key); - } - } - })); - } - - /** - * @param callable(TValue, int):void $onYield - * - * @return Promise - */ - public function each(callable $onYield): Promise - { - return call(function () use ($onYield) { - while (list($value, $key) = yield $this->stream->continue()) { - yield call($onYield, $value, $key); - } - }); - } - - /** - * @param int $count - * - * @return self - */ - public function drop(int $count): self - { - return new self(new AsyncGenerator(function (callable $yield) use ($count) { - $skipped = 0; - while (list($value) = yield $this->stream->continue()) { - if (++$skipped <= $count) { - continue; - } - - yield $yield($value); - } - })); - } - - /** - * @param int $limit - * - * @return self - */ - public function limit(int $limit): Stream - { - return new self(new AsyncGenerator(function (callable $yield) use ($limit) { - $yielded = 0; - while (list($value) = yield $this->stream->continue()) { - if (++$yielded > $limit) { - $this->stream->dispose(); - return; - } - - yield $yield($value); - } - })); - } - - /** - * @return Promise> - */ - public function toArray(): Promise - { - return call(static function (): \Generator { - /** @psalm-var list $array */ - $array = []; - - while (list($value) = yield $this->stream->continue()) { - $array[] = $value; - } - - return $array; - }); - } -}