diff --git a/examples/streams/backpressure.php b/examples/streams/backpressure.php index 800b978..9d25c83 100644 --- a/examples/streams/backpressure.php +++ b/examples/streams/backpressure.php @@ -14,7 +14,7 @@ Loop::run(function () { $stream = $emitter->stream(); - $stream->listen(function ($value) { + $stream->onEmit(function ($value) { printf("Stream emitted %d\n", $value); return new Pause(500); // Artificial back-pressure on stream. }); diff --git a/lib/Failure.php b/lib/Failure.php index 0a8a3d7..33ca931 100644 --- a/lib/Failure.php +++ b/lib/Failure.php @@ -32,6 +32,6 @@ final class Failure implements Stream { /** * {@inheritdoc} */ - public function listen(callable $onNext) { + public function onEmit(callable $onEmit) { } } diff --git a/lib/Internal/Producer.php b/lib/Internal/Producer.php index a383fb1..b0fbae1 100644 --- a/lib/Internal/Producer.php +++ b/lib/Internal/Producer.php @@ -26,14 +26,14 @@ trait Producer { private $listeners = []; /** - * @param callable $onNext + * @param callable $onEmit */ - public function listen(callable $onNext) { + public function onEmit(callable $onEmit) { if ($this->resolved) { return; } - $this->listeners[] = $onNext; + $this->listeners[] = $onEmit; } /** @@ -79,9 +79,9 @@ trait Producer { $promises = []; - foreach ($this->listeners as $onNext) { + foreach ($this->listeners as $onEmit) { try { - $result = $onNext($value); + $result = $onEmit($value); if ($result instanceof ReactPromise) { $result = adapt($result); } diff --git a/lib/Listener.php b/lib/Listener.php index 1376f8e..969b75b 100644 --- a/lib/Listener.php +++ b/lib/Listener.php @@ -48,7 +48,7 @@ class Listener implements Iterator { $backPressure = &$this->backPressure; $resolved = &$this->resolved; - $this->stream->listen(static function ($value) use (&$waiting, &$values, &$backPressure, &$resolved) { + $this->stream->onEmit(static function ($value) use (&$waiting, &$values, &$backPressure, &$resolved) { $values[] = $value; $backPressure[] = $pressure = new Deferred; diff --git a/lib/Stream.php b/lib/Stream.php index c45cb85..fa4d557 100644 --- a/lib/Stream.php +++ b/lib/Stream.php @@ -12,10 +12,10 @@ interface Stream extends Promise { * Registers a callback to be invoked each time value is emitted from the stream. If the function returns an * promise, back-pressure is applied to the promise until the returned promise is resolved. * - * Exceptions thrown from $onNext (or failures of promises returned from $onNext) will fail the returned + * Exceptions thrown from $onEmit (or failures of promises returned from $onNext) will fail the returned * Subscriber with the thrown exception. * - * @param callable $onNext Function invoked each time a value is emitted from the stream. + * @param callable $onEmit Function invoked each time a value is emitted from the stream. */ - public function listen(callable $onNext); + public function onEmit(callable $onEmit); } diff --git a/lib/Success.php b/lib/Success.php index dc33c6a..68740af 100644 --- a/lib/Success.php +++ b/lib/Success.php @@ -41,6 +41,6 @@ final class Success implements Stream { /** * {@inheritdoc} */ - public function listen(callable $onNext) { + public function onEmit(callable $onEmit) { } } diff --git a/lib/functions.php b/lib/functions.php index 031f046..ea2a8bf 100644 --- a/lib/functions.php +++ b/lib/functions.php @@ -609,21 +609,21 @@ namespace Amp\Stream { /** * @param \Amp\Stream $stream - * @param callable (mixed $value): mixed $onNext - * @param callable (mixed $value): mixed|null $onComplete + * @param callable (mixed $value): mixed $onEmit + * @param callable (mixed $value): mixed|null $onResolve * * @return \Amp\Stream */ - function map(Stream $stream, callable $onNext, callable $onComplete = null): Stream { + function map(Stream $stream, callable $onEmit, callable $onResolve = null): Stream { $listener = new Listener($stream); - return new Producer(function (callable $emit) use ($listener, $onNext, $onComplete) { + return new Producer(function (callable $emit) use ($listener, $onEmit, $onResolve) { while (yield $listener->advance()) { - yield $emit($onNext($listener->getCurrent())); + yield $emit($onEmit($listener->getCurrent())); } - if ($onComplete === null) { + if ($onResolve === null) { return $listener->getResult(); } - return $onComplete($listener->getResult()); + return $onResolve($listener->getResult()); }); } @@ -660,7 +660,7 @@ namespace Amp\Stream { if (!$stream instanceof Stream) { throw new UnionTypeError([Stream::class], $stream); } - $stream->listen(function ($value) use (&$pending, $emitter) { + $stream->onEmit(function ($value) use (&$pending, $emitter) { if ($pending) { return $emitter->emit($value); } @@ -723,7 +723,7 @@ namespace Amp\Stream { yield $emitter->emit($value); }; - $subscriptions[] = $stream->listen(function ($value) use ($generator) { + $subscriptions[] = $stream->onEmit(function ($value) use ($generator) { return new Coroutine($generator($value)); }); $previous[] = $stream; diff --git a/test/ConcatTest.php b/test/ConcatTest.php index e943ea0..9f86b6d 100644 --- a/test/ConcatTest.php +++ b/test/ConcatTest.php @@ -50,7 +50,7 @@ class ConcatTest extends \PHPUnit\Framework\TestCase { $stream = Stream\concat([Stream\fromIterable(\range(1, 5)), $producer, Stream\fromIterable(\range(7, 10))]); - $stream->listen(function ($value) use (&$results) { + $stream->onEmit(function ($value) use (&$results) { $results[] = $value; }); diff --git a/test/FilterTest.php b/test/FilterTest.php index f84cf5d..257cebf 100644 --- a/test/FilterTest.php +++ b/test/FilterTest.php @@ -42,7 +42,7 @@ class FilterTest extends \PHPUnit\Framework\TestCase { return $value & 1; }); - $stream->listen(function ($value) use (&$results) { + $stream->onEmit(function ($value) use (&$results) { $results[] = $value; }); @@ -72,7 +72,7 @@ class FilterTest extends \PHPUnit\Framework\TestCase { throw $exception; }); - $stream->listen(function ($value) use (&$results) { + $stream->onEmit(function ($value) use (&$results) { $results[] = $value; }); diff --git a/test/IntervalTest.php b/test/IntervalTest.php index 95f3cfc..3e793d0 100644 --- a/test/IntervalTest.php +++ b/test/IntervalTest.php @@ -33,7 +33,7 @@ class IntervalTest extends \PHPUnit\Framework\TestCase { Loop::run(function () use (&$invoked, $count) { $stream = Stream\interval(self::TIMEOUT, $count); - $stream->listen(function () use (&$invoked) { + $stream->onEmit(function () use (&$invoked) { ++$invoked; return new Pause(self::TIMEOUT * 2); }); diff --git a/test/ProducerTest.php b/test/ProducerTest.php index 63b38e7..348d188 100644 --- a/test/ProducerTest.php +++ b/test/ProducerTest.php @@ -37,7 +37,7 @@ class ProducerTest extends TestCase { $this->assertSame($emitted, $value); }; - $producer->listen($callback); + $producer->onEmit($callback); $producer->onResolve(function ($exception, $result) use ($value) { $this->assertSame($result, $value); @@ -66,7 +66,7 @@ class ProducerTest extends TestCase { $this->assertSame($emitted, $value); }; - $producer->listen($callback); + $producer->onEmit($callback); $deferred->resolve($value); }); @@ -108,7 +108,7 @@ class ProducerTest extends TestCase { $time = microtime(true) - $time; }); - $producer->listen(function () { + $producer->onEmit(function () { return new Pause(self::TIMEOUT); }); }); @@ -130,7 +130,7 @@ class ProducerTest extends TestCase { $time = microtime(true) - $time; }); - $producer->listen(function () { + $producer->onEmit(function () { return new ReactPromise(function ($resolve) { Loop::delay(self::TIMEOUT, $resolve); }); @@ -153,7 +153,7 @@ class ProducerTest extends TestCase { yield $emit(2); }); - $producer->listen(function () use ($exception) { + $producer->onEmit(function () use ($exception) { throw $exception; }); }); @@ -192,7 +192,7 @@ class ProducerTest extends TestCase { yield $producer; - $producer->listen(function () use (&$invoked) { + $producer->onEmit(function () use (&$invoked) { $invoked = true; }); }); diff --git a/test/ProducerTraitTest.php b/test/ProducerTraitTest.php index fce8427..c6cd75c 100644 --- a/test/ProducerTraitTest.php +++ b/test/ProducerTraitTest.php @@ -35,7 +35,7 @@ class ProducerTraitTest extends TestCase { $this->assertSame($emitted, $value); }; - $this->producer->listen($callback); + $this->producer->onEmit($callback); $promise = $this->producer->emit($value); $this->assertInstanceOf(Promise::class, $promise); @@ -55,7 +55,7 @@ class ProducerTraitTest extends TestCase { $this->assertSame($emitted, $value); }; - $this->producer->listen($callback); + $this->producer->onEmit($callback); $this->producer->emit($promise); $this->assertTrue($invoked); @@ -73,7 +73,7 @@ class ProducerTraitTest extends TestCase { $invoked = true; }; - $this->producer->listen($callback); + $this->producer->onEmit($callback); $this->producer->emit($promise); $this->assertFalse($invoked); @@ -100,7 +100,7 @@ class ProducerTraitTest extends TestCase { $this->assertSame($emitted, $value); }; - $this->producer->listen($callback); + $this->producer->onEmit($callback); $this->producer->emit($deferred->promise()); $this->assertFalse($invoked); @@ -123,7 +123,7 @@ class ProducerTraitTest extends TestCase { $this->assertSame($emitted, $value); }; - $this->producer->listen($callback); + $this->producer->onEmit($callback); $this->producer->emit($promise); $this->assertTrue($invoked); @@ -141,7 +141,7 @@ class ProducerTraitTest extends TestCase { $result = $emitted; }; - $this->producer->listen($callback); + $this->producer->onEmit($callback); $this->producer->emit($deferred->promise()); $this->assertFalse($invoked); @@ -215,7 +215,7 @@ class ProducerTraitTest extends TestCase { try { Loop::run(function () use ($exception) { - $this->producer->listen(function () use ($exception) { + $this->producer->onEmit(function () use ($exception) { throw $exception; }); @@ -231,7 +231,7 @@ class ProducerTraitTest extends TestCase { $value = 1; $promise = new Success($value); - $this->producer->listen(function () use ($promise) { + $this->producer->onEmit(function () use ($promise) { return $promise; }); @@ -249,7 +249,7 @@ class ProducerTraitTest extends TestCase { try { Loop::run(function () use ($exception, $promise) { - $this->producer->listen(function () use ($promise) { + $this->producer->onEmit(function () use ($promise) { return $promise; }); diff --git a/test/StreamFromIterableTest.php b/test/StreamFromIterableTest.php index df498d5..112e5be 100644 --- a/test/StreamFromIterableTest.php +++ b/test/StreamFromIterableTest.php @@ -14,7 +14,7 @@ class StreamFromIterableTest extends \PHPUnit\Framework\TestCase { Loop::run(function () use (&$results) { $stream = Stream\fromIterable([new Success(1), new Success(2), new Success(3)]); - $stream->listen(function ($value) use (&$results) { + $stream->onEmit(function ($value) use (&$results) { $results[] = $value; }); }); @@ -43,7 +43,7 @@ class StreamFromIterableTest extends \PHPUnit\Framework\TestCase { Loop::run(function () use (&$results, &$reason, $exception) { $stream = Stream\fromIterable([new Success(1), new Success(2), new Failure($exception), new Success(4)]); - $stream->listen(function ($value) use (&$results) { + $stream->onEmit(function ($value) use (&$results) { $results[] = $value; }); @@ -63,7 +63,7 @@ class StreamFromIterableTest extends \PHPUnit\Framework\TestCase { Loop::run(function () use (&$results) { $stream = Stream\fromIterable([new Pause(30, 1), new Pause(10, 2), new Pause(20, 3), new Success(4)]); - $stream->listen(function ($value) use (&$results) { + $stream->onEmit(function ($value) use (&$results) { $results[] = $value; }); }); @@ -82,7 +82,7 @@ class StreamFromIterableTest extends \PHPUnit\Framework\TestCase { $stream = Stream\fromIterable($generator); - $stream->listen(function ($value) use (&$results) { + $stream->onEmit(function ($value) use (&$results) { $results[] = $value; }); }); diff --git a/test/StreamMapTest.php b/test/StreamMapTest.php index b7e53bc..6505c4d 100644 --- a/test/StreamMapTest.php +++ b/test/StreamMapTest.php @@ -46,7 +46,7 @@ class StreamMapTest extends \PHPUnit\Framework\TestCase { return $value + 1; }); - $stream->listen(function ($value) use (&$results) { + $stream->onEmit(function ($value) use (&$results) { $results[] = $value; }); @@ -77,7 +77,7 @@ class StreamMapTest extends \PHPUnit\Framework\TestCase { throw $exception; }); - $stream->listen(function ($value) use (&$results) { + $stream->onEmit(function ($value) use (&$results) { $results[] = $value; }); @@ -113,7 +113,7 @@ class StreamMapTest extends \PHPUnit\Framework\TestCase { throw $exception; }); - $stream->listen(function ($value) use (&$results) { + $stream->onEmit(function ($value) use (&$results) { $results[] = $value; });