1
0
mirror of https://github.com/danog/amp.git synced 2024-11-26 20:15:00 +01:00

Subscriber model

This commit is contained in:
Aaron Piotrowski 2016-05-27 15:44:01 -05:00
parent 99eecc1a3f
commit 4a2baa670e
13 changed files with 361 additions and 273 deletions

View File

@ -4,7 +4,6 @@
require dirname(__DIR__) . '/vendor/autoload.php';
use Amp\Coroutine;
use Amp\Observable;
use Amp\Pause;
use Amp\Postponed;
use Amp\Loop\NativeLoop;
@ -12,10 +11,24 @@ use Interop\Async\Loop;
Loop::execute(Amp\coroutine(function () {
try {
$coroutines = [];
$postponed = new Postponed;
$observable = $postponed->getObservable();
$disposable = $observable->subscribe(function ($value) {
printf("Observable emitted %d\n", $value);
return new Pause(500); // Artificial back-pressure on observable.
});
$disposable->when(function ($exception, $value) {
if ($exception) {
printf("Observable failed: %s\n", $exception->getMessage());
return;
}
printf("Observable result %d\n", $value);
});
$generator = function (Postponed $postponed) {
yield $postponed->emit(new Pause(500, 1));
yield $postponed->emit(new Pause(1500, 2));
@ -27,26 +40,10 @@ Loop::execute(Amp\coroutine(function () {
yield $postponed->emit(new Pause(2000, 8));
yield $postponed->emit(9);
yield $postponed->emit(10);
yield $postponed->complete(11);
$postponed->complete(11);
};
$coroutines[] = new Coroutine($generator($postponed));
$generator = function (Observable $observable) {
$observer = $observable->getObserver();
while (yield $observer->isValid()) {
printf("Observable emitted %d\n", $observer->getCurrent());
yield new Pause(500); // Artificial back-pressure on observer.
}
printf("Observable result %d\n", $observer->getReturn());
};
$coroutines[] = new Coroutine($generator($postponed->getObservable()));
yield Amp\all($coroutines);
yield new Coroutine($generator($postponed));
} catch (\Exception $exception) {
printf("Exception: %s\n", $exception);

View File

@ -3,43 +3,45 @@
require dirname(__DIR__) . '/vendor/autoload.php';
use Amp\Coroutine;
use Amp\Observable;
use Amp\Pause;
use Amp\Postponed;
use Amp\Loop\NativeLoop;
use Interop\Async\Loop;
Loop::execute(Amp\coroutine(function () {
Loop::execute(function () {
try {
$postponed = new Postponed;
$postponed->emit(new Pause(500, 1));
$postponed->emit(new Pause(1500, 2));
$postponed->emit(new Pause(1000, 3));
$postponed->emit(new Pause(2000, 4));
$postponed->emit(5);
$postponed->emit(6);
$postponed->emit(7);
$postponed->emit(new Pause(2000, 8));
$postponed->emit(9);
$postponed->emit(10);
$postponed->complete(11);
Loop::defer(function () use ($postponed) {
$postponed->emit(new Pause(500, 1));
$postponed->emit(new Pause(1500, 2));
$postponed->emit(new Pause(1000, 3));
$postponed->emit(new Pause(2000, 4));
$postponed->emit(5);
$postponed->emit(6);
$postponed->emit(7);
$postponed->emit(new Pause(2000, 8));
$postponed->emit(9);
$postponed->emit(10);
$postponed->complete(11);
});
$generator = function (Observable $observable) {
$observer = $observable->getObserver();
$observable = $postponed->getObservable();
while (yield $observer->isValid()) {
printf("Observable emitted %d\n", $observer->getCurrent());
yield new Pause(500); // Artificial back-pressure on observer.
$disposable = $observable->subscribe(function ($value) {
printf("Observable emitted %d\n", $value);
return new Pause(500); // Artificial back-pressure on observable, but is ignored.
});
$disposable->when(function ($exception, $value) {
if ($exception) {
printf("Exception: %s\n", $exception->getMessage());
return;
}
printf("Observable result %d\n", $observer->getReturn());
};
yield new Coroutine($generator($postponed->getObservable()));
printf("Observable result %d\n", $value);
});
} catch (\Exception $exception) {
printf("Exception: %s\n", $exception);
}
}), $loop = new NativeLoop());
}, $loop = new NativeLoop());

12
lib/Disposable.php Normal file
View File

@ -0,0 +1,12 @@
<?php
namespace Amp;
use Interop\Async\Awaitable;
interface Disposable extends Awaitable {
/**
* Disposes of the observable subscriber, failing with an instance of \Amp\DisposedException
*/
public function dispose();
}

View File

@ -0,0 +1,5 @@
<?php
namespace Amp;
class DisposedException extends \RuntimeException {}

View File

@ -3,14 +3,12 @@
namespace Amp\Internal;
use Amp\Future;
use Amp\Success;
use Interop\Async\Awaitable;
final class Emitted {
/**
* @var \Interop\Async\Awaitable
* @var mixed
*/
private $awaitable;
private $value;
/**
* @var int
@ -20,7 +18,7 @@ final class Emitted {
/**
* @var \Amp\Future
*/
private $future;
private $ready;
/**
* @var bool
@ -33,17 +31,21 @@ final class Emitted {
* @param bool $complete
*/
public function __construct($value, $waiting, $complete) {
$this->awaitable = $value instanceof Awaitable ? $value : new Success($value);
$this->value = $value;
$this->waiting = (int) $waiting;
$this->complete = (bool) $complete;
$this->future = new Future;
$this->ready = new Future;
if ($this->waiting === 0) {
$this->ready->resolve($this->value);
}
}
/**
* @return \Interop\Async\Awaitable|mixed
* @return mixed
*/
public function getAwaitable() {
return $this->awaitable;
public function getValue() {
return $this->value;
}
/**
@ -58,7 +60,7 @@ final class Emitted {
*/
public function ready() {
if (--$this->waiting === 0) {
$this->future->resolve($this->awaitable);
$this->ready->resolve($this->value);
}
}
@ -66,6 +68,6 @@ final class Emitted {
* @return \Interop\Async\Awaitable
*/
public function wait() {
return $this->future;
return $this->ready;
}
}

View File

@ -0,0 +1,130 @@
<?php
namespace Amp\Internal;
use Amp\CompletedException;
use Amp\Deferred;
use Amp\Future;
use Amp\IncompleteException;
use Amp\Observable;
use Amp\Success;
final class ObserverSubscriber {
/**
* @var \Amp\Disposable
*/
private $disposable;
/**
* @var \Amp\Deferred
*/
private $deferred;
/**
* @var \Amp\Future
*/
private $future;
/**
* @var bool
*/
private $complete = false;
/**
* @var mixed
*/
private $current;
/**
* @param \Amp\Observable $observable
*/
public function __construct(Observable $observable) {
$this->deferred = new Deferred;
$this->disposable = $observable->subscribe([$this, 'onNext']);
$this->disposable->when([$this, 'onComplete']);
}
/**
* @param mixed $value
*
* @return \Amp\Future
*/
public function onNext($value) {
$this->current = $value;
$this->future = new Future;
$this->deferred->resolve(true);
return $this->future;
}
/**
* @param \Throwable|\Exception|null $exception
* @param mixed $value
*/
public function onComplete($exception, $value) {
$this->complete = true;
if ($exception) {
$this->current = null;
$this->deferred->fail($exception);
return;
}
$this->current = $value;
$this->deferred->resolve(false);
}
/**
* @return \Interop\Async\Awaitable
*/
public function getAwaitable() {
if ($this->complete) {
return new Success(false);
}
if ($this->future !== null) {
$future = $this->future;
$this->future = null;
$this->deferred = new Deferred;
$future->resolve();
}
return $this->deferred->getAwaitable();
}
/**
* @return mixed
*
* @throws \Amp\CompletedException
*/
public function getCurrent() {
if ($this->future === null) {
throw new \LogicException("Awaitable returned from isValid() must resolve before calling this method");
}
if ($this->complete) {
throw new CompletedException("The observable has completed");
}
return $this->current;
}
/**
* @return mixed
*
* @throws \Amp\IncompleteException
*/
public function getReturn() {
if (!$this->complete) {
throw new IncompleteException("The observable has not completed");
}
return $this->current;
}
public function dispose() {
$this->disposable->dispose();
}
}

View File

@ -8,16 +8,12 @@ use Amp\Observable;
* An observable that cannot externally emit values. Used by Postponed in development mode.
*/
final class PrivateObservable implements Observable {
use Producer {
__construct as init;
}
use Producer;
/**
* @param callable(callable $emit, callable $complete, callable $fail): void $emitter
*/
public function __construct(callable $emitter) {
$this->init();
/**
* Emits a value from the observable.
*

View File

@ -5,7 +5,7 @@ namespace Amp\Internal;
use Amp\CompletedException;
use Amp\Coroutine;
use Amp\Failure;
use Amp\Future;
use Amp\Subscriber;
trait Producer {
/**
@ -13,11 +13,6 @@ trait Producer {
*/
private $subscriptions = [];
/**
* @var \Amp\Future|null
*/
private $waiting;
/**
* @var bool
*/
@ -33,43 +28,33 @@ trait Producer {
*/
private $unsubscribe;
public function __construct() {
$this->waiting = new Future;
$this->unsubscribe = function (Subscription $subscription) {
unset($this->subscriptions[\spl_object_hash($subscription)]);
if (empty($this->subscriptions) && !$this->complete) {
$this->waiting = new Future; // Wait for another subscriber.
}
};
}
/**
* @return \Amp\Observer
*
*/
public function getObserver() {
public function subscribe(callable $onNext) {
if ($this->unsubscribe === null) {
$this->unsubscribe = function (Subscription $subscription) {
unset($this->subscriptions[\spl_object_hash($subscription)]);
};
}
$subscription = new Subscription($this->unsubscribe);
$this->subscriptions[\spl_object_hash($subscription)] = $subscription;
if ($this->waiting !== null) {
$waiting = $this->waiting;
$this->waiting = null;
$waiting->resolve();
}
return new Subscriber($subscription);
return new Subscriber($onNext, $subscription);
}
/**
* {@inheritdoc}
* Emits a value from the observable. If the value is an awaitable, the success value will be emitted. If the
* awaitable fails, the observable will fail with the same exception. The returned awaitable is resolved with the
* emitted value once all subscribers have been invoked.
*
* @param mixed $value
*
* @return \Interop\Async\Awaitable
*/
protected function emit($value = null) {
if ($this->complete) {
if ($this->exception) {
throw $this->exception;
}
throw new CompletedException("The observable has completed");
}
@ -88,10 +73,6 @@ trait Producer {
* @throws \Throwable|\Exception
*/
private function push($value, $complete = false) {
if ($this->waiting !== null) {
yield $this->waiting; // Wait for at least one observer to be registered.
}
$emitted = new Emitted($value, \count($this->subscriptions), $complete);
foreach ($this->subscriptions as $subscription) {
@ -102,11 +83,9 @@ trait Producer {
$value = (yield $emitted->wait());
} catch (\Throwable $exception) {
$this->complete = true;
$this->exception = $exception;
throw $exception;
} catch (\Exception $exception) {
$this->complete = true;
$this->exception = $exception;
throw $exception;
}
@ -114,14 +93,16 @@ trait Producer {
}
/**
* {@inheritdoc}
* Completes the observable with the given value. If the value is an awaitable, the success value will be emitted.
* If the awaitable fails, the observable will fail with the same exception. The returned awaitable is resolved
* with the completion value once all subscribers have received all prior emitted values.
*
* @param mixed $value
*
* @return \Interop\Async\Awaitable
*/
protected function complete($value = null) {
if ($this->complete) {
if ($this->exception) {
throw $this->exception;
}
throw new CompletedException("The observable has completed");
}
@ -131,14 +112,15 @@ trait Producer {
}
/**
* {@inheritdoc}
* Fails the observable with the given exception. The returned awaitable fails with the given exception once all
* subscribers have been received all prior emitted values.
*
* @param \Throwable|\Exception $exception
*
* @return \Interop\Async\Awaitable
*/
protected function fail($exception) {
if ($this->complete) {
if ($this->exception) {
throw $this->exception;
}
throw new CompletedException("The observable has completed");
}

View File

@ -1,139 +0,0 @@
<?php
namespace Amp\Internal;
use Amp\CompletedException;
use Amp\Coroutine;
use Amp\IncompleteException;
use Amp\Observer;
final class Subscriber implements Observer {
/**
* @var \Amp\Internal\Subscription
*/
private $subscription;
/**
* @var \Amp\Internal\Emitted
*/
private $emitted;
/**
* @var mixed
*/
private $current;
/**
* @var \Interop\Async\Awaitable
*/
private $awaitable;
/**
* @var bool
*/
private $complete = false;
/**
* @var \Throwable|\Exception|null
*/
private $exception;
/**
* @param \Amp\Internal\Subscription $subscription
*/
public function __construct(Subscription $subscription) {
$this->subscription = $subscription;
}
/**
* Removes queue from collection.
*/
public function __destruct() {
if ($this->emitted !== null) {
$this->emitted->ready();
}
$this->subscription->unsubscribe();
}
/**
* {@inheritdoc}
*/
public function isValid() {
return new Coroutine($this->valid());
}
/**
* @coroutine
*
* @return \Generator
*
* @resolve bool
*
* @throws \Throwable|\Exception
*/
private function valid() {
while ($this->awaitable !== null) {
yield $this->awaitable; // Wait for previous calls to resolve.
}
if ($this->emitted !== null) {
$this->emitted->ready();
}
$this->emitted = (yield $this->subscription->pull());
try {
$this->current = (yield $this->awaitable = $this->emitted->getAwaitable());
} catch (\Throwable $exception) {
$this->exception = $exception;
throw $exception;
} catch (\Exception $exception) {
$this->exception = $exception;
throw $exception;
} finally {
$this->complete = $this->emitted->isComplete();
$this->awaitable = null;
}
yield Coroutine::result(!$this->complete);
}
/**
* {@inheritdoc}
*/
public function getCurrent() {
if ($this->emitted === null || $this->awaitable !== null) {
throw new \LogicException("isValid() must be called before calling this method");
}
if ($this->complete) {
if ($this->exception) {
throw $this->exception;
}
throw new CompletedException("The observable has completed and the iterator is invalid");
}
return $this->current;
}
/**
* {@inheritdoc}
*/
public function getReturn() {
if ($this->emitted === null || $this->awaitable !== null) {
throw new \LogicException("isValid() must be called before calling this method");
}
if (!$this->complete) {
throw new IncompleteException("The observable has not completed");
}
if ($this->exception) {
throw $this->exception;
}
return $this->current;
}
}

View File

@ -4,9 +4,15 @@ namespace Amp;
interface Observable {
/**
* Returns an observer of the observable.
* Registers a callback to be invoked each time value is emitted from the observable. If the function returns an
* awaitable, backpressure is applied to the awaitable until the returned awaitable is resolved.
*
* @return \Amp\Observer
* Exceptions thrown from $onNext (or failures of awaitables returned from $onNext) will fail the returned
* Disposable with the thrown exception.
*
* @param callable $onNext Function invoked each time a value is emitted from the observable.
*
* @return \Amp\Disposable
*/
public function getObserver();
public function subscribe(callable $onNext);
}

View File

@ -2,7 +2,26 @@
namespace Amp;
interface Observer {
final class Observer {
/**
* @var \Amp\Internal\ObserverSubscriber
*/
private $subscriber;
/**
* @param \Amp\Observable $observable
*/
public function __construct(Observable $observable) {
$this->subscriber = new Internal\ObserverSubscriber($observable);
}
/**
* Disposes of the subscription.
*/
public function __destruct() {
$this->subscriber->dispose();
}
/**
* Succeeds with true if a new value is available by calling getCurrent() or false if the observable has completed.
* Calling getCurrent() will throw an exception if the observable completed. If an error occurs with the observable,
@ -14,7 +33,9 @@ interface Observer {
*
* @throws \Throwable|\Exception Exception used to fail the observable.
*/
public function isValid();
public function isValid() {
return $this->subscriber->getAwaitable();
}
/**
* Gets the last emitted value or throws an exception if the observable has completed.
@ -23,9 +44,10 @@ interface Observer {
*
* @throws \Amp\CompletedException If the observable has successfully completed.
* @throws \LogicException If isValid() was not called before calling this method.
* @throws \Throwable|\Exception Exception used to fail the observable.
*/
public function getCurrent();
public function getCurrent() {
return $this->subscriber->getCurrent();
}
/**
* Gets the return value of the observable or throws the failure reason. Also throws an exception if the
@ -34,7 +56,8 @@ interface Observer {
* @return mixed Final return value of the observable.
*
* @throws \Amp\IncompleteException If the observable has not completed.
* @throws \Throwable|\Exception Exception used to fail the observable.
*/
public function getReturn();
public function getReturn() {
return $this->subscriber->getReturn();
}
}

83
lib/Subscriber.php Normal file
View File

@ -0,0 +1,83 @@
<?php
namespace Amp;
use Interop\Async\Awaitable;
final class Subscriber implements Disposable {
/**
* @var \Amp\Coroutine
*/
private $coroutine;
/**
* @var bool
*/
private $subscribed = true;
/**
* @param callable $onNext
* @param \Amp\Internal\Subscription $subscription
*/
public function __construct(callable $onNext, Internal\Subscription $subscription) {
$this->coroutine = new Coroutine($this->run($onNext, $subscription));
}
/**
* @coroutine
*
* @param callable $onNext
* @param \Amp\Internal\Subscription $subscription
*
* @return \Generator
*
* @throws \Throwable|\Exception
*/
private function run(callable $onNext, Internal\Subscription $subscription) {
try {
while ($this->subscribed) {
/** @var \Amp\Internal\Emitted $emitted */
$emitted = (yield $subscription->pull());
try {
$value = $emitted->getValue();
if ($value instanceof Awaitable) {
$value = (yield $value);
}
if ($emitted->isComplete()) {
yield Coroutine::result($value);
return;
}
$result = $onNext($value);
if ($result instanceof Awaitable) {
yield $result;
}
} finally {
$emitted->ready();
}
}
} finally {
$subscription->unsubscribe();
}
throw new DisposedException("The subscriber was disposed");
}
/**
* {@inheritdoc}
*/
public function when(callable $onResolved) {
$this->coroutine->when($onResolved);
}
/**
* {@inheritdoc}
*/
public function dispose() {
$this->subscribed = false;
}
}

View File

@ -22,24 +22,13 @@ function merge(array $observables) {
$postponed = new Postponed;
$generator = function (Observable $observable) use ($postponed) {
$observer = $observable->getObserver();
while (yield $observer->isValid()) {
yield $postponed->emit($observer->getCurrent());
}
yield Coroutine::result($observer->getReturn());
};
/** @var \Amp\Coroutine[] $coroutines */
$coroutines = [];
$subscriptions = [];
foreach ($observables as $observable) {
$coroutines[] = new Coroutine($generator($observable));
$subscriptions[] = $observable->subscribe([$postponed, 'emit']);
}
all($coroutines)->when(function ($exception, $value) use ($postponed) {
all($subscriptions)->when(function ($exception, $value) use ($postponed) {
if ($exception) {
$postponed->fail($exception);
return;