1
0
mirror of https://github.com/danog/amp.git synced 2025-01-22 05:11:42 +01:00

Merge branch 'observable' into v2

This commit is contained in:
Aaron Piotrowski 2016-07-18 23:29:19 -05:00
commit dad93a6da0
13 changed files with 1015 additions and 0 deletions

51
example/backpressure.php Normal file
View File

@ -0,0 +1,51 @@
#!/usr/bin/env php
<?php
require dirname(__DIR__) . '/vendor/autoload.php';
use Amp\Coroutine;
use Amp\Pause;
use Amp\Postponed;
use Amp\Loop\NativeLoop;
use Interop\Async\Loop;
Loop::execute(Amp\coroutine(function () {
try {
$postponed = new Postponed;
$observable = $postponed->getObservable();
$subscriber = $observable->subscribe(function ($value) {
printf("Observable emitted %d\n", $value);
return new Pause(500); // Artificial back-pressure on observable.
});
$subscriber->when(function ($exception, $value) {
if ($exception) {
printf("Observable failed: %s\n", $exception->getMessage());
return;
}
printf("Observable result %d\n", $value);
});
$generator = function (Postponed $postponed) {
yield $postponed->emit(new Pause(500, 1));
yield $postponed->emit(new Pause(1500, 2));
yield $postponed->emit(new Pause(1000, 3));
yield $postponed->emit(new Pause(2000, 4));
yield $postponed->emit(5);
yield $postponed->emit(6);
yield $postponed->emit(7);
yield $postponed->emit(new Pause(2000, 8));
yield $postponed->emit(9);
yield $postponed->emit(10);
$postponed->resolve(11);
};
yield new Coroutine($generator($postponed));
} catch (\Exception $exception) {
printf("Exception: %s\n", $exception);
}
}), $loop = new NativeLoop());

46
example/emitter.php Normal file
View File

@ -0,0 +1,46 @@
#!/usr/bin/env php
<?php
require dirname(__DIR__) . '/vendor/autoload.php';
use Amp\Coroutine;
use Amp\Emitter;
use Amp\Observable;
use Amp\Observer;
use Amp\Pause;
use Amp\Loop\NativeLoop;
use Interop\Async\Loop;
Loop::execute(Amp\coroutine(function () {
try {
$emitter = new Emitter(function (callable $emit) {
yield $emit(1);
yield $emit(new Pause(500, 2));
yield $emit(3);
yield $emit(new Pause(300, 4));
yield $emit(5);
yield $emit(6);
yield $emit(new Pause(1000, 7));
yield $emit(8);
yield $emit(9);
yield $emit(new Pause(600, 10));
yield Coroutine::result(11);
});
$generator = function (Observable $observable) {
$observer = new Observer($observable);
while (yield $observer->next()) {
printf("Observable emitted %d\n", $observer->getCurrent());
yield new Pause(100); // Observer consumption takes 100 ms.
}
printf("Observable result %d\n", $observer->getResult());
};
yield new Coroutine($generator($emitter));
} catch (\Exception $exception) {
printf("Exception: %s\n", $exception);
}
}), $loop = new NativeLoop());

51
example/postponed.php Normal file
View File

@ -0,0 +1,51 @@
#!/usr/bin/env php
<?php
require dirname(__DIR__) . '/vendor/autoload.php';
use Amp\Coroutine;
use Amp\Observable;
use Amp\Observer;
use Amp\Pause;
use Amp\Postponed;
use Amp\Loop\NativeLoop;
use Interop\Async\Loop;
Loop::execute(Amp\coroutine(function () {
try {
$postponed = new Postponed;
Loop::defer(function () use ($postponed) {
// Observer emits all values at once.
$postponed->emit(1);
$postponed->emit(2);
$postponed->emit(3);
$postponed->emit(4);
$postponed->emit(5);
$postponed->emit(6);
$postponed->emit(7);
$postponed->emit(8);
$postponed->emit(9);
$postponed->emit(10);
$postponed->resolve(11);
});
$observable = $postponed->getObservable();
$generator = function (Observable $observable) {
$observer = new Observer($observable);
while (yield $observer->next()) {
printf("Observable emitted %d\n", $observer->getCurrent());
yield new Pause(100); // Observer consumption takes 100 ms.
}
printf("Observable result %d\n", $observer->getResult());
};
yield new Coroutine($generator($observable));
} catch (\Exception $exception) {
printf("Exception: %s\n", $exception);
}
}), $loop = new NativeLoop());

39
lib/Emitter.php Normal file
View File

