1
0
mirror of https://github.com/danog/amp.git synced 2025-01-21 21:01:16 +01:00

- Added Reactor::coroutine() method

- Added `Amp\coroutine()` function
- `YieldCommands` "enum" constant class removed -- yield keys now live in
  the reactor class
- New optional `"coroutine"` yield key for self-documenting generator
  yields.
- New optional `"async"` yield key for self-documenting promise yields.
- New `"return"` yield key for specifying the return value of a resolved
  Generator coroutine. If not specified a resolved coroutine result is
  equal to null.
- The final value yielded by a resolved `Generator` is *no longer* used
  as its "return" value. Instead, generators must manually use the new
  `"return"` yield key specifically to designate the value that should
  be used to resolve the promise associated with generator resolution.
- `GeneratorResolver` trait renamed to `CoroutineResolver` and is now an
  abstract class extended by the various `Reactor` implementations.
- Implicit "all" array combinator resolution is now removed. Use the
  explicit form instead:

```php
function() {
    list($a, $b, $c) = (yield 'all' => [$promise1, $promise2, $promise3]);
};
```
This commit is contained in:
Daniel Lowrey 2015-02-03 00:21:07 -05:00
parent 7e451251ab
commit 99e38762a7
11 changed files with 731 additions and 607 deletions

View File

@ -1,6 +1,29 @@
### master
- n/a
- Added `Reactor::coroutine()` method
- Added `Amp\coroutine()` function
- `YieldCommands` "enum" constant class removed -- yield keys now live in
the reactor class
- New optional `"coroutine"` yield key for self-documenting generator
yields.
- New optional `"async"` yield key for self-documenting promise yields.
- New `"return"` yield key for specifying the return value of a resolved
Generator coroutine. If not specified a resolved coroutine result is
equal to null.
- The final value yielded by a resolved `Generator` is *no longer* used
as its "return" value. Instead, generators must manually use the new
`"return"` yield key specifically to designate the value that should
be used to resolve the promise associated with generator resolution.
- `GeneratorResolver` trait renamed to `CoroutineResolver` and is now an
abstract class extended by the various `Reactor` implementations.
- Implicit "all" array combinator resolution is now removed. Use the
explicit form instead:
```php
function() {
list($a, $b, $c) = (yield 'all' => [$promise1, $promise2, $promise3]);
};
```
### v0.15.3

307
lib/CoroutineResolver.php Normal file
View File

