1
0
mirror of https://github.com/danog/amp.git synced 2024-11-30 04:29:08 +01:00

Refactor coroutine functionality

This commit is contained in:
Daniel Lowrey 2015-03-16 15:00:10 -04:00
parent 446e09a95d
commit 834255163c
8 changed files with 128 additions and 694 deletions

View File

@ -1,312 +0,0 @@
<?php
namespace Amp;
abstract class CoroutineResolver implements Reactor {
/**
* Resolve the specified generator
*
* Upon resolution the final yielded value is used to succeed the returned promise. If an
* error occurs the returned promise is failed appropriately.
*
* @param \Generator $generator
* @return Promise
*/
public function coroutine(\Generator $gen) {
$promisor = new Future;
$this->advanceGenerator($gen, $promisor, null);
return $promisor;
}
private function advanceGenerator(\Generator $gen, Promisor $promisor, $return) {
try {
if (!$gen->valid()) {
$promisor->succeed($return);
return;
}
list($promise, $noWait, $return) = $this->promisifyGeneratorYield($gen, $return);
$this->immediately(function() use ($gen, $promisor, $return, $promise, $noWait) {
if ($noWait) {
$this->sendToGenerator($gen, $promisor, $return);
} else {
$promise->when(function($error, $result) use ($gen, $promisor, $return) {
$this->sendToGenerator($gen, $promisor, $return, $error, $result);
});
}
});
} catch (\Exception $uncaught) {
$promisor->fail($uncaught);
}
}
private function sendToGenerator(\Generator $gen, Promisor $promisor, $return = null, \Exception $error = null, $result = null) {
try {
if ($error) {
$gen->throw($error);
} else {
$gen->send($result);
}
$this->advanceGenerator($gen, $promisor, $return);
} catch (\Exception $uncaught) {
$promisor->fail($uncaught);
}
}
private function promisifyGeneratorYield(\Generator $gen, $return) {
$noWait = false;
$promise = null;
$key = $gen->key();
$yielded = $gen->current();
if (is_string($key)) {
goto explicit_key;
}
// Fall through to implicit_key if no string key was yielded
implicit_key: {
if (!isset($yielded)) {
$promise = new Success;
} elseif (!is_object($yielded)) {
$promise = new Failure(new \LogicException(
sprintf(
"Unresolvable implicit yield of type %s; key required",
is_object($yielded) ? get_class($yielded) : gettype($yielded)
)
));
} elseif ($yielded instanceof Promise) {
$promise = $yielded;
} elseif ($yielded instanceof \Generator) {
$promise = $this->coroutine($yielded);
} else {
$promise = new Failure(new \LogicException(
sprintf(
"Unresolvable implicit yield of type %s; key required",
is_object($yielded) ? get_class($yielded) : gettype($yielded)
)
));
}
goto return_struct;
}
explicit_key: {
$key = strtolower($key);
if ($key[0] === self::NOWAIT_PREFIX) {
$noWait = true;
$key = substr($key, 1);
}
switch ($key) {
case self::ASYNC:
goto async;
case self::COROUTINE:
goto coroutine;
case self::CORETURN:
goto coreturn;
case self::ALL:
// fallthrough
case self::ANY:
// fallthrough
case self::SOME:
goto combinator;
case self::PAUSE:
goto pause;
case self::BIND:
goto bind;
case self::IMMEDIATELY:
goto immediately;
case self::ONCE:
// fallthrough
case self::REPEAT:
goto schedule;
case self::ON_READABLE:
$ioWatchMethod = 'onReadable';
goto io_watcher;
case self::ON_WRITABLE:
$ioWatchMethod = 'onWritable';
goto io_watcher;
case self::ENABLE:
// fallthrough
case self::DISABLE:
// fallthrough
case self::CANCEL:
goto watcher_control;
case self::NOWAIT:
$noWait = true;
goto implicit_key;
default:
if ($noWait) {
$promise = new Failure(new \LogicException(
'Cannot use standalone @ "nowait" prefix'
));
goto return_struct;
} else {
goto unknown_key;
}
}
}
coreturn: {
$return = $yielded;
$promise = new Success;
goto return_struct;
}
unknown_key: {
$promise = new Failure(new \DomainException(
sprintf("Unknown yield key: %s", $key)
));
goto return_struct;
}
async: {
if (is_object($yielded) && $yielded instanceof Promise) {
$promise = $yielded;
} else {
$promise = new Failure(new \DomainException(
sprintf(
"%s yield command expects Promise; %s yielded",
$key,
gettype($yielded)
)
));
}
goto return_struct;
}
coroutine: {
if (is_object($yielded) && $yielded instanceof \Generator) {
$promise = $this->coroutine($yielded);
} else {
$promise = new Failure(new \DomainException(
sprintf(
"%s yield command expects Generator; %s yielded",
$key,
gettype($yielded)
)
));
}
goto return_struct;
}
combinator: {
if (!is_array($yielded)) {
$promise = new Failure(new \DomainException(
sprintf("%s yield command expects array; %s yielded", $key, gettype($yielded))
));
goto return_struct;
}
$promises = [];
foreach ($yielded as $index => $element) {
if ($element instanceof Promise) {
$promise = $element;
} elseif ($element instanceof \Generator) {
$promise = $this->coroutine($element);
} else {
$promise = new Success($element);
}
$promises[$index] = $promise;
}
$combinatorFunction = __NAMESPACE__ . "\\{$key}";
$promise = $combinatorFunction($promises);
goto return_struct;
}
immediately: {
if (is_callable($yielded)) {
$watcherId = $this->immediately($yielded);
$promise = new Success($watcherId);
} else {
$promise = new Failure(new \DomainException(
sprintf(
"%s yield command requires callable; %s provided",
$key,
gettype($yielded)
)
));
}
goto return_struct;
}
schedule: {
if (is_array($yielded) && isset($yielded[0], $yielded[1]) && is_callable($yielded[0])) {
list($func, $msDelay) = $yielded;
$watcherId = $this->{$key}($func, $msDelay);
$promise = new Success($watcherId);
} else {
$promise = new Failure(new \DomainException(
sprintf(
"%s yield command requires [callable \$func, int \$msDelay]; %s provided",
$key,
gettype($yielded)
)
));
}
goto return_struct;
}
io_watcher: {
if (is_array($yielded) && isset($yielded[0], $yielded[1]) && is_callable($yielded[1])) {
list($stream, $func, $enableNow) = $yielded;
$watcherId = $this->{$ioWatchMethod}($stream, $func, $enableNow);
$promise = new Success($watcherId);
} else {
$promise = new Failure(new \DomainException(
sprintf(
"%s yield command requires [resource \$stream, callable \$func, bool \$enableNow]; %s provided",
$key,
gettype($yielded)
)
));
}
goto return_struct;
}
pause: {
$promise = new Future;
$this->once(function() use ($promise) {
$promise->succeed();
}, (int) $yielded);
goto return_struct;
}
bind: {
if (is_callable($yielded)) {
$promise = new Success(function() use ($yielded) {
$result = call_user_func_array($yielded, func_get_args());
return $result instanceof \Generator
? $this->coroutine($result)
: $result;
});
} else {
$promise = new Failure(new \DomainException(
sprintf("bind yield command requires callable; %s provided", gettype($yielded))
));
}
goto return_struct;
}
watcher_control: {
$this->{$key}($yielded);
$promise = new Success;
goto return_struct;
}
return_struct: {
return [$promise, $noWait, $return];
}
}
}

