diff --git a/example/backpressure.php b/example/backpressure.php index 0f9cb54..ca77d62 100644 --- a/example/backpressure.php +++ b/example/backpressure.php @@ -15,12 +15,12 @@ Loop::execute(Amp\coroutine(function () { $observable = $postponed->getObservable(); - $subscriber = $observable->subscribe(function ($value) { + $observable->subscribe(function ($value) { printf("Observable emitted %d\n", $value); return new Pause(500); // Artificial back-pressure on observable. }); - $subscriber->when(function ($exception, $value) { + $observable->when(function ($exception, $value) { if ($exception) { printf("Observable failed: %s\n", $exception->getMessage()); return; diff --git a/lib/Internal/Producer.php b/lib/Internal/Producer.php index a311463..ffec347 100644 --- a/lib/Internal/Producer.php +++ b/lib/Internal/Producer.php @@ -67,13 +67,11 @@ trait Producer { if ($this->result !== null) { return new Subscriber( $this->nextId++, - $this->result instanceof Awaitable ? $this->result : new Success($this->result), $this->unsubscribe ); } $id = $this->nextId++; - $this->futures[$id] = $future = new Future; $this->subscribers[$id] = $onNext; if ($this->waiting !== null) { @@ -82,26 +80,23 @@ trait Producer { $waiting->resolve(); } - return new Subscriber($id, $future, $this->unsubscribe); + return new Subscriber($id, $this->unsubscribe); } /** * @param string $id * @param \Throwable|\Exception|null $exception */ - if (!isset($this->futures[$id])) { private function unsubscribe($id, $exception = null) { + if (!isset($this->subscribers[$id])) { return; } - $future = $this->futures[$id]; - unset($this->subscribers[$id], $this->futures[$id]); + unset($this->subscribers[$id]); if (empty($this->subscribers)) { $this->waiting = new Future; } - - $future->fail($exception ?: new UnsubscribedException); } /** @@ -196,9 +191,8 @@ trait Producer { * * @throws \LogicException If the observable has already been resolved. */ - $futures = $this->futures; - $this->subscribers = $this->futures = []; private function resolve($value = null) { + $this->subscribers = []; if ($this->waiting !== null) { $waiting = $this->waiting; @@ -207,9 +201,5 @@ trait Producer { } $this->complete($value); - - foreach ($futures as $future) { - $future->resolve($value); - } } } diff --git a/lib/Observer.php b/lib/Observer.php index e758914..8c524e6 100644 --- a/lib/Observer.php +++ b/lib/Observer.php @@ -78,7 +78,7 @@ class Observer { $result = &$this->result; $error = &$this->exception; - $this->subscriber->when(static function ($exception, $value) use (&$deferred, &$result, &$error, &$resolved) { + $observable->when(static function ($exception, $value) use (&$deferred, &$result, &$error, &$resolved) { $resolved = true; if ($exception) { diff --git a/lib/Subscriber.php b/lib/Subscriber.php index 1918240..824b72a 100644 --- a/lib/Subscriber.php +++ b/lib/Subscriber.php @@ -2,22 +2,15 @@ namespace Amp; -use Interop\Async\Awaitable; - /** * Subscriber implementation returned from implementors of \Amp\Observable. */ -class Subscriber implements Awaitable { +class Subscriber { /** * @var string */ private $id; - /** - * @var \Interop\Async\Awaitable - */ - private $awaitable; - /** * @var callable */ @@ -25,24 +18,15 @@ class Subscriber implements Awaitable { /** * @param string $id - * @param \Interop\Async\Awaitable $awaitable * @param callable $unsubscribe */ - public function __construct($id, Awaitable $awaitable, callable $unsubscribe) { + public function __construct($id, callable $unsubscribe) { $this->id = $id; - $this->awaitable = $awaitable; $this->unsubscribe = $unsubscribe; } /** - * {@inheritdoc} - */ - public function when(callable $onResolved) { - $this->awaitable->when($onResolved); - } - - /** - * {@inheritdoc} + * Unsubscribes from the Observable. No future values emitted by the Observable will be received. */ public function unsubscribe() { $unsubscribe = $this->unsubscribe;