@ -0,0 +1,307 @@
<?php
namespace Amp;
abstract class CoroutineResolver implements Reactor {
/**
* Resolve the specified generator
*
* Upon resolution the final yielded value is used to succeed the returned promise. If an
* error occurs the returned promise is failed appropriately.
*
* @param \Generator $generator
* @return Promise
*/
public function coroutine(\Generator $gen) {
$promisor = new Future;
$this->advanceGenerator($gen, $promisor, null);
return $promisor;
}
private function advanceGenerator(\Generator $gen, Promisor $promisor, $return) {
try {
if (!$gen->valid()) {
$promisor->succeed($return);
return;
}
list($promise, $noWait, $return) = $this->promisifyGeneratorYield($gen, $return);
$this->immediately(function() use ($gen, $promisor, $return, $promise, $noWait) {
if ($noWait) {
$this->sendToGenerator($gen, $promisor, $return);
} else {
$promise->when(function($error, $result) use ($gen, $promisor, $return) {
$this->sendToGenerator($gen, $promisor, $return, $error, $result);
});
}
});
} catch (\Exception $uncaught) {
$promisor->fail($uncaught);
}
}
private function sendToGenerator(\Generator $gen, Promisor $promisor, $return = null, \Exception $error = null, $result = null) {
try {
if ($error) {
$gen->throw($error);
} else {
$gen->send($result);
}
$this->advanceGenerator($gen, $promisor, $return);
} catch (\Exception $uncaught) {
$promisor->fail($uncaught);
}
}
private function promisifyGeneratorYield(\Generator $gen, $return) {
$noWait = false;
$promise = null;
$key = $gen->key();
$yielded = $gen->current();
if (is_string($key)) {
goto explicit_key;
} else {
goto implicit_key;
}
implicit_key: {
if ($yielded instanceof Promise) {
$promise = $yielded;
} elseif ($yielded instanceof \Generator) {
$promise = $this->coroutine($yielded);
} elseif (isset($yielded)) {
$promise = new Failure(new \LogicException(
sprintf(
'Unresolvable %s type requires yield key',
is_object($yielded) ? get_class($yielded) : gettype($yielded)
)
));
} else {
$promise = new Failure(new \LogicException(
'Empty yield without key (yield; or yield null;)'
));
}
goto return_struct;
}
explicit_key: {
$key = strtolower($key);
if ($key[0] === self::NOWAIT_PREFIX) {
$noWait = true;
$key = substr($key, 1);
}
switch ($key) {
case self::ASYNC:
goto async;
case self::COROUTINE:
goto coroutine;
case self::CORETURN:
goto coreturn;
case self::ALL:
// fallthrough
case self::ANY:
// fallthrough
case self::SOME:
goto combinator;
case self::PAUSE:
goto pause;
case self::BIND:
goto bind;
case self::IMMEDIATELY:
goto immediately;
case self::ONCE:
// fallthrough
case self::REPEAT:
goto schedule;
case self::ON_READABLE:
$ioWatchMethod = 'onReadable';
goto io_watcher;
case self::ON_WRITABLE:
$ioWatchMethod = 'onWritable';
goto io_watcher;
case self::ENABLE:
// fallthrough
case self::DISABLE:
// fallthrough
case self::CANCEL:
goto watcher_control;
case self::NOWAIT:
$noWait = true;
goto implicit_key;
default:
if ($noWait) {
$promise = new Failure(new \LogicException(
'Cannot use standalone @ "nowait" prefix'
));
goto return_struct;
} else {
goto unknown_key;
}
}
}
coreturn: {
$return = $yielded;
$promise = new Success;
goto return_struct;
}
unknown_key: {
$promise = new Failure(new \DomainException(
sprintf('Unknown yield key: "%s"', $key)
));
goto return_struct;
}
async: {
if ($yielded instanceof Promise) {
$promise = $yielded;
} else {
$promise = new Failure(new \DomainException(
sprintf(
'"%s" yield command expects Promise; %s yielded',
$key,
gettype($yielded)
)
));
}
goto return_struct;
}
coroutine: {
if ($yielded instanceof \Generator) {
$promise = $this->coroutine($yielded);
} else {
$promise = new Failure(new \DomainException(
sprintf(
'"%s" yield command expects Generator; %s yielded',
$key,
gettype($yielded)
)
));
}
goto return_struct;
}
combinator: {
if (!is_array($yielded)) {
$promise = new Failure(new \DomainException(
sprintf('"%s" yield command expects array; %s yielded', $key, gettype($yielded))
));
goto return_struct;
}
$promises = [];
foreach ($yielded as $index => $element) {
if ($element instanceof Promise) {
$promise = $element;
} elseif ($element instanceof \Generator) {
$promise = $this->coroutine($element);
} else {
$promise = new Success($element);
}
$promises[$index] = $promise;
}
$combinatorFunction = __NAMESPACE__ . "\\{$key}";
$promise = $combinatorFunction($promises);
goto return_struct;
}
immediately: {
if (is_callable($yielded)) {
$watcherId = $this->immediately($yielded);
$promise = new Success($watcherId);
} else {
$promise = new Failure(new \DomainException(
sprintf(
'"%s" yield command requires callable; %s provided',
$key,
gettype($yielded)
)
));
}
goto return_struct;
}
schedule: {
if (is_array($yielded) && isset($yielded[0], $yielded[1]) && is_callable($yielded[0])) {
list($func, $msDelay) = $yielded;
$watcherId = $this->{$key}($func, $msDelay);
$promise = new Success($watcherId);
} else {
$promise = new Failure(new \DomainException(
sprintf(
'"%s" yield command requires [callable $func, int $msDelay]; %s provided',
$key,
gettype($yielded)
)
));
}
goto return_struct;
}
io_watcher: {
if (is_array($yielded) && isset($yielded[0], $yielded[1]) && is_callable($yielded[1])) {
list($stream, $func, $enableNow) = $yielded;
$watcherId = $this->{$ioWatchMethod}($stream, $func, $enableNow);
$promise = new Success($watcherId);
} else {
$promise = new Failure(new \DomainException(
sprintf(
'"%s" yield command requires [resource $stream, callable $func, bool $enableNow]; %s provided',
$key,
gettype($yielded)
)
));
}
goto return_struct;
}
pause: {
$promise = new Future;
$this->once(function() use ($promise) {
$promise->succeed();
}, (int) $yielded);
goto return_struct;
}
bind: {
if (is_callable($yielded)) {
$promise = new Success(function() use ($yielded) {
$result = call_user_func_array($yielded, func_get_args());
return $result instanceof \Generator
? $this->coroutine($result)
: $result;
});
} else {
$promise = new Failure(new \DomainException(
sprintf('"bind" yield command requires callable; %s provided', gettype($yielded))
));
}
goto return_struct;
}
watcher_control: {
$this->{$key}($yielded);
$promise = new Success;
goto return_struct;
}
return_struct: {
return [$promise, $noWait, $return];
}
}
}

View File