@ -0,0 +1,39 @@
<?php
namespace Amp;
final class Emitter implements Observable {
use Internal\Producer;
/**
* @param callable(callable $emit): \Generator $emitter
*/
public function __construct(callable $emitter) {
$this->init();
/**
* @param mixed $value
*
* @return \Interop\Async\Awaitable
*/
$emit = function ($value = null) {
return $this->emit($value);
};
$result = $emitter($emit);
if (!$result instanceof \Generator) {
throw new \LogicException("The callable did not return a Generator");
}
$coroutine = new Coroutine($result);
$coroutine->when(function ($exception, $value) {
if ($exception) {
$this->fail($exception);
return;
}
$this->resolve($value);
});
}
}

View File

@ -0,0 +1,58 @@
<?php
namespace Amp\Internal;
use Amp\Observable;
/**
* An observable that cannot externally emit values. Used by Postponed in development mode.
*
* @internal
*/
final class PrivateObservable implements Observable {
use Producer;
/**
* @param callable(callable $emit, callable $complete, callable $fail): void $emitter
*/
public function __construct(callable $emitter) {
$this->init();
/**
* Emits a value from the observable.
*
* @param mixed $value
*
* @return \Interop\Async\Awaitable
*/
$emit = function ($value = null) {
return $this->emit($value);
};
/**
* Completes the observable with the given value.
*
* @param mixed $value
*/
$resolve = function ($value = null) {
$this->resolve($value);
};
/**
* Fails the observable with the given exception.
*
* @param \Exception $reason
*/
$fail = function ($reason) {
$this->fail($reason);
};
try {
$emitter($emit, $resolve, $fail);
} catch (\Throwable $exception) {
$this->fail($exception);
} catch (\Exception $exception) {
$this->fail($exception);
}
}
}

215
lib/Internal/Producer.php Normal file
View File

@ -0,0 +1,215 @@
<?php
namespace Amp\Internal;
use Amp\Coroutine;
use Amp\Future;
use Amp\Observable;
use Amp\Subscriber;
use Amp\Success;
use Amp\UnsubscribedException;
use Interop\Async\Awaitable;
use Interop\Async\Loop;
/**
* Trait used by Observable implementations. Do not use this trait in your code, instead compose your class from one of
* the available classes implementing \Amp\Observable.
*
* @internal
*/
trait Producer {
use Placeholder {
resolve as complete;
}
/**
* @var callable[]
*/
private $subscribers = [];
/**
* @var \Amp\Future|null
*/
private $waiting;
/**
* @var \Amp\Future[]
*/
private $futures = [];
/**
* @var string
*/
private $nextId = "a";
/**
* @var callable
*/
private $unsubscribe;
/**
* Initializes the trait. Use as constructor or call within using class constructor.
*/
public function init()
{
$this->waiting = new Future;
$this->unsubscribe = function ($id, $exception = null) {
$this->unsubscribe($id, $exception);
};
}
/**
* @param callable $onNext
*
* @return \Amp\Subscriber
*/
public function subscribe(callable $onNext) {
if ($this->result !== null) {
return new Subscriber(
$this->nextId++,
$this->result instanceof Awaitable ? $this->result : new Success($this->result),
$this->unsubscribe
);
}
$id = $this->nextId++;
$this->futures[$id] = $future = new Future;
$this->subscribers[$id] = $onNext;
if ($this->waiting !== null) {
$waiting = $this->waiting;
$this->waiting = null;
$waiting->resolve();
}
return new Subscriber($id, $future, $this->unsubscribe);
}
/**
* @param string $id
* @param \Throwable|\Exception|null $exception
*/
protected function unsubscribe($id, $exception = null) {
if (!isset($this->futures[$id])) {
return;
}
$future = $this->futures[$id];
unset($this->subscribers[$id], $this->futures[$id]);
if (empty($this->subscribers)) {
$this->waiting = new Future;
}
$future->fail($exception ?: new UnsubscribedException);
}
/**
* Emits a value from the observable. The returned awaitable is resolved with the emitted value once all subscribers
* have been invoked.
*
* @param mixed $value
*
* @return \Interop\Async\Awaitable
*
* @throws \LogicException If the observable has resolved.
*/
protected function emit($value = null) {
if ($this->resolved) {
throw new \LogicException("The observable has been resolved; cannot emit more values");
}
return new Coroutine($this->push($value));
}
/**
* @coroutine
*
* @param mixed $value
*
* @return \Generator
*
* @throws \InvalidArgumentException
* @throws \Throwable|\Exception
*/
private function push($value) {
while ($this->waiting !== null) {
yield $this->waiting;
}
try {
if ($value instanceof Observable) {
$subscriber = $value->subscribe(function ($value) {
return $this->emit($value);
});
yield Coroutine::result(yield $subscriber);
return;
}
if ($value instanceof Awaitable) {
$value = (yield $value);
}
} catch (\Throwable $exception) {
if (!$this->resolved) {
$this->fail($exception);
}
throw $exception;
} catch (\Exception $exception) {
if (!$this->resolved) {
$this->fail($exception);
}
throw $exception;
}
$awaitables = [];
foreach ($this->subscribers as $id => $onNext) {
try {
$result = $onNext($value);
if ($result instanceof Awaitable) {
$awaitables[$id] = $result;
}
} catch (\Throwable $exception) {
$this->unsubscribe($id, $exception);
} catch (\Exception $exception) {
$this->unsubscribe($id, $exception);
}
}
foreach ($awaitables as $id => $awaitable) {
try {
yield $awaitable;
} catch (\Throwable $exception) {
$this->unsubscribe($id, $exception);
} catch (\Exception $exception) {
$this->unsubscribe($id, $exception);
}
}
yield Coroutine::result($value);
}
/**
* Resolves the observable with the given value.
*
* @param mixed $value
*
* @throws \LogicException If the observable has already been resolved.
*/
protected function resolve($value = null) {
$futures = $this->futures;
$this->subscribers = $this->futures = [];
if ($this->waiting !== null) {
$waiting = $this->waiting;
$this->waiting = null;
$waiting->resolve();
}
$this->complete($value);
foreach ($futures as $future) {
$future->resolve($value);
}
}
}

