1
0
mirror of https://github.com/danog/amp.git synced 2024-12-03 09:57:51 +01:00

Intial stream transformations

This commit is contained in:
Aaron Piotrowski 2020-05-13 16:48:38 -05:00
parent 12c97b2561
commit 6283d9bbb7
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
7 changed files with 183 additions and 18 deletions

View File

@ -24,7 +24,7 @@ final class AsyncGenerator implements Stream
{
$source = new class implements Internal\GeneratorStream {
use Internal\Yielder {
generate as public;
createGenerator as public;
}
};
@ -52,7 +52,15 @@ final class AsyncGenerator implements Stream
$source->complete();
});
$this->generator = $source->generate();
$this->generator = $source->createGenerator();
}
/**
* @return TransformationStream<TValue>
*/
public function transform(): TransformationStream
{
return new TransformationStream($this);
}
/**

View File

@ -2,6 +2,7 @@
namespace Amp\Internal;
use Amp\TransformationStream;
use Amp\Promise;
use Amp\Stream;
@ -44,4 +45,12 @@ class AutoDisposingStream implements Stream
{
$this->stream->dispose();
}
/**
* @return TransformationStream
*/
public function transform(): TransformationStream
{
return new TransformationStream($this);
}
}

View File

@ -5,6 +5,7 @@ namespace Amp\Internal;
use Amp\Deferred;
use Amp\DisposedException;
use Amp\Failure;
use Amp\TransformationStream;
use Amp\Promise;
use Amp\Stream;
use Amp\Success;
@ -130,7 +131,12 @@ trait Yielder
return $deferred->promise();
}
private function stream(): Stream
public function transform(): TransformationStream
{
return new TransformationStream($this);
}
private function createStream(): Stream
{
\assert($this instanceof Stream, \sprintf("Users of this trait must implement %s to call %s", Stream::class, __METHOD__));
@ -147,7 +153,7 @@ trait Yielder
return new AutoDisposingStream($this);
}
private function generate(): GeneratorStream
private function createGenerator(): GeneratorStream
{
\assert($this instanceof GeneratorStream, \sprintf("Users of this trait must implement %s to call %s", GeneratorStream::class, __METHOD__));

View File

@ -25,4 +25,11 @@ interface Stream
* @return void
*/
public function dispose();
/**
* Returns a stream object with fluent transformation methods.
*
* @return TransformationStream
*/
public function transform(): TransformationStream;
}

View File

@ -18,7 +18,7 @@ final class StreamSource
{
$this->stream = new class implements Stream {
use Internal\Yielder {
stream as public;
createStream as public;
}
};
}
@ -33,7 +33,7 @@ final class StreamSource
public function stream(): Stream
{
/** @psalm-suppress UndefinedInterfaceMethod */
return $this->stream->stream();
return $this->stream->createStream();
}
/**

View File

@ -0,0 +1,135 @@
<?php
namespace Amp;
/**
* @template TValue
* @template-implements Stream<TValue>
*/
final class TransformationStream implements Stream
{
/** @var Stream<TValue> */
private $stream;
public function __construct(Stream $stream)
{
$this->stream = $stream;
}
public function continue(): Promise
{
return $this->stream->continue();
}
public function dispose()
{
$this->stream->dispose();
}
public function transform(): self
{
return $this;
}
/**
* @template TMap
*
* @param callable(TValue, int):TMap $onYield
*
* @return self<TMap>
*/
public function map(callable $onYield): self
{
return new self(new AsyncGenerator(function (callable $yield) use ($onYield): \Generator {
while (list($value, $key) = yield $this->stream->continue()) {
yield $yield(yield call($onYield, $value, $key));
}
}));
}
/**
* @param callable(TValue, int):bool $filter
*
* @return self<TValue>
*/
public function filter(callable $filter): self
{
return new self(new AsyncGenerator(function (callable $yield) use ($filter) {
while (list($value, $key) = yield $this->stream->continue()) {
if (yield call($filter, $value, $key)) {
yield $yield($value, $key);
}
}
}));
}
/**
* @param callable(TValue, int):void $onYield
*
* @return Promise<void>
*/
public function each(callable $onYield): Promise
{
return call(function () use ($onYield) {
while (list($value, $key) = yield $this->stream->continue()) {
yield call($onYield, $value, $key);
}
});
}
/**
* @param int $count
*
* @return self<TValue>
*/
public function drop(int $count): self
{
return new self(new AsyncGenerator(function (callable $yield) use ($count) {
$skipped = 0;
while (list($value) = yield $this->stream->continue()) {
if (++$skipped < $count) {
continue;
}
yield $yield($value);
}
}));
}
/**
* @param int $limit
*
* @return self<TValue>
*/
public function limit(int $limit): Stream
{
return new self(new AsyncGenerator(function (callable $yield) use ($limit) {
$yielded = 0;
while (list($value) = yield $this->stream->continue()) {
if (++$yielded > $limit) {
$this->stream->dispose();
return;
}
yield $yield($value);
}
}));
}
/**
* @return Promise<list<TValue>>
*/
public function toArray(): Promise
{
return call(static function (): \Generator {
/** @psalm-var list $array */
$array = [];
while (list($value) = yield $this->stream->continue()) {
$array[] = $value;
}
return $array;
});
}
}