@ -1,243 +0,0 @@
<?php
namespace Amp;
trait GeneratorResolver {
private function resolveGenerator(\Generator $gen) {
$promisor = new Future;
$this->advanceGenerator($gen, $promisor);
return $promisor;
}
private function advanceGenerator(\Generator $gen, Promisor $promisor, $previous = null) {
try {
if ($gen->valid()) {
$key = $gen->key();
$current = $gen->current();
$promiseStruct = $this->promisifyGeneratorYield($key, $current);
$this->immediately(function() use ($gen, $promisor, $promiseStruct) {
list($promise, $noWait) = $promiseStruct;
if ($noWait) {
$this->sendToGenerator($gen, $promisor);
} else {
$promise->when(function($error, $result) use ($gen, $promisor) {
$this->sendToGenerator($gen, $promisor, $error, $result);
});
}
});
} else {
$promisor->succeed($previous);
}
} catch (\Exception $error) {
$promisor->fail($error);
}
}
private function promisifyGeneratorYield($key, $current) {
$noWait = false;
if ($key === (string) $key) {
goto explicit_key;
} else {
goto implicit_key;
}
explicit_key: {
$key = strtolower($key);
if ($key[0] === YieldCommands::NOWAIT_PREFIX) {
$noWait = true;
$key = substr($key, 1);
}
switch ($key) {
case YieldCommands::ALL:
// fallthrough
case YieldCommands::ANY:
// fallthrough
case YieldCommands::SOME:
if (is_array($current)) {
goto combinator;
} else {
$promise = new Failure(new \DomainException(
sprintf('"%s" yield command expects array; %s yielded', $key, gettype($current))
));
goto return_struct;
}
case YieldCommands::PAUSE:
goto pause;
case YieldCommands::BIND:
goto bind;
case YieldCommands::IMMEDIATELY:
goto immediately;
case YieldCommands::ONCE:
// fallthrough
case YieldCommands::REPEAT:
goto schedule;
case YieldCommands::ON_READABLE:
$ioWatchMethod = 'onReadable';
goto io_watcher;
case YieldCommands::ON_WRITABLE:
$ioWatchMethod = 'onWritable';
goto io_watcher;
case YieldCommands::ENABLE:
// fallthrough
case YieldCommands::DISABLE:
// fallthrough
case YieldCommands::CANCEL:
goto watcher_control;
case YieldCommands::NOWAIT:
$noWait = true;
goto implicit_key;
default:
if ($noWait) {
goto implicit_key;
}
$promise = new Failure(new \DomainException(
sprintf('Unknown or invalid yield key: "%s"', $key)
));
goto return_struct;
}
}
implicit_key: {
if ($current instanceof Promise) {
$promise = $current;
} elseif ($current instanceof \Generator) {
$promise = $this->resolveGenerator($current);
} elseif (is_array($current)) {
$key = YieldCommands::ALL;
goto combinator;
} else {
$promise = new Success($current);
}
goto return_struct;
}
combinator: {
$promises = [];
foreach ($current as $index => $element) {
if ($element instanceof Promise) {
$promise = $element;
} elseif ($element instanceof \Generator) {
$promise = $this->resolveGenerator($element);
} else {
$promise = new Success($element);
}
$promises[$index] = $promise;
}
$combinatorFunction = __NAMESPACE__ . "\\{$key}";
$promise = $combinatorFunction($promises);
goto return_struct;
}
immediately: {
if (is_callable($current)) {
$watcherId = $this->immediately($current);
$promise = new Success($watcherId);
} else {
$promise = new Failure(new \DomainException(
sprintf(
'"%s" yield command requires callable; %s provided',
$key,
gettype($current)
)
));
}
goto return_struct;
}
schedule: {
if (is_array($current) && isset($current[0], $current[1]) && is_callable($current[0])) {
list($func, $msDelay) = $current;
$watcherId = $this->{$key}($func, $msDelay);
$promise = new Success($watcherId);
} else {
$promise = new Failure(new \DomainException(
sprintf(
'"%s" yield command requires [callable $func, int $msDelay]; %s provided',
$key,
gettype($current)
)
));
}
goto return_struct;
}
io_watcher: {
if (is_array($current) && isset($current[0], $current[1]) && is_callable($current[1])) {
list($stream, $func, $enableNow) = $current;
$watcherId = $this->{$ioWatchMethod}($stream, $func, $enableNow);
$promise = new Success($watcherId);
} else {
$promise = new Failure(new \DomainException(
sprintf(
'"%s" yield command requires [resource $stream, callable $func, bool $enableNow]; %s provided',
$key,
gettype($current)
)
));
}
goto return_struct;
}
pause: {
$promisor = new Future;
$this->once(function() use ($promisor) {
$promisor->succeed();
}, (int) $current);
$promise = $promisor;
goto return_struct;
}
bind: {
if (is_callable($current)) {
$promise = new Success(function() use ($current) {
$result = call_user_func_array($current, func_get_args());
return $result instanceof \Generator
? $this->resolveGenerator($result)
: $result;
});
} else {
$promise = new Failure(new \DomainException(
sprintf('"bind" yield command requires callable; %s provided', gettype($current))
));
}
goto return_struct;
}
watcher_control: {
$this->{$key}($current);
$promise = new Success;
goto return_struct;
}
return_struct: {
return [$promise, $noWait];
}
}
private function sendToGenerator(\Generator $gen, Promisor $promisor, \Exception $error = null, $result = null) {
try {
if ($error) {
$gen->throw($error);
} else {
$gen->send($result);
}
$this->advanceGenerator($gen, $promisor, $result);
} catch (\Exception $error) {
$promisor->fail($error);
}
}
}

View File

