1
0
mirror of https://github.com/danog/amp.git synced 2024-11-30 04:29:08 +01:00

Initial commit

This commit is contained in:
Aaron Piotrowski 2016-05-24 11:47:14 -05:00
commit a9362780ed
15 changed files with 804 additions and 0 deletions

6
.gitattributes vendored Normal file
View File

@ -0,0 +1,6 @@
example export-ignore
test export-ignore
.gitattributes export-ignore
.gitignore export-ignore
.travis.yml export-ignore
phpunit.xml.dist export-ignore

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
build
composer.lock
phpunit.xml
vendor

22
LICENSE Normal file
View File

@ -0,0 +1,22 @@
The MIT License (MIT)
Copyright (c) 2016 amphp
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

34
composer.json Normal file
View File

@ -0,0 +1,34 @@
{
"name": "amphp/observable",
"description": "",
"keywords": [
"asynchronous",
"async",
"observable",
"emitter"
],
"homepage": "http://amphp.org",
"license": "MIT",
"require": {
"amphp/awaitable": "dev-master",
"async-interop/event-loop": "dev-master",
"async-interop/event-loop-implementation": "dev-master"
},
"require-dev": {
"amphp/loop": "dev-master"
},
"minimum-stability": "dev",
"autoload": {
"psr-4": {
"Amp\\": "lib"
},
"files": [
"lib/functions.php"
]
},
"autoload-dev": {
"psr-4": {
"Amp\\Test\\": "test"
}
}
}

36
example/emitter.php Normal file
View File

@ -0,0 +1,36 @@
#!/usr/bin/env php
<?php
require dirname(__DIR__) . '/vendor/autoload.php';
use Interop\Async\Loop;
use Amp\Pause;
use Amp\Emitter;
use Amp\Loop\NativeLoop;
Loop::execute(Amp\coroutine(function () {
try {
$observable = new Emitter(function (callable $emit) {
yield $emit(new Pause(500, 1));
yield $emit(new Pause(1500, 2));
yield $emit(new Pause(1000, 3));
yield $emit(new Pause(1000, 4));
yield $emit(5); // The values starting here will be emitted in 0.5 second intervals because the coroutine
yield $emit(6); // consuming values below takes 0.5 seconds per iteration. This behavior occurs because
yield $emit(7); // observables respect back-pressure from consumers, waiting to emit a value until all
yield $emit(8); // consumers have finished processing (if desired, see the docs on using and avoiding
yield $emit(9); // back-pressure).
yield $emit(10);
});
$iterator = $observable->getIterator();
while (yield $iterator->isValid()) {
printf("Observable emitted %d\n", $iterator->getCurrent());
yield new Pause(500); // Artificial back-pressure on observable.
}
} catch (\Throwable $exception) {
printf("Exception: %s\n", $exception);
}
}), $loop = new NativeLoop());

View File

@ -0,0 +1,5 @@
<?php
namespace Amp;
class CompletedException extends \RuntimeException {}

View File

@ -0,0 +1,5 @@
<?php
namespace Amp;
class DisposedException extends \RuntimeException {}

50
lib/Emitter.php Normal file
View File

@ -0,0 +1,50 @@
<?php
namespace Amp;
use Interop\Async\Loop;
class Emitter implements Observable {
/**
* @var callable|null
*/
private $emitter;
/**
* @var \Amp\Internal\EmitQueue
*/
private $queue;
/**
* @param callable $emitter
*/
public function __construct(callable $emitter) {
$this->emitter = $emitter;
$this->queue = new Internal\EmitQueue;
}
/**
* {@inheritdoc}
*/
public function dispose() {
$this->emitter = null;
$this->queue->dispose();
}
/**
* {@inheritdoc}
*/
public function getIterator() {
if ($this->emitter !== null) {
$emitter = $this->emitter;
$this->emitter = null;
// Asynchronously start the emitter.
Loop::defer(function () use ($emitter) {
$this->queue->start($emitter);
});
}
return new Internal\EmitterIterator($this->queue);
}
}

View File

@ -0,0 +1,5 @@
<?php
namespace Amp;
class IncompleteException extends \RuntimeException {}

240
lib/Internal/EmitQueue.php Normal file
View File

