diff --git a/lib/AsyncGenerator.php b/lib/AsyncGenerator.php index 1086373..2285b53 100644 --- a/lib/AsyncGenerator.php +++ b/lib/AsyncGenerator.php @@ -71,6 +71,8 @@ final class AsyncGenerator implements Stream * @psalm-param TSend $value * * @return Promise + * + * @psalm-return Promise> */ public function send($value): Promise { @@ -83,6 +85,8 @@ final class AsyncGenerator implements Stream * @param \Throwable $exception Exception to throw into the async generator. * * @return Promise + * + * @psalm-return Promise> */ public function throw(\Throwable $exception): Promise { diff --git a/lib/Internal/Yielder.php b/lib/Internal/Yielder.php index ab0ffa0..273099f 100644 --- a/lib/Internal/Yielder.php +++ b/lib/Internal/Yielder.php @@ -39,9 +39,6 @@ trait Yielder /** @var Deferred[] */ private $waiting = []; - /** @var int */ - private $nextKey = 0; - /** @var int */ private $consumePosition = 0; @@ -58,9 +55,9 @@ trait Yielder private $used = false; /** - * @return Promise + * @return Promise * - * @psalm-return Promise|null> + * @psalm-return Promise> */ public function continue(): Promise { @@ -72,9 +69,9 @@ trait Yielder * * @psalm-param TSend $value * - * @return Promise + * @return Promise * - * @psalm-return Promise|null> + * @psalm-return Promise> */ public function send($value): Promise { @@ -88,9 +85,9 @@ trait Yielder /** * @param \Throwable $exception * - * @return Promise + * @return Promise * - * @psalm-return Promise|null> + * @psalm-return Promise> */ public function throw(\Throwable $exception): Promise { @@ -104,11 +101,11 @@ trait Yielder /** * @param Promise $promise * - * @psalm-param Promise + * @psalm-param Promise $promise * - * @return Promise + * @return Promise * - * @psalm-return Promise|null> + * @psalm-return Promise> */ private function next(Promise $promise): Promise { @@ -123,11 +120,11 @@ trait Yielder $this->sendValues[$position - 1] = $promise; } - if (isset($this->yieldedValues[$position])) { - $tuple = $this->yieldedValues[$position]; + if (\array_key_exists($position, $this->yieldedValues)) { + $value = $this->yieldedValues[$position]; unset($this->yieldedValues[$position]); - return new Success($tuple); + return new Success([$value]); } if ($this->result) { @@ -190,11 +187,11 @@ trait Yielder * * @psalm-param TValue $value * - * @return Promise Resolves with the key of the yielded value once the value has been consumed. Fails with - * the failure reason if the {@see fail()} is called, or with {@see DisposedException} if the - * stream is destroyed. + * @return Promise Resolves with the sent value once the value has been consumed. Fails with the failure + * reason if the {@see fail()} is called, or with {@see DisposedException} if the stream + * is destroyed. * - * @psalm-return Promise + * @psalm-return Promise * * @throws \Error If the stream has completed. */ @@ -212,14 +209,12 @@ trait Yielder throw new \TypeError("Streams cannot yield promises"); } - $key = $this->nextKey++; - $tuple = [$value, $key]; $position = $this->yieldPosition++; if (isset($this->waiting[$position])) { $deferred = $this->waiting[$position]; unset($this->waiting[$position]); - $deferred->resolve($tuple); + $deferred->resolve([$value]); // Send-values are indexed as $this->consumePosition - 1, so use $position for the next value. if (isset($this->sendValues[$position])) { @@ -228,7 +223,7 @@ trait Yielder return $promise; } } else { - $this->yieldedValues[$position] = $tuple; + $this->yieldedValues[$position] = $value; } $this->backPressure[$position] = $deferred = new Deferred; diff --git a/lib/Stream.php b/lib/Stream.php index f36f807..e2f5f25 100644 --- a/lib/Stream.php +++ b/lib/Stream.php @@ -10,12 +10,13 @@ namespace Amp; interface Stream { /** - * Succeeds with a tuple of the yielded value and key or null if the stream has completed. If the stream fails, - * the returned promise will fail with the same exception. + * Succeeds with a single element array containing the yielded value if the stream has yielded a value. If the + * stream completes the promise resolves with null. If the stream fails, the returned promise will fail with the + * same exception. * - * @return Promise + * @return Promise Resolves with null if the stream has completed. * - * @psalm-return Promise|null> + * @psalm-return Promise> * * @throws \Throwable The exception used to fail the stream. */ diff --git a/lib/functions.php b/lib/functions.php index 6168f5b..d41d99b 100644 --- a/lib/functions.php +++ b/lib/functions.php @@ -811,10 +811,21 @@ namespace Amp\Iterator }); } + /** + * @template TValue + * + * @param Stream $stream + * + * @psalm-param Stream $stream + * + * @return Iterator + * + * @psalm-return Iterator + */ function fromStream(Stream $stream): Iterator { return new Producer(function (callable $emit) use ($stream): \Generator { - while (null !== $value = $stream->continue()) { + while (null !== $value = yield $stream->continue()) { yield $emit($value); } }); @@ -837,11 +848,17 @@ namespace Amp\Stream * Creates a stream from the given iterable, emitting the each value. The iterable may contain promises. If any * promise fails, the returned stream will fail with the same reason. * + * @template TValue + * * @param array|\Traversable $iterable Elements to yield. * @param int $delay Delay between elements yielded in milliseconds. * + * @psalm-param iterable $iterable + * * @return Stream * + * @psalm-return Stream + * * @throws \TypeError If the argument is not an array or instance of \Traversable. */ function fromIterable(/* iterable */ @@ -873,7 +890,7 @@ namespace Amp\Stream * @template TReturn * * @param Stream $stream - * @param callable (TValue $value, int $key): TReturn $onYield + * @param callable(TValue $value):TReturn $onYield * * @psalm-param Stream $stream * @@ -884,8 +901,8 @@ namespace Amp\Stream function map(Stream $stream, callable $onYield): Stream { return new AsyncGenerator(static function (callable $yield) use ($stream, $onYield) { - while (list($value, $key) = yield $stream->continue()) { - yield $yield($onYield($value, $key)); + while (list($value) = yield $stream->continue()) { + yield $yield($onYield($value)); } }); } @@ -894,7 +911,7 @@ namespace Amp\Stream * @template TValue * * @param Stream $stream - * @param callable(TValue $value, int $key):bool $filter + * @param callable(TValue $value):bool $filter * * @psalm-param Stream $stream * @@ -905,9 +922,9 @@ namespace Amp\Stream function filter(Stream $stream, callable $filter): Stream { return new AsyncGenerator(static function (callable $yield) use ($stream, $filter) { - while (list($value, $key) = yield $stream->continue()) { - if ($filter($value, $key)) { - yield $yield($value, $key); + while (list($value) = yield $stream->continue()) { + if ($filter($value)) { + yield $yield($value); } } }); @@ -1027,8 +1044,6 @@ namespace Amp\Stream * @psalm-param Stream $stream * * @return Promise - * - * @psalm-return Promise */ function discard(Stream $stream): Promise { @@ -1059,7 +1074,7 @@ namespace Amp\Stream function toArray(Stream $stream): Promise { return call(static function () use ($stream): \Generator { - /** @psalm-var list $array */ + /** @psalm-var list $array */ $array = []; while (list($value) = yield $stream->continue()) { @@ -1070,6 +1085,19 @@ namespace Amp\Stream }); } + /** + * Converts an instance of the deprecated {@see Iterator} into an instance of {@see Stream}. + * + * @template TValue + * + * @param Iterator $iterator + * + * @psalm-param Iterator $iterator + * + * @return Stream + * + * @psalm-return Stream + */ function fromIterator(Iterator $iterator): Stream { return new AsyncGenerator(function (callable $yield) use ($iterator): \Generator { diff --git a/test/AsyncGeneratorTest.php b/test/AsyncGeneratorTest.php index eef33cc..4edf04b 100644 --- a/test/AsyncGeneratorTest.php +++ b/test/AsyncGeneratorTest.php @@ -30,7 +30,7 @@ class AsyncGeneratorTest extends BaseTest yield $yield($value); }); - $this->assertSame([$value, 0], yield $generator->continue()); + $this->assertSame([$value], yield $generator->continue()); }); } @@ -43,7 +43,7 @@ class AsyncGeneratorTest extends BaseTest $result = yield $yield($value); }); - $this->assertSame([$value, 0], yield $generator->continue()); + $this->assertSame([$value], yield $generator->continue()); $this->assertNull(yield $generator->send($send)); $this->assertSame($result, $send); }); @@ -62,7 +62,7 @@ class AsyncGeneratorTest extends BaseTest $promise1 = $generator->continue(); $promise2 = $generator->send($send); - $this->assertSame([$value, 0], yield $promise1); + $this->assertSame([$value], yield $promise1); $this->assertNull(yield $promise2); $this->assertSame($result, $send); }); @@ -84,7 +84,7 @@ class AsyncGeneratorTest extends BaseTest $promise1 = $generator->continue(); $promise2 = $generator->throw($exception); - $this->assertSame([$value, 0], yield $promise1); + $this->assertSame([$value], yield $promise1); $this->assertNull(yield $promise2); $this->assertSame($result, $exception); }); @@ -104,7 +104,7 @@ class AsyncGeneratorTest extends BaseTest } }); - $this->assertSame([$value, 0], yield $generator->continue()); + $this->assertSame([$value], yield $generator->continue()); $this->assertNull(yield $generator->throw($exception)); $this->assertSame($result, $exception); }); @@ -147,7 +147,7 @@ class AsyncGeneratorTest extends BaseTest return $value; }); - $this->assertSame([null, 0], yield $generator->continue()); + $this->assertSame([null], yield $generator->continue()); $this->assertNull(yield $generator->continue()); $this->assertSame($value, yield $generator->getReturn()); }); diff --git a/test/Stream/FilterTest.php b/test/Stream/FilterTest.php index dd6dfbd..0bc58e6 100644 --- a/test/Stream/FilterTest.php +++ b/test/Stream/FilterTest.php @@ -3,12 +3,12 @@ namespace Amp\Test\Stream; use Amp\AsyncGenerator; +use Amp\PHPUnit\AsyncTestCase; use Amp\PHPUnit\TestException; use Amp\Stream; use Amp\StreamSource; -use Amp\Test\BaseTest; -class FilterTest extends BaseTest +class FilterTest extends AsyncTestCase { public function testNoValuesEmitted() { diff --git a/test/YielderTraitTest.php b/test/YielderTraitTest.php index 4aa23c5..ac9eb00 100644 --- a/test/YielderTraitTest.php +++ b/test/YielderTraitTest.php @@ -34,7 +34,7 @@ class YielderTraitTest extends TestCase $promise = $this->source->yield($value); $stream = $this->source->createStream(); - $this->assertSame([$value, 0], yield $stream->continue()); + $this->assertSame([$value], yield $stream->continue()); $this->assertInstanceOf(Promise::class, $promise); $this->assertNull(yield $promise); @@ -60,7 +60,7 @@ class YielderTraitTest extends TestCase { Loop::run(function () { $this->source->yield(null); - $this->assertSame([null, 0], yield $this->source->createStream()->continue()); + $this->assertSame([null], yield $this->source->createStream()->continue()); }); } @@ -115,7 +115,7 @@ class YielderTraitTest extends TestCase $this->assertNull(yield $this->source->yield($value)); - $this->assertSame([$value, 0], yield $promise); + $this->assertSame([$value], yield $promise); }); }