25
lib/Observable.php Normal file
View File

@ -0,0 +1,25 @@
<?php
namespace Amp;
use Interop\Async\Awaitable;
/**
* Represents a set of asynchronous values. An observable is analogous to an asynchronous generator, yielding (emitting)
* values when they are available, returning a value (success value) when the observable completes or throwing an
* exception (failure reason).
*/
interface Observable extends Awaitable {
/**
* Registers a callback to be invoked each time value is emitted from the observable. If the function returns an
* awaitable, back-pressure is applied to the awaitable until the returned awaitable is resolved.
*
* Exceptions thrown from $onNext (or failures of awaitables returned from $onNext) will fail the returned
* Subscriber with the thrown exception.
*
* @param callable $onNext Function invoked each time a value is emitted from the observable.
*
* @return \Amp\Subscriber
*/
public function subscribe(callable $onNext);
}

183
lib/Observer.php Normal file
View File

@ -0,0 +1,183 @@
<?php
namespace Amp;
/**
* Asynchronous iterator that can be used within a coroutine to iterate over the emitted values from an Observable.
*
* Example:
* $observer = new Observer($observable); // $observable is an instance of \Amp\Observable
* while (yield $observer->next()) {
* $emitted = $observer->getCurrent();
* }
* $result = $observer->getResult();
*/
final class Observer {
/**
* @var \Amp\Subscriber
*/
private $subscriber;
/**
* @var mixed[]
*/
private $values = [];
/**
* @var \Amp\Future[]
*/
private $futures = [];
/**
* @var int
*/
private $position = -1;
/**
* @var \Amp\Deferred|null
*/
private $deferred;
/**
* @var bool
*/
private $resolved = false;
/**
* @var mixed
*/
private $result;
/**
* @var \Throwable|\Exception|null
*/
private $exception;
/**
* @param \Amp\Observable $observable
*/
public function __construct(Observable $observable) {
$deferred = &$this->deferred;
$values = &$this->values;
$futures = &$this->futures;
$this->subscriber = $observable->subscribe(static function ($value) use (&$deferred, &$values, &$futures) {
$values[] = $value;
$futures[] = $future = new Future;
if ($deferred !== null) {
$temp = $deferred;
$deferred = null;
$temp->resolve($value);
}
return $future;
});
$resolved = &$this->resolved;
$result = &$this->result;
$error = &$this->exception;
$this->subscriber->when(static function ($exception, $value) use (&$deferred, &$result, &$error, &$resolved) {
$resolved = true;
if ($exception) {
$result = null;
$error = $exception;
if ($deferred !== null) {
$deferred->fail($exception);
}
return;
}
$result = $value;
if ($deferred !== null) {
$deferred->resolve(false);
}
});
}
/**
* Unsubscribes the internal subscriber from the observable.
*/
public function __destruct() {
if (!$this->resolved) {
$this->subscriber->unsubscribe();
}
foreach ($this->futures as $future) {
$future->resolve();
}
}
/**
* Succeeds with true if an emitted value is available by calling getCurrent() or false if the observable has
* resolved. If the observable fails, the returned awaitable will fail with the same exception.
*
* @return \Interop\Async\Awaitable
*/
public function next() {
if (isset($this->futures[$this->position])) {
$future = $this->futures[$this->position];
unset($this->values[$this->position], $this->futures[$this->position]);
$future->resolve();
}
++$this->position;
if (isset($this->values[$this->position])) {
return new Success(true);
}
if ($this->resolved) {
if ($this->exception) {
return new Failure($this->exception);
}
return new Success(false);
}
$this->deferred = new Deferred;
return $this->deferred->getAwaitable();
}
/**
* Gets the last emitted value or throws an exception if the observable has completed.
*
* @return mixed Value emitted from observable.
*
* @throws \LogicException If the observable has resolved or next() was not called before calling this method.
*/
public function getCurrent() {
if (empty($this->values) && $this->resolved) {
throw new \LogicException("The observable has completed");
}
if (!isset($this->values[$this->position])) {
throw new \LogicException("Awaitable returned from next() must resolve before calling this method");
}
return $this->values[$this->position];
}
/**
* Gets the result of the observable or throws the failure reason. Also throws an exception if the observable has
* not completed.
*
* @return mixed Final return value of the observable.
*
* @throws \LogicException If the observable has not completed.
* @throws \Throwable|\Exception The exception used to fail the observable.
*/
public function getResult() {
if (!$this->resolved) {
throw new \LogicException("The observable has not resolved");
}
if ($this->exception) {
throw $this->exception;
}
return $this->result;
}
}