@ -0,0 +1,240 @@
<?php
namespace Amp\Internal;
use Amp\CompletedException;
use Amp\Coroutine;
use Amp\DisposedException;
use Amp\Future;
use Amp\Observable;
use Interop\Async\Awaitable;
class EmitQueue {
/**
* @var \Amp\Observable|null
*/
private $observable;
/**
* @var bool
*/
private $busy = false;
/**
* @var bool
*/
private $complete = false;
/**
* @var \Throwable|\Exception|null
*/
private $reason;
/**
* @var \Amp\Future
*/
private $future;
/**
* @var \Amp\Internal\Emitted
*/
private $emitted;
/**
* @var int Number of listening iterators.
*/
private $listeners = 0;
public function __construct() {
$this->future = new Future;
$this->emitted = new Emitted($this->future);
}
/**
* @param callable $emitter
*/
public function start(callable $emitter) {
/**
* Emits a value from the observable.
*
* @param mixed $value If $value is an instance of \Interop\Async\Awaitable, the success value is used as the
* value to emit or the failure reason is used to fail the awaitable returned from this function.
*
* @return \Interop\Async\Awaitable
*
* @resolve mixed The emitted value (the resolution value of $value)
*
* @throws \Amp\CompletedException If the observable has been completed.
* @throws \Amp\DisposedException If the observable has been disposed.
*/
$emit = function ($value = null) {
return new Coroutine($this->push($value));
};
try {
$generator = $emitter($emit);
if (!$generator instanceof \Generator) {
throw new \LogicException("Callable must be a coroutine");
}
$coroutine = new Coroutine($generator);
$coroutine->when([$this, 'done']);
} catch (\Throwable $exception) {
$this->done($exception);
} catch (\Exception $exception) {
$this->done($exception);
}
}
/**
* @coroutine
*
* @param mixed $value
*
* @return \Generator
*
* @throws \InvalidArgumentException
* @throws \Throwable|\Exception
*/
public function push($value) {
if ($this->complete) {
throw $this->reason ?: new CompletedException("The observable has completed");
}
if ($this->busy) {
throw new \LogicException("Cannot emit values simultaneously");
}
$this->busy = true;
try {
if ($value instanceof Observable) {
if ($value === $this->observable) {
throw new \InvalidArgumentException("Cannot emit an observable within itself");
}
$iterator = $value->getIterator();
while (yield $iterator->isValid()) {
yield $this->emit($iterator->getCurrent());
}
yield Coroutine::result($iterator->getReturn());
return;
}
if ($value instanceof Awaitable) {
$value = (yield $value);
}
yield $this->emit($value);
} catch (\Throwable $exception) {
$this->done($exception);
throw $exception;
} catch (\Exception $exception) {
$this->done($exception);
throw $exception;
} finally {
$this->busy = false;
}
yield Coroutine::result($value);
}
/**
* @param mixed $value Value to emit.
*
* @return \Interop\Async\Awaitable
*/
private function emit($value) {
$future = $this->future;
$emitted = $this->emitted;
$this->future = new Future;
$this->emitted = new Emitted($this->future);
$future->resolve($value);
return $emitted->wait();
}
/**
* Increments the number of listening iterators.
*/
public function increment() {
++$this->listeners;
}
/**
* Decrements the number of listening iterators. Marks the queue as disposed if the count reaches 0.
*/
public function decrement() {
if (--$this->listeners <= 0 && !$this->complete) {
$this->dispose(new DisposedException("The observable was automatically disposed"));
}
}
/**
* @return \Amp\Internal\Emitted
*/
public function pull() {
return $this->emitted;
}
/**
* Marks the observable as complete, failing with the given exception or completing with the given value.
*
* @param \Throwable|\Exception|null $exception
* @param mixed $value
*/
public function done($exception, $value = null) {
if ($this->complete) {
return;
}
$this->complete = true;
if ($exception) {
$this->reason = $exception;
$this->future->fail($exception);
return;
}
$this->future->resolve($value);
}
/**
* Disposes the observable.
*
* @param \Exception|null $exception
*/
public function dispose(\Exception $exception = null) {
$this->done($exception ?: new DisposedException("The observable was disposed"));
}
/**
* @return bool
*/
public function isComplete() {
return $this->complete;
}
/**
* @return bool
*/
public function isFailed() {
return $this->reason !== null;
}
/**
* @return \Exception|\Throwable
*/
public function getReason() {
if ($this->reason === null) {
throw new \LogicException("The observable has not failed");
}
return $this->reason;
}
}

57
lib/Internal/Emitted.php Normal file
View File

