1
0
mirror of https://github.com/danog/amp.git synced 2025-01-22 13:21:16 +01:00

Use separate objects for stream ops

This commit is contained in:
Aaron Piotrowski 2020-05-16 10:39:34 -05:00
parent 94e01e7e49
commit 704f87ccc8
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
13 changed files with 422 additions and 194 deletions

View File

@ -55,14 +55,6 @@ final class AsyncGenerator implements Stream
$this->generator = $source->createGenerator();
}
/**
* @inheritDoc
*/
public function transform(callable $operator = null): TransformationStream
{
return $this->generator->transform($operator);
}
/**
* @inheritDoc
*/
@ -114,5 +106,4 @@ final class AsyncGenerator implements Stream
{
return $this->coroutine;
}
}

View File

@ -2,7 +2,6 @@
namespace Amp\Internal;
use Amp\TransformationStream;
use Amp\Promise;
use Amp\Stream;
@ -45,12 +44,4 @@ class AutoDisposingStream implements Stream
{
$this->stream->dispose();
}
/**
* @inheritDoc
*/
public function transform(callable $operator = null): TransformationStream
{
return $this->stream->transform($operator);
}
}

View File

@ -5,7 +5,6 @@ namespace Amp\Internal;
use Amp\Deferred;
use Amp\DisposedException;
use Amp\Failure;
use Amp\TransformationStream;
use Amp\Promise;
use Amp\Stream;
use Amp\Success;
@ -19,7 +18,6 @@ use React\Promise\PromiseInterface as ReactPromise;
*
* @template TValue
* @template TSend
* @template TReturn
*/
trait Yielder
{
@ -60,7 +58,9 @@ trait Yielder
private $used = false;
/**
* @return Promise<array>
* @return Promise<array|null>
*
* @psalm-return Promise<list<TValue>|null>
*/
public function continue(): Promise
{
@ -72,7 +72,9 @@ trait Yielder
*
* @psalm-param TSend $value
*
* @return Promise<array>
* @return Promise<array|null>
*
* @psalm-return Promise<list<TValue>|null>
*/
public function send($value): Promise
{
@ -86,7 +88,9 @@ trait Yielder
/**
* @param \Throwable $exception
*
* @return Promise<array>
* @return Promise<array|null>
*
* @psalm-return Promise<list<TValue>|null>
*/
public function throw(\Throwable $exception): Promise
{
@ -98,9 +102,13 @@ trait Yielder
}
/**
* @param Promise<TSend|null> $promise
* @param Promise<mixed> $promise
*
* @return Promise<array>
* @psalm-param Promise<TSend|null>
*
* @return Promise<array|null>
*
* @psalm-return Promise<list<TValue>|null>
*/
private function next(Promise $promise): Promise
{
@ -131,11 +139,6 @@ trait Yielder
return $deferred->promise();
}
public function transform(callable $operator = null): TransformationStream
{
return new TransformationStream($this, $operator);
}
private function createStream(): Stream
{
\assert($this instanceof Stream, \sprintf("Users of this trait must implement %s to call %s", Stream::class, __METHOD__));

View File

@ -13,7 +13,9 @@ interface Stream
* Succeeds with a tuple of the yielded value and key or null if the stream has completed. If the stream fails,
* the returned promise will fail with the same exception.
*
* @return Promise<array>
* @return Promise<array|null>
*
* @psalm-return Promise<list<TValue>|null>
*
* @throws \Throwable The exception used to fail the stream.
*/
@ -25,13 +27,4 @@ interface Stream
* @return void
*/
public function dispose();
/**
* Returns a stream object with fluent transformation methods.
*
* @param callable(TransformationStream):Stream $operator
*
* @return TransformationStream
*/
public function transform(callable $operator = null): TransformationStream;
}

View File

@ -0,0 +1,44 @@
<?php
namespace Amp\Stream;
use Amp\Promise;
use Amp\Stream;
/**
* @template TValue
* @template TApplied
*/
final class ApplyStream implements Stream
{
/** @var Stream<TApplied> */
private $stream;
/**
* @param Stream $stream
* @param callable(Stream<TValue>):Stream<TApplied> $operator
*/
public function __construct(Stream $stream, callable $operator)
{
$stream = $operator($stream);
if (!$stream instanceof Stream) {
throw new \TypeError('$operator callback must return an instance of ' . Stream::class);
}
$this->stream = $stream;
}
/**
* @psalm-return Promise<list<TApplied>|null>
*/
public function continue(): Promise
{
return $this->stream->continue();
}
public function dispose()
{
$this->stream->dispose();
}
}

46
lib/Stream/DropStream.php Normal file
View File

@ -0,0 +1,46 @@
<?php
namespace Amp\Stream;
use Amp\Promise;
use Amp\Stream;
use function Amp\call;
/**
* @template TValue
*/
final class DropStream implements Stream
{
/** @var Stream<TValue> */
private $stream;
/** @var int */
private $count;
/** @var int */
private $dropped = 0;
public function __construct(Stream $stream, int $count)
{
$this->stream = $stream;
$this->count = $count;
}
public function continue(): Promise
{
return call(function () {
while (++$this->dropped <= $this->count) {
if (yield $this->stream->continue() === null) {
return null;
}
}
return yield $this->stream->continue();
});
}
public function dispose()
{
$this->stream->dispose();
}
}

View File

@ -0,0 +1,37 @@
<?php
namespace Amp\Stream;
use Amp\Promise;
use Amp\Stream;
use function Amp\call;
/**
* @template TValue
*/
final class EachOperator
{
/** @var Promise<list<TValue>> */
private $promise;
/**
* @param Stream<TValue> $stream
* @param callable(TValue):void $each
*/
public function __construct(Stream $stream, callable $each)
{
$this->promise = call(function () use ($stream, $each) {
while (list($value, $key) = yield $stream->continue()) {
yield call($each, $value, $key);
}
});
}
/**
* @return Promise<list<TValue>>
*/
public function promise(): Promise
{
return $this->promise;
}
}

View File

@ -0,0 +1,43 @@
<?php
namespace Amp\Stream;
use Amp\Promise;
use Amp\Stream;
use function Amp\call;
/**
* @template TValue
*/
final class FilterStream implements Stream
{
/** @var Stream<TValue> */
private $stream;
/** @var callable(TValue):Promise<bool> */
private $filter;
public function __construct(Stream $stream, callable $filter)
{
$this->stream = $stream;
$this->filter = $filter;
}
public function continue(): Promise
{
return call(function () {
while (list($value) = yield $this->stream->continue()) {
if (!yield call($this->filter, $value)) {
return $value;
}
}
return null;
});
}
public function dispose()
{
$this->stream->dispose();
}
}

View File

@ -0,0 +1,46 @@
<?php
namespace Amp\Stream;
use Amp\Promise;
use Amp\Stream;
use function Amp\call;
/**
* @template TValue
*/
final class LimitStream implements Stream
{
/** @var Stream<TValue> */
private $stream;
/** @var int */
private $limit;
/** @var int */
private $yielded = 0;
public function __construct(Stream $stream, int $limit)
{
$this->stream = $stream;
$this->limit = $limit;
}
public function continue(): Promise
{
return call(function () {
$value = yield $this->stream->continue();
if (++$this->yielded > $this->limit) {
$this->stream->dispose();
}
return $value;
});
}
public function dispose()
{
$this->stream->dispose();
}
}

42
lib/Stream/MapStream.php Normal file
View File

@ -0,0 +1,42 @@
<?php
namespace Amp\Stream;
use Amp\Promise;
use Amp\Stream;
use function Amp\call;
/**
* @template TValue
* @template TMap
*/
final class MapStream implements Stream
{
/** @var Stream<TValue> */
private $stream;
/** @var callable(TValue):Promise<TMap> */
private $mapper;
public function __construct(Stream $stream, callable $mapper)
{
$this->stream = $stream;
$this->mapper = $mapper;
}
public function continue(): Promise
{
return call(function () {
if (list($value) = yield $this->stream->continue()) {
return yield call($this->mapper, $value);
}
return null;
});
}
public function dispose()
{
$this->stream->dispose();
}
}

View File

@ -0,0 +1,41 @@
<?php
namespace Amp\Stream;
use Amp\Promise;
use Amp\Stream;
use function Amp\call;
/**
* @template TValue
*/
final class ToArrayOperator
{
/** @var Promise<list<TValue>> */
private $promise;
/**
* @param Stream<TValue> $stream
*/
public function __construct(Stream $stream)
{
$this->promise = call(function () use ($stream) {
/** @psalm-var list $array */
$array = [];
while (list($value) = yield $stream->continue()) {
$array[] = $value;
}
return $array;
});
}
/**
* @return Promise<list<TValue>>
*/
public function promise(): Promise
{
return $this->promise;
}
}

105
lib/StreamModifier.php Normal file
View File

@ -0,0 +1,105 @@
<?php
namespace Amp;
use Amp\Stream\EachOperator;
/**
* @template TValue
*/
final class StreamModifier
{
/** @var Stream<TValue> */
private $stream;
/**
* @param Stream<TValue> $stream
*/
public function __construct(Stream $stream)
{
$this->stream = $stream;
}
/**
* @return Stream<TValue>
*/
public function stream(): Stream
{
return $this->stream;
}
public function apply(callable $operator): self
{
$clone = clone $this;
$clone->stream = new Stream\ApplyStream($clone->stream, $operator);
return $clone;
}
/**
* @template TMap
*
* @param callable(TValue, int):TMap $onYield
*
* @return self<TMap>
*/
public function map(callable $onYield): self
{
$clone = clone $this;
$clone->stream = new Stream\MapStream($clone->stream, $onYield);
return $clone;
}
/**
* @param callable(TValue, int):bool $filter
*
* @return self<TValue>
*/
public function filter(callable $filter): self
{
$clone = clone $this;
$clone->stream = new Stream\FilterStream($clone->stream, $filter);
return $clone;
}
/**
* @param callable(TValue, int):void $onYield
*
* @return Promise<void>
*/
public function each(callable $onYield): Promise
{
return (new EachOperator($this->stream, $onYield))->promise();
}
/**
* @param int $count
*
* @return self<TValue>
*/
public function drop(int $count): self
{
$clone = clone $this;
$clone->stream = new Stream\DropStream($clone->stream, $count);
return $clone;
}
/**
* @param int $limit
*
* @return self<TValue>
*/
public function limit(int $limit): self
{
$clone = clone $this;
$clone->stream = new Stream\LimitStream($clone->stream, $limit);
return $clone;
}
/**
* @return Promise<list<TValue>>
*/
public function toArray(): Promise
{
return (new Stream\ToArrayOperator($this->stream))->promise();
}
}

View File

@ -1,154 +0,0 @@
<?php
namespace Amp;
/**
* @template TValue
* @template-implements Stream<TValue>
*/
final class TransformationStream implements Stream
{
/** @var Stream<TValue> */
private $stream;
public function __construct(Stream $stream, callable $operator = null)
{
$this->stream = $stream instanceof self ? $stream->stream : $stream;
if ($operator !== null) {
$this->stream = $this->apply($operator);
}
}
public function continue(): Promise
{
return $this->stream->continue();
}
public function dispose()
{
$this->stream->dispose();
}
public function transform(callable $operator = null): self
{
if ($operator === null) {
return $this;
}
return new self($this->apply($operator));
}
private function apply(callable $operator): Stream
{
$stream = $operator($this);
if (!$stream instanceof Stream) {
throw new \TypeError('$operator callback must return an instance of ' . Stream::class);
}
return $stream;
}
/**
* @template TMap
*
* @param callable(TValue, int):TMap $onYield
*
* @return self<TMap>
*/
public function map(callable $onYield): self
{
return new self(new AsyncGenerator(function (callable $yield) use ($onYield): \Generator {
while (list($value, $key) = yield $this->stream->continue()) {
yield $yield(yield call($onYield, $value, $key));
}
}));
}
/**
* @param callable(TValue, int):bool $filter
*
* @return self<TValue>
*/
public function filter(callable $filter): self
{
return new self(new AsyncGenerator(function (callable $yield) use ($filter) {
while (list($value, $key) = yield $this->stream->continue()) {
if (yield call($filter, $value, $key)) {
yield $yield($value, $key);
}
}
}));
}
/**
* @param callable(TValue, int):void $onYield
*
* @return Promise<void>
*/
public function each(callable $onYield): Promise
{
return call(function () use ($onYield) {
while (list($value, $key) = yield $this->stream->continue()) {
yield call($onYield, $value, $key);
}
});
}
/**
* @param int $count
*
* @return self<TValue>
*/
public function drop(int $count): self
{
return new self(new AsyncGenerator(function (callable $yield) use ($count) {
$skipped = 0;
while (list($value) = yield $this->stream->continue()) {
if (++$skipped <= $count) {
continue;
}
yield $yield($value);
}
}));
}
/**
* @param int $limit
*
* @return self<TValue>
*/
public function limit(int $limit): Stream
{
return new self(new AsyncGenerator(function (callable $yield) use ($limit) {
$yielded = 0;
while (list($value) = yield $this->stream->continue()) {
if (++$yielded > $limit) {
$this->stream->dispose();
return;
}
yield $yield($value);
}
}));
}
/**
* @return Promise<list<TValue>>
*/
public function toArray(): Promise
{
return call(static function (): \Generator {
/** @psalm-var list $array */
$array = [];
while (list($value) = yield $this->stream->continue()) {
$array[] = $value;
}
return $array;
});
}
}