98
lib/Postponed.php Normal file
View File

@ -0,0 +1,98 @@
<?php
namespace Amp;
try {
if (@assert(false)) {
production: // PHP 7 production environment (zend.assertions=0)
final class Postponed implements Observable {
use Internal\Producer {
init as __construct;
emit as public;
resolve as public;
fail as public;
}
/**
* @return \Amp\Observable
*/
public function getObservable() {
return $this;
}
}
} else {
development: // PHP 7 development (zend.assertions=1) or PHP 5.x
final class Postponed {
/**
* @var \Amp\Observable
*/
private $observable;
/**
* @var callable
*/
private $emit;
/**
* @var callable
*/
private $resolve;
/**
* @var callable
*/
private $fail;
public function __construct() {
$this->observable = new Internal\PrivateObservable(
function (callable $emit, callable $resolve, callable $fail) {
$this->emit = $emit;
$this->resolve = $resolve;
$this->fail = $fail;
}
);
}
/**
* @return \Amp\Observable
*/
public function getObservable() {
return $this->observable;
}
/**
* Emits a value from the observable.
*
* @param mixed $value
*
* @return \Interop\Async\Awaitable
*/
public function emit($value = null) {
$emit = $this->emit;
return $emit($value);
}
/**
* Resolves the observable with the given value.
*
* @param mixed $value
*/
public function resolve($value = null) {
$resolve = $this->resolve;
$resolve($value);
}
/**
* Fails the observable with the given reason.
*
* @param \Throwable|\Exception $reason
*/
public function fail($reason) {
$fail = $this->fail;
$fail($reason);
}
}
}
} catch (\AssertionError $exception) {
goto development; // zend.assertions=1 and assert.exception=1, use development definition.
}

15
lib/Producer.php Normal file
View File

@ -0,0 +1,15 @@
<?php
namespace Amp;
/**
* Observable implementation that should not be returned from a public API, but used only internally.
*/
final class Producer implements Observable {
use Internal\Producer {
init as __construct;
emit as public;
resolve as public;
fail as public;
}
}

51
lib/Subscriber.php Normal file
View File

@ -0,0 +1,51 @@
<?php
namespace Amp;
use Interop\Async\Awaitable;
/**
* Subscriber implementation returned from implementors of \Amp\Observable.
*/
final class Subscriber implements Awaitable {
/**
* @var string
*/
private $id;
/**
* @var \Interop\Async\Awaitable
*/
private $awaitable;
/**
* @var callable
*/
private $unsubscribe;
/**
* @param string $id
* @param \Interop\Async\Awaitable $awaitable
* @param callable $unsubscribe
*/
public function __construct($id, Awaitable $awaitable, callable $unsubscribe) {
$this->id = $id;
$this->awaitable = $awaitable;
$this->unsubscribe = $unsubscribe;
}
/**
* {@inheritdoc}
*/
public function when(callable $onResolved) {
$this->awaitable->when($onResolved);
}
/**
* {@inheritdoc}
*/
public function unsubscribe() {
$unsubscribe = $this->unsubscribe;
$unsubscribe($this->id);
}
}