@ -0,0 +1,57 @@
<?php
namespace Amp\Internal;
use Amp\Future;
use Interop\Async\Awaitable;
final class Emitted {
/**
* @var \Amp\Future
*/
private $future;
/**
* @var \Interop\Async\Awaitable
*/
private $awaitable;
/**
* @var int
*/
private $waiting = 0;
/**
* @param \Interop\Async\Awaitable $awaitable
*/
public function __construct(Awaitable $awaitable) {
$this->awaitable = $awaitable;
$this->future = new Future;
}
/**
* @return \Interop\Async\Awaitable
*/
public function getAwaitable() {
++$this->waiting;
return $this->awaitable;
}
/**
* Notifies the placeholder that the consumer is ready.
*/
public function ready() {
if (0 === --$this->waiting) {
$this->future->resolve();
}
}
/**
* Returns an awaitable that is fulfilled once all consumers are ready.
*
* @return \Interop\Async\Awaitable
*/
public function wait() {
return $this->future;
}
}

View File

@ -0,0 +1,136 @@
<?php
namespace Amp\Internal;
use Amp\CompletedException;
use Amp\Coroutine;
use Amp\IncompleteException;
use Amp\ObservableIterator;
class EmitterIterator implements ObservableIterator {
/**
* @var \Amp\Internal\Emitted
*/
private $emitted;
/**
* @var mixed
*/
private $current;
/**
* @var \Amp\Internal\EmitQueue
*/
private $queue;
/**
* @var \Interop\Async\Awaitable
*/
private $awaitable;
/**
* @var bool
*/
private $complete = false;
/**
* @param \Amp\Internal\EmitQueue $queue
*/
public function __construct(EmitQueue $queue) {
$this->queue = $queue;
$this->queue->increment();
}
/**
* Removes queue from collection.
*/
public function __destruct() {
if ($this->emitted !== null) {
$this->emitted->ready();
}
$this->queue->decrement();
}
/**
* {@inheritdoc}
*/
public function isValid() {
return new Coroutine($this->doValid());
}
/**
* @coroutine
*
* @return \Generator
*
* @resolve bool
*
* @throws \Throwable|\Exception
*/
private function doValid() {
if ($this->awaitable !== null) {
throw new \LogicException("Simultaneous calls to isValid() are not allowed");
}
try {
$emitted = $this->queue->pull();
if ($this->emitted !== null) {
$this->emitted->ready();
}
$this->emitted = $emitted;
$this->current = (yield $this->awaitable = $this->emitted->getAwaitable());
} catch (\Throwable $exception) {
$this->current = null;
throw $exception;
} catch (\Exception $exception) {
$this->current = null;
throw $exception;
} finally {
$this->complete = $this->queue->isComplete();
$this->awaitable = null;
}
yield Coroutine::result(!$this->complete);
}
/**
* {@inheritdoc}
*/
public function getCurrent() {
if ($this->emitted === null || $this->awaitable !== null) {
throw new \LogicException("isValid() must be called before calling this method");
}
if ($this->complete) {
if ($this->queue->isFailed()) {
throw $this->queue->getReason();
}
throw new CompletedException("The observable has completed and the iterator is invalid");
}
return $this->current;
}
/**
* {@inheritdoc}
*/
public function getReturn() {
if ($this->emitted === null || $this->awaitable !== null) {
throw new \LogicException("isValid() must be called before calling this method");
}
if (!$this->complete) {
throw new IncompleteException("The observable has not completed");
}
if ($this->queue->isFailed()) {
throw $this->queue->getReason();
}
return $this->current;
}
}

17
lib/Observable.php Normal file
View File

@ -0,0 +1,17 @@
<?php
namespace Amp;
interface Observable
{
/**
* @return \Amp\ObservableIterator
*/
public function getIterator();
/**
* Disposes of the observable, halting emission of values and failing the observable with an instance of
* \Amp\DisposedException.
*/
public function dispose();
}

View File

