diff --git a/lib/Emitter.php b/lib/Emitter.php index 34f07b1..84b5ec4 100644 --- a/lib/Emitter.php +++ b/lib/Emitter.php @@ -7,6 +7,7 @@ namespace Amp; */ final class Emitter implements Observable { use Internal\Producer { + init as __construct; emit as public; resolve as public; fail as public; diff --git a/lib/Internal/PrivateObservable.php b/lib/Internal/PrivateObservable.php index 9b6ba03..3fa51da 100644 --- a/lib/Internal/PrivateObservable.php +++ b/lib/Internal/PrivateObservable.php @@ -14,6 +14,8 @@ final class PrivateObservable implements Observable { * @param callable(callable $emit, callable $complete, callable $fail): void $emitter */ public function __construct(callable $emitter) { + $this->init(); + /** * Emits a value from the observable. * diff --git a/lib/Internal/Producer.php b/lib/Internal/Producer.php index b6eb382..f63ed1e 100644 --- a/lib/Internal/Producer.php +++ b/lib/Internal/Producer.php @@ -9,6 +9,7 @@ use Amp\Observable; use Amp\Subscriber; use Amp\Success; use Interop\Async\Awaitable; +use Interop\Async\Loop; trait Producer { use Placeholder { @@ -20,6 +21,11 @@ trait Producer { */ private $subscribers = []; + /** + * @var \Amp\Future|null + */ + private $waiting; + /** * @var \Amp\Future[] */ @@ -35,18 +41,23 @@ trait Producer { */ private $dispose; + /** + * Initializes the trait. Use as constructor or call within using class constructor. + */ + public function init() + { + $this->waiting = new Future; + $this->dispose = function ($id, $exception = null) { + $this->dispose($id, $exception); + }; + } + /** * @param callable $onNext * * @return \Amp\Subscriber */ public function subscribe(callable $onNext) { - if ($this->dispose === null) { - $this->dispose = function ($id, $exception = null) { - $this->dispose($id, $exception); - }; - } - if ($this->result !== null) { return new Subscriber( $this->nextId++, @@ -59,6 +70,12 @@ trait Producer { $this->futures[$id] = new Future; $this->subscribers[$id] = $onNext; + if ($this->waiting !== null) { + $waiting = $this->waiting; + $this->waiting = null; + $waiting->resolve(); + } + return new Subscriber($id, $this->futures[$id], $this->dispose); } @@ -73,7 +90,12 @@ trait Producer { $future = $this->futures[$id]; unset($this->subscribers[$id], $this->futures[$id]); - $future->fail($exception ?: new DisposedException()); + + if (empty($this->subscribers)) { + $this->waiting = new Future; + } + + $future->fail($exception ?: new DisposedException); } /** @@ -105,6 +127,10 @@ trait Producer { * @throws \Throwable|\Exception */ private function push($value) { + while ($this->waiting !== null) { + yield $this->waiting; + } + try { if ($value instanceof Observable) { $disposable = $value->subscribe(function ($value) { @@ -168,6 +194,12 @@ trait Producer { $futures = $this->futures; $this->subscribers = $this->futures = []; + if ($this->waiting !== null) { + $waiting = $this->waiting; + $this->waiting = null; + $waiting->resolve(); + } + $this->complete($value); foreach ($futures as $future) { diff --git a/lib/Postponed.php b/lib/Postponed.php index 391d4ba..2744915 100644 --- a/lib/Postponed.php +++ b/lib/Postponed.php @@ -7,6 +7,7 @@ try { production: // PHP 7 production environment (zend.assertions=0) final class Postponed implements Observable { use Internal\Producer { + init as __construct; emit as public; resolve as public; fail as public;