1
0
mirror of https://github.com/danog/amp.git synced 2024-12-02 17:37:50 +01:00

Remove key

This commit is contained in:
Aaron Piotrowski 2020-05-18 13:49:56 -05:00
parent e1402f8484
commit 57924690fe
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
7 changed files with 77 additions and 49 deletions

View File

@ -71,6 +71,8 @@ final class AsyncGenerator implements Stream
* @psalm-param TSend $value * @psalm-param TSend $value
* *
* @return Promise<array> * @return Promise<array>
*
* @psalm-return Promise<list<TValue>>
*/ */
public function send($value): Promise public function send($value): Promise
{ {
@ -83,6 +85,8 @@ final class AsyncGenerator implements Stream
* @param \Throwable $exception Exception to throw into the async generator. * @param \Throwable $exception Exception to throw into the async generator.
* *
* @return Promise<array> * @return Promise<array>
*
* @psalm-return Promise<list<TValue>>
*/ */
public function throw(\Throwable $exception): Promise public function throw(\Throwable $exception): Promise
{ {

View File

@ -39,9 +39,6 @@ trait Yielder
/** @var Deferred[] */ /** @var Deferred[] */
private $waiting = []; private $waiting = [];
/** @var int */
private $nextKey = 0;
/** @var int */ /** @var int */
private $consumePosition = 0; private $consumePosition = 0;
@ -58,9 +55,9 @@ trait Yielder
private $used = false; private $used = false;
/** /**
* @return Promise<array|null> * @return Promise<array>
* *
* @psalm-return Promise<list<TValue>|null> * @psalm-return Promise<list<TValue>>
*/ */
public function continue(): Promise public function continue(): Promise
{ {
@ -72,9 +69,9 @@ trait Yielder
* *
* @psalm-param TSend $value * @psalm-param TSend $value
* *
* @return Promise<array|null> * @return Promise<array>
* *
* @psalm-return Promise<list<TValue>|null> * @psalm-return Promise<list<TValue>>
*/ */
public function send($value): Promise public function send($value): Promise
{ {
@ -88,9 +85,9 @@ trait Yielder
/** /**
* @param \Throwable $exception * @param \Throwable $exception
* *
* @return Promise<array|null> * @return Promise<array>
* *
* @psalm-return Promise<list<TValue>|null> * @psalm-return Promise<list<TValue>>
*/ */
public function throw(\Throwable $exception): Promise public function throw(\Throwable $exception): Promise
{ {
@ -104,11 +101,11 @@ trait Yielder
/** /**
* @param Promise<mixed> $promise * @param Promise<mixed> $promise
* *
* @psalm-param Promise<TSend|null> * @psalm-param Promise<TSend|null> $promise
* *
* @return Promise<array|null> * @return Promise<array>
* *
* @psalm-return Promise<list<TValue>|null> * @psalm-return Promise<list<TValue>>
*/ */
private function next(Promise $promise): Promise private function next(Promise $promise): Promise
{ {
@ -123,11 +120,11 @@ trait Yielder
$this->sendValues[$position - 1] = $promise; $this->sendValues[$position - 1] = $promise;
} }
if (isset($this->yieldedValues[$position])) { if (\array_key_exists($position, $this->yieldedValues)) {
$tuple = $this->yieldedValues[$position]; $value = $this->yieldedValues[$position];
unset($this->yieldedValues[$position]); unset($this->yieldedValues[$position]);
return new Success($tuple); return new Success([$value]);
} }
if ($this->result) { if ($this->result) {
@ -190,11 +187,11 @@ trait Yielder
* *
* @psalm-param TValue $value * @psalm-param TValue $value
* *
* @return Promise<TSend> Resolves with the key of the yielded value once the value has been consumed. Fails with * @return Promise<mixed> Resolves with the sent value once the value has been consumed. Fails with the failure
* the failure reason if the {@see fail()} is called, or with {@see DisposedException} if the * reason if the {@see fail()} is called, or with {@see DisposedException} if the stream
* stream is destroyed. * is destroyed.
* *
* @psalm-return Promise<TSend> * @psalm-return Promise<TSend|null>
* *
* @throws \Error If the stream has completed. * @throws \Error If the stream has completed.
*/ */
@ -212,14 +209,12 @@ trait Yielder
throw new \TypeError("Streams cannot yield promises"); throw new \TypeError("Streams cannot yield promises");
} }
$key = $this->nextKey++;
$tuple = [$value, $key];
$position = $this->yieldPosition++; $position = $this->yieldPosition++;
if (isset($this->waiting[$position])) { if (isset($this->waiting[$position])) {
$deferred = $this->waiting[$position]; $deferred = $this->waiting[$position];
unset($this->waiting[$position]); unset($this->waiting[$position]);
$deferred->resolve($tuple); $deferred->resolve([$value]);
// Send-values are indexed as $this->consumePosition - 1, so use $position for the next value. // Send-values are indexed as $this->consumePosition - 1, so use $position for the next value.
if (isset($this->sendValues[$position])) { if (isset($this->sendValues[$position])) {
@ -228,7 +223,7 @@ trait Yielder
return $promise; return $promise;
} }
} else { } else {
$this->yieldedValues[$position] = $tuple; $this->yieldedValues[$position] = $value;
} }
$this->backPressure[$position] = $deferred = new Deferred; $this->backPressure[$position] = $deferred = new Deferred;

View File

@ -10,12 +10,13 @@ namespace Amp;
interface Stream interface Stream
{ {
/** /**
* Succeeds with a tuple of the yielded value and key or null if the stream has completed. If the stream fails, * Succeeds with a single element array containing the yielded value if the stream has yielded a value. If the
* the returned promise will fail with the same exception. * stream completes the promise resolves with null. If the stream fails, the returned promise will fail with the
* same exception.
* *
* @return Promise<array|null> * @return Promise<array> Resolves with null if the stream has completed.
* *
* @psalm-return Promise<list<TValue>|null> * @psalm-return Promise<list<TValue>>
* *
* @throws \Throwable The exception used to fail the stream. * @throws \Throwable The exception used to fail the stream.
*/ */

View File

@ -811,10 +811,21 @@ namespace Amp\Iterator
}); });
} }
/**
* @template TValue
*
* @param Stream $stream
*
* @psalm-param Stream<TValue> $stream
*
* @return Iterator
*
* @psalm-return Iterator<TValue>
*/
function fromStream(Stream $stream): Iterator function fromStream(Stream $stream): Iterator
{ {
return new Producer(function (callable $emit) use ($stream): \Generator { return new Producer(function (callable $emit) use ($stream): \Generator {
while (null !== $value = $stream->continue()) { while (null !== $value = yield $stream->continue()) {
yield $emit($value); yield $emit($value);
} }
}); });
@ -837,11 +848,17 @@ namespace Amp\Stream
* Creates a stream from the given iterable, emitting the each value. The iterable may contain promises. If any * Creates a stream from the given iterable, emitting the each value. The iterable may contain promises. If any
* promise fails, the returned stream will fail with the same reason. * promise fails, the returned stream will fail with the same reason.
* *
* @template TValue
*
* @param array|\Traversable $iterable Elements to yield. * @param array|\Traversable $iterable Elements to yield.
* @param int $delay Delay between elements yielded in milliseconds. * @param int $delay Delay between elements yielded in milliseconds.
* *
* @psalm-param iterable<TValue> $iterable
*
* @return Stream * @return Stream
* *
* @psalm-return Stream<TValue>
*
* @throws \TypeError If the argument is not an array or instance of \Traversable. * @throws \TypeError If the argument is not an array or instance of \Traversable.
*/ */
function fromIterable(/* iterable */ function fromIterable(/* iterable */
@ -873,7 +890,7 @@ namespace Amp\Stream
* @template TReturn * @template TReturn
* *
* @param Stream $stream * @param Stream $stream
* @param callable (TValue $value, int $key): TReturn $onYield * @param callable(TValue $value):TReturn $onYield
* *
* @psalm-param Stream<TValue> $stream * @psalm-param Stream<TValue> $stream
* *
@ -884,8 +901,8 @@ namespace Amp\Stream
function map(Stream $stream, callable $onYield): Stream function map(Stream $stream, callable $onYield): Stream
{ {
return new AsyncGenerator(static function (callable $yield) use ($stream, $onYield) { return new AsyncGenerator(static function (callable $yield) use ($stream, $onYield) {
while (list($value, $key) = yield $stream->continue()) { while (list($value) = yield $stream->continue()) {
yield $yield($onYield($value, $key)); yield $yield($onYield($value));
} }
}); });
} }
@ -894,7 +911,7 @@ namespace Amp\Stream
* @template TValue * @template TValue
* *
* @param Stream $stream * @param Stream $stream
* @param callable(TValue $value, int $key):bool $filter * @param callable(TValue $value):bool $filter
* *
* @psalm-param Stream<TValue> $stream * @psalm-param Stream<TValue> $stream
* *
@ -905,9 +922,9 @@ namespace Amp\Stream
function filter(Stream $stream, callable $filter): Stream function filter(Stream $stream, callable $filter): Stream
{ {
return new AsyncGenerator(static function (callable $yield) use ($stream, $filter) { return new AsyncGenerator(static function (callable $yield) use ($stream, $filter) {
while (list($value, $key) = yield $stream->continue()) { while (list($value) = yield $stream->continue()) {
if ($filter($value, $key)) { if ($filter($value)) {
yield $yield($value, $key); yield $yield($value);
} }
} }
}); });
@ -1027,8 +1044,6 @@ namespace Amp\Stream
* @psalm-param Stream<TValue> $stream * @psalm-param Stream<TValue> $stream
* *
* @return Promise<int> * @return Promise<int>
*
* @psalm-return Promise<int>
*/ */
function discard(Stream $stream): Promise function discard(Stream $stream): Promise
{ {
@ -1059,7 +1074,7 @@ namespace Amp\Stream
function toArray(Stream $stream): Promise function toArray(Stream $stream): Promise
{ {
return call(static function () use ($stream): \Generator { return call(static function () use ($stream): \Generator {
/** @psalm-var list $array */ /** @psalm-var list<TValue> $array */
$array = []; $array = [];
while (list($value) = yield $stream->continue()) { while (list($value) = yield $stream->continue()) {
@ -1070,6 +1085,19 @@ namespace Amp\Stream
}); });
} }
/**
* Converts an instance of the deprecated {@see Iterator} into an instance of {@see Stream}.
*
* @template TValue
*
* @param Iterator $iterator
*
* @psalm-param Iterator<TValue> $iterator
*
* @return Stream
*
* @psalm-return Stream<TValue>
*/
function fromIterator(Iterator $iterator): Stream function fromIterator(Iterator $iterator): Stream
{ {
return new AsyncGenerator(function (callable $yield) use ($iterator): \Generator { return new AsyncGenerator(function (callable $yield) use ($iterator): \Generator {

View File

@ -30,7 +30,7 @@ class AsyncGeneratorTest extends BaseTest
yield $yield($value); yield $yield($value);
}); });
$this->assertSame([$value, 0], yield $generator->continue()); $this->assertSame([$value], yield $generator->continue());
}); });
} }
@ -43,7 +43,7 @@ class AsyncGeneratorTest extends BaseTest
$result = yield $yield($value); $result = yield $yield($value);
}); });
$this->assertSame([$value, 0], yield $generator->continue()); $this->assertSame([$value], yield $generator->continue());
$this->assertNull(yield $generator->send($send)); $this->assertNull(yield $generator->send($send));
$this->assertSame($result, $send); $this->assertSame($result, $send);
}); });
@ -62,7 +62,7 @@ class AsyncGeneratorTest extends BaseTest
$promise1 = $generator->continue(); $promise1 = $generator->continue();
$promise2 = $generator->send($send); $promise2 = $generator->send($send);
$this->assertSame([$value, 0], yield $promise1); $this->assertSame([$value], yield $promise1);
$this->assertNull(yield $promise2); $this->assertNull(yield $promise2);
$this->assertSame($result, $send); $this->assertSame($result, $send);
}); });
@ -84,7 +84,7 @@ class AsyncGeneratorTest extends BaseTest
$promise1 = $generator->continue(); $promise1 = $generator->continue();
$promise2 = $generator->throw($exception); $promise2 = $generator->throw($exception);
$this->assertSame([$value, 0], yield $promise1); $this->assertSame([$value], yield $promise1);
$this->assertNull(yield $promise2); $this->assertNull(yield $promise2);
$this->assertSame($result, $exception); $this->assertSame($result, $exception);
}); });
@ -104,7 +104,7 @@ class AsyncGeneratorTest extends BaseTest
} }
}); });
$this->assertSame([$value, 0], yield $generator->continue()); $this->assertSame([$value], yield $generator->continue());
$this->assertNull(yield $generator->throw($exception)); $this->assertNull(yield $generator->throw($exception));
$this->assertSame($result, $exception); $this->assertSame($result, $exception);
}); });
@ -147,7 +147,7 @@ class AsyncGeneratorTest extends BaseTest
return $value; return $value;
}); });
$this->assertSame([null, 0], yield $generator->continue()); $this->assertSame([null], yield $generator->continue());
$this->assertNull(yield $generator->continue()); $this->assertNull(yield $generator->continue());
$this->assertSame($value, yield $generator->getReturn()); $this->assertSame($value, yield $generator->getReturn());
}); });