@ -0,0 +1,41 @@
<?php
namespace Amp;
interface ObservableIterator
{
/**
* Succeeds with true if a new value is available by calling getCurrent() or false if the observable has completed.
* Calling getCurrent() will throw an exception if the observable completed. If an error occurs with the observable,
* this coroutine will be rejected with the exception used to fail the observable.
*
* @return \Interop\Async\Awaitable
*
* @resolve bool
*
* @throws \Throwable|\Exception Exception used to fail the observable.
*/
public function isValid();
/**
* Gets the last emitted value or throws an exception if the observable has completed.
*
* @return mixed Value emitted from observable.
*
* @throws \Amp\CompletedException If the observable has successfully completed.
* @throws \LogicException If isValid() was not called before calling this method.
* @throws \Throwable|\Exception Exception used to fail the observable.
*/
public function getCurrent();
/**
* Gets the return value of the observable or throws the failure reason. Also throws an exception if the
* observable has not completed.
*
* @return mixed Final return value of the observable.
*
* @throws \Amp\IncompleteException If the observable has not completed.
* @throws \Throwable|\Exception Exception used to fail the observable.
*/
public function getReturn();
}

146
lib/functions.php Normal file
View File

@ -0,0 +1,146 @@
<?php
namespace Amp;
use Interop\Async\Loop;
/**
* Throttles an observable to only emit a value every $interval milliseconds.
*
* @param \Amp\Observable $observable
* @param int $interval
*
* @return \Amp\Observable
*/
function throttle(Observable $observable, $interval) {
if (0 >= $interval) {
throw new \InvalidArgumentException("The interval should be greater than 0");
}
return new Emitter(function (callable $emit) use ($observable, $interval) {
$iterator = $observable->getIterator();
$start = (int) (\microtime(true) - $interval);
while (yield $iterator->isValid()) {
$diff = $interval + $start - (int) (\microtime(true) * 1e3);
if (0 < $diff) {
yield new Pause($diff);
}
$start = (int) (\microtime(true) * 1e3);
yield $emit($iterator->getCurrent());
}
yield Coroutine::result($iterator->getReturn());
});
}
/**
* Creates an observable that emits values emitted from any observable in the array of observables. Values in the
* array are passed through the from() function, so they may be observables, arrays of values to emit, awaitables,
* or any other value.
*
* @param \Amp\Observable[] $observables
*
* @return \Amp\Observable
*/
function merge(array $observables) {
foreach ($observables as $observable) {
if (!$observable instanceof Observable) {
throw new \InvalidArgumentException("Non-observable provided");
}
}
return new Emitter(function (callable $emit) use ($observables) {
$generator = function (Observable $observable) use (&$emitting, $emit) {
$iterator = $observable->getIterator();
while (yield $iterator->isValid()) {
while ($emitting !== null) {
yield $emitting; // Prevent simultaneous emit.
}
yield $emitting = $emit($iterator->getCurrent());
$emitting = null;
}
yield Coroutine::result($iterator->getReturn());
};
/** @var \Amp\Coroutine[] $coroutines */
$coroutines = [];
foreach ($observables as $observable) {
$coroutines[] = new Coroutine($generator($observable));
}
yield Coroutine::result(yield all($coroutines));
});
}
/**
* Returns an observable that emits a value every $interval milliseconds after the previous value has been consumed
* (up to $count times (or indefinitely if $count is 0). The value emitted is an integer of the number of times the
* observable emitted a value.
*
* @param int $interval Time interval between emitted values in milliseconds.
* @param int $count Use 0 to emit values indefinitely.
*
* @return \Amp\Observable
*/
function interval($interval, $count = 0) {
$count = (int) $count;
if (0 > $count) {
throw new \InvalidArgumentException("The number of times to emit must be a non-negative value");
}
return new Emitter(function (callable $emit) use ($interval, $count) {
$i = 0;
$future = new Future;
$watcher = Loop::repeat($interval, function ($watcher) use (&$future, &$i) {
Loop::disable($watcher);
$awaitable = $future;
$future = new Future;
$awaitable->resolve(++$i);
});
try {
while (0 === $count || $i < $count) {
yield $emit($future);
Loop::enable($watcher);
}
} finally {
Loop::cancel($watcher);
}
});
}
/**
* @param int $start
* @param int $end
* @param int $step
*
* @return \Amp\Observable
*/
function range($start, $end, $step = 1) {
$start = (int) $start;
$end = (int) $end;
$step = (int) $step;
if (0 === $step) {
throw new \InvalidArgumentException("Step must be a non-zero integer");
}
if ((($end - $start) ^ $step) < 0) {
throw new \InvalidArgumentException("Step is not of the correct sign");
}
return new Emitter(function (callable $emit) use ($start, $end, $step) {
for ($i = $start; $i <= $end; $i += $step) {
yield $emit($i);
}
});
}