1
0
mirror of https://github.com/danog/amp.git synced 2024-12-02 17:37:50 +01:00

Rename yield() to emit()

This commit is contained in:
Aaron Piotrowski 2020-05-28 12:59:55 -05:00
parent 66f0deb563
commit b64b3affc9
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
11 changed files with 122 additions and 122 deletions

View File

@ -10,25 +10,25 @@ use Amp\Loop;
Loop::run(function () { Loop::run(function () {
try { try {
/** @psalm-var AsyncGenerator<int, int, int> $generator */ /** @psalm-var AsyncGenerator<int, int, int> $generator */
$generator = new AsyncGenerator(function (callable $yield): \Generator { $generator = new AsyncGenerator(function (callable $emit): \Generator {
$value = yield $yield(0); $value = yield $emit(0);
$value = yield $yield(yield new Delayed(500, $value)); $value = yield $emit(yield new Delayed(500, $value));
$value = yield $yield($value); $value = yield $emit($value);
$value = yield $yield(yield new Delayed(300, $value)); $value = yield $emit(yield new Delayed(300, $value));
$value = yield $yield($value); $value = yield $emit($value);
$value = yield $yield($value); $value = yield $emit($value);
$value = yield $yield(yield new Delayed(1000, $value)); $value = yield $emit(yield new Delayed(1000, $value));
$value = yield $yield($value); $value = yield $emit($value);
$value = yield $yield($value); $value = yield $emit($value);
$value = yield $yield(yield new Delayed(600, $value)); $value = yield $emit(yield new Delayed(600, $value));
return $value; return $value;
}); });
// Use AsyncGenerator::continue() to get the first yielded value. // Use AsyncGenerator::continue() to get the first emitted value.
if (null !== $value = yield $generator->continue()) { if (null !== $value = yield $generator->continue()) {
\printf("Async Generator yielded %d\n", $value); \printf("Async Generator yielded %d\n", $value);
// Use AsyncGenerator::send() to send values into the generator and get the next yielded value. // Use AsyncGenerator::send() to send values into the generator and get the next emitted value.
while (null !== $value = yield $generator->send($value + 1)) { while (null !== $value = yield $generator->send($value + 1)) {
\printf("Async Generator yielded %d\n", $value); \printf("Async Generator yielded %d\n", $value);
yield new Delayed(100); // Listener consumption takes 100 ms. yield new Delayed(100); // Listener consumption takes 100 ms.

View File

@ -15,16 +15,16 @@ Loop::run(function () {
$stream = $source->stream(); $stream = $source->stream();
asyncCall(function (StreamSource $source): \Generator { asyncCall(function (StreamSource $source): \Generator {
yield $source->yield(yield new Delayed(500, 1)); yield $source->emit(yield new Delayed(500, 1));
yield $source->yield(yield new Delayed(1500, 2)); yield $source->emit(yield new Delayed(1500, 2));
yield $source->yield(yield new Delayed(1000, 3)); yield $source->emit(yield new Delayed(1000, 3));
yield $source->yield(yield new Delayed(2000, 4)); yield $source->emit(yield new Delayed(2000, 4));
yield $source->yield(5); yield $source->emit(5);
yield $source->yield(6); yield $source->emit(6);
yield $source->yield(7); yield $source->emit(7);
yield $source->yield(yield new Delayed(2000, 8)); yield $source->emit(yield new Delayed(2000, 8));
yield $source->yield(9); yield $source->emit(9);
yield $source->yield(10); yield $source->emit(10);
$source->complete(); $source->complete();
}, $source); }, $source);

View File

@ -10,20 +10,20 @@ use Amp\Loop;
Loop::run(function () { Loop::run(function () {
try { try {
/** @psalm-var AsyncGenerator<int, void, void> $stream */ /** @psalm-var AsyncGenerator<int, void, void> $stream */
$stream = new AsyncGenerator(function (callable $yield): \Generator { $stream = new AsyncGenerator(function (callable $emit): \Generator {
yield $yield(1); yield $emit(1);
yield $yield(yield new Delayed(500, 2)); yield $emit(yield new Delayed(500, 2));
yield $yield(3); yield $emit(3);
yield $yield(yield new Delayed(300, 4)); yield $emit(yield new Delayed(300, 4));
yield $yield(5); yield $emit(5);
yield $yield(6); yield $emit(6);
yield $yield(yield new Delayed(1000, 7)); yield $emit(yield new Delayed(1000, 7));
yield $yield(8); yield $emit(8);
yield $yield(9); yield $emit(9);
yield $yield(yield new Delayed(600, 10)); yield $emit(yield new Delayed(600, 10));
}); });
// Flow listener attempts to consume 11 values at once. Only 10 will be yielded. // Flow listener attempts to consume 11 values at once. Only 10 will be emitted.
$promises = []; $promises = [];
for ($i = 0; $i < 11 && ($promises[] = $stream->continue()); ++$i); for ($i = 0; $i < 11 && ($promises[] = $stream->continue()); ++$i);

View File

@ -13,17 +13,17 @@ Loop::run(function () {
$source = new StreamSource; $source = new StreamSource;
Loop::defer(function () use ($source) { Loop::defer(function () use ($source) {
// Source yields all values at once without awaiting back-pressure. // Source emits all values at once without awaiting back-pressure.
$source->yield(1); $source->emit(1);
$source->yield(2); $source->emit(2);
$source->yield(3); $source->emit(3);
$source->yield(4); $source->emit(4);
$source->yield(5); $source->emit(5);
$source->yield(6); $source->emit(6);
$source->yield(7); $source->emit(7);
$source->yield(8); $source->emit(8);
$source->yield(9); $source->emit(9);
$source->yield(10); $source->emit(10);
$source->complete(); $source->complete();
}); });

View File

@ -9,7 +9,7 @@ namespace Amp;
*/ */
final class AsyncGenerator implements Stream final class AsyncGenerator implements Stream
{ {
/** @var Internal\YieldSource<TValue, TSend> */ /** @var Internal\EmitSource<TValue, TSend> */
private $source; private $source;
/** @var Promise<TReturn> */ /** @var Promise<TReturn> */
@ -23,18 +23,18 @@ final class AsyncGenerator implements Stream
*/ */
public function __construct(callable $callable) public function __construct(callable $callable)
{ {
$this->source = $source = new Internal\YieldSource; $this->source = $source = new Internal\EmitSource;
if (\PHP_VERSION_ID < 70100) { if (\PHP_VERSION_ID < 70100) {
$yield = static function ($value) use ($source): Promise { $emit = static function ($value) use ($source): Promise {
return $source->yield($value); return $source->emit($value);
}; };
} else { } else {
$yield = \Closure::fromCallable([$source, "yield"]); $emit = \Closure::fromCallable([$source, "emit"]);
} }
try { try {
$generator = $callable($yield); $generator = $callable($emit);
} catch (\Throwable $exception) { } catch (\Throwable $exception) {
throw new \Error("The callable threw an exception", 0, $exception); throw new \Error("The callable threw an exception", 0, $exception);
} }
@ -69,7 +69,7 @@ final class AsyncGenerator implements Stream
/** /**
* Sends a value to the async generator, resolving the back-pressure promise with the given value. * Sends a value to the async generator, resolving the back-pressure promise with the given value.
* The first yielded value must be retrieved using {@see continue()}. * The first emitted value must be retrieved using {@see continue()}.
* *
* @param mixed $value Value to send to the async generator. * @param mixed $value Value to send to the async generator.
* *
@ -79,7 +79,7 @@ final class AsyncGenerator implements Stream
* *
* @psalm-return Promise<TValue|null> * @psalm-return Promise<TValue|null>
* *
* @throws \Error If the first yielded value has not been retrieved using {@see continue()}. * @throws \Error If the first emitted value has not been retrieved using {@see continue()}.
*/ */
public function send($value): Promise public function send($value): Promise
{ {
@ -88,7 +88,7 @@ final class AsyncGenerator implements Stream
/** /**
* Throws an exception into the async generator, failing the back-pressure promise with the given exception. * Throws an exception into the async generator, failing the back-pressure promise with the given exception.
* The first yielded value must be retrieved using {@see continue()}. * The first emitted value must be retrieved using {@see continue()}.
* *
* @param \Throwable $exception Exception to throw into the async generator. * @param \Throwable $exception Exception to throw into the async generator.
* *
@ -96,7 +96,7 @@ final class AsyncGenerator implements Stream
* *
* @psalm-return Promise<TValue|null> * @psalm-return Promise<TValue|null>
* *
* @throws \Error If the first yielded value has not been retrieved using {@see continue()}. * @throws \Error If the first emitted value has not been retrieved using {@see continue()}.
*/ */
public function throw(\Throwable $exception): Promise public function throw(\Throwable $exception): Promise
{ {

View File

@ -3,7 +3,7 @@
namespace Amp; namespace Amp;
/** /**
* Will be thrown from {@see StreamSource::yield()} or the yield callable provided by {@see AsyncGenerator} if the * Will be thrown from {@see StreamSource::emit()} or the emit callable provided by {@see AsyncGenerator} if the
* associated stream is destroyed. * associated stream is destroyed.
*/ */
final class DisposedException extends \Exception final class DisposedException extends \Exception

View File

@ -6,7 +6,7 @@ use Amp\Promise;
use Amp\Stream; use Amp\Stream;
/** /**
* Wraps a Stream instance that has public methods to yield, complete, and fail into an object that only allows * Wraps a Stream instance that has public methods to emit, complete, and fail into an object that only allows
* access to the public API methods and sets $disposed to true when the object is destroyed. * access to the public API methods and sets $disposed to true when the object is destroyed.
* *
* @internal * @internal
@ -16,10 +16,10 @@ use Amp\Stream;
*/ */
final class AutoDisposingStream implements Stream final class AutoDisposingStream implements Stream
{ {
/** @var YieldSource<TValue, null> */ /** @var EmitSource<TValue, null> */
private $source; private $source;
public function __construct(YieldSource $source) public function __construct(EmitSource $source)
{ {
$this->source = $source; $this->source = $source;
} }

View File

@ -19,7 +19,7 @@ use React\Promise\PromiseInterface as ReactPromise;
* @template TValue * @template TValue
* @template TSend * @template TSend
*/ */
final class YieldSource final class EmitSource
{ {
/** @var Promise|null */ /** @var Promise|null */
private $result; private $result;
@ -28,7 +28,7 @@ final class YieldSource
private $completed = false; private $completed = false;
/** @var mixed[] */ /** @var mixed[] */
private $yieldedValues = []; private $emittedValues = [];
/** @var Promise[] */ /** @var Promise[] */
private $sendValues = []; private $sendValues = [];
@ -43,7 +43,7 @@ final class YieldSource
private $consumePosition = 0; private $consumePosition = 0;
/** @var int */ /** @var int */
private $yieldPosition = 0; private $emitPosition = 0;
/** @var array|null */ /** @var array|null */
private $resolutionTrace; private $resolutionTrace;
@ -120,9 +120,9 @@ final class YieldSource
$this->sendValues[$position - 1] = $promise; $this->sendValues[$position - 1] = $promise;
} }
if (\array_key_exists($position, $this->yieldedValues)) { if (\array_key_exists($position, $this->emittedValues)) {
$value = $this->yieldedValues[$position]; $value = $this->emittedValues[$position];
unset($this->yieldedValues[$position]); unset($this->emittedValues[$position]);
return new Success($value); return new Success($value);
} }
@ -160,7 +160,7 @@ final class YieldSource
} }
/** /**
* Yields a value from the stream. The returned promise is resolved once the yielded value has been consumed or * Emits a value from the stream. The returned promise is resolved once the emitted value has been consumed or
* if the stream is completed, failed, or disposed. * if the stream is completed, failed, or disposed.
* *
* @param mixed $value * @param mixed $value
@ -175,25 +175,25 @@ final class YieldSource
* *
* @throws \Error If the stream has completed. * @throws \Error If the stream has completed.
*/ */
public function yield($value): Promise public function emit($value): Promise
{ {
if ($this->result) { if ($this->result) {
if ($this->disposed) { if ($this->disposed) {
return $this->result; // Promise failed with an instance of DisposedException. return $this->result; // Promise failed with an instance of DisposedException.
} }
throw new \Error("Streams cannot yield values after calling complete"); throw new \Error("Streams cannot emit values after calling complete");
} }
if ($value === null) { if ($value === null) {
throw new \TypeError("Streams cannot yield NULL"); throw new \TypeError("Streams cannot emit NULL");
} }
if ($value instanceof Promise || $value instanceof ReactPromise) { if ($value instanceof Promise || $value instanceof ReactPromise) {
throw new \TypeError("Streams cannot yield promises"); throw new \TypeError("Streams cannot emit promises");
} }
$position = $this->yieldPosition++; $position = $this->emitPosition++;
if (isset($this->waiting[$position])) { if (isset($this->waiting[$position])) {
$deferred = $this->waiting[$position]; $deferred = $this->waiting[$position];
@ -207,7 +207,7 @@ final class YieldSource
return $promise; return $promise;
} }
} else { } else {
$this->yieldedValues[$position] = $value; $this->emittedValues[$position] = $value;
} }
$this->backPressure[$position] = $deferred = new Deferred; $this->backPressure[$position] = $deferred = new Deferred;

View File

@ -3,20 +3,20 @@
namespace Amp; namespace Amp;
/** /**
* StreamSource is a container for a Stream that can yield values using the yield() method and completed using the * StreamSource is a container for a Stream that can emit values using the emit() method and completed using the
* complete() and fail() methods. The contained Stream may be accessed using the stream() method. This object should * complete() and fail() methods. The contained Stream may be accessed using the stream() method. This object should
* not be returned as part of a public API, but used internally to create and yield values to a Stream. * not be returned as part of a public API, but used internally to create and emit values to a Stream.
* *
* @template TValue * @template TValue
*/ */
final class StreamSource final class StreamSource
{ {
/** @var Internal\YieldSource<TValue, null> Has public yield, complete, and fail methods. */ /** @var Internal\EmitSource<TValue, null> Has public emit, complete, and fail methods. */
private $source; private $source;
public function __construct() public function __construct()
{ {
$this->source = new Internal\YieldSource; $this->source = new Internal\EmitSource;
} }
/** /**
@ -34,18 +34,18 @@ final class StreamSource
} }
/** /**
* Yields a value to the stream. * Emits a value to the stream.
* *
* @param mixed $value * @param mixed $value
* *
* @psalm-param TValue $value * @psalm-param TValue $value
* *
* @return Promise<null> Resolves with null when the yielded value has been consumed or fails with * @return Promise<null> Resolves with null when the emitted value has been consumed or fails with
* {@see DisposedException} if the stream has been destroyed. * {@see DisposedException} if the stream has been destroyed.
*/ */
public function yield($value): Promise public function emit($value): Promise
{ {
return $this->source->yield($value); return $this->source->emit($value);
} }
/** /**

View File

@ -850,8 +850,8 @@ namespace Amp\Stream
* *
* @template TValue * @template TValue
* *
* @param array|\Traversable $iterable Elements to yield. * @param array|\Traversable $iterable Elements to emit.
* @param int $delay Delay between elements yielded in milliseconds. * @param int $delay Delay between elements emitted in milliseconds.
* *
* @psalm-param iterable<TValue> $iterable * @psalm-param iterable<TValue> $iterable
* *
@ -890,7 +890,7 @@ namespace Amp\Stream
* @template TReturn * @template TReturn
* *
* @param Stream $stream * @param Stream $stream
* @param callable(TValue $value):TReturn $onYield * @param callable(TValue $value):TReturn $onEmit
* *
* @psalm-param Stream<TValue> $stream * @psalm-param Stream<TValue> $stream
* *
@ -898,11 +898,11 @@ namespace Amp\Stream
* *
* @psalm-return Stream<TReturn> * @psalm-return Stream<TReturn>
*/ */
function map(Stream $stream, callable $onYield): Stream function map(Stream $stream, callable $onEmit): Stream
{ {
return new AsyncGenerator(static function (callable $yield) use ($stream, $onYield) { return new AsyncGenerator(static function (callable $yield) use ($stream, $onEmit) {
while (null !== $value = yield $stream->continue()) { while (null !== $value = yield $stream->continue()) {
yield $yield(yield call($onYield, $value)); yield $yield(yield call($onEmit, $value));
} }
}); });
} }
@ -931,7 +931,7 @@ namespace Amp\Stream
} }
/** /**
* Creates a stream that yields values emitted from any stream in the array of streams. * Creates a stream that emits values emitted from any stream in the array of streams.
* *
* @param Stream[] $streams * @param Stream[] $streams
* *
@ -944,7 +944,7 @@ namespace Amp\Stream
$coroutine = coroutine(static function (Stream $stream) use (&$source) { $coroutine = coroutine(static function (Stream $stream) use (&$source) {
while ((null !== $value = yield $stream->continue()) && $source !== null) { while ((null !== $value = yield $stream->continue()) && $source !== null) {
yield $source->yield($value); yield $source->emit($value);
} }
}); });
@ -972,8 +972,8 @@ namespace Amp\Stream
} }
/** /**
* Concatenates the given streams into a single stream, yielding values from a single stream at a time. The * Concatenates the given streams into a single stream, emitting from a single stream at a time. The
* prior stream must complete before values are yielded from any subsequent streams. Streams are concatenated * prior stream must complete before values are emitted from any subsequent streams. Streams are concatenated
* in the order given (iteration order of the array). * in the order given (iteration order of the array).
* *
* @param Stream[] $streams * @param Stream[] $streams
@ -1016,7 +1016,7 @@ namespace Amp\Stream
} }
} }
yield $source->yield($value); yield $source->emit($value);
}); });
$previous[] = $coroutine($iterator, $emit); $previous[] = $coroutine($iterator, $emit);
$promise = Promise\all($previous); $promise = Promise\all($previous);

View File

@ -3,32 +3,32 @@
namespace Amp\Test; namespace Amp\Test;
use Amp\DisposedException; use Amp\DisposedException;
use Amp\Internal\YieldSource; use Amp\Internal\EmitSource;
use Amp\PHPUnit\AsyncTestCase; use Amp\PHPUnit\AsyncTestCase;
use Amp\Promise; use Amp\Promise;
use Amp\Success; use Amp\Success;
class YieldSourceTest extends AsyncTestCase class EmitSourceTest extends AsyncTestCase
{ {
/** @var YieldSource */ /** @var EmitSource */
private $source; private $source;
public function setUp() public function setUp()
{ {
parent::setUp(); parent::setUp();
$this->source = new YieldSource; $this->source = new EmitSource;
} }
public function testYield() public function testEmit()
{ {
$value = 'Yielded Value'; $value = 'Emited Value';
$promise = $this->source->yield($value); $promise = $this->source->emit($value);
$stream = $this->source->stream(); $stream = $this->source->stream();
$this->assertSame($value, yield $stream->continue()); $this->assertSame($value, yield $stream->continue());
$continue = $stream->continue(); // Promise will not resolve until another value is yielded or stream completed. $continue = $stream->continue(); // Promise will not resolve until another value is emitted or stream completed.
$this->assertInstanceOf(Promise::class, $promise); $this->assertInstanceOf(Promise::class, $promise);
$this->assertNull(yield $promise); $this->assertNull(yield $promise);
@ -39,37 +39,37 @@ class YieldSourceTest extends AsyncTestCase
} }
/** /**
* @depends testYield * @depends testEmit
*/ */
public function testYieldAfterComplete() public function testEmitAfterComplete()
{ {
$this->expectException(\Error::class); $this->expectException(\Error::class);
$this->expectExceptionMessage('Streams cannot yield values after calling complete'); $this->expectExceptionMessage('Streams cannot emit values after calling complete');
$this->source->complete(); $this->source->complete();
$this->source->yield(1); $this->source->emit(1);
} }
/** /**
* @depends testYield * @depends testEmit
*/ */
public function testYieldingNull() public function testEmitingNull()
{ {
$this->expectException(\TypeError::class); $this->expectException(\TypeError::class);
$this->expectExceptionMessage('Streams cannot yield NULL'); $this->expectExceptionMessage('Streams cannot emit NULL');
$this->source->yield(null); $this->source->emit(null);
} }
/** /**
* @depends testYield * @depends testEmit
*/ */
public function testYieldingPromise() public function testEmitingPromise()
{ {
$this->expectException(\TypeError::class); $this->expectException(\TypeError::class);
$this->expectExceptionMessage('Streams cannot yield promises'); $this->expectExceptionMessage('Streams cannot emit promises');
$this->source->yield(new Success); $this->source->emit(new Success);
} }
public function testDoubleComplete() public function testDoubleComplete()
@ -100,16 +100,16 @@ class YieldSourceTest extends AsyncTestCase
$stream = $this->source->stream(); $stream = $this->source->stream();
} }
public function testYieldAfterContinue() public function testEmitAfterContinue()
{ {
$value = 'Yielded Value'; $value = 'Emited Value';
$stream = $this->source->stream(); $stream = $this->source->stream();
$promise = $stream->continue(); $promise = $stream->continue();
$this->assertInstanceOf(Promise::class, $promise); $this->assertInstanceOf(Promise::class, $promise);
$backPressure = $this->source->yield($value); $backPressure = $this->source->emit($value);
$this->assertSame($value, yield $promise); $this->assertSame($value, yield $promise);
@ -168,7 +168,7 @@ class YieldSourceTest extends AsyncTestCase
}; };
foreach (\range(1, 5) as $value) { foreach (\range(1, 5) as $value) {
$promise = $this->source->yield($value); $promise = $this->source->emit($value);
$promise->onResolve($onResolved); $promise->onResolve($onResolved);
} }
@ -186,28 +186,28 @@ class YieldSourceTest extends AsyncTestCase
$this->source->complete(); // Should throw. $this->source->complete(); // Should throw.
} }
public function testYieldAfterDisposal() public function testEmitAfterDisposal()
{ {
$this->expectException(DisposedException::class); $this->expectException(DisposedException::class);
$this->expectExceptionMessage('The stream has been disposed'); $this->expectExceptionMessage('The stream has been disposed');
$stream = $this->source->stream(); $stream = $this->source->stream();
$promise = $this->source->yield(1); $promise = $this->source->emit(1);
$stream->dispose(); $stream->dispose();
$this->assertNull(yield $promise); $this->assertNull(yield $promise);
yield $this->source->yield(1); yield $this->source->emit(1);
} }
public function testYieldAfterDestruct() public function testEmitAfterDestruct()
{ {
$this->expectException(DisposedException::class); $this->expectException(DisposedException::class);
$this->expectExceptionMessage('The stream has been disposed'); $this->expectExceptionMessage('The stream has been disposed');
$stream = $this->source->stream(); $stream = $this->source->stream();
$promise = $this->source->yield(1); $promise = $this->source->emit(1);
unset($stream); unset($stream);
$this->assertNull(yield $promise); $this->assertNull(yield $promise);
yield $this->source->yield(1); yield $this->source->emit(1);
} }
} }