10
lib/CoroutineStruct.php Normal file
View File

@ -0,0 +1,10 @@
<?php
namespace Amp;
final class CoroutineStruct extends Struct {
public $reactor;
public $promisor;
public $generator;
public $promisifier;
}

View File

@ -2,7 +2,7 @@
namespace Amp;
class LibeventReactor extends CoroutineResolver implements SignalReactor {
class LibeventReactor implements SignalReactor {
private $base;
private $watchers = [];
private $immediates = [];
@ -85,7 +85,7 @@ class LibeventReactor extends CoroutineResolver implements SignalReactor {
);
$result = $callback($this, $watcherId);
if ($result instanceof \Generator) {
$this->coroutine($result)->when($this->onCallbackResolution);
resolve($result, $this)->when($this->onCallbackResolution);
}
} catch (\Exception $e) {
$this->handleRunError($e);
@ -222,7 +222,7 @@ class LibeventReactor extends CoroutineResolver implements SignalReactor {
$watcherId = $watcher->id;
$result = $callback($this, $watcherId);
if ($result instanceof \Generator) {
$this->coroutine($result)->when($this->onCallbackResolution);
resolve($result, $this)->when($this->onCallbackResolution);
}
$this->cancel($watcherId);
} catch (\Exception $e) {
@ -287,7 +287,7 @@ class LibeventReactor extends CoroutineResolver implements SignalReactor {
try {
$result = $callback($this, $watcherId);
if ($result instanceof \Generator) {
$this->coroutine($result)->when($this->onCallbackResolution);
resolve($result, $this)->when($this->onCallbackResolution);
}
// If the watcher cancelled itself this will no longer be set
@ -361,7 +361,7 @@ class LibeventReactor extends CoroutineResolver implements SignalReactor {
try {
$result = $callback($this, $watcherId, $stream);
if ($result instanceof \Generator) {
$this->coroutine($result)->when($this->onCallbackResolution);
resolve($result, $this)->when($this->onCallbackResolution);
}
} catch (\Exception $e) {
$this->handleRunError($e);
@ -409,7 +409,7 @@ class LibeventReactor extends CoroutineResolver implements SignalReactor {
try {
$result = $callback($this, $watcherId, $signo);
if ($result instanceof \Generator) {
$this->coroutine($result)->when($this->onCallbackResolution);
resolve($result, $this)->when($this->onCallbackResolution);
}
} catch (\Exception $e) {
$this->handleRunError($e);

View File

@ -2,7 +2,7 @@
namespace Amp;
class NativeReactor extends CoroutineResolver implements Reactor {
class NativeReactor implements Reactor {
private $alarms = [];
private $immediates = [];
private $alarmOrder = [];
@ -121,7 +121,7 @@ class NativeReactor extends CoroutineResolver implements Reactor {
foreach ($immediates as $watcherId => $callback) {
$result = $callback($this, $watcherId);
if ($result instanceof \Generator) {
$this->coroutine($result)->when($this->onCallbackResolution);
resolve($result, $this)->when($this->onCallbackResolution);
}
}
}
@ -177,7 +177,7 @@ class NativeReactor extends CoroutineResolver implements Reactor {
foreach ($this->readCallbacks[$streamId] as $watcherId => $callback) {
$result = $callback($this, $watcherId, $readableStream);
if ($result instanceof \Generator) {
$this->coroutine($result)->when($this->onCallbackResolution);
resolve($result, $this)->when($this->onCallbackResolution);
}
}
}
@ -186,7 +186,7 @@ class NativeReactor extends CoroutineResolver implements Reactor {
foreach ($this->writeCallbacks[$streamId] as $watcherId => $callback) {
$result = $callback($this, $watcherId, $writableStream);
if ($result instanceof \Generator) {
$this->coroutine($result)->when($this->onCallbackResolution);
resolve($result, $this)->when($this->onCallbackResolution);
}
}
}
@ -217,7 +217,7 @@ class NativeReactor extends CoroutineResolver implements Reactor {
$result = $callback($this, $watcherId);
if ($result instanceof \Generator) {
$this->coroutine($result)->when($this->onCallbackResolution);
resolve($result, $this)->when($this->onCallbackResolution);
}
}
}

View File

@ -3,25 +3,6 @@
namespace Amp;
interface Reactor {
const ALL = 'all';
const ANY = 'any';
const SOME = 'some';
const PAUSE = 'pause';
const BIND = 'bind';
const IMMEDIATELY = 'immediately';
const ONCE = 'once';
const REPEAT = 'repeat';
const ON_READABLE = 'onreadable';
const ON_WRITABLE = 'onwritable';
const ENABLE = 'enable';
const DISABLE = 'disable';
const CANCEL = 'cancel';
const NOWAIT = 'nowait';
const NOWAIT_PREFIX = '@';
const ASYNC = 'async';
const COROUTINE = 'coroutine';
const CORETURN = 'return';
/**
* Start the event reactor and assume program flow control
*
@ -131,17 +112,6 @@ interface Reactor {
*/
public function enable($watcherId);
/**
* Resolve the specified generator
*
* Upon resolution the final yielded value is used to succeed the returned promise. If an
* error occurs the returned promise is failed appropriately.
*
* @param \Generator $generator
* @return Promise
*/
public function coroutine(\Generator $generator);
/**
* An optional "last-chance" exception handler for errors resulting during callback invocation
*

View File

@ -93,7 +93,7 @@ class UvReactor extends CoroutineResolver implements SignalReactor {
);
$result = $callback($this, $watcherId);
if ($result instanceof \Generator) {
$this->coroutine($result)->when($this->onCallbackResolution);
resolve($result, $this)->when($this->onCallbackResolution);
}
} catch (\Exception $e) {
$this->handleRunError($e);
@ -219,7 +219,7 @@ class UvReactor extends CoroutineResolver implements SignalReactor {
try {
$result = $callback($this, $watcher->id);
if ($result instanceof \Generator) {
$this->coroutine($result)->when($this->onCallbackResolution);
resolve($result, $this)->when($this->onCallbackResolution);
}
// The isset() check is necessary because the "once" timer
// callback may have cancelled itself when it was invoked.
@ -378,7 +378,7 @@ class UvReactor extends CoroutineResolver implements SignalReactor {
$callback = $watcher->callback;
$result = $callback($this, $watcher->id, $watcher->stream);
if ($result instanceof \Generator) {
$this->coroutine($result)->when($this->onCallbackResolution);
resolve($result, $this)->when($this->onCallbackResolution);
}
} catch (\Exception $e) {
$this->handleRunError($e);
@ -414,7 +414,7 @@ class UvReactor extends CoroutineResolver implements SignalReactor {
try {
$result = $callback($this, $watcher->id, $watcher->signo);
if ($result instanceof \Generator) {
$this->coroutine($result)->when($this->onCallbackResolution);
resolve($result, $this)->when($this->onCallbackResolution);
}
} catch (\Exception $e) {
$this->handleRunError($e);

View File

@ -193,21 +193,6 @@ function onWritable($stream, callable $func, $enableNow = true) {
return getReactor()->onWritable($stream, $func, $enableNow);
}
/**
* Resolve the specified generator
*
* Upon resolution the final yielded value is used to succeed the returned promise. If an
* error occurs the returned promise is failed appropriately.
*
* @param \Generator $generator
* @param Reactor $reactor optional reactor instance (uses global reactor if not specified)
* @return Promise
*/
function coroutine(\Generator $generator, $reactor = null) {
$reactor = $reactor ?: getReactor();
return $reactor->coroutine($generator);
}
/**
* React to process control signals
*
@ -439,7 +424,7 @@ function map(array $promises, callable $functor) {
$promise = ($resolvable instanceof Promise) ? $resolvable : new Success($resolvable);
$promise->when(function($error, $result) use (&$remaining, &$results, $key, $promisor, $functor) {
if (empty($remaining)) {
// If the future already failed we don't bother.
// If the promise already failed we don't bother.
return;
}
if ($error) {
@ -553,6 +538,99 @@ function wait(Promise $promise, Reactor $reactor = null) {
return $resolvedResult;
}
/**
* Return a function that will be resolved as a coroutine once invoked
*
* @param \Generator $generator
* @param \Amp\Reactor $reactor
* @param callable $promisifier
* @return callable
*/
function coroutine(callable $func, Reactor $reactor = null, callable $promisifier = null) {
return function(...$args) use ($func) {
$result = $func(...$args);
return ($result instanceof \Generator)
? resolve($result, $reactor, $promisifier)
: $result;
};
}
/**
* Resolve a Generator function as a coroutine
*
* Upon resolution the Generator return value is used to succeed the promised result. If an
* error occurs during coroutine resolution the promise fails.
*
* @param \Generator $generator
* @param \Amp\Reactor $reactor
* @param callable $promisifier
* @return \Amp\Promise
*/
function resolve(\Generator $generator, Reactor $reactor = null, callable $promisifier = null) {
$cs = new CoroutineStruct;
$cs->reactor = $reactor ?: getReactor();
$cs->promisor = new Future;
$cs->generator = $generator;
$cs->promisifier = $promisifier;
__coroutineAdvance($cs);
return $cs->promisor->promise();
}
function __coroutineAdvance(CoroutineStruct $cs) {
try {
if ($cs->generator->valid()) {
$promise = __coroutinePromisify($cs);
$cs->reactor->immediately(function() use ($cs, $promise) {
$promise->when(function($error, $result) use ($cs) {
__coroutineSend($cs, $error, $result);
});
});
} else {
$cs->promisor->succeed($cs->generator->getReturn());
}
} catch (\Exception $uncaught) {
$cs->promisor->fail($uncaught);
}
}
function __coroutineSend(CoroutineStruct $cs, \Exception $error = null, $result = null) {
try {
if ($error) {
$cs->generator->throw($error);
} else {
$cs->generator->send($result);
}
__coroutineAdvance($cs);
} catch (\Exception $uncaught) {
$cs->promisor->fail($uncaught);
}
}
function __coroutinePromisify(CoroutineStruct $cs) : Promise {
$yielded = $cs->generator->current();
if (!isset($yielded)) {
return new Success;
}
if (is_object($yielded) && $yielded instanceof Promise) {
return $yielded;
}
if ($cs->promisifier) {
return call_user_func($cs->promisifier, $cs->generator->key(), $yielded);
}
return new Failure(new \DomainException(
sprintf(
"Unexpected value of type %s yielded; Promise expected",
is_object($yielded) ? get_class($yielded) : gettype($yielded)
)
));
}
// === DEPRECATED FUNCTIONS ========================================================================
/**

View File

@ -4,6 +4,11 @@ namespace Amp\Test;
use Amp\Success;
use Amp\Failure;
use function Amp\resolve;
use function Amp\coroutine;
use function Amp\all;
use function Amp\any;
use function Amp\some;
abstract class ReactorTest extends \PHPUnit_Framework_TestCase {
abstract protected function getReactor();
@ -277,7 +282,7 @@ abstract class ReactorTest extends \PHPUnit_Framework_TestCase {
public function testOnStartGeneratorResolvesAutomatically() {
$test = '';
$this->getReactor()->run(function($reactor) use (&$test) {
yield "pause" => 1;
yield;
$test = "Thus Spake Zarathustra";
$reactor->once(function() use ($reactor) { $reactor->stop(); }, 50);
});
@ -288,7 +293,7 @@ abstract class ReactorTest extends \PHPUnit_Framework_TestCase {
$reactor = $this->getReactor();
$test = '';
$gen = function($reactor) use (&$test) {
yield "pause" => 1;
yield;
$test = "The abyss will gaze back into you";
$reactor->once(function() use ($reactor) { $reactor->stop(); }, 50);
};
@ -301,7 +306,7 @@ abstract class ReactorTest extends \PHPUnit_Framework_TestCase {
$reactor = $this->getReactor();
$test = '';
$gen = function($reactor) use (&$test) {
yield "pause" => 1;
yield;
$test = "There are no facts, only interpretations.";
$reactor->once(function() use ($reactor) { $reactor->stop(); }, 50);
};
@ -315,7 +320,7 @@ abstract class ReactorTest extends \PHPUnit_Framework_TestCase {
$test = '';
$gen = function($reactor, $watcherId) use (&$test) {
$reactor->cancel($watcherId);
yield "pause" => 1;
yield;
$test = "Art is the supreme task";
$reactor->stop();
};
@ -384,321 +389,4 @@ abstract class ReactorTest extends \PHPUnit_Framework_TestCase {
public function testAllResolvesWithArrayOfResults() {
$this->getReactor()->run(function($reactor) {
$expected = ['r1' => 42, 'r2' => 41];
$actual = (yield 'all' => [
'r1' => 42,
'r2' => new Success(41),
]);
$this->assertSame($expected, $actual);
});
}
/**
* @expectedException \RuntimeException
* @expectedExceptionMessage zanzibar
*/
public function testAllThrowsIfAnyIndividualPromiseFails() {
$this->getReactor()->run(function($reactor) {
$exception = new \RuntimeException('zanzibar');
$promises = [
'r1' => new Success(42),
'r2' => new Failure($exception),
'r3' => new Success(40),
];
$results = (yield 'all' => $promises);
});
}
public function testSomeReturnsArrayOfErrorsAndResults() {
$this->getReactor()->run(function($reactor) {
$exception = new \RuntimeException('zanzibar');
$promises = [
'r1' => new Success(42),
'r2' => new Failure($exception),
'r3' => new Success(40),
];
list($errors, $results) = (yield 'some' => $promises);
$this->assertSame(['r2' => $exception], $errors);
$this->assertSame(['r1' => 42, 'r3' => 40], $results);
});
}
/**
* @expectedException \RuntimeException
* @expectedExceptionMessage All promises failed
*/
public function testSomeThrowsIfNoPromisesResolveSuccessfully() {
$this->getReactor()->run(function($reactor) {
$promises = [
'r1' => new Failure(new \RuntimeException),
'r2' => new Failure(new \RuntimeException),
];
list($errors, $results) = (yield 'some' => $promises);
});
}
public function testResolvedValueEqualsReturnKeyYield() {
$this->getReactor()->run(function($reactor) {
$gen = function() {
$a = (yield new Success(21));
$b = (yield new Success(2));
yield 'return' => ($a * $b);
};
$result = (yield 'coroutine' => $gen());
$this->assertSame(42, $result);
});
}
public function testResolutionFailuresAreThrownIntoGenerator() {
$this->getReactor()->run(function($reactor) {
$gen = function() {
$a = (yield new Success(21));
$b = 1;
try {
yield new Failure(new \Exception('test'));
$this->fail('Code path should not be reached');
} catch (\Exception $e) {
$this->assertSame('test', $e->getMessage());
$b = 2;
}
yield 'return' => ($a * $b);
};
$result = (yield 'coroutine' => $gen());
$this->assertSame(42, $result);
});
}
/**
* @expectedException \Exception
* @expectedExceptionMessage When in the chronicle of wasted time
*/
public function testUncaughtGeneratorExceptionFailsResolverPromise() {
$this->getReactor()->run(function($reactor) {
$gen = function() {
yield "pause" => 1;
throw new \Exception('When in the chronicle of wasted time');
yield "pause" => 1;
};
yield 'coroutine' => $gen();
});
}
public function testAllCombinatorResolution() {
$this->getReactor()->run(function($reactor) {
$gen = function() {
list($a, $b) = (yield 'all' => [
new Success(21),
new Success(2),
]);
yield 'return' => ($a * $b);
};
$result = (yield 'coroutine' => $gen());
$this->assertSame(42, $result);
});
}
public function testAllCombinatorResolutionWithNonPromises() {
$this->getReactor()->run(function($reactor) {
$gen = function() {
list($a, $b, $c) = (yield 'all' => [new Success(21), new Success(2), 10]);
yield 'return' => ($a * $b * $c);
};
$result = (yield 'coroutine' => $gen());
$this->assertSame(420, $result);
});
}
/**
* @expectedException \Exception
* @expectedExceptionMessage When in the chronicle of wasted time
*/
public function testAllCombinatorResolutionThrowsIfAnyOnePromiseFails() {
$gen = function() {
list($a, $b) = (yield 'all' => [
new Success(21),
new Failure(new \Exception('When in the chronicle of wasted time')),
]);
};
$this->getReactor()->run(function($reactor) use ($gen) {
yield 'coroutine' => $gen();
});
}
public function testCombinatorResolvesGeneratorInArray() {
$this->getReactor()->run(function($reactor) {
$gen1 = function() {
yield 'return' => 21;
};
$gen2 = function() use ($gen1) {
list($a, $b) = (yield 'all' => [
\Amp\coroutine($gen1(), $reactor),
new Success(2)
]);
yield 'return' => ($a * $b);
};
$result = (yield 'coroutine' => $gen2());
$this->assertSame(42, $result);
});
}
public function testExplicitAllCombinatorResolution() {
$this->getReactor()->run(function($reactor) {
$gen = function() {
list($a, $b, $c) = (yield 'all' => [
new Success(21),
new Success(2),
10
]);
yield 'return' => ($a * $b * $c);
};
$result = (yield 'coroutine' => $gen());
$this->assertSame(420, $result);
});
}
public function testExplicitAnyCombinatorResolution() {
$this->getReactor()->run(function($reactor) {
$gen = function() {
$any = (yield 'any' => [
'a' => new Success(21),
'b' => new Failure(new \Exception('test')),
]);
yield 'return' => $any;
};
list($errors, $results) = (yield 'coroutine' => $gen());
$this->assertSame('test', $errors['b']->getMessage());
$this->assertSame(21, $results['a']);
});
}
/**
* @expectedException \RuntimeException
* @expectedExceptionMessage All promises failed
*/
public function testExplicitSomeCombinatorResolutionFailsOnError() {
$this->getReactor()->run(function($reactor) {
$gen = function() {
yield 'some' => [
'r1' => new Failure(new \RuntimeException),
'r2' => new Failure(new \RuntimeException),
];
};
yield 'coroutine' => $gen();
});
}
/**
* @expectedException \DomainException
* @expectedExceptionMessage some yield command expects array; string yielded
*/
public function testExplicitCombinatorResolutionFailsIfNonArrayYielded() {
$this->getReactor()->run(function($reactor) {
$gen = function() {
yield 'some' => 'test';
};
yield 'coroutine' => $gen();
});
}
public function testCallableBindYield() {
$this->getReactor()->run(function($reactor) {
// Register a repeating callback so the reactor run loop doesn't break
// without our intervention.
$repeatWatcherId = (yield 'repeat' => [function(){}, 1000]);
$func = function() use ($repeatWatcherId) {
yield "cancel" => $repeatWatcherId;
};
$boundFunc = (yield "bind" => $func);
// Because this Generator function is bound to the reactor it should be
// automatically resolved and our repeating watcher should be cancelled
// allowing the reactor to stop running.
$result = $boundFunc();
$this->assertInstanceOf('Amp\\Promise', $result);
});
}
public function testExplicitImmediatelyYieldResolution() {
$this->getReactor()->run(function($reactor) {
$gen = function() {
$var = null;
yield 'immediately' => function() use (&$var) { $var = 42; };
yield 'pause' => 100; // pause for 100ms so the immediately callback executes
yield 'return' => $var;
};
$result = (yield 'coroutine' => $gen());
$this->assertSame(42, $result);
});
}
public function testExplicitOnceYieldResolution() {
$this->getReactor()->run(function($reactor) {
$gen = function() {
$var = null;
yield 'once' => [function() use (&$var) { $var = 42; }, $msDelay = 1];
yield 'pause' => 100; // pause for 100ms so the once callback executes
yield 'return' => $var;
};
$result = (yield 'coroutine' => $gen());
$this->assertSame(42, $result);
});
}
public function testExplicitRepeatYieldResolution() {
$this->getReactor()->run(function($reactor) {
$var = null;
$repeatFunc = function($reactor, $watcherId) use (&$var) {
$var = 1;
yield 'cancel' => $watcherId;
$var++;
};
$gen = function() use (&$var, $repeatFunc) {
yield 'repeat' => [$repeatFunc, $msDelay = 1];
yield 'pause' => 100; // pause for 100ms so we can be sure the repeat callback executes
yield 'return' => $var;
};
$result = (yield 'coroutine' => $gen());
$this->assertSame(2, $result);
});
}
}