View File

@ -0,0 +1,5 @@
<?php
namespace Amp;
class UnsubscribedException extends \RuntimeException {}

View File

@ -507,3 +507,181 @@ function map(callable $callback /* array ...$awaitables */) {
return \call_user_func_array("array_map", $args);
}
/**
* @param \Amp\Observable $observable
* @param callable(mixed $value): mixed $onNext
* @param callable(mixed $value): mixed|null $onComplete
*
* @return \Amp\Observable
*/
function each(Observable $observable, callable $onNext, callable $onComplete = null) {
return new Emitter(function (callable $emit) use ($observable, $onNext, $onComplete) {
$result = (yield $observable->subscribe(function ($value) use ($emit, $onNext) {
return $emit($onNext($value));
}));
if ($onComplete === null) {
yield Coroutine::result($result);
return;
}
yield Coroutine::result($onComplete($result));
});
}
/**
* @param \Amp\Observable $observable
* @param callable(mixed $value): bool $filter
*
* @return \Amp\Observable
*/
function filter(Observable $observable, callable $filter) {
return new Emitter(function (callable $emit) use ($observable, $filter) {
yield Coroutine::result(yield $observable->subscribe(function ($value) use ($emit, $filter) {
if (!$filter($value)) {
return null;
}
return $emit($value);
}));
});
}
/**
* Creates an observable that emits values emitted from any observable in the array of observables. Values in the
* array are passed through the from() function, so they may be observables, arrays of values to emit, awaitables,
* or any other value.
*
* @param \Amp\Observable[] $observables
*
* @return \Amp\Observable
*/
function merge(array $observables) {
foreach ($observables as $observable) {
if (!$observable instanceof Observable) {
throw new \InvalidArgumentException("Non-observable provided");
}
}
return new Emitter(function (callable $emit) use ($observables) {
$subscriptions = [];
foreach ($observables as $observable) {
$subscriptions[] = $observable->subscribe($emit);
}
try {
$result = (yield all($subscriptions));
} finally {
foreach ($subscriptions as $subscription) {
$subscription->unsubscribe();
}
}
yield Coroutine::result($result);
});
}
/**
* Creates an observable from the given array of observables, emitting the success value of each provided awaitable or
* failing if any awaitable fails.
*
* @param \Interop\Async\Awaitable[] $awaitables
*
* @return \Amp\Observable
*/
function stream(array $awaitables) {
$postponed = new Postponed;
if (empty($awaitables)) {
$postponed->complete();
return $postponed;
}
$pending = \count($awaitables);
$onResolved = function ($exception, $value) use (&$pending, $postponed) {
if ($pending <= 0) {
return;
}
if ($exception) {
$pending = 0;
$postponed->fail($exception);
return;
}
$postponed->emit($value);
if (--$pending === 0) {
$postponed->complete();
}
};
foreach ($awaitables as $awaitable) {
if (!$awaitable instanceof Awaitable) {
throw new \InvalidArgumentException("Non-awaitable provided");
}
$awaitable->when($onResolved);
}
return $postponed;
}
/**
* Returns an observable that emits a value every $interval milliseconds after the previous value has been consumed
* (up to $count times (or indefinitely if $count is 0). The value emitted is an integer of the number of times the
* observable emitted a value.
*
* @param int $interval Time interval between emitted values in milliseconds.
* @param int $count Use 0 to emit values indefinitely.
*
* @return \Amp\Observable
*/
function interval($interval, $count = 0) {
$count = (int) $count;
if (0 > $count) {
throw new \InvalidArgumentException("The number of times to emit must be a non-negative value");
}
$postponed = new Postponed;
Loop::repeat($interval, function ($watcher) use (&$i, $postponed, $count) {
$postponed->emit(++$i);
if ($i === $count) {
Loop::cancel($watcher);
$postponed->resolve();
}
});
return $postponed->getObservable();
}
/**
* @param int $start
* @param int $end
* @param int $step
*
* @return \Amp\Observable
*/
function range($start, $end, $step = 1) {
$start = (int) $start;
$end = (int) $end;
$step = (int) $step;
if (0 === $step) {
throw new \InvalidArgumentException("Step must be a non-zero integer");
}
if ((($end - $start) ^ $step) < 0) {
throw new \InvalidArgumentException("Step is not of the correct sign");
}
return new Emitter(function (callable $emit) use ($start, $end, $step) {
for ($i = $start; $i <= $end; $i += $step) {
yield $emit($i);
}
});
}