View File

@ -12,7 +12,7 @@ use PHPUnit\Framework\TestCase;
class Yielder implements Stream
{
use \Amp\Internal\Yielder {
stream as public;
createStream as public;
}
}
@ -32,7 +32,7 @@ class YielderTraitTest extends TestCase
$value = 'Yielded Value';
$promise = $this->source->yield($value);
$stream = $this->source->stream();
$stream = $this->source->createStream();
$this->assertSame([$value, 0], yield $stream->continue());
@ -60,7 +60,7 @@ class YielderTraitTest extends TestCase
{
Loop::run(function () {
$this->source->yield(null);
$this->assertSame([null, 0], yield $this->source->stream()->continue());
$this->assertSame([null, 0], yield $this->source->createStream()->continue());
});
}
@ -95,12 +95,12 @@ class YielderTraitTest extends TestCase
public function testDoubleStart()
{
$stream = $this->source->stream();
$stream = $this->source->createStream();
$this->expectException(\Error::class);
$this->expectExceptionMessage('A stream may be started only once');
$stream = $this->source->stream();
$stream = $this->source->createStream();
}
public function testYieldAfterContinue()
@ -108,7 +108,7 @@ class YielderTraitTest extends TestCase
Loop::run(function () {
$value = 'Yielded Value';
$stream = $this->source->stream();
$stream = $this->source->createStream();
$promise = $stream->continue();
$this->assertInstanceOf(Promise::class, $promise);
@ -122,7 +122,7 @@ class YielderTraitTest extends TestCase
public function testContinueAfterComplete()
{
Loop::run(function () {
$stream = $this->source->stream();
$stream = $this->source->createStream();
$this->source->complete();
@ -136,7 +136,7 @@ class YielderTraitTest extends TestCase
public function testContinueAfterFail()
{
Loop::run(function () {
$stream = $this->source->stream();
$stream = $this->source->createStream();
$this->source->fail(new \Exception('Stream failed'));
@ -154,7 +154,7 @@ class YielderTraitTest extends TestCase
public function testCompleteAfterContinue()
{
Loop::run(function () {
$stream = $this->source->stream();
$stream = $this->source->createStream();
$promise = $stream->continue();
$this->assertInstanceOf(Promise::class, $promise);
@ -167,7 +167,7 @@ class YielderTraitTest extends TestCase
public function testDestroyingStreamRelievesBackPressure()
{
$stream = $this->source->stream();
$stream = $this->source->createStream();
$invoked = 0;
$onResolved = function () use (&$invoked) {
@ -199,7 +199,7 @@ class YielderTraitTest extends TestCase
$this->expectExceptionMessage('The stream has been disposed');
Loop::run(function () {
$stream = $this->source->stream();
$stream = $this->source->createStream();
$promise = $this->source->yield(1);
$stream->dispose();
$this->assertNull(yield $promise);
@ -214,7 +214,7 @@ class YielderTraitTest extends TestCase
$this->expectExceptionMessage('The stream has been disposed');
Loop::run(function () {
$stream = $this->source->stream();
$stream = $this->source->createStream();
$promise = $this->source->yield(1);
unset($stream);
$this->assertNull(yield $promise);