diff --git a/lib/AsyncGenerator.php b/lib/AsyncGenerator.php index 1178677..e307e95 100644 --- a/lib/AsyncGenerator.php +++ b/lib/AsyncGenerator.php @@ -24,7 +24,7 @@ final class AsyncGenerator implements Stream { $source = new class implements Internal\GeneratorStream { use Internal\Yielder { - generate as public; + createGenerator as public; } }; @@ -52,7 +52,15 @@ final class AsyncGenerator implements Stream $source->complete(); }); - $this->generator = $source->generate(); + $this->generator = $source->createGenerator(); + } + + /** + * @return TransformationStream + */ + public function transform(): TransformationStream + { + return new TransformationStream($this); } /** diff --git a/lib/Internal/AutoDisposingStream.php b/lib/Internal/AutoDisposingStream.php index 8895cd5..6effab4 100644 --- a/lib/Internal/AutoDisposingStream.php +++ b/lib/Internal/AutoDisposingStream.php @@ -2,6 +2,7 @@ namespace Amp\Internal; +use Amp\TransformationStream; use Amp\Promise; use Amp\Stream; @@ -44,4 +45,12 @@ class AutoDisposingStream implements Stream { $this->stream->dispose(); } + + /** + * @return TransformationStream + */ + public function transform(): TransformationStream + { + return new TransformationStream($this); + } } diff --git a/lib/Internal/Yielder.php b/lib/Internal/Yielder.php index 422ef1f..c336fac 100644 --- a/lib/Internal/Yielder.php +++ b/lib/Internal/Yielder.php @@ -5,6 +5,7 @@ namespace Amp\Internal; use Amp\Deferred; use Amp\DisposedException; use Amp\Failure; +use Amp\TransformationStream; use Amp\Promise; use Amp\Stream; use Amp\Success; @@ -130,7 +131,12 @@ trait Yielder return $deferred->promise(); } - private function stream(): Stream + public function transform(): TransformationStream + { + return new TransformationStream($this); + } + + private function createStream(): Stream { \assert($this instanceof Stream, \sprintf("Users of this trait must implement %s to call %s", Stream::class, __METHOD__)); @@ -147,7 +153,7 @@ trait Yielder return new AutoDisposingStream($this); } - private function generate(): GeneratorStream + private function createGenerator(): GeneratorStream { \assert($this instanceof GeneratorStream, \sprintf("Users of this trait must implement %s to call %s", GeneratorStream::class, __METHOD__)); diff --git a/lib/Stream.php b/lib/Stream.php index 2705725..82e1c8c 100644 --- a/lib/Stream.php +++ b/lib/Stream.php @@ -25,4 +25,11 @@ interface Stream * @return void */ public function dispose(); + + /** + * Returns a stream object with fluent transformation methods. + * + * @return TransformationStream + */ + public function transform(): TransformationStream; } diff --git a/lib/StreamSource.php b/lib/StreamSource.php index c1682a7..dff158d 100644 --- a/lib/StreamSource.php +++ b/lib/StreamSource.php @@ -18,7 +18,7 @@ final class StreamSource { $this->stream = new class implements Stream { use Internal\Yielder { - stream as public; + createStream as public; } }; } @@ -33,7 +33,7 @@ final class StreamSource public function stream(): Stream { /** @psalm-suppress UndefinedInterfaceMethod */ - return $this->stream->stream(); + return $this->stream->createStream(); } /** diff --git a/lib/TransformationStream.php b/lib/TransformationStream.php new file mode 100644 index 0000000..10b5a46 --- /dev/null +++ b/lib/TransformationStream.php @@ -0,0 +1,135 @@ + + */ +final class TransformationStream implements Stream +{ + /** @var Stream */ + private $stream; + + public function __construct(Stream $stream) + { + $this->stream = $stream; + } + + public function continue(): Promise + { + return $this->stream->continue(); + } + + public function dispose() + { + $this->stream->dispose(); + } + + public function transform(): self + { + return $this; + } + + /** + * @template TMap + * + * @param callable(TValue, int):TMap $onYield + * + * @return self + */ + public function map(callable $onYield): self + { + return new self(new AsyncGenerator(function (callable $yield) use ($onYield): \Generator { + while (list($value, $key) = yield $this->stream->continue()) { + yield $yield(yield call($onYield, $value, $key)); + } + })); + } + + /** + * @param callable(TValue, int):bool $filter + * + * @return self + */ + public function filter(callable $filter): self + { + return new self(new AsyncGenerator(function (callable $yield) use ($filter) { + while (list($value, $key) = yield $this->stream->continue()) { + if (yield call($filter, $value, $key)) { + yield $yield($value, $key); + } + } + })); + } + + /** + * @param callable(TValue, int):void $onYield + * + * @return Promise + */ + public function each(callable $onYield): Promise + { + return call(function () use ($onYield) { + while (list($value, $key) = yield $this->stream->continue()) { + yield call($onYield, $value, $key); + } + }); + } + + /** + * @param int $count + * + * @return self + */ + public function drop(int $count): self + { + return new self(new AsyncGenerator(function (callable $yield) use ($count) { + $skipped = 0; + while (list($value) = yield $this->stream->continue()) { + if (++$skipped < $count) { + continue; + } + + yield $yield($value); + } + })); + } + + /** + * @param int $limit + * + * @return self + */ + public function limit(int $limit): Stream + { + return new self(new AsyncGenerator(function (callable $yield) use ($limit) { + $yielded = 0; + while (list($value) = yield $this->stream->continue()) { + if (++$yielded > $limit) { + $this->stream->dispose(); + return; + } + + yield $yield($value); + } + })); + } + + /** + * @return Promise> + */ + public function toArray(): Promise + { + return call(static function (): \Generator { + /** @psalm-var list $array */ + $array = []; + + while (list($value) = yield $this->stream->continue()) { + $array[] = $value; + } + + return $array; + }); + } +} diff --git a/test/YielderTraitTest.php b/test/YielderTraitTest.php index f9a9b7c..4aa23c5 100644 --- a/test/YielderTraitTest.php +++ b/test/YielderTraitTest.php @@ -12,7 +12,7 @@ use PHPUnit\Framework\TestCase; class Yielder implements Stream { use \Amp\Internal\Yielder { - stream as public; + createStream as public; } } @@ -32,7 +32,7 @@ class YielderTraitTest extends TestCase $value = 'Yielded Value'; $promise = $this->source->yield($value); - $stream = $this->source->stream(); + $stream = $this->source->createStream(); $this->assertSame([$value, 0], yield $stream->continue()); @@ -60,7 +60,7 @@ class YielderTraitTest extends TestCase { Loop::run(function () { $this->source->yield(null); - $this->assertSame([null, 0], yield $this->source->stream()->continue()); + $this->assertSame([null, 0], yield $this->source->createStream()->continue()); }); } @@ -95,12 +95,12 @@ class YielderTraitTest extends TestCase public function testDoubleStart() { - $stream = $this->source->stream(); + $stream = $this->source->createStream(); $this->expectException(\Error::class); $this->expectExceptionMessage('A stream may be started only once'); - $stream = $this->source->stream(); + $stream = $this->source->createStream(); } public function testYieldAfterContinue() @@ -108,7 +108,7 @@ class YielderTraitTest extends TestCase Loop::run(function () { $value = 'Yielded Value'; - $stream = $this->source->stream(); + $stream = $this->source->createStream(); $promise = $stream->continue(); $this->assertInstanceOf(Promise::class, $promise); @@ -122,7 +122,7 @@ class YielderTraitTest extends TestCase public function testContinueAfterComplete() { Loop::run(function () { - $stream = $this->source->stream(); + $stream = $this->source->createStream(); $this->source->complete(); @@ -136,7 +136,7 @@ class YielderTraitTest extends TestCase public function testContinueAfterFail() { Loop::run(function () { - $stream = $this->source->stream(); + $stream = $this->source->createStream(); $this->source->fail(new \Exception('Stream failed')); @@ -154,7 +154,7 @@ class YielderTraitTest extends TestCase public function testCompleteAfterContinue() { Loop::run(function () { - $stream = $this->source->stream(); + $stream = $this->source->createStream(); $promise = $stream->continue(); $this->assertInstanceOf(Promise::class, $promise); @@ -167,7 +167,7 @@ class YielderTraitTest extends TestCase public function testDestroyingStreamRelievesBackPressure() { - $stream = $this->source->stream(); + $stream = $this->source->createStream(); $invoked = 0; $onResolved = function () use (&$invoked) { @@ -199,7 +199,7 @@ class YielderTraitTest extends TestCase $this->expectExceptionMessage('The stream has been disposed'); Loop::run(function () { - $stream = $this->source->stream(); + $stream = $this->source->createStream(); $promise = $this->source->yield(1); $stream->dispose(); $this->assertNull(yield $promise); @@ -214,7 +214,7 @@ class YielderTraitTest extends TestCase $this->expectExceptionMessage('The stream has been disposed'); Loop::run(function () { - $stream = $this->source->stream(); + $stream = $this->source->createStream(); $promise = $this->source->yield(1); unset($stream); $this->assertNull(yield $promise);