@ -2,9 +2,7 @@
namespace Amp;
class LibeventReactor implements SignalReactor {
use GeneratorResolver;
class LibeventReactor extends CoroutineResolver implements SignalReactor {
private $base;
private $watchers = [];
private $immediates = [];
@ -87,7 +85,7 @@ class LibeventReactor implements SignalReactor {
);
$result = $callback($this, $watcherId);
if ($result instanceof \Generator) {
$this->resolveGenerator($result)->when($this->onCallbackResolution);
$this->coroutine($result)->when($this->onCallbackResolution);
}
} catch (\Exception $e) {
$this->handleRunError($e);
@ -224,7 +222,7 @@ class LibeventReactor implements SignalReactor {
$watcherId = $watcher->id;
$result = $callback($this, $watcherId);
if ($result instanceof \Generator) {
$this->resolveGenerator($result)->when($this->onCallbackResolution);
$this->coroutine($result)->when($this->onCallbackResolution);
}
$this->cancel($watcherId);
} catch (\Exception $e) {
@ -289,7 +287,7 @@ class LibeventReactor implements SignalReactor {
try {
$result = $callback($this, $watcherId);
if ($result instanceof \Generator) {
$this->resolveGenerator($result)->when($this->onCallbackResolution);
$this->coroutine($result)->when($this->onCallbackResolution);
}
// If the watcher cancelled itself this will no longer be set
@ -363,7 +361,7 @@ class LibeventReactor implements SignalReactor {
try {
$result = $callback($this, $watcherId, $stream);
if ($result instanceof \Generator) {
$this->resolveGenerator($result)->when($this->onCallbackResolution);
$this->coroutine($result)->when($this->onCallbackResolution);
}
} catch (\Exception $e) {
$this->handleRunError($e);
@ -411,7 +409,7 @@ class LibeventReactor implements SignalReactor {
try {
$result = $callback($this, $watcherId, $signo);
if ($result instanceof \Generator) {
$this->resolveGenerator($result)->when($this->onCallbackResolution);
$this->coroutine($result)->when($this->onCallbackResolution);
}
} catch (\Exception $e) {
$this->handleRunError($e);

View File

@ -2,9 +2,7 @@
namespace Amp;
class NativeReactor implements Reactor {
use GeneratorResolver;
class NativeReactor extends CoroutineResolver implements Reactor {
private $alarms = [];
private $immediates = [];
private $alarmOrder = [];
@ -123,7 +121,7 @@ class NativeReactor implements Reactor {
foreach ($immediates as $watcherId => $callback) {
$result = $callback($this, $watcherId);
if ($result instanceof \Generator) {
$this->resolveGenerator($result)->when($this->onCallbackResolution);
$this->coroutine($result)->when($this->onCallbackResolution);
}
}
}
@ -179,7 +177,7 @@ class NativeReactor implements Reactor {
foreach ($this->readCallbacks[$streamId] as $watcherId => $callback) {
$result = $callback($this, $watcherId, $readableStream);
if ($result instanceof \Generator) {
$this->resolveGenerator($result)->when($this->onCallbackResolution);
$this->coroutine($result)->when($this->onCallbackResolution);
}
}
}
@ -188,7 +186,7 @@ class NativeReactor implements Reactor {
foreach ($this->writeCallbacks[$streamId] as $watcherId => $callback) {
$result = $callback($this, $watcherId, $writableStream);
if ($result instanceof \Generator) {
$this->resolveGenerator($result)->when($this->onCallbackResolution);
$this->coroutine($result)->when($this->onCallbackResolution);
}
}
}
@ -219,7 +217,7 @@ class NativeReactor implements Reactor {
$result = $callback($this, $watcherId);
if ($result instanceof \Generator) {
$this->resolveGenerator($result)->when($this->onCallbackResolution);
$this->coroutine($result)->when($this->onCallbackResolution);
}
}
}

View File

@ -3,6 +3,25 @@
namespace Amp;
interface Reactor {
const ALL = 'all';
const ANY = 'any';
const SOME = 'some';
const PAUSE = 'pause';
const BIND = 'bind';
const IMMEDIATELY = 'immediately';
const ONCE = 'once';
const REPEAT = 'repeat';
const ON_READABLE = 'onreadable';
const ON_WRITABLE = 'onwritable';
const ENABLE = 'enable';
const DISABLE = 'disable';
const CANCEL = 'cancel';
const NOWAIT = 'nowait';
const NOWAIT_PREFIX = '@';
const ASYNC = 'async';
const COROUTINE = 'coroutine';
const CORETURN = 'return';
/**
* Start the event reactor and assume program flow control
*
@ -112,6 +131,17 @@ interface Reactor {
*/
public function enable($watcherId);
/**
* Resolve the specified generator
*
* Upon resolution the final yielded value is used to succeed the returned promise. If an
* error occurs the returned promise is failed appropriately.
*
* @param \Generator $generator
* @return Promise
*/
public function coroutine(\Generator $generator);
/**
* An optional "last-chance" exception handler for errors resulting during callback invocation
*

View File

@ -2,9 +2,7 @@
namespace Amp;
class UvReactor implements SignalReactor {
use GeneratorResolver;
class UvReactor extends CoroutineResolver implements SignalReactor {
private $loop;
private $lastWatcherId = 1;
private $watchers;
@ -95,7 +93,7 @@ class UvReactor implements SignalReactor {
);
$result = $callback($this, $watcherId);
if ($result instanceof \Generator) {
$this->resolveGenerator($result)->when($this->onCallbackResolution);
$this->coroutine($result)->when($this->onCallbackResolution);
}
} catch (\Exception $e) {
$this->handleRunError($e);
@ -221,12 +219,11 @@ class UvReactor implements SignalReactor {
try {
$result = $callback($this, $watcher->id);
if ($result instanceof \Generator) {
$this->resolveGenerator($result)->when($this->onCallbackResolution);
$this->coroutine($result)->when($this->onCallbackResolution);
}
// The isset() check is necessary because the "once" timer
// callback may have cancelled itself when it was invoked.
if ($watcher->type === Watcher::TIMER_ONCE && isset($this->watchers[$watcher->id])) {
$watcher->isEnabled = false;
$this->clearWatcher($watcher->id);
}
} catch (\Exception $e) {
@ -381,7 +378,7 @@ class UvReactor implements SignalReactor {
$callback = $watcher->callback;
$result = $callback($this, $watcher->id, $watcher->stream);
if ($result instanceof \Generator) {
$this->resolveGenerator($result)->when($this->onCallbackResolution);
$this->coroutine($result)->when($this->onCallbackResolution);
}
} catch (\Exception $e) {
$this->handleRunError($e);
@ -417,7 +414,7 @@ class UvReactor implements SignalReactor {
try {
$result = $callback($this, $watcher->id, $watcher->signo);
if ($result instanceof \Generator) {
$this->resolveGenerator($result)->when($this->onCallbackResolution);
$this->coroutine($result)->when($this->onCallbackResolution);
}
} catch (\Exception $e) {
$this->handleRunError($e);
@ -440,7 +437,6 @@ class UvReactor implements SignalReactor {
private function clearWatcher($watcherId) {
$watcher = $this->watchers[$watcherId];
unset($this->watchers[$watcherId]);
if ($watcher->isEnabled) {
$this->enabledWatcherCount--;
switch ($watcher->type) {
@ -455,8 +451,11 @@ class UvReactor implements SignalReactor {
case Watcher::IMMEDIATE:
unset($this->immediates[$watcherId]);
break;
case Watcher::TIMER_ONCE:
// we don't have to actually stop once timers
break;
default:
@uv_timer_stop($watcher->uvStruct);
uv_timer_stop($watcher->uvStruct);
break;
}
}

View File

@ -1,21 +0,0 @@
<?php
namespace Amp;
class YieldCommands {
const ALL = 'all';
const ANY = 'any';
const SOME = 'some';
const PAUSE = 'pause';
const BIND = 'bind';
const IMMEDIATELY = 'immediately';
const ONCE = 'once';
const REPEAT = 'repeat';
const ON_READABLE = 'onreadable';
const ON_WRITABLE = 'onwritable';
const ENABLE = 'enable';
const DISABLE = 'disable';
const CANCEL = 'cancel';
const NOWAIT = 'nowait';
const NOWAIT_PREFIX = '@';
}

View File

@ -13,10 +13,10 @@ function getReactor($forceNew = false) {
if ($forceNew) {
return chooseReactor();
} elseif (empty($reactor)) {
return $reactor = chooseReactor();
} else {
} elseif ($reactor) {
return $reactor;
} else {
return $reactor = chooseReactor();
}
}
@ -193,7 +193,20 @@ function onWritable($stream, callable $func, $enableNow = true) {
return getReactor()->onWritable($stream, $func, $enableNow);
}
/**
* Resolve the specified generator
*
* Upon resolution the final yielded value is used to succeed the returned promise. If an
* error occurs the returned promise is failed appropriately.
*
* @param \Generator $generator
* @param Reactor $reactor optional reactor instance (uses global reactor if not specified)
* @return Promise
*/
function coroutine(\Generator $generator, $reactor = null) {
$reactor = $reactor ?: getReactor();
return $reactor->coroutine($generator);
}
/**
* React to process control signals

View File

@ -1,305 +0,0 @@
<?php
namespace Amp\Test;
use Amp\Success;
use Amp\Failure;
use Amp\NativeReactor;
class GeneratorResolverTest extends \PHPUnit_Framework_TestCase {
public function testAllResolvesWithArrayOfResults() {
(new NativeReactor)->run(function($reactor) {
$expected = ['r1' => 42, 'r2' => 41];
$actual = (yield 'all' => [
'r1' => 42,
'r2' => new Success(41),
]);
$this->assertSame($expected, $actual);
});
}
/**
* @expectedException \RuntimeException
* @expectedExceptionMessage zanzibar
*/
public function testAllThrowsIfAnyIndividualPromiseFails() {
(new NativeReactor)->run(function($reactor) {
$exception = new \RuntimeException('zanzibar');
$promises = [
'r1' => new Success(42),
'r2' => new Failure($exception),
'r3' => new Success(40),
];
$results = (yield $promises);
});
}
public function testSomeReturnsArrayOfErrorsAndResults() {
(new NativeReactor)->run(function($reactor) {
$exception = new \RuntimeException('zanzibar');
$promises = [
'r1' => new Success(42),
'r2' => new Failure($exception),
'r3' => new Success(40),
];
list($errors, $results) = (yield 'some' => $promises);
$this->assertSame(['r2' => $exception], $errors);
$this->assertSame(['r1' => 42, 'r3' => 40], $results);
});
}
/**
* @expectedException \RuntimeException
* @expectedExceptionMessage All promises failed
*/
public function testSomeThrowsIfNoPromisesResolveSuccessfully() {
(new NativeReactor)->run(function($reactor) {
$promises = [
'r1' => new Failure(new \RuntimeException),
'r2' => new Failure(new \RuntimeException),
];
list($errors, $results) = (yield 'some' => $promises);
});
}
public function testResolvedValueEqualsFinalYield() {
(new NativeReactor)->run(function($reactor) {
$gen = function() {
$a = (yield 21);
$b = (yield new Success(2));
yield ($a * $b);
};
$result = (yield $gen());
$this->assertSame(42, $result);
});
}
public function testFutureErrorsAreThrownIntoGenerator() {
(new NativeReactor)->run(function($reactor) {
$gen = function() {
$a = (yield 21);
$b = 1;
try {
yield new Failure(new \Exception('test'));
$this->fail('Code path should not be reached');
} catch (\Exception $e) {
$this->assertSame('test', $e->getMessage());
$b = 2;
}
yield ($a * $b);
};
$result = (yield $gen());
$this->assertSame(42, $result);
});
}
/**
* @expectedException \Exception
* @expectedExceptionMessage When in the chronicle of wasted time
*/
public function testUncaughtGeneratorExceptionFailsResolverPromise() {
(new NativeReactor)->run(function($reactor) {
$gen = function() {
yield;
throw new \Exception('When in the chronicle of wasted time');
yield;
};
yield $gen();
});
}
public function testImplicitAllCombinatorResolution() {
(new NativeReactor)->run(function($reactor) {
$gen = function() {
list($a, $b) = (yield [
new Success(21),
new Success(2),
]);
yield ($a * $b);
};
$result = (yield $gen());
$this->assertSame(42, $result);
});
}
public function testImplicitAllCombinatorResolutionWithNonPromises() {
(new NativeReactor)->run(function($reactor) {
$gen = function() {
list($a, $b, $c) = (yield [new Success(21), new Success(2), 10]);
yield ($a * $b * $c);
};
$result = (yield $gen());
$this->assertSame(420, $result);
});
}
/**
* @expectedException \Exception
* @expectedExceptionMessage When in the chronicle of wasted time
*/
public function testImplicitAllCombinatorResolutionThrowsIfAnyOnePromiseFails() {
$gen = function() {
list($a, $b) = (yield [
new Success(21),
new Failure(new \Exception('When in the chronicle of wasted time')),
]);
};
$reactor = new NativeReactor;
$reactor->run(function($reactor) use ($gen) {
yield $gen();
});
}
public function testImplicitCombinatorResolvesGeneratorInArray() {
(new NativeReactor)->run(function($reactor) {
$gen1 = function() {
yield 21;
};
$gen2 = function() use ($gen1) {
list($a, $b) = (yield [
$gen1(),
new Success(2)
]);
yield ($a * $b);
};
$result = (yield $gen2());
$this->assertSame(42, $result);
});
}
public function testExplicitAllCombinatorResolution() {
(new NativeReactor)->run(function($reactor) {
$gen = function() {
list($a, $b, $c) = (yield 'all' => [
new Success(21),
new Success(2),
10
]);
yield ($a * $b * $c);
};
$result = (yield $gen());
$this->assertSame(420, $result);
});
}
public function testExplicitAnyCombinatorResolution() {
(new NativeReactor)->run(function($reactor) {
$gen = function() {
yield 'any' => [
'a' => new Success(21),
'b' => new Failure(new \Exception('test')),
];
};
list($errors, $results) = (yield $gen());
$this->assertSame('test', $errors['b']->getMessage());
$this->assertSame(21, $results['a']);
});
}
/**
* @expectedException \RuntimeException
* @expectedExceptionMessage All promises failed
*/
public function testExplicitSomeCombinatorResolutionFailsOnError() {
(new NativeReactor)->run(function($reactor) {
$gen = function() {
yield 'some' => [
'r1' => new Failure(new \RuntimeException),
'r2' => new Failure(new \RuntimeException),
];
};
yield $gen();
});
}
/**
* @expectedException \DomainException
* @expectedExceptionMessage "some" yield command expects array; string yielded
*/
public function testExplicitCombinatorResolutionFailsIfNonArrayYielded() {
(new NativeReactor)->run(function($reactor) {
$gen = function() {
yield 'some' => 'test';
};
yield $gen();
});
}
public function testExplicitImmediatelyYieldResolution() {
(new NativeReactor)->run(function($reactor) {
$gen = function() {
$var = null;
yield 'immediately' => function() use (&$var) { $var = 42; };
yield 'pause' => 100; // pause for 100ms so the immediately callback executes
yield $var;
};
$result = (yield $gen());
$this->assertSame(42, $result);
});
}
public function testExplicitOnceYieldResolution() {
(new NativeReactor)->run(function($reactor) {
$gen = function() {
$var = null;
yield 'once' => [function() use (&$var) { $var = 42; }, $msDelay = 1];
yield 'pause' => 100; // pause for 100ms so the once callback executes
yield $var;
};
$result = (yield $gen());
$this->assertSame(42, $result);
});
}
public function testExplicitRepeatYieldResolution() {
(new NativeReactor)->run(function($reactor) {
$var = null;
$repeatFunc = function($reactor, $watcherId) use (&$var) {
$var = 1;
yield 'cancel' => $watcherId;
$var++;
};
$gen = function() use (&$var, $repeatFunc) {
yield 'repeat' => [$repeatFunc, $msDelay = 1];
yield 'pause' => 100; // pause for 100ms so we can be sure the repeat callback executes
yield $var;
};
$result = (yield $gen());
$this->assertSame(2, $result);
});
}
public function testCallableBindYield() {
(new NativeReactor)->run(function($reactor) {
// Register a repeating callback so the reactor run loop doesn't break
// without our intervention.
$repeatWatcherId = (yield 'repeat' => [function(){}, 1000]);
$func = function() use ($repeatWatcherId) {
yield "cancel" => $repeatWatcherId;
};
$boundFunc = (yield "bind" => $func);
// Because this Generator function is bound to the reactor it should be
// automatically resolved and our repeating watcher should be cancelled
// allowing the reactor to stop running.
$result = $boundFunc();
$this->assertInstanceOf('Amp\\Promise', $result);
});
}
}

View File

@ -2,6 +2,9 @@
namespace Amp\Test;
use Amp\Success;
use Amp\Failure;
abstract class ReactorTest extends \PHPUnit_Framework_TestCase {
abstract protected function getReactor();
@ -272,14 +275,12 @@ abstract class ReactorTest extends \PHPUnit_Framework_TestCase {
}
public function testOnStartGeneratorResolvesAutomatically() {
$reactor = $this->getReactor();
$test = '';
$gen = function($reactor) use (&$test) {
yield;
$this->getReactor()->run(function($reactor) use (&$test) {
yield "pause" => 1;
$test = "Thus Spake Zarathustra";
$reactor->once(function() use ($reactor) { $reactor->stop(); }, 50);
};
$reactor->run($gen);
});
$this->assertSame("Thus Spake Zarathustra", $test);
}
@ -287,7 +288,7 @@ abstract class ReactorTest extends \PHPUnit_Framework_TestCase {
$reactor = $this->getReactor();
$test = '';
$gen = function($reactor) use (&$test) {
yield;
yield "pause" => 1;
$test = "The abyss will gaze back into you";
$reactor->once(function() use ($reactor) { $reactor->stop(); }, 50);
};
@ -300,7 +301,7 @@ abstract class ReactorTest extends \PHPUnit_Framework_TestCase {
$reactor = $this->getReactor();
$test = '';
$gen = function($reactor) use (&$test) {
yield;
yield "pause" => 1;
$test = "There are no facts, only interpretations.";
$reactor->once(function() use ($reactor) { $reactor->stop(); }, 50);
};
@ -314,7 +315,7 @@ abstract class ReactorTest extends \PHPUnit_Framework_TestCase {
$test = '';
$gen = function($reactor, $watcherId) use (&$test) {
$reactor->cancel($watcherId);
yield;
yield "pause" => 1;
$test = "Art is the supreme task";
$reactor->stop();
};
@ -376,4 +377,328 @@ abstract class ReactorTest extends \PHPUnit_Framework_TestCase {
$this->assertSame(3, $var1);
$this->assertSame(4, $var2);
}
public function testAllResolvesWithArrayOfResults() {
$this->getReactor()->run(function($reactor) {
$expected = ['r1' => 42, 'r2' => 41];
$actual = (yield 'all' => [
'r1' => 42,
'r2' => new Success(41),
]);
$this->assertSame($expected, $actual);
});
}
/**
* @expectedException \RuntimeException
* @expectedExceptionMessage zanzibar
*/
public function testAllThrowsIfAnyIndividualPromiseFails() {
$this->getReactor()->run(function($reactor) {
$exception = new \RuntimeException('zanzibar');
$promises = [
'r1' => new Success(42),
'r2' => new Failure($exception),
'r3' => new Success(40),
];
$results = (yield 'all' => $promises);
});
}
public function testSomeReturnsArrayOfErrorsAndResults() {
$this->getReactor()->run(function($reactor) {
$exception = new \RuntimeException('zanzibar');
$promises = [
'r1' => new Success(42),
'r2' => new Failure($exception),
'r3' => new Success(40),
];
list($errors, $results) = (yield 'some' => $promises);
$this->assertSame(['r2' => $exception], $errors);
$this->assertSame(['r1' => 42, 'r3' => 40], $results);
});
}
/**
* @expectedException \RuntimeException
* @expectedExceptionMessage All promises failed
*/
public function testSomeThrowsIfNoPromisesResolveSuccessfully() {
$this->getReactor()->run(function($reactor) {
$promises = [
'r1' => new Failure(new \RuntimeException),
'r2' => new Failure(new \RuntimeException),
];
list($errors, $results) = (yield 'some' => $promises);
});
}
public function testResolvedValueEqualsReturnKeyYield() {
$this->getReactor()->run(function($reactor) {
$gen = function() {
$a = (yield new Success(21));
$b = (yield new Success(2));
yield 'return' => ($a * $b);
};
$result = (yield 'coroutine' => $gen());
$this->assertSame(42, $result);
});
}
public function testResolutionFailuresAreThrownIntoGenerator() {
$this->getReactor()->run(function($reactor) {
$gen = function() {
$a = (yield new Success(21));
$b = 1;
try {
yield new Failure(new \Exception('test'));
$this->fail('Code path should not be reached');
} catch (\Exception $e) {
$this->assertSame('test', $e->getMessage());
$b = 2;
}
yield 'return' => ($a * $b);
};
$result = (yield 'coroutine' => $gen());
$this->assertSame(42, $result);
});
}
/**
* @expectedException \Exception
* @expectedExceptionMessage When in the chronicle of wasted time
*/
public function testUncaughtGeneratorExceptionFailsResolverPromise() {
$this->getReactor()->run(function($reactor) {
$gen = function() {
yield "pause" => 1;
throw new \Exception('When in the chronicle of wasted time');
yield "pause" => 1;
};
yield 'coroutine' => $gen();
});
}
public function testAllCombinatorResolution() {
$this->getReactor()->run(function($reactor) {
$gen = function() {
list($a, $b) = (yield 'all' => [
new Success(21),
new Success(2),
]);
yield 'return' => ($a * $b);
};
$result = (yield 'coroutine' => $gen());
$this->assertSame(42, $result);
});
}
public function testAllCombinatorResolutionWithNonPromises() {
$this->getReactor()->run(function($reactor) {
$gen = function() {
list($a, $b, $c) = (yield 'all' => [new Success(21), new Success(2), 10]);
yield 'return' => ($a * $b * $c);
};
$result = (yield 'coroutine' => $gen());
$this->assertSame(420, $result);
});
}
/**
* @expectedException \Exception
* @expectedExceptionMessage When in the chronicle of wasted time
*/
public function testAllCombinatorResolutionThrowsIfAnyOnePromiseFails() {
$gen = function() {
list($a, $b) = (yield 'all' => [
new Success(21),
new Failure(new \Exception('When in the chronicle of wasted time')),
]);
};
$this->getReactor()->run(function($reactor) use ($gen) {
yield 'coroutine' => $gen();
});
}
public function testCombinatorResolvesGeneratorInArray() {
$this->getReactor()->run(function($reactor) {
$gen1 = function() {
yield 'return' => 21;
};
$gen2 = function() use ($gen1) {
list($a, $b) = (yield 'all' => [
\Amp\coroutine($gen1(), $reactor),
new Success(2)
]);
yield 'return' => ($a * $b);
};
$result = (yield 'coroutine' => $gen2());
$this->assertSame(42, $result);
});
}
public function testExplicitAllCombinatorResolution() {
$this->getReactor()->run(function($reactor) {
$gen = function() {
list($a, $b, $c) = (yield 'all' => [
new Success(21),
new Success(2),
10
]);
yield 'return' => ($a * $b * $c);
};
$result = (yield 'coroutine' => $gen());
$this->assertSame(420, $result);
});
}
public function testExplicitAnyCombinatorResolution() {
$this->getReactor()->run(function($reactor) {
$gen = function() {
$any = (yield 'any' => [
'a' => new Success(21),
'b' => new Failure(new \Exception('test')),
]);
yield 'return' => $any;
};
list($errors, $results) = (yield 'coroutine' => $gen());
$this->assertSame('test', $errors['b']->getMessage());
$this->assertSame(21, $results['a']);
});
}
/**
* @expectedException \RuntimeException
* @expectedExceptionMessage All promises failed
*/
public function testExplicitSomeCombinatorResolutionFailsOnError() {
$this->getReactor()->run(function($reactor) {
$gen = function() {
yield 'some' => [
'r1' => new Failure(new \RuntimeException),
'r2' => new Failure(new \RuntimeException),
];
};
yield 'coroutine' => $gen();
});
}
/**
* @expectedException \DomainException
* @expectedExceptionMessage "some" yield command expects array; string yielded
*/
public function testExplicitCombinatorResolutionFailsIfNonArrayYielded() {
$this->getReactor()->run(function($reactor) {
$gen = function() {
yield 'some' => 'test';
};
yield 'coroutine' => $gen();
});
}
public function testCallableBindYield() {
$this->getReactor()->run(function($reactor) {
// Register a repeating callback so the reactor run loop doesn't break
// without our intervention.
$repeatWatcherId = (yield 'repeat' => [function(){}, 1000]);
$func = function() use ($repeatWatcherId) {
yield "cancel" => $repeatWatcherId;
};
$boundFunc = (yield "bind" => $func);
// Because this Generator function is bound to the reactor it should be
// automatically resolved and our repeating watcher should be cancelled
// allowing the reactor to stop running.
$result = $boundFunc();
$this->assertInstanceOf('Amp\\Promise', $result);
});
}
public function testExplicitImmediatelyYieldResolution() {
$this->getReactor()->run(function($reactor) {
$gen = function() {
$var = null;
yield 'immediately' => function() use (&$var) { $var = 42; };
yield 'pause' => 100; // pause for 100ms so the immediately callback executes
yield 'return' => $var;
};
$result = (yield 'coroutine' => $gen());
$this->assertSame(42, $result);
});
}
public function testExplicitOnceYieldResolution() {
$this->getReactor()->run(function($reactor) {
$gen = function() {
$var = null;
yield 'once' => [function() use (&$var) { $var = 42; }, $msDelay = 1];
yield 'pause' => 100; // pause for 100ms so the once callback executes
yield 'return' => $var;
};
$result = (yield 'coroutine' => $gen());
$this->assertSame(42, $result);
});
}
public function testExplicitRepeatYieldResolution() {
$this->getReactor()->run(function($reactor) {
$var = null;
$repeatFunc = function($reactor, $watcherId) use (&$var) {
$var = 1;
yield 'cancel' => $watcherId;
$var++;
};
$gen = function() use (&$var, $repeatFunc) {
yield 'repeat' => [$repeatFunc, $msDelay = 1];
yield 'pause' => 100; // pause for 100ms so we can be sure the repeat callback executes
yield 'return' => $var;
};
$result = (yield 'coroutine' => $gen());
$this->assertSame(2, $result);
});
}
}