From a9362780ed9f9cf9b7412a2ccb9c28ac22438775 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Tue, 24 May 2016 11:47:14 -0500 Subject: [PATCH 01/17] Initial commit --- .gitattributes | 6 + .gitignore | 4 + LICENSE | 22 +++ composer.json | 34 +++++ example/emitter.php | 36 +++++ lib/CompletedException.php | 5 + lib/DisposedException.php | 5 + lib/Emitter.php | 50 +++++++ lib/IncompleteException.php | 5 + lib/Internal/EmitQueue.php | 240 +++++++++++++++++++++++++++++++ lib/Internal/Emitted.php | 57 ++++++++ lib/Internal/EmitterIterator.php | 136 ++++++++++++++++++ lib/Observable.php | 17 +++ lib/ObservableIterator.php | 41 ++++++ lib/functions.php | 146 +++++++++++++++++++ 15 files changed, 804 insertions(+) create mode 100644 .gitattributes create mode 100644 .gitignore create mode 100644 LICENSE create mode 100644 composer.json create mode 100644 example/emitter.php create mode 100644 lib/CompletedException.php create mode 100644 lib/DisposedException.php create mode 100644 lib/Emitter.php create mode 100644 lib/IncompleteException.php create mode 100644 lib/Internal/EmitQueue.php create mode 100644 lib/Internal/Emitted.php create mode 100644 lib/Internal/EmitterIterator.php create mode 100644 lib/Observable.php create mode 100644 lib/ObservableIterator.php create mode 100644 lib/functions.php diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..29a850a --- /dev/null +++ b/.gitattributes @@ -0,0 +1,6 @@ +example export-ignore +test export-ignore +.gitattributes export-ignore +.gitignore export-ignore +.travis.yml export-ignore +phpunit.xml.dist export-ignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..d92a4ec --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +build +composer.lock +phpunit.xml +vendor diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..c9bb310 --- /dev/null +++ b/LICENSE @@ -0,0 +1,22 @@ + +The MIT License (MIT) + +Copyright (c) 2016 amphp + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/composer.json b/composer.json new file mode 100644 index 0000000..77bb4b2 --- /dev/null +++ b/composer.json @@ -0,0 +1,34 @@ +{ + "name": "amphp/observable", + "description": "", + "keywords": [ + "asynchronous", + "async", + "observable", + "emitter" + ], + "homepage": "http://amphp.org", + "license": "MIT", + "require": { + "amphp/awaitable": "dev-master", + "async-interop/event-loop": "dev-master", + "async-interop/event-loop-implementation": "dev-master" + }, + "require-dev": { + "amphp/loop": "dev-master" + }, + "minimum-stability": "dev", + "autoload": { + "psr-4": { + "Amp\\": "lib" + }, + "files": [ + "lib/functions.php" + ] + }, + "autoload-dev": { + "psr-4": { + "Amp\\Test\\": "test" + } + } +} diff --git a/example/emitter.php b/example/emitter.php new file mode 100644 index 0000000..965259d --- /dev/null +++ b/example/emitter.php @@ -0,0 +1,36 @@ +#!/usr/bin/env php +getIterator(); + + while (yield $iterator->isValid()) { + printf("Observable emitted %d\n", $iterator->getCurrent()); + yield new Pause(500); // Artificial back-pressure on observable. + } + + } catch (\Throwable $exception) { + printf("Exception: %s\n", $exception); + } +}), $loop = new NativeLoop()); diff --git a/lib/CompletedException.php b/lib/CompletedException.php new file mode 100644 index 0000000..b56013e --- /dev/null +++ b/lib/CompletedException.php @@ -0,0 +1,5 @@ +emitter = $emitter; + $this->queue = new Internal\EmitQueue; + } + + /** + * {@inheritdoc} + */ + public function dispose() { + $this->emitter = null; + $this->queue->dispose(); + } + + /** + * {@inheritdoc} + */ + public function getIterator() { + if ($this->emitter !== null) { + $emitter = $this->emitter; + $this->emitter = null; + + // Asynchronously start the emitter. + Loop::defer(function () use ($emitter) { + $this->queue->start($emitter); + }); + } + + return new Internal\EmitterIterator($this->queue); + } +} diff --git a/lib/IncompleteException.php b/lib/IncompleteException.php new file mode 100644 index 0000000..9d56dd1 --- /dev/null +++ b/lib/IncompleteException.php @@ -0,0 +1,5 @@ +future = new Future; + $this->emitted = new Emitted($this->future); + } + + /** + * @param callable $emitter + */ + public function start(callable $emitter) { + /** + * Emits a value from the observable. + * + * @param mixed $value If $value is an instance of \Interop\Async\Awaitable, the success value is used as the + * value to emit or the failure reason is used to fail the awaitable returned from this function. + * + * @return \Interop\Async\Awaitable + * + * @resolve mixed The emitted value (the resolution value of $value) + * + * @throws \Amp\CompletedException If the observable has been completed. + * @throws \Amp\DisposedException If the observable has been disposed. + */ + $emit = function ($value = null) { + return new Coroutine($this->push($value)); + }; + + try { + $generator = $emitter($emit); + + if (!$generator instanceof \Generator) { + throw new \LogicException("Callable must be a coroutine"); + } + + $coroutine = new Coroutine($generator); + $coroutine->when([$this, 'done']); + } catch (\Throwable $exception) { + $this->done($exception); + } catch (\Exception $exception) { + $this->done($exception); + } + } + + /** + * @coroutine + * + * @param mixed $value + * + * @return \Generator + * + * @throws \InvalidArgumentException + * @throws \Throwable|\Exception + */ + public function push($value) { + if ($this->complete) { + throw $this->reason ?: new CompletedException("The observable has completed"); + } + + if ($this->busy) { + throw new \LogicException("Cannot emit values simultaneously"); + } + + $this->busy = true; + + try { + if ($value instanceof Observable) { + if ($value === $this->observable) { + throw new \InvalidArgumentException("Cannot emit an observable within itself"); + } + + $iterator = $value->getIterator(); + + while (yield $iterator->isValid()) { + yield $this->emit($iterator->getCurrent()); + } + + yield Coroutine::result($iterator->getReturn()); + return; + } + + if ($value instanceof Awaitable) { + $value = (yield $value); + } + + yield $this->emit($value); + } catch (\Throwable $exception) { + $this->done($exception); + throw $exception; + } catch (\Exception $exception) { + $this->done($exception); + throw $exception; + } finally { + $this->busy = false; + } + + yield Coroutine::result($value); + } + + /** + * @param mixed $value Value to emit. + * + * @return \Interop\Async\Awaitable + */ + private function emit($value) { + $future = $this->future; + $emitted = $this->emitted; + + $this->future = new Future; + $this->emitted = new Emitted($this->future); + + $future->resolve($value); + + return $emitted->wait(); + } + + /** + * Increments the number of listening iterators. + */ + public function increment() { + ++$this->listeners; + } + + /** + * Decrements the number of listening iterators. Marks the queue as disposed if the count reaches 0. + */ + public function decrement() { + if (--$this->listeners <= 0 && !$this->complete) { + $this->dispose(new DisposedException("The observable was automatically disposed")); + } + } + + /** + * @return \Amp\Internal\Emitted + */ + public function pull() { + return $this->emitted; + } + + /** + * Marks the observable as complete, failing with the given exception or completing with the given value. + * + * @param \Throwable|\Exception|null $exception + * @param mixed $value + */ + public function done($exception, $value = null) { + if ($this->complete) { + return; + } + + $this->complete = true; + + if ($exception) { + $this->reason = $exception; + $this->future->fail($exception); + return; + } + + $this->future->resolve($value); + } + + /** + * Disposes the observable. + * + * @param \Exception|null $exception + */ + public function dispose(\Exception $exception = null) { + $this->done($exception ?: new DisposedException("The observable was disposed")); + } + + /** + * @return bool + */ + public function isComplete() { + return $this->complete; + } + + /** + * @return bool + */ + public function isFailed() { + return $this->reason !== null; + } + + /** + * @return \Exception|\Throwable + */ + public function getReason() { + if ($this->reason === null) { + throw new \LogicException("The observable has not failed"); + } + + return $this->reason; + } +} diff --git a/lib/Internal/Emitted.php b/lib/Internal/Emitted.php new file mode 100644 index 0000000..6638b9b --- /dev/null +++ b/lib/Internal/Emitted.php @@ -0,0 +1,57 @@ +awaitable = $awaitable; + $this->future = new Future; + } + + /** + * @return \Interop\Async\Awaitable + */ + public function getAwaitable() { + ++$this->waiting; + return $this->awaitable; + } + + /** + * Notifies the placeholder that the consumer is ready. + */ + public function ready() { + if (0 === --$this->waiting) { + $this->future->resolve(); + } + } + + /** + * Returns an awaitable that is fulfilled once all consumers are ready. + * + * @return \Interop\Async\Awaitable + */ + public function wait() { + return $this->future; + } +} diff --git a/lib/Internal/EmitterIterator.php b/lib/Internal/EmitterIterator.php new file mode 100644 index 0000000..fc05628 --- /dev/null +++ b/lib/Internal/EmitterIterator.php @@ -0,0 +1,136 @@ +queue = $queue; + $this->queue->increment(); + } + + /** + * Removes queue from collection. + */ + public function __destruct() { + if ($this->emitted !== null) { + $this->emitted->ready(); + } + + $this->queue->decrement(); + } + + /** + * {@inheritdoc} + */ + public function isValid() { + return new Coroutine($this->doValid()); + } + + /** + * @coroutine + * + * @return \Generator + * + * @resolve bool + * + * @throws \Throwable|\Exception + */ + private function doValid() { + if ($this->awaitable !== null) { + throw new \LogicException("Simultaneous calls to isValid() are not allowed"); + } + + try { + $emitted = $this->queue->pull(); + + if ($this->emitted !== null) { + $this->emitted->ready(); + } + + $this->emitted = $emitted; + $this->current = (yield $this->awaitable = $this->emitted->getAwaitable()); + } catch (\Throwable $exception) { + $this->current = null; + throw $exception; + } catch (\Exception $exception) { + $this->current = null; + throw $exception; + } finally { + $this->complete = $this->queue->isComplete(); + $this->awaitable = null; + } + + yield Coroutine::result(!$this->complete); + } + + /** + * {@inheritdoc} + */ + public function getCurrent() { + if ($this->emitted === null || $this->awaitable !== null) { + throw new \LogicException("isValid() must be called before calling this method"); + } + + if ($this->complete) { + if ($this->queue->isFailed()) { + throw $this->queue->getReason(); + } + + throw new CompletedException("The observable has completed and the iterator is invalid"); + } + + return $this->current; + } + + /** + * {@inheritdoc} + */ + public function getReturn() { + if ($this->emitted === null || $this->awaitable !== null) { + throw new \LogicException("isValid() must be called before calling this method"); + } + + if (!$this->complete) { + throw new IncompleteException("The observable has not completed"); + } + + if ($this->queue->isFailed()) { + throw $this->queue->getReason(); + } + + return $this->current; + } +} diff --git a/lib/Observable.php b/lib/Observable.php new file mode 100644 index 0000000..856785f --- /dev/null +++ b/lib/Observable.php @@ -0,0 +1,17 @@ += $interval) { + throw new \InvalidArgumentException("The interval should be greater than 0"); + } + + return new Emitter(function (callable $emit) use ($observable, $interval) { + $iterator = $observable->getIterator(); + $start = (int) (\microtime(true) - $interval); + + while (yield $iterator->isValid()) { + $diff = $interval + $start - (int) (\microtime(true) * 1e3); + + if (0 < $diff) { + yield new Pause($diff); + } + + $start = (int) (\microtime(true) * 1e3); + + yield $emit($iterator->getCurrent()); + } + + yield Coroutine::result($iterator->getReturn()); + }); +} + +/** + * Creates an observable that emits values emitted from any observable in the array of observables. Values in the + * array are passed through the from() function, so they may be observables, arrays of values to emit, awaitables, + * or any other value. + * + * @param \Amp\Observable[] $observables + * + * @return \Amp\Observable + */ +function merge(array $observables) { + foreach ($observables as $observable) { + if (!$observable instanceof Observable) { + throw new \InvalidArgumentException("Non-observable provided"); + } + } + + return new Emitter(function (callable $emit) use ($observables) { + $generator = function (Observable $observable) use (&$emitting, $emit) { + $iterator = $observable->getIterator(); + + while (yield $iterator->isValid()) { + while ($emitting !== null) { + yield $emitting; // Prevent simultaneous emit. + } + + yield $emitting = $emit($iterator->getCurrent()); + $emitting = null; + } + + yield Coroutine::result($iterator->getReturn()); + }; + + /** @var \Amp\Coroutine[] $coroutines */ + $coroutines = []; + + foreach ($observables as $observable) { + $coroutines[] = new Coroutine($generator($observable)); + } + + yield Coroutine::result(yield all($coroutines)); + }); +} + +/** + * Returns an observable that emits a value every $interval milliseconds after the previous value has been consumed + * (up to $count times (or indefinitely if $count is 0). The value emitted is an integer of the number of times the + * observable emitted a value. + * + * @param int $interval Time interval between emitted values in milliseconds. + * @param int $count Use 0 to emit values indefinitely. + * + * @return \Amp\Observable + */ +function interval($interval, $count = 0) { + $count = (int) $count; + if (0 > $count) { + throw new \InvalidArgumentException("The number of times to emit must be a non-negative value"); + } + + return new Emitter(function (callable $emit) use ($interval, $count) { + $i = 0; + $future = new Future; + + $watcher = Loop::repeat($interval, function ($watcher) use (&$future, &$i) { + Loop::disable($watcher); + $awaitable = $future; + $future = new Future; + $awaitable->resolve(++$i); + }); + + try { + while (0 === $count || $i < $count) { + yield $emit($future); + Loop::enable($watcher); + } + } finally { + Loop::cancel($watcher); + } + }); +} + +/** + * @param int $start + * @param int $end + * @param int $step + * + * @return \Amp\Observable + */ +function range($start, $end, $step = 1) { + $start = (int) $start; + $end = (int) $end; + $step = (int) $step; + + if (0 === $step) { + throw new \InvalidArgumentException("Step must be a non-zero integer"); + } + + if ((($end - $start) ^ $step) < 0) { + throw new \InvalidArgumentException("Step is not of the correct sign"); + } + + return new Emitter(function (callable $emit) use ($start, $end, $step) { + for ($i = $start; $i <= $end; $i += $step) { + yield $emit($i); + } + }); +} From b436a568695c34f4f1cc6b2306c564f3897230d9 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Tue, 24 May 2016 12:09:52 -0500 Subject: [PATCH 02/17] Remove unused property --- lib/Internal/EmitQueue.php | 9 --------- 1 file changed, 9 deletions(-) diff --git a/lib/Internal/EmitQueue.php b/lib/Internal/EmitQueue.php index f45007b..6a87157 100644 --- a/lib/Internal/EmitQueue.php +++ b/lib/Internal/EmitQueue.php @@ -10,11 +10,6 @@ use Amp\Observable; use Interop\Async\Awaitable; class EmitQueue { - /** - * @var \Amp\Observable|null - */ - private $observable; - /** * @var bool */ @@ -110,10 +105,6 @@ class EmitQueue { try { if ($value instanceof Observable) { - if ($value === $this->observable) { - throw new \InvalidArgumentException("Cannot emit an observable within itself"); - } - $iterator = $value->getIterator(); while (yield $iterator->isValid()) { From 99eecc1a3fe625f325e4def1bef46d10639a974e Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Thu, 26 May 2016 18:20:05 -0500 Subject: [PATCH 03/17] Refactor --- example/backpressure.php | 54 ++++ example/emitter.php | 49 ++-- lib/DisposedException.php | 5 - lib/Emitter.php | 49 +--- lib/Internal/EmitQueue.php | 231 ------------------ lib/Internal/Emitted.php | 48 ++-- lib/Internal/PrivateObservable.php | 62 +++++ lib/Internal/Producer.php | 149 +++++++++++ .../{EmitterIterator.php => Subscriber.php} | 73 +++--- lib/Internal/Subscription.php | 78 ++++++ lib/Observable.php | 15 +- lib/{ObservableIterator.php => Observer.php} | 3 +- lib/Postponed.php | 101 ++++++++ lib/functions.php | 117 ++++----- 14 files changed, 600 insertions(+), 434 deletions(-) create mode 100644 example/backpressure.php delete mode 100644 lib/DisposedException.php delete mode 100644 lib/Internal/EmitQueue.php create mode 100644 lib/Internal/PrivateObservable.php create mode 100644 lib/Internal/Producer.php rename lib/Internal/{EmitterIterator.php => Subscriber.php} (66%) create mode 100644 lib/Internal/Subscription.php rename lib/{ObservableIterator.php => Observer.php} (97%) create mode 100644 lib/Postponed.php diff --git a/example/backpressure.php b/example/backpressure.php new file mode 100644 index 0000000..1e663df --- /dev/null +++ b/example/backpressure.php @@ -0,0 +1,54 @@ +#!/usr/bin/env php +emit(new Pause(500, 1)); + yield $postponed->emit(new Pause(1500, 2)); + yield $postponed->emit(new Pause(1000, 3)); + yield $postponed->emit(new Pause(2000, 4)); + yield $postponed->emit(5); + yield $postponed->emit(6); + yield $postponed->emit(7); + yield $postponed->emit(new Pause(2000, 8)); + yield $postponed->emit(9); + yield $postponed->emit(10); + yield $postponed->complete(11); + }; + + $coroutines[] = new Coroutine($generator($postponed)); + + $generator = function (Observable $observable) { + $observer = $observable->getObserver(); + + while (yield $observer->isValid()) { + printf("Observable emitted %d\n", $observer->getCurrent()); + yield new Pause(500); // Artificial back-pressure on observer. + } + + printf("Observable result %d\n", $observer->getReturn()); + }; + + + $coroutines[] = new Coroutine($generator($postponed->getObservable())); + + yield Amp\all($coroutines); + + } catch (\Exception $exception) { + printf("Exception: %s\n", $exception); + } +}), $loop = new NativeLoop()); diff --git a/example/emitter.php b/example/emitter.php index 965259d..a2a00c3 100644 --- a/example/emitter.php +++ b/example/emitter.php @@ -3,34 +3,43 @@ require dirname(__DIR__) . '/vendor/autoload.php'; -use Interop\Async\Loop; +use Amp\Coroutine; +use Amp\Observable; use Amp\Pause; -use Amp\Emitter; +use Amp\Postponed; use Amp\Loop\NativeLoop; +use Interop\Async\Loop; Loop::execute(Amp\coroutine(function () { try { - $observable = new Emitter(function (callable $emit) { - yield $emit(new Pause(500, 1)); - yield $emit(new Pause(1500, 2)); - yield $emit(new Pause(1000, 3)); - yield $emit(new Pause(1000, 4)); - yield $emit(5); // The values starting here will be emitted in 0.5 second intervals because the coroutine - yield $emit(6); // consuming values below takes 0.5 seconds per iteration. This behavior occurs because - yield $emit(7); // observables respect back-pressure from consumers, waiting to emit a value until all - yield $emit(8); // consumers have finished processing (if desired, see the docs on using and avoiding - yield $emit(9); // back-pressure). - yield $emit(10); - }); + $postponed = new Postponed; - $iterator = $observable->getIterator(); + $postponed->emit(new Pause(500, 1)); + $postponed->emit(new Pause(1500, 2)); + $postponed->emit(new Pause(1000, 3)); + $postponed->emit(new Pause(2000, 4)); + $postponed->emit(5); + $postponed->emit(6); + $postponed->emit(7); + $postponed->emit(new Pause(2000, 8)); + $postponed->emit(9); + $postponed->emit(10); + $postponed->complete(11); - while (yield $iterator->isValid()) { - printf("Observable emitted %d\n", $iterator->getCurrent()); - yield new Pause(500); // Artificial back-pressure on observable. - } + $generator = function (Observable $observable) { + $observer = $observable->getObserver(); - } catch (\Throwable $exception) { + while (yield $observer->isValid()) { + printf("Observable emitted %d\n", $observer->getCurrent()); + yield new Pause(500); // Artificial back-pressure on observer. + } + + printf("Observable result %d\n", $observer->getReturn()); + }; + + yield new Coroutine($generator($postponed->getObservable())); + + } catch (\Exception $exception) { printf("Exception: %s\n", $exception); } }), $loop = new NativeLoop()); diff --git a/lib/DisposedException.php b/lib/DisposedException.php deleted file mode 100644 index 565269c..0000000 --- a/lib/DisposedException.php +++ /dev/null @@ -1,5 +0,0 @@ -emitter = $emitter; - $this->queue = new Internal\EmitQueue; - } - - /** - * {@inheritdoc} - */ - public function dispose() { - $this->emitter = null; - $this->queue->dispose(); - } - - /** - * {@inheritdoc} - */ - public function getIterator() { - if ($this->emitter !== null) { - $emitter = $this->emitter; - $this->emitter = null; - - // Asynchronously start the emitter. - Loop::defer(function () use ($emitter) { - $this->queue->start($emitter); - }); - } - - return new Internal\EmitterIterator($this->queue); +final class Emitter implements Observable { + use Internal\Producer { + emit as public; + complete as public; + fail as public; } } diff --git a/lib/Internal/EmitQueue.php b/lib/Internal/EmitQueue.php deleted file mode 100644 index 6a87157..0000000 --- a/lib/Internal/EmitQueue.php +++ /dev/null @@ -1,231 +0,0 @@ -future = new Future; - $this->emitted = new Emitted($this->future); - } - - /** - * @param callable $emitter - */ - public function start(callable $emitter) { - /** - * Emits a value from the observable. - * - * @param mixed $value If $value is an instance of \Interop\Async\Awaitable, the success value is used as the - * value to emit or the failure reason is used to fail the awaitable returned from this function. - * - * @return \Interop\Async\Awaitable - * - * @resolve mixed The emitted value (the resolution value of $value) - * - * @throws \Amp\CompletedException If the observable has been completed. - * @throws \Amp\DisposedException If the observable has been disposed. - */ - $emit = function ($value = null) { - return new Coroutine($this->push($value)); - }; - - try { - $generator = $emitter($emit); - - if (!$generator instanceof \Generator) { - throw new \LogicException("Callable must be a coroutine"); - } - - $coroutine = new Coroutine($generator); - $coroutine->when([$this, 'done']); - } catch (\Throwable $exception) { - $this->done($exception); - } catch (\Exception $exception) { - $this->done($exception); - } - } - - /** - * @coroutine - * - * @param mixed $value - * - * @return \Generator - * - * @throws \InvalidArgumentException - * @throws \Throwable|\Exception - */ - public function push($value) { - if ($this->complete) { - throw $this->reason ?: new CompletedException("The observable has completed"); - } - - if ($this->busy) { - throw new \LogicException("Cannot emit values simultaneously"); - } - - $this->busy = true; - - try { - if ($value instanceof Observable) { - $iterator = $value->getIterator(); - - while (yield $iterator->isValid()) { - yield $this->emit($iterator->getCurrent()); - } - - yield Coroutine::result($iterator->getReturn()); - return; - } - - if ($value instanceof Awaitable) { - $value = (yield $value); - } - - yield $this->emit($value); - } catch (\Throwable $exception) { - $this->done($exception); - throw $exception; - } catch (\Exception $exception) { - $this->done($exception); - throw $exception; - } finally { - $this->busy = false; - } - - yield Coroutine::result($value); - } - - /** - * @param mixed $value Value to emit. - * - * @return \Interop\Async\Awaitable - */ - private function emit($value) { - $future = $this->future; - $emitted = $this->emitted; - - $this->future = new Future; - $this->emitted = new Emitted($this->future); - - $future->resolve($value); - - return $emitted->wait(); - } - - /** - * Increments the number of listening iterators. - */ - public function increment() { - ++$this->listeners; - } - - /** - * Decrements the number of listening iterators. Marks the queue as disposed if the count reaches 0. - */ - public function decrement() { - if (--$this->listeners <= 0 && !$this->complete) { - $this->dispose(new DisposedException("The observable was automatically disposed")); - } - } - - /** - * @return \Amp\Internal\Emitted - */ - public function pull() { - return $this->emitted; - } - - /** - * Marks the observable as complete, failing with the given exception or completing with the given value. - * - * @param \Throwable|\Exception|null $exception - * @param mixed $value - */ - public function done($exception, $value = null) { - if ($this->complete) { - return; - } - - $this->complete = true; - - if ($exception) { - $this->reason = $exception; - $this->future->fail($exception); - return; - } - - $this->future->resolve($value); - } - - /** - * Disposes the observable. - * - * @param \Exception|null $exception - */ - public function dispose(\Exception $exception = null) { - $this->done($exception ?: new DisposedException("The observable was disposed")); - } - - /** - * @return bool - */ - public function isComplete() { - return $this->complete; - } - - /** - * @return bool - */ - public function isFailed() { - return $this->reason !== null; - } - - /** - * @return \Exception|\Throwable - */ - public function getReason() { - if ($this->reason === null) { - throw new \LogicException("The observable has not failed"); - } - - return $this->reason; - } -} diff --git a/lib/Internal/Emitted.php b/lib/Internal/Emitted.php index 6638b9b..2aa1ebd 100644 --- a/lib/Internal/Emitted.php +++ b/lib/Internal/Emitted.php @@ -3,14 +3,10 @@ namespace Amp\Internal; use Amp\Future; +use Amp\Success; use Interop\Async\Awaitable; final class Emitted { - /** - * @var \Amp\Future - */ - private $future; - /** * @var \Interop\Async\Awaitable */ @@ -19,39 +15,57 @@ final class Emitted { /** * @var int */ - private $waiting = 0; + private $waiting; /** - * @param \Interop\Async\Awaitable $awaitable + * @var \Amp\Future */ - public function __construct(Awaitable $awaitable) { - $this->awaitable = $awaitable; + private $future; + + /** + * @var bool + */ + private $complete; + + /** + * @param mixed $value + * @param int $waiting + * @param bool $complete + */ + public function __construct($value, $waiting, $complete) { + $this->awaitable = $value instanceof Awaitable ? $value : new Success($value); + $this->waiting = (int) $waiting; + $this->complete = (bool) $complete; $this->future = new Future; } /** - * @return \Interop\Async\Awaitable + * @return \Interop\Async\Awaitable|mixed */ public function getAwaitable() { - ++$this->waiting; return $this->awaitable; } /** - * Notifies the placeholder that the consumer is ready. + * @return bool + */ + public function isComplete() { + return $this->complete; + } + + /** + * Indicates that a subscriber has consumed the value represented by this object. */ public function ready() { - if (0 === --$this->waiting) { - $this->future->resolve(); + if (--$this->waiting === 0) { + $this->future->resolve($this->awaitable); } } /** - * Returns an awaitable that is fulfilled once all consumers are ready. - * * @return \Interop\Async\Awaitable */ public function wait() { return $this->future; } -} +} \ No newline at end of file diff --git a/lib/Internal/PrivateObservable.php b/lib/Internal/PrivateObservable.php new file mode 100644 index 0000000..1b32ef6 --- /dev/null +++ b/lib/Internal/PrivateObservable.php @@ -0,0 +1,62 @@ +init(); + + /** + * Emits a value from the observable. + * + * @param mixed $value + * + * @return \Interop\Async\Awaitable + */ + $emit = function ($value = null) { + return $this->emit($value); + }; + + /** + * Completes the observable with the given value. + * + * @param mixed $value + * + * @return \Interop\Async\Awaitable + */ + $complete = function ($value = null) { + return $this->complete($value); + }; + + /** + * Fails the observable with the given exception. + * + * @param \Exception $reason + * + * @return \Interop\Async\Awaitable + */ + $fail = function ($reason) { + return $this->fail($reason); + }; + + try { + $emitter($emit, $complete, $fail); + } catch (\Throwable $exception) { + $this->fail($exception); + } catch (\Exception $exception) { + $this->fail($exception); + } + } +} diff --git a/lib/Internal/Producer.php b/lib/Internal/Producer.php new file mode 100644 index 0000000..1ab4089 --- /dev/null +++ b/lib/Internal/Producer.php @@ -0,0 +1,149 @@ +waiting = new Future; + + $this->unsubscribe = function (Subscription $subscription) { + unset($this->subscriptions[\spl_object_hash($subscription)]); + + if (empty($this->subscriptions) && !$this->complete) { + $this->waiting = new Future; // Wait for another subscriber. + } + }; + } + + /** + * @return \Amp\Observer + */ + public function getObserver() { + $subscription = new Subscription($this->unsubscribe); + $this->subscriptions[\spl_object_hash($subscription)] = $subscription; + + if ($this->waiting !== null) { + $waiting = $this->waiting; + $this->waiting = null; + $waiting->resolve(); + } + + return new Subscriber($subscription); + } + + /** + * {@inheritdoc} + */ + protected function emit($value = null) { + if ($this->complete) { + if ($this->exception) { + throw $this->exception; + } + + throw new CompletedException("The observable has completed"); + } + + return new Coroutine($this->push($value)); + } + + /** + * @coroutine + * + * @param mixed $value + * @param bool $complete + * + * @return \Generator + * + * @throws \InvalidArgumentException + * @throws \Throwable|\Exception + */ + private function push($value, $complete = false) { + if ($this->waiting !== null) { + yield $this->waiting; // Wait for at least one observer to be registered. + } + + $emitted = new Emitted($value, \count($this->subscriptions), $complete); + + foreach ($this->subscriptions as $subscription) { + $subscription->push($emitted); + } + + try { + $value = (yield $emitted->wait()); + } catch (\Throwable $exception) { + $this->complete = true; + $this->exception = $exception; + throw $exception; + } catch (\Exception $exception) { + $this->complete = true; + $this->exception = $exception; + throw $exception; + } + + yield Coroutine::result($value); + } + + /** + * {@inheritdoc} + */ + protected function complete($value = null) { + if ($this->complete) { + if ($this->exception) { + throw $this->exception; + } + + throw new CompletedException("The observable has completed"); + } + + $this->complete = true; + + return new Coroutine($this->push($value, true)); + } + + /** + * {@inheritdoc} + */ + protected function fail($exception) { + if ($this->complete) { + if ($this->exception) { + throw $this->exception; + } + + throw new CompletedException("The observable has completed"); + } + + $this->complete = true; + + return new Coroutine($this->push(new Failure($exception), true)); + } +} diff --git a/lib/Internal/EmitterIterator.php b/lib/Internal/Subscriber.php similarity index 66% rename from lib/Internal/EmitterIterator.php rename to lib/Internal/Subscriber.php index fc05628..ceca0ce 100644 --- a/lib/Internal/EmitterIterator.php +++ b/lib/Internal/Subscriber.php @@ -5,9 +5,14 @@ namespace Amp\Internal; use Amp\CompletedException; use Amp\Coroutine; use Amp\IncompleteException; -use Amp\ObservableIterator; +use Amp\Observer; + +final class Subscriber implements Observer { + /** + * @var \Amp\Internal\Subscription + */ + private $subscription; -class EmitterIterator implements ObservableIterator { /** * @var \Amp\Internal\Emitted */ @@ -17,28 +22,27 @@ class EmitterIterator implements ObservableIterator { * @var mixed */ private $current; - - /** - * @var \Amp\Internal\EmitQueue - */ - private $queue; - + /** * @var \Interop\Async\Awaitable */ private $awaitable; - + /** * @var bool */ private $complete = false; - + /** - * @param \Amp\Internal\EmitQueue $queue + * @var \Throwable|\Exception|null */ - public function __construct(EmitQueue $queue) { - $this->queue = $queue; - $this->queue->increment(); + private $exception; + + /** + * @param \Amp\Internal\Subscription $subscription + */ + public function __construct(Subscription $subscription) { + $this->subscription = $subscription; } /** @@ -48,17 +52,17 @@ class EmitterIterator implements ObservableIterator { if ($this->emitted !== null) { $this->emitted->ready(); } - - $this->queue->decrement(); + + $this->subscription->unsubscribe(); } /** * {@inheritdoc} */ public function isValid() { - return new Coroutine($this->doValid()); + return new Coroutine($this->valid()); } - + /** * @coroutine * @@ -68,28 +72,27 @@ class EmitterIterator implements ObservableIterator { * * @throws \Throwable|\Exception */ - private function doValid() { - if ($this->awaitable !== null) { - throw new \LogicException("Simultaneous calls to isValid() are not allowed"); + private function valid() { + while ($this->awaitable !== null) { + yield $this->awaitable; // Wait for previous calls to resolve. } + if ($this->emitted !== null) { + $this->emitted->ready(); + } + + $this->emitted = (yield $this->subscription->pull()); + try { - $emitted = $this->queue->pull(); - - if ($this->emitted !== null) { - $this->emitted->ready(); - } - - $this->emitted = $emitted; $this->current = (yield $this->awaitable = $this->emitted->getAwaitable()); } catch (\Throwable $exception) { - $this->current = null; + $this->exception = $exception; throw $exception; } catch (\Exception $exception) { - $this->current = null; + $this->exception = $exception; throw $exception; } finally { - $this->complete = $this->queue->isComplete(); + $this->complete = $this->emitted->isComplete(); $this->awaitable = null; } @@ -105,8 +108,8 @@ class EmitterIterator implements ObservableIterator { } if ($this->complete) { - if ($this->queue->isFailed()) { - throw $this->queue->getReason(); + if ($this->exception) { + throw $this->exception; } throw new CompletedException("The observable has completed and the iterator is invalid"); @@ -127,8 +130,8 @@ class EmitterIterator implements ObservableIterator { throw new IncompleteException("The observable has not completed"); } - if ($this->queue->isFailed()) { - throw $this->queue->getReason(); + if ($this->exception) { + throw $this->exception; } return $this->current; diff --git a/lib/Internal/Subscription.php b/lib/Internal/Subscription.php new file mode 100644 index 0000000..5fbc08d --- /dev/null +++ b/lib/Internal/Subscription.php @@ -0,0 +1,78 @@ +unsubscribe = $unsubscribe; + } + + /** + * Removes the subscription from the emit queue. + */ + public function unsubscribe() { + foreach ($this->valueQueue as $emitted) { + $emitted->ready(); + } + + $this->valueQueue = []; + + $unsubscribe = $this->unsubscribe; + $unsubscribe($this); + } + + /** + * @param \Amp\Internal\Emitted $emitted + */ + public function push(Emitted $emitted) { + if ($this->future !== null) { + $future = $this->future; + $this->future = null; + $future->resolve($emitted); + } else { + $this->valueQueue[] = $emitted; + } + } + + /** + * @return \Interop\Async\Awaitable + */ + public function pull() { + if (empty($this->valueQueue)) { + $this->future = new Future; + return $this->future; + } + + $emitted = $this->valueQueue[$this->current]; + unset($this->valueQueue[$this->current]); + ++$this->current; + + return new Success($emitted); + } +} \ No newline at end of file diff --git a/lib/Observable.php b/lib/Observable.php index 856785f..3cdcb71 100644 --- a/lib/Observable.php +++ b/lib/Observable.php @@ -2,16 +2,11 @@ namespace Amp; -interface Observable -{ +interface Observable { /** - * @return \Amp\ObservableIterator + * Returns an observer of the observable. + * + * @return \Amp\Observer */ - public function getIterator(); - - /** - * Disposes of the observable, halting emission of values and failing the observable with an instance of - * \Amp\DisposedException. - */ - public function dispose(); + public function getObserver(); } diff --git a/lib/ObservableIterator.php b/lib/Observer.php similarity index 97% rename from lib/ObservableIterator.php rename to lib/Observer.php index 5cbdb1e..c48d10a 100644 --- a/lib/ObservableIterator.php +++ b/lib/Observer.php @@ -2,8 +2,7 @@ namespace Amp; -interface ObservableIterator -{ +interface Observer { /** * Succeeds with true if a new value is available by calling getCurrent() or false if the observable has completed. * Calling getCurrent() will throw an exception if the observable completed. If an error occurs with the observable, diff --git a/lib/Postponed.php b/lib/Postponed.php new file mode 100644 index 0000000..cb5967b --- /dev/null +++ b/lib/Postponed.php @@ -0,0 +1,101 @@ +observable = new Internal\PrivateObservable( + function (callable $emit, callable $complete, callable $fail) { + $this->emit = $emit; + $this->complete = $complete; + $this->fail = $fail; + } + ); + } + + /** + * @return \Amp\Observable + */ + public function getObservable() { + return $this->observable; + } + + /** + * Emits a value from the observable. + * + * @param mixed $value + * + * @return \Interop\Async\Awaitable + */ + public function emit($value = null) { + $emit = $this->emit; + return $emit($value); + } + + /** + * Completes the observable with the given value. + * + * @param mixed $value + * + * @return \Interop\Async\Awaitable + */ + public function complete($value = null) { + $complete = $this->complete; + return $complete($value); + } + + /** + * Fails the observable with the given reason. + * + * @param \Throwable|\Exception $reason + * + * @return \Interop\Async\Awaitable + */ + public function fail($reason) { + $fail = $this->fail; + return $fail($reason); + } + } + } +} catch (\AssertionError $exception) { + goto development; // zend.assertions=1 and assert.exception=1, use development definition. +} diff --git a/lib/functions.php b/lib/functions.php index a04adf4..36b4630 100644 --- a/lib/functions.php +++ b/lib/functions.php @@ -4,39 +4,6 @@ namespace Amp; use Interop\Async\Loop; -/** - * Throttles an observable to only emit a value every $interval milliseconds. - * - * @param \Amp\Observable $observable - * @param int $interval - * - * @return \Amp\Observable - */ -function throttle(Observable $observable, $interval) { - if (0 >= $interval) { - throw new \InvalidArgumentException("The interval should be greater than 0"); - } - - return new Emitter(function (callable $emit) use ($observable, $interval) { - $iterator = $observable->getIterator(); - $start = (int) (\microtime(true) - $interval); - - while (yield $iterator->isValid()) { - $diff = $interval + $start - (int) (\microtime(true) * 1e3); - - if (0 < $diff) { - yield new Pause($diff); - } - - $start = (int) (\microtime(true) * 1e3); - - yield $emit($iterator->getCurrent()); - } - - yield Coroutine::result($iterator->getReturn()); - }); -} - /** * Creates an observable that emits values emitted from any observable in the array of observables. Values in the * array are passed through the from() function, so they may be observables, arrays of values to emit, awaitables, @@ -53,31 +20,35 @@ function merge(array $observables) { } } - return new Emitter(function (callable $emit) use ($observables) { - $generator = function (Observable $observable) use (&$emitting, $emit) { - $iterator = $observable->getIterator(); + $postponed = new Postponed; - while (yield $iterator->isValid()) { - while ($emitting !== null) { - yield $emitting; // Prevent simultaneous emit. - } + $generator = function (Observable $observable) use ($postponed) { + $observer = $observable->getObserver(); - yield $emitting = $emit($iterator->getCurrent()); - $emitting = null; - } - - yield Coroutine::result($iterator->getReturn()); - }; - - /** @var \Amp\Coroutine[] $coroutines */ - $coroutines = []; - - foreach ($observables as $observable) { - $coroutines[] = new Coroutine($generator($observable)); + while (yield $observer->isValid()) { + yield $postponed->emit($observer->getCurrent()); } - yield Coroutine::result(yield all($coroutines)); + yield Coroutine::result($observer->getReturn()); + }; + + /** @var \Amp\Coroutine[] $coroutines */ + $coroutines = []; + + foreach ($observables as $observable) { + $coroutines[] = new Coroutine($generator($observable)); + } + + all($coroutines)->when(function ($exception, $value) use ($postponed) { + if ($exception) { + $postponed->fail($exception); + return; + } + + $postponed->complete($value); }); + + return $postponed->getObservable(); } /** @@ -96,26 +67,18 @@ function interval($interval, $count = 0) { throw new \InvalidArgumentException("The number of times to emit must be a non-negative value"); } - return new Emitter(function (callable $emit) use ($interval, $count) { - $i = 0; - $future = new Future; + $postponed = new Postponed; - $watcher = Loop::repeat($interval, function ($watcher) use (&$future, &$i) { - Loop::disable($watcher); - $awaitable = $future; - $future = new Future; - $awaitable->resolve(++$i); - }); + Loop::repeat($interval, function ($watcher) use (&$i, $postponed, $count) { + $postponed->emit(++$i); - try { - while (0 === $count || $i < $count) { - yield $emit($future); - Loop::enable($watcher); - } - } finally { + if ($i === $count) { Loop::cancel($watcher); + $postponed->complete(); } }); + + return $postponed->getObservable(); } /** @@ -138,9 +101,23 @@ function range($start, $end, $step = 1) { throw new \InvalidArgumentException("Step is not of the correct sign"); } - return new Emitter(function (callable $emit) use ($start, $end, $step) { + $postponed = new Postponed; + + $generator = function (Postponed $postponed, $start, $end, $step) { for ($i = $start; $i <= $end; $i += $step) { - yield $emit($i); + yield $postponed->emit($i); } + }; + + $coroutine = new Coroutine($generator($postponed, $start, $end, $step)); + $coroutine->when(function ($exception) use ($postponed) { + if ($exception) { + $postponed->fail($exception); + return; + } + + $postponed->complete(); }); + + return $postponed->getObservable(); } From 4a2baa670e976460092d4603af0a637597ccf18a Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Fri, 27 May 2016 15:44:01 -0500 Subject: [PATCH 04/17] Subscriber model --- example/backpressure.php | 39 ++++---- example/emitter.php | 52 ++++++----- lib/Disposable.php | 12 +++ lib/DisposedException.php | 5 + lib/Internal/Emitted.php | 26 +++--- lib/Internal/ObserverSubscriber.php | 130 ++++++++++++++++++++++++++ lib/Internal/PrivateObservable.php | 6 +- lib/Internal/Producer.php | 78 ++++++---------- lib/Internal/Subscriber.php | 139 ---------------------------- lib/Observable.php | 12 ++- lib/Observer.php | 35 +++++-- lib/Subscriber.php | 83 +++++++++++++++++ lib/functions.php | 17 +--- 13 files changed, 361 insertions(+), 273 deletions(-) create mode 100644 lib/Disposable.php create mode 100644 lib/DisposedException.php create mode 100644 lib/Internal/ObserverSubscriber.php delete mode 100644 lib/Internal/Subscriber.php create mode 100644 lib/Subscriber.php diff --git a/example/backpressure.php b/example/backpressure.php index 1e663df..16c7a54 100644 --- a/example/backpressure.php +++ b/example/backpressure.php @@ -4,7 +4,6 @@ require dirname(__DIR__) . '/vendor/autoload.php'; use Amp\Coroutine; -use Amp\Observable; use Amp\Pause; use Amp\Postponed; use Amp\Loop\NativeLoop; @@ -12,10 +11,24 @@ use Interop\Async\Loop; Loop::execute(Amp\coroutine(function () { try { - $coroutines = []; - $postponed = new Postponed; + $observable = $postponed->getObservable(); + + $disposable = $observable->subscribe(function ($value) { + printf("Observable emitted %d\n", $value); + return new Pause(500); // Artificial back-pressure on observable. + }); + + $disposable->when(function ($exception, $value) { + if ($exception) { + printf("Observable failed: %s\n", $exception->getMessage()); + return; + } + + printf("Observable result %d\n", $value); + }); + $generator = function (Postponed $postponed) { yield $postponed->emit(new Pause(500, 1)); yield $postponed->emit(new Pause(1500, 2)); @@ -27,26 +40,10 @@ Loop::execute(Amp\coroutine(function () { yield $postponed->emit(new Pause(2000, 8)); yield $postponed->emit(9); yield $postponed->emit(10); - yield $postponed->complete(11); + $postponed->complete(11); }; - $coroutines[] = new Coroutine($generator($postponed)); - - $generator = function (Observable $observable) { - $observer = $observable->getObserver(); - - while (yield $observer->isValid()) { - printf("Observable emitted %d\n", $observer->getCurrent()); - yield new Pause(500); // Artificial back-pressure on observer. - } - - printf("Observable result %d\n", $observer->getReturn()); - }; - - - $coroutines[] = new Coroutine($generator($postponed->getObservable())); - - yield Amp\all($coroutines); + yield new Coroutine($generator($postponed)); } catch (\Exception $exception) { printf("Exception: %s\n", $exception); diff --git a/example/emitter.php b/example/emitter.php index a2a00c3..9504756 100644 --- a/example/emitter.php +++ b/example/emitter.php @@ -3,43 +3,45 @@ require dirname(__DIR__) . '/vendor/autoload.php'; -use Amp\Coroutine; -use Amp\Observable; use Amp\Pause; use Amp\Postponed; use Amp\Loop\NativeLoop; use Interop\Async\Loop; -Loop::execute(Amp\coroutine(function () { +Loop::execute(function () { try { $postponed = new Postponed; - $postponed->emit(new Pause(500, 1)); - $postponed->emit(new Pause(1500, 2)); - $postponed->emit(new Pause(1000, 3)); - $postponed->emit(new Pause(2000, 4)); - $postponed->emit(5); - $postponed->emit(6); - $postponed->emit(7); - $postponed->emit(new Pause(2000, 8)); - $postponed->emit(9); - $postponed->emit(10); - $postponed->complete(11); + Loop::defer(function () use ($postponed) { + $postponed->emit(new Pause(500, 1)); + $postponed->emit(new Pause(1500, 2)); + $postponed->emit(new Pause(1000, 3)); + $postponed->emit(new Pause(2000, 4)); + $postponed->emit(5); + $postponed->emit(6); + $postponed->emit(7); + $postponed->emit(new Pause(2000, 8)); + $postponed->emit(9); + $postponed->emit(10); + $postponed->complete(11); + }); - $generator = function (Observable $observable) { - $observer = $observable->getObserver(); + $observable = $postponed->getObservable(); - while (yield $observer->isValid()) { - printf("Observable emitted %d\n", $observer->getCurrent()); - yield new Pause(500); // Artificial back-pressure on observer. + $disposable = $observable->subscribe(function ($value) { + printf("Observable emitted %d\n", $value); + return new Pause(500); // Artificial back-pressure on observable, but is ignored. + }); + + $disposable->when(function ($exception, $value) { + if ($exception) { + printf("Exception: %s\n", $exception->getMessage()); + return; } - printf("Observable result %d\n", $observer->getReturn()); - }; - - yield new Coroutine($generator($postponed->getObservable())); - + printf("Observable result %d\n", $value); + }); } catch (\Exception $exception) { printf("Exception: %s\n", $exception); } -}), $loop = new NativeLoop()); +}, $loop = new NativeLoop()); diff --git a/lib/Disposable.php b/lib/Disposable.php new file mode 100644 index 0000000..8492ada --- /dev/null +++ b/lib/Disposable.php @@ -0,0 +1,12 @@ +awaitable = $value instanceof Awaitable ? $value : new Success($value); + $this->value = $value; $this->waiting = (int) $waiting; $this->complete = (bool) $complete; - $this->future = new Future; + $this->ready = new Future; + + if ($this->waiting === 0) { + $this->ready->resolve($this->value); + } } /** - * @return \Interop\Async\Awaitable|mixed + * @return mixed */ - public function getAwaitable() { - return $this->awaitable; + public function getValue() { + return $this->value; } /** @@ -58,7 +60,7 @@ final class Emitted { */ public function ready() { if (--$this->waiting === 0) { - $this->future->resolve($this->awaitable); + $this->ready->resolve($this->value); } } @@ -66,6 +68,6 @@ final class Emitted { * @return \Interop\Async\Awaitable */ public function wait() { - return $this->future; + return $this->ready; } } \ No newline at end of file diff --git a/lib/Internal/ObserverSubscriber.php b/lib/Internal/ObserverSubscriber.php new file mode 100644 index 0000000..cf8ae14 --- /dev/null +++ b/lib/Internal/ObserverSubscriber.php @@ -0,0 +1,130 @@ +deferred = new Deferred; + + $this->disposable = $observable->subscribe([$this, 'onNext']); + $this->disposable->when([$this, 'onComplete']); + } + + /** + * @param mixed $value + * + * @return \Amp\Future + */ + public function onNext($value) { + $this->current = $value; + + $this->future = new Future; + $this->deferred->resolve(true); + + return $this->future; + } + + /** + * @param \Throwable|\Exception|null $exception + * @param mixed $value + */ + public function onComplete($exception, $value) { + $this->complete = true; + + if ($exception) { + $this->current = null; + $this->deferred->fail($exception); + return; + } + + $this->current = $value; + $this->deferred->resolve(false); + } + + /** + * @return \Interop\Async\Awaitable + */ + public function getAwaitable() { + if ($this->complete) { + return new Success(false); + } + + if ($this->future !== null) { + $future = $this->future; + $this->future = null; + $this->deferred = new Deferred; + $future->resolve(); + } + + return $this->deferred->getAwaitable(); + } + + /** + * @return mixed + * + * @throws \Amp\CompletedException + */ + public function getCurrent() { + if ($this->future === null) { + throw new \LogicException("Awaitable returned from isValid() must resolve before calling this method"); + } + + if ($this->complete) { + throw new CompletedException("The observable has completed"); + } + + return $this->current; + } + + /** + * @return mixed + * + * @throws \Amp\IncompleteException + */ + public function getReturn() { + if (!$this->complete) { + throw new IncompleteException("The observable has not completed"); + } + + return $this->current; + } + + public function dispose() { + $this->disposable->dispose(); + } +} diff --git a/lib/Internal/PrivateObservable.php b/lib/Internal/PrivateObservable.php index 1b32ef6..ba0b43d 100644 --- a/lib/Internal/PrivateObservable.php +++ b/lib/Internal/PrivateObservable.php @@ -8,16 +8,12 @@ use Amp\Observable; * An observable that cannot externally emit values. Used by Postponed in development mode. */ final class PrivateObservable implements Observable { - use Producer { - __construct as init; - } + use Producer; /** * @param callable(callable $emit, callable $complete, callable $fail): void $emitter */ public function __construct(callable $emitter) { - $this->init(); - /** * Emits a value from the observable. * diff --git a/lib/Internal/Producer.php b/lib/Internal/Producer.php index 1ab4089..2d0183a 100644 --- a/lib/Internal/Producer.php +++ b/lib/Internal/Producer.php @@ -5,7 +5,7 @@ namespace Amp\Internal; use Amp\CompletedException; use Amp\Coroutine; use Amp\Failure; -use Amp\Future; +use Amp\Subscriber; trait Producer { /** @@ -13,11 +13,6 @@ trait Producer { */ private $subscriptions = []; - /** - * @var \Amp\Future|null - */ - private $waiting; - /** * @var bool */ @@ -33,43 +28,33 @@ trait Producer { */ private $unsubscribe; - public function __construct() { - $this->waiting = new Future; - - $this->unsubscribe = function (Subscription $subscription) { - unset($this->subscriptions[\spl_object_hash($subscription)]); - - if (empty($this->subscriptions) && !$this->complete) { - $this->waiting = new Future; // Wait for another subscriber. - } - }; - } - /** - * @return \Amp\Observer + * */ - public function getObserver() { + public function subscribe(callable $onNext) { + if ($this->unsubscribe === null) { + $this->unsubscribe = function (Subscription $subscription) { + unset($this->subscriptions[\spl_object_hash($subscription)]); + }; + } + $subscription = new Subscription($this->unsubscribe); $this->subscriptions[\spl_object_hash($subscription)] = $subscription; - if ($this->waiting !== null) { - $waiting = $this->waiting; - $this->waiting = null; - $waiting->resolve(); - } - - return new Subscriber($subscription); + return new Subscriber($onNext, $subscription); } /** - * {@inheritdoc} + * Emits a value from the observable. If the value is an awaitable, the success value will be emitted. If the + * awaitable fails, the observable will fail with the same exception. The returned awaitable is resolved with the + * emitted value once all subscribers have been invoked. + * + * @param mixed $value + * + * @return \Interop\Async\Awaitable */ protected function emit($value = null) { if ($this->complete) { - if ($this->exception) { - throw $this->exception; - } - throw new CompletedException("The observable has completed"); } @@ -88,10 +73,6 @@ trait Producer { * @throws \Throwable|\Exception */ private function push($value, $complete = false) { - if ($this->waiting !== null) { - yield $this->waiting; // Wait for at least one observer to be registered. - } - $emitted = new Emitted($value, \count($this->subscriptions), $complete); foreach ($this->subscriptions as $subscription) { @@ -102,11 +83,9 @@ trait Producer { $value = (yield $emitted->wait()); } catch (\Throwable $exception) { $this->complete = true; - $this->exception = $exception; throw $exception; } catch (\Exception $exception) { $this->complete = true; - $this->exception = $exception; throw $exception; } @@ -114,14 +93,16 @@ trait Producer { } /** - * {@inheritdoc} + * Completes the observable with the given value. If the value is an awaitable, the success value will be emitted. + * If the awaitable fails, the observable will fail with the same exception. The returned awaitable is resolved + * with the completion value once all subscribers have received all prior emitted values. + * + * @param mixed $value + * + * @return \Interop\Async\Awaitable */ protected function complete($value = null) { if ($this->complete) { - if ($this->exception) { - throw $this->exception; - } - throw new CompletedException("The observable has completed"); } @@ -131,14 +112,15 @@ trait Producer { } /** - * {@inheritdoc} + * Fails the observable with the given exception. The returned awaitable fails with the given exception once all + * subscribers have been received all prior emitted values. + * + * @param \Throwable|\Exception $exception + * + * @return \Interop\Async\Awaitable */ protected function fail($exception) { if ($this->complete) { - if ($this->exception) { - throw $this->exception; - } - throw new CompletedException("The observable has completed"); } diff --git a/lib/Internal/Subscriber.php b/lib/Internal/Subscriber.php deleted file mode 100644 index ceca0ce..0000000 --- a/lib/Internal/Subscriber.php +++ /dev/null @@ -1,139 +0,0 @@ -subscription = $subscription; - } - - /** - * Removes queue from collection. - */ - public function __destruct() { - if ($this->emitted !== null) { - $this->emitted->ready(); - } - - $this->subscription->unsubscribe(); - } - - /** - * {@inheritdoc} - */ - public function isValid() { - return new Coroutine($this->valid()); - } - - /** - * @coroutine - * - * @return \Generator - * - * @resolve bool - * - * @throws \Throwable|\Exception - */ - private function valid() { - while ($this->awaitable !== null) { - yield $this->awaitable; // Wait for previous calls to resolve. - } - - if ($this->emitted !== null) { - $this->emitted->ready(); - } - - $this->emitted = (yield $this->subscription->pull()); - - try { - $this->current = (yield $this->awaitable = $this->emitted->getAwaitable()); - } catch (\Throwable $exception) { - $this->exception = $exception; - throw $exception; - } catch (\Exception $exception) { - $this->exception = $exception; - throw $exception; - } finally { - $this->complete = $this->emitted->isComplete(); - $this->awaitable = null; - } - - yield Coroutine::result(!$this->complete); - } - - /** - * {@inheritdoc} - */ - public function getCurrent() { - if ($this->emitted === null || $this->awaitable !== null) { - throw new \LogicException("isValid() must be called before calling this method"); - } - - if ($this->complete) { - if ($this->exception) { - throw $this->exception; - } - - throw new CompletedException("The observable has completed and the iterator is invalid"); - } - - return $this->current; - } - - /** - * {@inheritdoc} - */ - public function getReturn() { - if ($this->emitted === null || $this->awaitable !== null) { - throw new \LogicException("isValid() must be called before calling this method"); - } - - if (!$this->complete) { - throw new IncompleteException("The observable has not completed"); - } - - if ($this->exception) { - throw $this->exception; - } - - return $this->current; - } -} diff --git a/lib/Observable.php b/lib/Observable.php index 3cdcb71..7fd96b4 100644 --- a/lib/Observable.php +++ b/lib/Observable.php @@ -4,9 +4,15 @@ namespace Amp; interface Observable { /** - * Returns an observer of the observable. + * Registers a callback to be invoked each time value is emitted from the observable. If the function returns an + * awaitable, backpressure is applied to the awaitable until the returned awaitable is resolved. * - * @return \Amp\Observer + * Exceptions thrown from $onNext (or failures of awaitables returned from $onNext) will fail the returned + * Disposable with the thrown exception. + * + * @param callable $onNext Function invoked each time a value is emitted from the observable. + * + * @return \Amp\Disposable */ - public function getObserver(); + public function subscribe(callable $onNext); } diff --git a/lib/Observer.php b/lib/Observer.php index c48d10a..299e9f0 100644 --- a/lib/Observer.php +++ b/lib/Observer.php @@ -2,7 +2,26 @@ namespace Amp; -interface Observer { +final class Observer { + /** + * @var \Amp\Internal\ObserverSubscriber + */ + private $subscriber; + + /** + * @param \Amp\Observable $observable + */ + public function __construct(Observable $observable) { + $this->subscriber = new Internal\ObserverSubscriber($observable); + } + + /** + * Disposes of the subscription. + */ + public function __destruct() { + $this->subscriber->dispose(); + } + /** * Succeeds with true if a new value is available by calling getCurrent() or false if the observable has completed. * Calling getCurrent() will throw an exception if the observable completed. If an error occurs with the observable, @@ -14,7 +33,9 @@ interface Observer { * * @throws \Throwable|\Exception Exception used to fail the observable. */ - public function isValid(); + public function isValid() { + return $this->subscriber->getAwaitable(); + } /** * Gets the last emitted value or throws an exception if the observable has completed. @@ -23,9 +44,10 @@ interface Observer { * * @throws \Amp\CompletedException If the observable has successfully completed. * @throws \LogicException If isValid() was not called before calling this method. - * @throws \Throwable|\Exception Exception used to fail the observable. */ - public function getCurrent(); + public function getCurrent() { + return $this->subscriber->getCurrent(); + } /** * Gets the return value of the observable or throws the failure reason. Also throws an exception if the @@ -34,7 +56,8 @@ interface Observer { * @return mixed Final return value of the observable. * * @throws \Amp\IncompleteException If the observable has not completed. - * @throws \Throwable|\Exception Exception used to fail the observable. */ - public function getReturn(); + public function getReturn() { + return $this->subscriber->getReturn(); + } } diff --git a/lib/Subscriber.php b/lib/Subscriber.php new file mode 100644 index 0000000..575410f --- /dev/null +++ b/lib/Subscriber.php @@ -0,0 +1,83 @@ +coroutine = new Coroutine($this->run($onNext, $subscription)); + } + + /** + * @coroutine + * + * @param callable $onNext + * @param \Amp\Internal\Subscription $subscription + * + * @return \Generator + * + * @throws \Throwable|\Exception + */ + private function run(callable $onNext, Internal\Subscription $subscription) { + try { + while ($this->subscribed) { + /** @var \Amp\Internal\Emitted $emitted */ + $emitted = (yield $subscription->pull()); + + try { + $value = $emitted->getValue(); + + if ($value instanceof Awaitable) { + $value = (yield $value); + } + + if ($emitted->isComplete()) { + yield Coroutine::result($value); + return; + } + + $result = $onNext($value); + + if ($result instanceof Awaitable) { + yield $result; + } + } finally { + $emitted->ready(); + } + } + } finally { + $subscription->unsubscribe(); + } + + throw new DisposedException("The subscriber was disposed"); + } + + /** + * {@inheritdoc} + */ + public function when(callable $onResolved) { + $this->coroutine->when($onResolved); + } + + /** + * {@inheritdoc} + */ + public function dispose() { + $this->subscribed = false; + } +} diff --git a/lib/functions.php b/lib/functions.php index 36b4630..56242bc 100644 --- a/lib/functions.php +++ b/lib/functions.php @@ -22,24 +22,13 @@ function merge(array $observables) { $postponed = new Postponed; - $generator = function (Observable $observable) use ($postponed) { - $observer = $observable->getObserver(); - - while (yield $observer->isValid()) { - yield $postponed->emit($observer->getCurrent()); - } - - yield Coroutine::result($observer->getReturn()); - }; - - /** @var \Amp\Coroutine[] $coroutines */ - $coroutines = []; + $subscriptions = []; foreach ($observables as $observable) { - $coroutines[] = new Coroutine($generator($observable)); + $subscriptions[] = $observable->subscribe([$postponed, 'emit']); } - all($coroutines)->when(function ($exception, $value) use ($postponed) { + all($subscriptions)->when(function ($exception, $value) use ($postponed) { if ($exception) { $postponed->fail($exception); return; From 57e25f935422f0288ad811de38ddcc1c1fb45c41 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sun, 29 May 2016 11:35:09 -0500 Subject: [PATCH 05/17] Simpler implementation --- example/backpressure.php | 2 +- example/emitter.php | 2 +- lib/CompletedException.php | 5 - lib/Disposable.php | 2 +- lib/Emitter.php | 5 +- lib/IncompleteException.php | 5 - lib/Internal/Emitted.php | 73 ------------ lib/Internal/ObserverSubscriber.php | 130 --------------------- lib/Internal/PrivateObservable.php | 12 +- lib/Internal/Producer.php | 169 +++++++++++++++++----------- lib/Internal/Subscription.php | 78 ------------- lib/Observable.php | 4 +- lib/Observer.php | 116 +++++++++++++++++-- lib/Postponed.php | 22 ++-- lib/Subscriber.php | 67 +++-------- lib/functions.php | 6 +- 16 files changed, 254 insertions(+), 444 deletions(-) delete mode 100644 lib/CompletedException.php delete mode 100644 lib/IncompleteException.php delete mode 100644 lib/Internal/Emitted.php delete mode 100644 lib/Internal/ObserverSubscriber.php delete mode 100644 lib/Internal/Subscription.php diff --git a/example/backpressure.php b/example/backpressure.php index 16c7a54..ca47c72 100644 --- a/example/backpressure.php +++ b/example/backpressure.php @@ -40,7 +40,7 @@ Loop::execute(Amp\coroutine(function () { yield $postponed->emit(new Pause(2000, 8)); yield $postponed->emit(9); yield $postponed->emit(10); - $postponed->complete(11); + $postponed->resolve(11); }; yield new Coroutine($generator($postponed)); diff --git a/example/emitter.php b/example/emitter.php index 9504756..578d41a 100644 --- a/example/emitter.php +++ b/example/emitter.php @@ -23,7 +23,7 @@ Loop::execute(function () { $postponed->emit(new Pause(2000, 8)); $postponed->emit(9); $postponed->emit(10); - $postponed->complete(11); + $postponed->resolve(11); }); $observable = $postponed->getObservable(); diff --git a/lib/CompletedException.php b/lib/CompletedException.php deleted file mode 100644 index b56013e..0000000 --- a/lib/CompletedException.php +++ /dev/null @@ -1,5 +0,0 @@ -value = $value; - $this->waiting = (int) $waiting; - $this->complete = (bool) $complete; - $this->ready = new Future; - - if ($this->waiting === 0) { - $this->ready->resolve($this->value); - } - } - - /** - * @return mixed - */ - public function getValue() { - return $this->value; - } - - /** - * @return bool - */ - public function isComplete() { - return $this->complete; - } - - /** - * Indicates that a subscriber has consumed the value represented by this object. - */ - public function ready() { - if (--$this->waiting === 0) { - $this->ready->resolve($this->value); - } - } - - /** - * @return \Interop\Async\Awaitable - */ - public function wait() { - return $this->ready; - } -} \ No newline at end of file diff --git a/lib/Internal/ObserverSubscriber.php b/lib/Internal/ObserverSubscriber.php deleted file mode 100644 index cf8ae14..0000000 --- a/lib/Internal/ObserverSubscriber.php +++ /dev/null @@ -1,130 +0,0 @@ -deferred = new Deferred; - - $this->disposable = $observable->subscribe([$this, 'onNext']); - $this->disposable->when([$this, 'onComplete']); - } - - /** - * @param mixed $value - * - * @return \Amp\Future - */ - public function onNext($value) { - $this->current = $value; - - $this->future = new Future; - $this->deferred->resolve(true); - - return $this->future; - } - - /** - * @param \Throwable|\Exception|null $exception - * @param mixed $value - */ - public function onComplete($exception, $value) { - $this->complete = true; - - if ($exception) { - $this->current = null; - $this->deferred->fail($exception); - return; - } - - $this->current = $value; - $this->deferred->resolve(false); - } - - /** - * @return \Interop\Async\Awaitable - */ - public function getAwaitable() { - if ($this->complete) { - return new Success(false); - } - - if ($this->future !== null) { - $future = $this->future; - $this->future = null; - $this->deferred = new Deferred; - $future->resolve(); - } - - return $this->deferred->getAwaitable(); - } - - /** - * @return mixed - * - * @throws \Amp\CompletedException - */ - public function getCurrent() { - if ($this->future === null) { - throw new \LogicException("Awaitable returned from isValid() must resolve before calling this method"); - } - - if ($this->complete) { - throw new CompletedException("The observable has completed"); - } - - return $this->current; - } - - /** - * @return mixed - * - * @throws \Amp\IncompleteException - */ - public function getReturn() { - if (!$this->complete) { - throw new IncompleteException("The observable has not completed"); - } - - return $this->current; - } - - public function dispose() { - $this->disposable->dispose(); - } -} diff --git a/lib/Internal/PrivateObservable.php b/lib/Internal/PrivateObservable.php index ba0b43d..9b6ba03 100644 --- a/lib/Internal/PrivateObservable.php +++ b/lib/Internal/PrivateObservable.php @@ -29,26 +29,22 @@ final class PrivateObservable implements Observable { * Completes the observable with the given value. * * @param mixed $value - * - * @return \Interop\Async\Awaitable */ - $complete = function ($value = null) { - return $this->complete($value); + $resolve = function ($value = null) { + $this->resolve($value); }; /** * Fails the observable with the given exception. * * @param \Exception $reason - * - * @return \Interop\Async\Awaitable */ $fail = function ($reason) { - return $this->fail($reason); + $this->fail($reason); }; try { - $emitter($emit, $complete, $fail); + $emitter($emit, $resolve, $fail); } catch (\Throwable $exception) { $this->fail($exception); } catch (\Exception $exception) { diff --git a/lib/Internal/Producer.php b/lib/Internal/Producer.php index 2d0183a..8e794a5 100644 --- a/lib/Internal/Producer.php +++ b/lib/Internal/Producer.php @@ -2,60 +2,93 @@ namespace Amp\Internal; -use Amp\CompletedException; use Amp\Coroutine; -use Amp\Failure; +use Amp\DisposedException; +use Amp\Future; +use Amp\Observable; use Amp\Subscriber; +use Amp\Success; +use Interop\Async\Awaitable; trait Producer { - /** - * @var Subscription[] - */ - private $subscriptions = []; + use Placeholder { + resolve as complete; + } /** - * @var bool + * @var callable[] */ - private $complete = false; + private $subscribers = []; /** - * @var \Throwable|\Exception|null + * @var \Amp\Future[] */ - private $exception; + private $futures = []; + + /** + * @var string + */ + private $nextId = "a"; /** * @var callable */ - private $unsubscribe; + private $dispose; /** - * + * @param callable $onNext + * + * @return \Amp\Subscriber */ public function subscribe(callable $onNext) { - if ($this->unsubscribe === null) { - $this->unsubscribe = function (Subscription $subscription) { - unset($this->subscriptions[\spl_object_hash($subscription)]); + if ($this->dispose === null) { + $this->dispose = function ($id, $exception = null) { + $this->dispose($id, $exception); }; } - $subscription = new Subscription($this->unsubscribe); - $this->subscriptions[\spl_object_hash($subscription)] = $subscription; + if ($this->result !== null) { + return new Subscriber( + $this->nextId++, + $this->result instanceof Awaitable ? $this->result : new Success($this->result), + $this->dispose + ); + } - return new Subscriber($onNext, $subscription); + $id = $this->nextId++; + $this->futures[$id] = new Future; + $this->subscribers[$id] = $onNext; + + return new Subscriber($id, $this->futures[$id], $this->dispose); } /** - * Emits a value from the observable. If the value is an awaitable, the success value will be emitted. If the - * awaitable fails, the observable will fail with the same exception. The returned awaitable is resolved with the - * emitted value once all subscribers have been invoked. + * @param string $id + * @param \Throwable|\Exception|null $exception + */ + protected function dispose($id, $exception = null) { + if (!isset($this->futures[$id])) { + throw new \LogicException("Disposable has already been disposed"); + } + + $future = $this->futures[$id]; + unset($this->subscribers[$id], $this->futures[$id]); + $future->fail($exception ?: new DisposedException()); + } + + /** + * Emits a value from the observable. The returned awaitable is resolved with the emitted value once all subscribers + * have been invoked. * * @param mixed $value * * @return \Interop\Async\Awaitable + * + * @throws \LogicException If the observable has resolved. */ protected function emit($value = null) { - if ($this->complete) { - throw new CompletedException("The observable has completed"); + if ($this->resolved) { + throw new \LogicException("The observable has been resolved; cannot emit more values"); } return new Coroutine($this->push($value)); @@ -65,67 +98,77 @@ trait Producer { * @coroutine * * @param mixed $value - * @param bool $complete * * @return \Generator * * @throws \InvalidArgumentException * @throws \Throwable|\Exception */ - private function push($value, $complete = false) { - $emitted = new Emitted($value, \count($this->subscriptions), $complete); - - foreach ($this->subscriptions as $subscription) { - $subscription->push($emitted); - } - + private function push($value) { try { - $value = (yield $emitted->wait()); + if ($value instanceof Awaitable) { + $value = (yield $value); + } elseif ($value instanceof Observable) { + $disposable = $value->subscribe(function ($value) { + return $this->emit($value); + }); + $value = (yield $disposable); + } } catch (\Throwable $exception) { - $this->complete = true; + if (!$this->resolved) { + $this->fail($exception); + } throw $exception; } catch (\Exception $exception) { - $this->complete = true; + if (!$this->resolved) { + $this->fail($exception); + } throw $exception; } + $awaitables = []; + + foreach ($this->subscribers as $id => $onNext) { + try { + $result = $onNext($value); + if ($result instanceof Awaitable) { + $awaitables[$id] = $result; + } + } catch (\Throwable $exception) { + $this->dispose($id, $exception); + } catch (\Exception $exception) { + $this->dispose($id, $exception); + } + } + + foreach ($awaitables as $id => $awaitable) { + try { + yield $awaitable; + } catch (\Throwable $exception) { + $this->dispose($id, $exception); + } catch (\Exception $exception) { + $this->dispose($id, $exception); + } + } + yield Coroutine::result($value); } /** - * Completes the observable with the given value. If the value is an awaitable, the success value will be emitted. - * If the awaitable fails, the observable will fail with the same exception. The returned awaitable is resolved - * with the completion value once all subscribers have received all prior emitted values. + * Resolves the observable with the given value. * * @param mixed $value * - * @return \Interop\Async\Awaitable + * @throws \LogicException If the observable has already been resolved. */ - protected function complete($value = null) { - if ($this->complete) { - throw new CompletedException("The observable has completed"); + protected function resolve($value = null) { + $futures = $this->futures; + $this->subscribers = $this->futures = []; + + $this->complete($value); + + foreach ($futures as $future) { + $future->resolve($value); } - - $this->complete = true; - - return new Coroutine($this->push($value, true)); - } - - /** - * Fails the observable with the given exception. The returned awaitable fails with the given exception once all - * subscribers have been received all prior emitted values. - * - * @param \Throwable|\Exception $exception - * - * @return \Interop\Async\Awaitable - */ - protected function fail($exception) { - if ($this->complete) { - throw new CompletedException("The observable has completed"); - } - - $this->complete = true; - - return new Coroutine($this->push(new Failure($exception), true)); } } diff --git a/lib/Internal/Subscription.php b/lib/Internal/Subscription.php deleted file mode 100644 index 5fbc08d..0000000 --- a/lib/Internal/Subscription.php +++ /dev/null @@ -1,78 +0,0 @@ -unsubscribe = $unsubscribe; - } - - /** - * Removes the subscription from the emit queue. - */ - public function unsubscribe() { - foreach ($this->valueQueue as $emitted) { - $emitted->ready(); - } - - $this->valueQueue = []; - - $unsubscribe = $this->unsubscribe; - $unsubscribe($this); - } - - /** - * @param \Amp\Internal\Emitted $emitted - */ - public function push(Emitted $emitted) { - if ($this->future !== null) { - $future = $this->future; - $this->future = null; - $future->resolve($emitted); - } else { - $this->valueQueue[] = $emitted; - } - } - - /** - * @return \Interop\Async\Awaitable - */ - public function pull() { - if (empty($this->valueQueue)) { - $this->future = new Future; - return $this->future; - } - - $emitted = $this->valueQueue[$this->current]; - unset($this->valueQueue[$this->current]); - ++$this->current; - - return new Success($emitted); - } -} \ No newline at end of file diff --git a/lib/Observable.php b/lib/Observable.php index 7fd96b4..1902f54 100644 --- a/lib/Observable.php +++ b/lib/Observable.php @@ -2,7 +2,9 @@ namespace Amp; -interface Observable { +use Interop\Async\Awaitable; + +interface Observable extends Awaitable { /** * Registers a callback to be invoked each time value is emitted from the observable. If the function returns an * awaitable, backpressure is applied to the awaitable until the returned awaitable is resolved. diff --git a/lib/Observer.php b/lib/Observer.php index 299e9f0..9e4adea 100644 --- a/lib/Observer.php +++ b/lib/Observer.php @@ -4,28 +4,93 @@ namespace Amp; final class Observer { /** - * @var \Amp\Internal\ObserverSubscriber + * @var \Amp\Disposable */ private $subscriber; + /** + * @var \Amp\Deferred + */ + private $deferred; + + /** + * @var \Amp\Future + */ + private $future; + + /** + * @var bool + */ + private $complete = false; + + /** + * @var mixed + */ + private $current; + + /** + * @var \Throwable|\Exception + */ + private $exception; + /** * @param \Amp\Observable $observable */ public function __construct(Observable $observable) { - $this->subscriber = new Internal\ObserverSubscriber($observable); + $this->deferred = new Deferred; + + $deferred = &$this->deferred; + $future = &$this->future; + $current = &$this->current; + + $this->subscriber = $observable->subscribe(static function ($value) use (&$deferred, &$future, &$current) { + $current = $value; + + $future = new Future; + $deferred->resolve(true); + + return $future; + }); + + $complete = &$this->complete; + $error = &$this->exception; + + $this->subscriber->when(static function ($exception, $value) use ( + &$deferred, &$future, &$current, &$error, &$complete + ) { + $complete = true; + + if ($exception) { + $current = null; + $error = $exception; + if ($future === null) { + $deferred->fail($exception); + } + return; + } + + $current = $value; + if ($future === null) { + $deferred->resolve(false); + } + }); } /** - * Disposes of the subscription. + * Disposes of the subscriber. */ public function __destruct() { $this->subscriber->dispose(); + + if ($this->future !== null) { + $this->future->resolve(); + } } /** * Succeeds with true if a new value is available by calling getCurrent() or false if the observable has completed. * Calling getCurrent() will throw an exception if the observable completed. If an error occurs with the observable, - * this coroutine will be rejected with the exception used to fail the observable. + * the returned awaitable will fail with the exception used to fail the observable. * * @return \Interop\Async\Awaitable * @@ -34,7 +99,22 @@ final class Observer { * @throws \Throwable|\Exception Exception used to fail the observable. */ public function isValid() { - return $this->subscriber->getAwaitable(); + if ($this->complete) { + if ($this->exception) { + return new Failure($this->exception); + } + + return new Success(false); + } + + if ($this->future !== null) { + $future = $this->future; + $this->future = null; + $this->deferred = new Deferred; + $future->resolve(); + } + + return $this->deferred->getAwaitable(); } /** @@ -42,11 +122,18 @@ final class Observer { * * @return mixed Value emitted from observable. * - * @throws \Amp\CompletedException If the observable has successfully completed. - * @throws \LogicException If isValid() was not called before calling this method. + * @throws \LogicException If the observable has resolved or isValid() was not called before calling this method. */ public function getCurrent() { - return $this->subscriber->getCurrent(); + if ($this->future === null) { + throw new \LogicException("Awaitable returned from isValid() must resolve before calling this method"); + } + + if ($this->complete) { + throw new \LogicException("The observable has completed"); + } + + return $this->current; } /** @@ -55,9 +142,18 @@ final class Observer { * * @return mixed Final return value of the observable. * - * @throws \Amp\IncompleteException If the observable has not completed. + * @throws \LogicException If the observable has not completed. + * @throws \Throwable|\Exception The exception used to fail the observable. */ public function getReturn() { - return $this->subscriber->getReturn(); + if (!$this->complete) { + throw new \LogicException("The observable has not completed"); + } + + if ($this->exception) { + throw $this->exception; + } + + return $this->current; } } diff --git a/lib/Postponed.php b/lib/Postponed.php index cb5967b..391d4ba 100644 --- a/lib/Postponed.php +++ b/lib/Postponed.php @@ -8,7 +8,7 @@ try { final class Postponed implements Observable { use Internal\Producer { emit as public; - complete as public; + resolve as public; fail as public; } @@ -35,7 +35,7 @@ try { /** * @var callable */ - private $complete; + private $resolve; /** * @var callable @@ -44,9 +44,9 @@ try { public function __construct() { $this->observable = new Internal\PrivateObservable( - function (callable $emit, callable $complete, callable $fail) { + function (callable $emit, callable $resolve, callable $fail) { $this->emit = $emit; - $this->complete = $complete; + $this->resolve = $resolve; $this->fail = $fail; } ); @@ -72,27 +72,23 @@ try { } /** - * Completes the observable with the given value. + * Resolves the observable with the given value. * * @param mixed $value - * - * @return \Interop\Async\Awaitable */ - public function complete($value = null) { - $complete = $this->complete; - return $complete($value); + public function resolve($value = null) { + $resolve = $this->resolve; + $resolve($value); } /** * Fails the observable with the given reason. * * @param \Throwable|\Exception $reason - * - * @return \Interop\Async\Awaitable */ public function fail($reason) { $fail = $this->fail; - return $fail($reason); + $fail($reason); } } } diff --git a/lib/Subscriber.php b/lib/Subscriber.php index 575410f..6cc0e27 100644 --- a/lib/Subscriber.php +++ b/lib/Subscriber.php @@ -6,78 +6,43 @@ use Interop\Async\Awaitable; final class Subscriber implements Disposable { /** - * @var \Amp\Coroutine + * @var string */ - private $coroutine; + private $id; /** - * @var bool + * @var \Interop\Async\Awaitable */ - private $subscribed = true; + private $awaitable; /** - * @param callable $onNext - * @param \Amp\Internal\Subscription $subscription + * @var callable */ - public function __construct(callable $onNext, Internal\Subscription $subscription) { - $this->coroutine = new Coroutine($this->run($onNext, $subscription)); - } + private $dispose; /** - * @coroutine - * - * @param callable $onNext - * @param \Amp\Internal\Subscription $subscription - * - * @return \Generator - * - * @throws \Throwable|\Exception + * @param string $id + * @param \Interop\Async\Awaitable $awaitable + * @param callable $dispose */ - private function run(callable $onNext, Internal\Subscription $subscription) { - try { - while ($this->subscribed) { - /** @var \Amp\Internal\Emitted $emitted */ - $emitted = (yield $subscription->pull()); - - try { - $value = $emitted->getValue(); - - if ($value instanceof Awaitable) { - $value = (yield $value); - } - - if ($emitted->isComplete()) { - yield Coroutine::result($value); - return; - } - - $result = $onNext($value); - - if ($result instanceof Awaitable) { - yield $result; - } - } finally { - $emitted->ready(); - } - } - } finally { - $subscription->unsubscribe(); - } - - throw new DisposedException("The subscriber was disposed"); + public function __construct($id, Awaitable $awaitable, callable $dispose) { + $this->id = $id; + $this->awaitable = $awaitable; + $this->dispose = $dispose; } /** * {@inheritdoc} */ public function when(callable $onResolved) { - $this->coroutine->when($onResolved); + $this->awaitable->when($onResolved); } /** * {@inheritdoc} */ public function dispose() { - $this->subscribed = false; + $dispose = $this->dispose; + $dispose($this->id); } } diff --git a/lib/functions.php b/lib/functions.php index 56242bc..fef6721 100644 --- a/lib/functions.php +++ b/lib/functions.php @@ -34,7 +34,7 @@ function merge(array $observables) { return; } - $postponed->complete($value); + $postponed->resolve($value); }); return $postponed->getObservable(); @@ -63,7 +63,7 @@ function interval($interval, $count = 0) { if ($i === $count) { Loop::cancel($watcher); - $postponed->complete(); + $postponed->resolve(); } }); @@ -105,7 +105,7 @@ function range($start, $end, $step = 1) { return; } - $postponed->complete(); + $postponed->resolve(); }); return $postponed->getObservable(); From 579ab2fc96f147a1096f3a5304fe949a997defe4 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sun, 29 May 2016 12:07:21 -0500 Subject: [PATCH 06/17] Update example --- example/emitter.php | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/example/emitter.php b/example/emitter.php index 578d41a..ff11eb7 100644 --- a/example/emitter.php +++ b/example/emitter.php @@ -13,14 +13,14 @@ Loop::execute(function () { $postponed = new Postponed; Loop::defer(function () use ($postponed) { - $postponed->emit(new Pause(500, 1)); - $postponed->emit(new Pause(1500, 2)); - $postponed->emit(new Pause(1000, 3)); - $postponed->emit(new Pause(2000, 4)); + $postponed->emit(1); + $postponed->emit(2); + $postponed->emit(3); + $postponed->emit(4); $postponed->emit(5); $postponed->emit(6); $postponed->emit(7); - $postponed->emit(new Pause(2000, 8)); + $postponed->emit(8); $postponed->emit(9); $postponed->emit(10); $postponed->resolve(11); From 1b079bb336ebbd07c80afacc91b462dbc58a6601 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sun, 29 May 2016 23:49:44 -0500 Subject: [PATCH 07/17] Fix Observer --- example/emitter.php | 26 +++++++------ lib/Observer.php | 95 +++++++++++++++++++++++++++------------------ 2 files changed, 71 insertions(+), 50 deletions(-) diff --git a/example/emitter.php b/example/emitter.php index ff11eb7..89fcde2 100644 --- a/example/emitter.php +++ b/example/emitter.php @@ -3,12 +3,14 @@ require dirname(__DIR__) . '/vendor/autoload.php'; +use Amp\Observable; +use Amp\Observer; use Amp\Pause; use Amp\Postponed; use Amp\Loop\NativeLoop; use Interop\Async\Loop; -Loop::execute(function () { +Loop::execute(Amp\coroutine(function () { try { $postponed = new Postponed; @@ -28,20 +30,20 @@ Loop::execute(function () { $observable = $postponed->getObservable(); - $disposable = $observable->subscribe(function ($value) { - printf("Observable emitted %d\n", $value); - return new Pause(500); // Artificial back-pressure on observable, but is ignored. - }); + $generator = function (Observable $observable) { + $observer = new Observer($observable); - $disposable->when(function ($exception, $value) { - if ($exception) { - printf("Exception: %s\n", $exception->getMessage()); - return; + while (yield $observer->isValid()) { + printf("Observable emitted %d\n", $observer->getCurrent()); + yield new Pause(100); } - printf("Observable result %d\n", $value); - }); + printf("Observable result %d\n", $observer->getReturn()); + }; + + yield new \Amp\Coroutine($generator($observable)); + } catch (\Exception $exception) { printf("Exception: %s\n", $exception); } -}, $loop = new NativeLoop()); +}), $loop = new NativeLoop()); diff --git a/lib/Observer.php b/lib/Observer.php index 9e4adea..2fc509d 100644 --- a/lib/Observer.php +++ b/lib/Observer.php @@ -9,14 +9,24 @@ final class Observer { private $subscriber; /** - * @var \Amp\Deferred + * @var mixed[] */ - private $deferred; + private $values = []; /** - * @var \Amp\Future + * @var \Amp\Future[] */ - private $future; + private $futures = []; + + /** + * @var int + */ + private $position = -1; + + /** + * @var \Amp\Deferred|null + */ + private $deferred; /** * @var bool @@ -26,10 +36,10 @@ final class Observer { /** * @var mixed */ - private $current; + private $result; /** - * @var \Throwable|\Exception + * @var \Throwable|\Exception|null */ private $exception; @@ -37,40 +47,41 @@ final class Observer { * @param \Amp\Observable $observable */ public function __construct(Observable $observable) { - $this->deferred = new Deferred; - $deferred = &$this->deferred; - $future = &$this->future; - $current = &$this->current; + $values = &$this->values; + $futures = &$this->futures; - $this->subscriber = $observable->subscribe(static function ($value) use (&$deferred, &$future, &$current) { - $current = $value; + $this->subscriber = $observable->subscribe(static function ($value) use (&$deferred, &$values, &$futures) { + $values[] = $value; + $futures[] = $future = new Future; - $future = new Future; - $deferred->resolve(true); + if ($deferred !== null) { + $temp = $deferred; + $deferred = null; + $temp->resolve($value); + } return $future; }); $complete = &$this->complete; + $result = &$this->result; $error = &$this->exception; - $this->subscriber->when(static function ($exception, $value) use ( - &$deferred, &$future, &$current, &$error, &$complete - ) { + $this->subscriber->when(static function ($exception, $value) use (&$deferred, &$result, &$error, &$complete) { $complete = true; if ($exception) { - $current = null; + $result = null; $error = $exception; - if ($future === null) { + if ($deferred !== null) { $deferred->fail($exception); } return; } - $current = $value; - if ($future === null) { + $result = $value; + if ($deferred !== null) { $deferred->resolve(false); } }); @@ -80,10 +91,12 @@ final class Observer { * Disposes of the subscriber. */ public function __destruct() { - $this->subscriber->dispose(); + if (!$this->complete) { + $this->subscriber->dispose(); + } - if ($this->future !== null) { - $this->future->resolve(); + foreach ($this->futures as $future) { + $future->resolve(); } } @@ -99,6 +112,18 @@ final class Observer { * @throws \Throwable|\Exception Exception used to fail the observable. */ public function isValid() { + if (isset($this->futures[$this->position])) { + $future = $this->futures[$this->position]; + unset($this->values[$this->position], $this->futures[$this->position]); + $future->resolve(); + } + + ++$this->position; + + if (isset($this->values[$this->position])) { + return new Success(true); + } + if ($this->complete) { if ($this->exception) { return new Failure($this->exception); @@ -107,13 +132,7 @@ final class Observer { return new Success(false); } - if ($this->future !== null) { - $future = $this->future; - $this->future = null; - $this->deferred = new Deferred; - $future->resolve(); - } - + $this->deferred = new Deferred; return $this->deferred->getAwaitable(); } @@ -125,15 +144,15 @@ final class Observer { * @throws \LogicException If the observable has resolved or isValid() was not called before calling this method. */ public function getCurrent() { - if ($this->future === null) { - throw new \LogicException("Awaitable returned from isValid() must resolve before calling this method"); - } - - if ($this->complete) { + if (empty($this->values) && $this->complete) { throw new \LogicException("The observable has completed"); } - return $this->current; + if (!isset($this->values[$this->position])) { + throw new \LogicException("Awaitable returned from isValid() must resolve before calling this method"); + } + + return $this->values[$this->position]; } /** @@ -154,6 +173,6 @@ final class Observer { throw $this->exception; } - return $this->current; + return $this->result; } } From a52e1e4c3337954c3f3ccb8f3f8f700576a4e6d8 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Mon, 30 May 2016 10:24:03 -0500 Subject: [PATCH 08/17] Rename Observer methods --- example/emitter.php | 7 ++++--- lib/Observer.php | 39 +++++++++++++++++---------------------- 2 files changed, 21 insertions(+), 25 deletions(-) diff --git a/example/emitter.php b/example/emitter.php index 89fcde2..b566f1a 100644 --- a/example/emitter.php +++ b/example/emitter.php @@ -15,6 +15,7 @@ Loop::execute(Amp\coroutine(function () { $postponed = new Postponed; Loop::defer(function () use ($postponed) { + // Observer emits all values at once. $postponed->emit(1); $postponed->emit(2); $postponed->emit(3); @@ -33,12 +34,12 @@ Loop::execute(Amp\coroutine(function () { $generator = function (Observable $observable) { $observer = new Observer($observable); - while (yield $observer->isValid()) { + while (yield $observer->next()) { printf("Observable emitted %d\n", $observer->getCurrent()); - yield new Pause(100); + yield new Pause(100); // Observer consumption takes 100 ms. } - printf("Observable result %d\n", $observer->getReturn()); + printf("Observable result %d\n", $observer->getResult()); }; yield new \Amp\Coroutine($generator($observable)); diff --git a/lib/Observer.php b/lib/Observer.php index 2fc509d..e3520cf 100644 --- a/lib/Observer.php +++ b/lib/Observer.php @@ -31,7 +31,7 @@ final class Observer { /** * @var bool */ - private $complete = false; + private $resolved = false; /** * @var mixed @@ -64,12 +64,12 @@ final class Observer { return $future; }); - $complete = &$this->complete; + $resolved = &$this->resolved; $result = &$this->result; $error = &$this->exception; - $this->subscriber->when(static function ($exception, $value) use (&$deferred, &$result, &$error, &$complete) { - $complete = true; + $this->subscriber->when(static function ($exception, $value) use (&$deferred, &$result, &$error, &$resolved) { + $resolved = true; if ($exception) { $result = null; @@ -91,7 +91,7 @@ final class Observer { * Disposes of the subscriber. */ public function __destruct() { - if (!$this->complete) { + if (!$this->resolved) { $this->subscriber->dispose(); } @@ -101,17 +101,12 @@ final class Observer { } /** - * Succeeds with true if a new value is available by calling getCurrent() or false if the observable has completed. - * Calling getCurrent() will throw an exception if the observable completed. If an error occurs with the observable, - * the returned awaitable will fail with the exception used to fail the observable. + * Succeeds with true if an emitted value is available by calling getCurrent() or false if the observable has + * resolved. If the observable fails, the returned awaitable will fail with the same exception. * * @return \Interop\Async\Awaitable - * - * @resolve bool - * - * @throws \Throwable|\Exception Exception used to fail the observable. */ - public function isValid() { + public function next() { if (isset($this->futures[$this->position])) { $future = $this->futures[$this->position]; unset($this->values[$this->position], $this->futures[$this->position]); @@ -124,7 +119,7 @@ final class Observer { return new Success(true); } - if ($this->complete) { + if ($this->resolved) { if ($this->exception) { return new Failure($this->exception); } @@ -141,32 +136,32 @@ final class Observer { * * @return mixed Value emitted from observable. * - * @throws \LogicException If the observable has resolved or isValid() was not called before calling this method. + * @throws \LogicException If the observable has resolved or next() was not called before calling this method. */ public function getCurrent() { - if (empty($this->values) && $this->complete) { + if (empty($this->values) && $this->resolved) { throw new \LogicException("The observable has completed"); } if (!isset($this->values[$this->position])) { - throw new \LogicException("Awaitable returned from isValid() must resolve before calling this method"); + throw new \LogicException("Awaitable returned from next() must resolve before calling this method"); } return $this->values[$this->position]; } /** - * Gets the return value of the observable or throws the failure reason. Also throws an exception if the - * observable has not completed. + * Gets the result of the observable or throws the failure reason. Also throws an exception if the observable has + * not completed. * * @return mixed Final return value of the observable. * * @throws \LogicException If the observable has not completed. * @throws \Throwable|\Exception The exception used to fail the observable. */ - public function getReturn() { - if (!$this->complete) { - throw new \LogicException("The observable has not completed"); + public function getResult() { + if (!$this->resolved) { + throw new \LogicException("The observable has not resolved"); } if ($this->exception) { From eb49e6e8ffdafc01b1a0d2989931b0cad38b684f Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Tue, 31 May 2016 15:01:07 -0500 Subject: [PATCH 09/17] Resolve emit with observable result instead of emitting Emitting another observable will emit values from the emitted observable, then resolve the awaitable returned from emit with the observable result --- lib/Internal/Producer.php | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/lib/Internal/Producer.php b/lib/Internal/Producer.php index 8e794a5..b6eb382 100644 --- a/lib/Internal/Producer.php +++ b/lib/Internal/Producer.php @@ -106,13 +106,16 @@ trait Producer { */ private function push($value) { try { - if ($value instanceof Awaitable) { - $value = (yield $value); - } elseif ($value instanceof Observable) { + if ($value instanceof Observable) { $disposable = $value->subscribe(function ($value) { return $this->emit($value); }); - $value = (yield $disposable); + yield Coroutine::result(yield $disposable); + return; + } + + if ($value instanceof Awaitable) { + $value = (yield $value); } } catch (\Throwable $exception) { if (!$this->resolved) { From cc431a0374a9a6ab919e0721d4110ae2c5b47ffe Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Tue, 31 May 2016 23:02:59 -0500 Subject: [PATCH 10/17] Prevent emit without subscribers --- lib/Emitter.php | 1 + lib/Internal/PrivateObservable.php | 2 ++ lib/Internal/Producer.php | 46 +++++++++++++++++++++++++----- lib/Postponed.php | 1 + 4 files changed, 43 insertions(+), 7 deletions(-) diff --git a/lib/Emitter.php b/lib/Emitter.php index 34f07b1..84b5ec4 100644 --- a/lib/Emitter.php +++ b/lib/Emitter.php @@ -7,6 +7,7 @@ namespace Amp; */ final class Emitter implements Observable { use Internal\Producer { + init as __construct; emit as public; resolve as public; fail as public; diff --git a/lib/Internal/PrivateObservable.php b/lib/Internal/PrivateObservable.php index 9b6ba03..3fa51da 100644 --- a/lib/Internal/PrivateObservable.php +++ b/lib/Internal/PrivateObservable.php @@ -14,6 +14,8 @@ final class PrivateObservable implements Observable { * @param callable(callable $emit, callable $complete, callable $fail): void $emitter */ public function __construct(callable $emitter) { + $this->init(); + /** * Emits a value from the observable. * diff --git a/lib/Internal/Producer.php b/lib/Internal/Producer.php index b6eb382..f63ed1e 100644 --- a/lib/Internal/Producer.php +++ b/lib/Internal/Producer.php @@ -9,6 +9,7 @@ use Amp\Observable; use Amp\Subscriber; use Amp\Success; use Interop\Async\Awaitable; +use Interop\Async\Loop; trait Producer { use Placeholder { @@ -20,6 +21,11 @@ trait Producer { */ private $subscribers = []; + /** + * @var \Amp\Future|null + */ + private $waiting; + /** * @var \Amp\Future[] */ @@ -35,18 +41,23 @@ trait Producer { */ private $dispose; + /** + * Initializes the trait. Use as constructor or call within using class constructor. + */ + public function init() + { + $this->waiting = new Future; + $this->dispose = function ($id, $exception = null) { + $this->dispose($id, $exception); + }; + } + /** * @param callable $onNext * * @return \Amp\Subscriber */ public function subscribe(callable $onNext) { - if ($this->dispose === null) { - $this->dispose = function ($id, $exception = null) { - $this->dispose($id, $exception); - }; - } - if ($this->result !== null) { return new Subscriber( $this->nextId++, @@ -59,6 +70,12 @@ trait Producer { $this->futures[$id] = new Future; $this->subscribers[$id] = $onNext; + if ($this->waiting !== null) { + $waiting = $this->waiting; + $this->waiting = null; + $waiting->resolve(); + } + return new Subscriber($id, $this->futures[$id], $this->dispose); } @@ -73,7 +90,12 @@ trait Producer { $future = $this->futures[$id]; unset($this->subscribers[$id], $this->futures[$id]); - $future->fail($exception ?: new DisposedException()); + + if (empty($this->subscribers)) { + $this->waiting = new Future; + } + + $future->fail($exception ?: new DisposedException); } /** @@ -105,6 +127,10 @@ trait Producer { * @throws \Throwable|\Exception */ private function push($value) { + while ($this->waiting !== null) { + yield $this->waiting; + } + try { if ($value instanceof Observable) { $disposable = $value->subscribe(function ($value) { @@ -168,6 +194,12 @@ trait Producer { $futures = $this->futures; $this->subscribers = $this->futures = []; + if ($this->waiting !== null) { + $waiting = $this->waiting; + $this->waiting = null; + $waiting->resolve(); + } + $this->complete($value); foreach ($futures as $future) { diff --git a/lib/Postponed.php b/lib/Postponed.php index 391d4ba..2744915 100644 --- a/lib/Postponed.php +++ b/lib/Postponed.php @@ -7,6 +7,7 @@ try { production: // PHP 7 production environment (zend.assertions=0) final class Postponed implements Observable { use Internal\Producer { + init as __construct; emit as public; resolve as public; fail as public; From c4e9a190952d1bdc7c988055f8e9d049a02411b6 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Wed, 1 Jun 2016 11:19:19 -0500 Subject: [PATCH 11/17] Rename Emitter to Producer; add new Emitter class Emitter uses a coroutine to emit values. Updated examples. --- example/emitter.php | 34 +++++++++++++---------------- example/postponed.php | 51 +++++++++++++++++++++++++++++++++++++++++++ lib/Emitter.php | 40 ++++++++++++++++++++++++++------- lib/Producer.php | 15 +++++++++++++ 4 files changed, 113 insertions(+), 27 deletions(-) create mode 100644 example/postponed.php create mode 100644 lib/Producer.php diff --git a/example/emitter.php b/example/emitter.php index b566f1a..09f7337 100644 --- a/example/emitter.php +++ b/example/emitter.php @@ -3,34 +3,30 @@ require dirname(__DIR__) . '/vendor/autoload.php'; +use Amp\Coroutine; +use Amp\Emitter; use Amp\Observable; use Amp\Observer; use Amp\Pause; -use Amp\Postponed; use Amp\Loop\NativeLoop; use Interop\Async\Loop; Loop::execute(Amp\coroutine(function () { try { - $postponed = new Postponed; - - Loop::defer(function () use ($postponed) { - // Observer emits all values at once. - $postponed->emit(1); - $postponed->emit(2); - $postponed->emit(3); - $postponed->emit(4); - $postponed->emit(5); - $postponed->emit(6); - $postponed->emit(7); - $postponed->emit(8); - $postponed->emit(9); - $postponed->emit(10); - $postponed->resolve(11); + $emitter = new Emitter(function (callable $emit) { + yield $emit(1); + yield $emit(new Pause(500, 2)); + yield $emit(3); + yield $emit(new Pause(300, 4)); + yield $emit(5); + yield $emit(6); + yield $emit(new Pause(1000, 7)); + yield $emit(8); + yield $emit(9); + yield $emit(new Pause(600, 10)); + yield Coroutine::result(11); }); - $observable = $postponed->getObservable(); - $generator = function (Observable $observable) { $observer = new Observer($observable); @@ -42,7 +38,7 @@ Loop::execute(Amp\coroutine(function () { printf("Observable result %d\n", $observer->getResult()); }; - yield new \Amp\Coroutine($generator($observable)); + yield new Coroutine($generator($emitter)); } catch (\Exception $exception) { printf("Exception: %s\n", $exception); diff --git a/example/postponed.php b/example/postponed.php new file mode 100644 index 0000000..5677304 --- /dev/null +++ b/example/postponed.php @@ -0,0 +1,51 @@ +#!/usr/bin/env php +emit(1); + $postponed->emit(2); + $postponed->emit(3); + $postponed->emit(4); + $postponed->emit(5); + $postponed->emit(6); + $postponed->emit(7); + $postponed->emit(8); + $postponed->emit(9); + $postponed->emit(10); + $postponed->resolve(11); + }); + + $observable = $postponed->getObservable(); + + $generator = function (Observable $observable) { + $observer = new Observer($observable); + + while (yield $observer->next()) { + printf("Observable emitted %d\n", $observer->getCurrent()); + yield new Pause(100); // Observer consumption takes 100 ms. + } + + printf("Observable result %d\n", $observer->getResult()); + }; + + yield new Coroutine($generator($observable)); + + } catch (\Exception $exception) { + printf("Exception: %s\n", $exception); + } +}), $loop = new NativeLoop()); diff --git a/lib/Emitter.php b/lib/Emitter.php index 84b5ec4..8ff086d 100644 --- a/lib/Emitter.php +++ b/lib/Emitter.php @@ -2,14 +2,38 @@ namespace Amp; -/** - * Observable implementation that should not be returned from a public API, but used only internally. - */ final class Emitter implements Observable { - use Internal\Producer { - init as __construct; - emit as public; - resolve as public; - fail as public; + use Internal\Producer; + + /** + * @param callable(callable $emit): \Generator $emitter + */ + public function __construct(callable $emitter) { + $this->init(); + + /** + * @param mixed $value + * + * @return \Interop\Async\Awaitable + */ + $emit = function ($value = null) { + return $this->emit($value); + }; + + $result = $emitter($emit); + + if (!$result instanceof \Generator) { + throw new \LogicException("The callable did not return a Generator"); + } + + $coroutine = new Coroutine($result); + $coroutine->when(function ($exception, $value) { + if ($exception) { + $this->fail($exception); + return; + } + + $this->resolve($value); + }); } } diff --git a/lib/Producer.php b/lib/Producer.php new file mode 100644 index 0000000..b15c93b --- /dev/null +++ b/lib/Producer.php @@ -0,0 +1,15 @@ + Date: Wed, 1 Jun 2016 11:37:12 -0500 Subject: [PATCH 12/17] Update docblocks --- lib/Disposable.php | 3 +++ lib/Internal/PrivateObservable.php | 2 ++ lib/Internal/Producer.php | 6 ++++++ lib/Observable.php | 7 ++++++- lib/Observer.php | 10 ++++++++++ lib/Subscriber.php | 3 +++ 6 files changed, 30 insertions(+), 1 deletion(-) diff --git a/lib/Disposable.php b/lib/Disposable.php index 073e40c..131096e 100644 --- a/lib/Disposable.php +++ b/lib/Disposable.php @@ -4,6 +4,9 @@ namespace Amp; use Interop\Async\Awaitable; +/** + * Objects returned from \Amp\Observable::subscribe() implement this interface. + */ interface Disposable extends Awaitable { /** * Disposes of the subscriber, failing with an instance of \Amp\DisposedException diff --git a/lib/Internal/PrivateObservable.php b/lib/Internal/PrivateObservable.php index 3fa51da..3bcf5cc 100644 --- a/lib/Internal/PrivateObservable.php +++ b/lib/Internal/PrivateObservable.php @@ -6,6 +6,8 @@ use Amp\Observable; /** * An observable that cannot externally emit values. Used by Postponed in development mode. + * + * @internal */ final class PrivateObservable implements Observable { use Producer; diff --git a/lib/Internal/Producer.php b/lib/Internal/Producer.php index f63ed1e..92ea902 100644 --- a/lib/Internal/Producer.php +++ b/lib/Internal/Producer.php @@ -11,6 +11,12 @@ use Amp\Success; use Interop\Async\Awaitable; use Interop\Async\Loop; +/** + * Trait used by Observable implementations. Do not use this trait in your code, instead compose your class from one of + * the available classes implementing \Amp\Observable. + * + * @internal + */ trait Producer { use Placeholder { resolve as complete; diff --git a/lib/Observable.php b/lib/Observable.php index 1902f54..8f216cd 100644 --- a/lib/Observable.php +++ b/lib/Observable.php @@ -4,10 +4,15 @@ namespace Amp; use Interop\Async\Awaitable; +/** + * Represents a set of asynchronous values. An observable is analogous to an asynchronous generator, yielding (emitting) + * values when they are available, returning a value (success value) when the observable completes or throwing an + * exception (failure reason). + */ interface Observable extends Awaitable { /** * Registers a callback to be invoked each time value is emitted from the observable. If the function returns an - * awaitable, backpressure is applied to the awaitable until the returned awaitable is resolved. + * awaitable, back-pressure is applied to the awaitable until the returned awaitable is resolved. * * Exceptions thrown from $onNext (or failures of awaitables returned from $onNext) will fail the returned * Disposable with the thrown exception. diff --git a/lib/Observer.php b/lib/Observer.php index e3520cf..c0955a8 100644 --- a/lib/Observer.php +++ b/lib/Observer.php @@ -2,6 +2,16 @@ namespace Amp; +/** + * Asynchronous iterator that can be used within a coroutine to iterate over the emitted values from an Observable. + * + * Example: + * $observer = new Observer($observable); // $observable is an instance of \Amp\Observable + * while (yield $observer->next()) { + * $emitted = $observer->getCurrent(); + * } + * $result = $observer->getResult(); + */ final class Observer { /** * @var \Amp\Disposable diff --git a/lib/Subscriber.php b/lib/Subscriber.php index 6cc0e27..41711c1 100644 --- a/lib/Subscriber.php +++ b/lib/Subscriber.php @@ -4,6 +4,9 @@ namespace Amp; use Interop\Async\Awaitable; +/** + * Disposable implementation returned from implementors of \Amp\Observable. + */ final class Subscriber implements Disposable { /** * @var string From cef5c9016851ac75e8b29042e2fdb9529f6a7dae Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Thu, 2 Jun 2016 10:35:41 -0500 Subject: [PATCH 13/17] Drop disposable interface; rename dispose() to unsubscribe() --- example/backpressure.php | 4 ++-- lib/Disposable.php | 15 --------------- lib/DisposedException.php | 5 ----- lib/Internal/Producer.php | 30 +++++++++++++++--------------- lib/Observable.php | 4 ++-- lib/Observer.php | 4 ++-- lib/Subscriber.php | 16 ++++++++-------- lib/UnsubscribedException.php | 5 +++++ 8 files changed, 34 insertions(+), 49 deletions(-) delete mode 100644 lib/Disposable.php delete mode 100644 lib/DisposedException.php create mode 100644 lib/UnsubscribedException.php diff --git a/example/backpressure.php b/example/backpressure.php index ca47c72..0f9cb54 100644 --- a/example/backpressure.php +++ b/example/backpressure.php @@ -15,12 +15,12 @@ Loop::execute(Amp\coroutine(function () { $observable = $postponed->getObservable(); - $disposable = $observable->subscribe(function ($value) { + $subscriber = $observable->subscribe(function ($value) { printf("Observable emitted %d\n", $value); return new Pause(500); // Artificial back-pressure on observable. }); - $disposable->when(function ($exception, $value) { + $subscriber->when(function ($exception, $value) { if ($exception) { printf("Observable failed: %s\n", $exception->getMessage()); return; diff --git a/lib/Disposable.php b/lib/Disposable.php deleted file mode 100644 index 131096e..0000000 --- a/lib/Disposable.php +++ /dev/null @@ -1,15 +0,0 @@ -waiting = new Future; - $this->dispose = function ($id, $exception = null) { - $this->dispose($id, $exception); + $this->unsubscribe = function ($id, $exception = null) { + $this->unsubscribe($id, $exception); }; } @@ -68,7 +68,7 @@ trait Producer { return new Subscriber( $this->nextId++, $this->result instanceof Awaitable ? $this->result : new Success($this->result), - $this->dispose + $this->unsubscribe ); } @@ -82,16 +82,16 @@ trait Producer { $waiting->resolve(); } - return new Subscriber($id, $this->futures[$id], $this->dispose); + return new Subscriber($id, $this->futures[$id], $this->unsubscribe); } /** * @param string $id * @param \Throwable|\Exception|null $exception */ - protected function dispose($id, $exception = null) { + protected function unsubscribe($id, $exception = null) { if (!isset($this->futures[$id])) { - throw new \LogicException("Disposable has already been disposed"); + return; } $future = $this->futures[$id]; @@ -101,7 +101,7 @@ trait Producer { $this->waiting = new Future; } - $future->fail($exception ?: new DisposedException); + $future->fail($exception ?: new UnsubscribedException); } /** @@ -139,10 +139,10 @@ trait Producer { try { if ($value instanceof Observable) { - $disposable = $value->subscribe(function ($value) { + $subscriber = $value->subscribe(function ($value) { return $this->emit($value); }); - yield Coroutine::result(yield $disposable); + yield Coroutine::result(yield $subscriber); return; } @@ -170,9 +170,9 @@ trait Producer { $awaitables[$id] = $result; } } catch (\Throwable $exception) { - $this->dispose($id, $exception); + $this->unsubscribe($id, $exception); } catch (\Exception $exception) { - $this->dispose($id, $exception); + $this->unsubscribe($id, $exception); } } @@ -180,9 +180,9 @@ trait Producer { try { yield $awaitable; } catch (\Throwable $exception) { - $this->dispose($id, $exception); + $this->unsubscribe($id, $exception); } catch (\Exception $exception) { - $this->dispose($id, $exception); + $this->unsubscribe($id, $exception); } } diff --git a/lib/Observable.php b/lib/Observable.php index 8f216cd..39cd16a 100644 --- a/lib/Observable.php +++ b/lib/Observable.php @@ -15,11 +15,11 @@ interface Observable extends Awaitable { * awaitable, back-pressure is applied to the awaitable until the returned awaitable is resolved. * * Exceptions thrown from $onNext (or failures of awaitables returned from $onNext) will fail the returned - * Disposable with the thrown exception. + * Subscriber with the thrown exception. * * @param callable $onNext Function invoked each time a value is emitted from the observable. * - * @return \Amp\Disposable + * @return \Amp\Subscriber */ public function subscribe(callable $onNext); } diff --git a/lib/Observer.php b/lib/Observer.php index c0955a8..68130fc 100644 --- a/lib/Observer.php +++ b/lib/Observer.php @@ -14,7 +14,7 @@ namespace Amp; */ final class Observer { /** - * @var \Amp\Disposable + * @var \Amp\Subscriber */ private $subscriber; @@ -102,7 +102,7 @@ final class Observer { */ public function __destruct() { if (!$this->resolved) { - $this->subscriber->dispose(); + $this->subscriber->unsubscribe(); } foreach ($this->futures as $future) { diff --git a/lib/Subscriber.php b/lib/Subscriber.php index 41711c1..4d6d397 100644 --- a/lib/Subscriber.php +++ b/lib/Subscriber.php @@ -7,7 +7,7 @@ use Interop\Async\Awaitable; /** * Disposable implementation returned from implementors of \Amp\Observable. */ -final class Subscriber implements Disposable { +final class Subscriber implements Awaitable { /** * @var string */ @@ -21,17 +21,17 @@ final class Subscriber implements Disposable { /** * @var callable */ - private $dispose; + private $unsubscribe; /** * @param string $id * @param \Interop\Async\Awaitable $awaitable - * @param callable $dispose + * @param callable $unsubscribe */ - public function __construct($id, Awaitable $awaitable, callable $dispose) { + public function __construct($id, Awaitable $awaitable, callable $unsubscribe) { $this->id = $id; $this->awaitable = $awaitable; - $this->dispose = $dispose; + $this->unsubscribe = $unsubscribe; } /** @@ -44,8 +44,8 @@ final class Subscriber implements Disposable { /** * {@inheritdoc} */ - public function dispose() { - $dispose = $this->dispose; - $dispose($this->id); + public function unsubscribe() { + $unsubscribe = $this->unsubscribe; + $unsubscribe($this->id); } } diff --git a/lib/UnsubscribedException.php b/lib/UnsubscribedException.php new file mode 100644 index 0000000..e305cbf --- /dev/null +++ b/lib/UnsubscribedException.php @@ -0,0 +1,5 @@ + Date: Thu, 2 Jun 2016 10:43:46 -0500 Subject: [PATCH 14/17] Refactor functions with Emitter --- lib/functions.php | 45 +++++++++++++++------------------------------ 1 file changed, 15 insertions(+), 30 deletions(-) diff --git a/lib/functions.php b/lib/functions.php index fef6721..9d2d00c 100644 --- a/lib/functions.php +++ b/lib/functions.php @@ -20,24 +20,23 @@ function merge(array $observables) { } } - $postponed = new Postponed; + return new Emitter(function (callable $emit) use ($observables) { + $subscriptions = []; - $subscriptions = []; - - foreach ($observables as $observable) { - $subscriptions[] = $observable->subscribe([$postponed, 'emit']); - } - - all($subscriptions)->when(function ($exception, $value) use ($postponed) { - if ($exception) { - $postponed->fail($exception); - return; + foreach ($observables as $observable) { + $subscriptions[] = $observable->subscribe($emit); } - $postponed->resolve($value); - }); + try { + $result = (yield all($subscriptions)); + } finally { + foreach ($subscriptions as $subscription) { + $subscription->unsubscribe(); + } + } - return $postponed->getObservable(); + yield Coroutine::result($result); + }); } /** @@ -90,23 +89,9 @@ function range($start, $end, $step = 1) { throw new \InvalidArgumentException("Step is not of the correct sign"); } - $postponed = new Postponed; - - $generator = function (Postponed $postponed, $start, $end, $step) { + return new Emitter(function (callable $emit) use ($start, $end, $step) { for ($i = $start; $i <= $end; $i += $step) { - yield $postponed->emit($i); + yield $emit($i); } - }; - - $coroutine = new Coroutine($generator($postponed, $start, $end, $step)); - $coroutine->when(function ($exception) use ($postponed) { - if ($exception) { - $postponed->fail($exception); - return; - } - - $postponed->resolve(); }); - - return $postponed->getObservable(); } From 548a9fd556ecb55a41dc23b9d3e5c29d7bd7bf92 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Thu, 2 Jun 2016 17:05:22 -0500 Subject: [PATCH 15/17] Fix bug when emitting failed awaitable and waiting for a subscriber --- lib/Internal/Producer.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/Internal/Producer.php b/lib/Internal/Producer.php index 425b9fb..5176824 100644 --- a/lib/Internal/Producer.php +++ b/lib/Internal/Producer.php @@ -73,7 +73,7 @@ trait Producer { } $id = $this->nextId++; - $this->futures[$id] = new Future; + $this->futures[$id] = $future = new Future; $this->subscribers[$id] = $onNext; if ($this->waiting !== null) { @@ -82,7 +82,7 @@ trait Producer { $waiting->resolve(); } - return new Subscriber($id, $this->futures[$id], $this->unsubscribe); + return new Subscriber($id, $future, $this->unsubscribe); } /** From 808ce32e3c94e8b83c2216a571adebe89b4ca811 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Thu, 2 Jun 2016 17:11:25 -0500 Subject: [PATCH 16/17] Fix leftover mentions of Disposable --- lib/Observer.php | 2 +- lib/Subscriber.php | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/Observer.php b/lib/Observer.php index 68130fc..02345bf 100644 --- a/lib/Observer.php +++ b/lib/Observer.php @@ -98,7 +98,7 @@ final class Observer { } /** - * Disposes of the subscriber. + * Unsubscribes the internal subscriber from the observable. */ public function __destruct() { if (!$this->resolved) { diff --git a/lib/Subscriber.php b/lib/Subscriber.php index 4d6d397..5888a48 100644 --- a/lib/Subscriber.php +++ b/lib/Subscriber.php @@ -5,7 +5,7 @@ namespace Amp; use Interop\Async\Awaitable; /** - * Disposable implementation returned from implementors of \Amp\Observable. + * Subscriber implementation returned from implementors of \Amp\Observable. */ final class Subscriber implements Awaitable { /** From b9d554dd7b1c0e21d5056ed7c5c8f20dfed534a0 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Mon, 18 Jul 2016 23:23:25 -0500 Subject: [PATCH 17/17] Add more functions --- lib/functions.php | 87 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) diff --git a/lib/functions.php b/lib/functions.php index 9d2d00c..af0fc59 100644 --- a/lib/functions.php +++ b/lib/functions.php @@ -2,8 +2,48 @@ namespace Amp; +use Interop\Async\Awaitable; use Interop\Async\Loop; +/** + * @param \Amp\Observable $observable + * @param callable(mixed $value): mixed $onNext + * @param callable(mixed $value): mixed|null $onComplete + * + * @return \Amp\Observable + */ +function each(Observable $observable, callable $onNext, callable $onComplete = null) { + return new Emitter(function (callable $emit) use ($observable, $onNext, $onComplete) { + $result = (yield $observable->subscribe(function ($value) use ($emit, $onNext) { + return $emit($onNext($value)); + })); + + if ($onComplete === null) { + yield Coroutine::result($result); + return; + } + + yield Coroutine::result($onComplete($result)); + }); +} + +/** + * @param \Amp\Observable $observable + * @param callable(mixed $value): bool $filter + * + * @return \Amp\Observable + */ +function filter(Observable $observable, callable $filter) { + return new Emitter(function (callable $emit) use ($observable, $filter) { + yield Coroutine::result(yield $observable->subscribe(function ($value) use ($emit, $filter) { + if (!$filter($value)) { + return null; + } + return $emit($value); + })); + }); +} + /** * Creates an observable that emits values emitted from any observable in the array of observables. Values in the * array are passed through the from() function, so they may be observables, arrays of values to emit, awaitables, @@ -39,6 +79,53 @@ function merge(array $observables) { }); } + +/** + * Creates an observable from the given array of observables, emitting the success value of each provided awaitable or + * failing if any awaitable fails. + * + * @param \Interop\Async\Awaitable[] $awaitables + * + * @return \Amp\Observable + */ +function stream(array $awaitables) { + $postponed = new Postponed; + + if (empty($awaitables)) { + $postponed->complete(); + return $postponed; + } + + $pending = \count($awaitables); + $onResolved = function ($exception, $value) use (&$pending, $postponed) { + if ($pending <= 0) { + return; + } + + if ($exception) { + $pending = 0; + $postponed->fail($exception); + return; + } + + $postponed->emit($value); + + if (--$pending === 0) { + $postponed->complete(); + } + }; + + foreach ($awaitables as $awaitable) { + if (!$awaitable instanceof Awaitable) { + throw new \InvalidArgumentException("Non-awaitable provided"); + } + + $awaitable->when($onResolved); + } + + return $postponed; +} + /** * Returns an observable that emits a value every $interval milliseconds after the previous value has been consumed * (up to $count times (or indefinitely if $count is 0). The value emitted is an integer of the number of times the