View File

@ -3,12 +3,12 @@
namespace Amp\Test\Stream; namespace Amp\Test\Stream;
use Amp\AsyncGenerator; use Amp\AsyncGenerator;
use Amp\PHPUnit\AsyncTestCase;
use Amp\PHPUnit\TestException; use Amp\PHPUnit\TestException;
use Amp\Stream; use Amp\Stream;
use Amp\StreamSource; use Amp\StreamSource;
use Amp\Test\BaseTest;
class FilterTest extends BaseTest class FilterTest extends AsyncTestCase
{ {
public function testNoValuesEmitted() public function testNoValuesEmitted()
{ {

View File

@ -34,7 +34,7 @@ class YielderTraitTest extends TestCase
$promise = $this->source->yield($value); $promise = $this->source->yield($value);
$stream = $this->source->createStream(); $stream = $this->source->createStream();
$this->assertSame([$value, 0], yield $stream->continue()); $this->assertSame([$value], yield $stream->continue());
$this->assertInstanceOf(Promise::class, $promise); $this->assertInstanceOf(Promise::class, $promise);
$this->assertNull(yield $promise); $this->assertNull(yield $promise);
@ -60,7 +60,7 @@ class YielderTraitTest extends TestCase
{ {
Loop::run(function () { Loop::run(function () {
$this->source->yield(null); $this->source->yield(null);
$this->assertSame([null, 0], yield $this->source->createStream()->continue()); $this->assertSame([null], yield $this->source->createStream()->continue());
}); });
} }
@ -115,7 +115,7 @@ class YielderTraitTest extends TestCase
$this->assertNull(yield $this->source->yield($value)); $this->assertNull(yield $this->source->yield($value));
$this->assertSame([$value, 0], yield $promise); $this->assertSame([$value], yield $promise);
}); });
} }