1
0
mirror of https://github.com/danog/amp.git synced 2025-01-22 05:11:42 +01:00

Rollback static coroutine methods to namespaced functions

This commit is contained in:
Daniel Lowrey 2015-07-23 01:30:53 -04:00
parent a4636851d6
commit 55e379e332
10 changed files with 314 additions and 331 deletions

View File

@ -1,173 +0,0 @@
<?php
namespace Amp;
class Coroutine {
/**
* Return a new function that will be resolved as a coroutine when invoked
*
* @param callable $func The callable to be wrapped for coroutine resolution
* @param \Amp\Reactor $reactor
* @return callable Returns a wrapped callable
* @TODO Use variadic function instead of func_get_args() once PHP5.5 is no longer supported
*/
public static function wrap(callable $func, Reactor $reactor = null) {
return function() use ($func, $reactor) {
$result = \call_user_func_array($func, \func_get_args());
return ($result instanceof \Generator)
? self::resolve($result, $reactor)
: $result;
};
}
/**
* Create a "return" value for a generator coroutine
*
* Prior to PHP7 Generators do not support return expressions. In order to work around
* this language limitation coroutine authors may yield the result of this function to
* indicate a coroutine's "return" value in a cross-version-compatible manner.
*
* Amp users who want their code to work in both PHP5 and PHP7 environments should yield
* this function's return value to indicate coroutine results.
*
* Example:
*
* // PHP 5 can't use generator return expressions
* function() {
* $foo = (yield someAsyncThing());
* yield Coroutine::result($foo + 42);
* };
*
* // PHP 7 doesn't require any extra work:
* function() {
* $foo = (yield someAsyncThing());
* return $foo + 42;
* };
*
* @param mixed $result The coroutine "return" result
* @return \Amp\CoroutineResult
* @TODO This method is only necessary for PHP5; remove once PHP7 is required
*/
public static function result($result) {
return new CoroutineResult($result);
}
/**
* Resolve a Generator function as a coroutine
*
* Upon resolution the Generator return value is used to succeed the promised result. If an
* error occurs during coroutine resolution the promise fails.
*
* @param \Generator $generator The generator to resolve as a coroutine
* @param \Amp\Reactor $reactor
*/
public static function resolve(\Generator $generator, Reactor $reactor = null) {
$cs = new CoroutineState;
$cs->reactor = $reactor ?: reactor();
$cs->promisor = new Deferred;
$cs->generator = $generator;
$cs->returnValue = null;
$cs->currentPromise = null;
$cs->nestingLevel = 0;
self::__advance($cs);
return $cs->promisor->promise();
}
private static function __advance(CoroutineState $cs) {
try {
$yielded = $cs->generator->current();
if (!isset($yielded)) {
if ($cs->generator->valid()) {
$cs->reactor->immediately("Amp\Coroutine::__nextTick", ["cb_data" => $cs]);
} elseif (isset($cs->returnValue)) {
$cs->promisor->succeed($cs->returnValue);
} else {
$result = (PHP_MAJOR_VERSION >= 7) ? $cs->generator->getReturn() : null;
$cs->promisor->succeed($result);
}
} elseif ($yielded instanceof Promise) {
if ($cs->nestingLevel < 3) {
$cs->nestingLevel++;
$yielded->when("Amp\Coroutine::__send", $cs);
$cs->nestingLevel--;
} else {
$cs->currentPromise = $yielded;
$cs->reactor->immediately("Amp\Coroutine::__nextTick", ["cb_data" => $cs]);
}
} elseif ($yielded instanceof CoroutineResult) {
/**
* @TODO This block is necessary for PHP5; remove once PHP7 is required and
* we have return expressions in generators
*/
$cs->returnValue = $yielded->getReturn();
self::__send(null, null, $cs);
} else {
/**
* @TODO Remove CoroutineResult from error message once PHP7 is required
*/
$error = new \DomainException(makeGeneratorError($cs->generator, \sprintf(
"Unexpected yield (Promise|CoroutineResult|null expected); %s yielded at key %s",
\is_object($yielded) ? \get_class($yielded) : \gettype($yielded),
$cs->generator->key()
)));
$cs->reactor->immediately(function() use ($cs, $error) {
$cs->promisor->fail($error);
});
}
} catch (\Throwable $uncaught) {
/**
* @codeCoverageIgnoreStart
* @TODO Remove these coverage ignore lines once PHP7 is required
*/
$cs->reactor->immediately(function() use ($cs, $uncaught) {
$cs->promisor->fail($uncaught);
});
/**
* @codeCoverageIgnoreEnd
*/
} catch (\Exception $uncaught) {
/**
* @TODO This extra catch block is necessary for PHP5; remove once PHP7 is required
*/
$cs->reactor->immediately(function() use ($cs, $uncaught) {
$cs->promisor->fail($uncaught);
});
}
}
/**
* This method is only public for performance reasons. It must not be considered
* part of the public API and library users should never invoke it directly.
*/
public static function __nextTick(Reactor $reactor, $watcherId, CoroutineState $cs) {
if ($cs->currentPromise) {
$promise = $cs->currentPromise;
$cs->currentPromise = null;
$promise->when("Amp\Coroutine::__send", $cs);
} else {
self::__send(null, null, $cs);
}
}
/**
* This method is only public for performance reasons. It must not be considered
* part of the public API and library users should never invoke it directly.
*/
public static function __send($error, $result, CoroutineState $cs) {
try {
if ($error) {
$cs->generator->throw($error);
} else {
$cs->generator->send($result);
}
self::__advance($cs);
} catch (\Exception $uncaught) {
$cs->reactor->immediately(function() use ($cs, $uncaught) {
$cs->promisor->fail($uncaught);
});
}
}
}

View File

@ -3,6 +3,29 @@
namespace Amp;
/**
* Create a "return" value for a generator coroutine
*
* Prior to PHP7 Generators do not support return expressions. In order to work around
* this language limitation coroutine authors may yield the result of this function to
* indicate a coroutine's "return" value in a cross-version-compatible manner.
*
* Amp users who want their code to work in both PHP5 and PHP7 environments should yield
* this object to indicate coroutine results.
*
* Example:
*
* // PHP 5 can't use generator return expressions
* function() {
* $foo = (yield someAsyncThing());
* yield new Amp\CoroutineResult($foo + 42);
* };
*
* // PHP 7 doesn't require any extra work:
* function() {
* $foo = yield someAsyncThing();
* return $foo + 42;
* };
*
* @TODO This class is only necessary for PHP5; remove once PHP7 is required
*/
class CoroutineResult {

View File

@ -3,7 +3,7 @@
namespace Amp;
/**
* @TODO This class is only necessary for PHP5; remove in favor of an anon class once PHP7 is required
* @TODO This class is only necessary for PHP5; use an anonymous class once PHP7 is required
*/
class CoroutineState {
use Struct;

View File

@ -119,7 +119,7 @@ class EvReactor implements ExtensionReactor {
unset($this->enabledImmediates[$watcherId]);
$out = \call_user_func($callback, $this, $watcherId, $cbData);
if ($out instanceof \Generator) {
Coroutine::resolve($out, $this)->when($this->onCoroutineResolution);
resolve($out, $this)->when($this->onCoroutineResolution);
}
} catch (\Throwable $e) {
// @TODO Remove coverage ignore block once PHP5 support is no longer required
@ -272,7 +272,7 @@ class EvReactor implements ExtensionReactor {
$out = \call_user_func($callback, $this, $watcherId, $evHandle->data);
}
if ($out instanceof \Generator) {
Coroutine::resolve($out, $this)->when($this->onCoroutineResolution);
resolve($out, $this)->when($this->onCoroutineResolution);
}
} catch (\Throwable $e) {
// @TODO Remove coverage ignore block once PHP5 support is no longer required

View File

@ -98,7 +98,7 @@ class LibeventReactor implements ExtensionReactor {
);
$result = \call_user_func($watcher->callback, $this, $watcherId, $watcher->callbackData);
if ($result instanceof \Generator) {
Coroutine::resolve($result, $this)->when($this->onCoroutineResolution);
resolve($result, $this)->when($this->onCoroutineResolution);
}
} catch (\Throwable $e) {
// @TODO Remove coverage ignore block once PHP5 support is no longer required
@ -220,7 +220,7 @@ class LibeventReactor implements ExtensionReactor {
break;
}
if ($result instanceof \Generator) {
Coroutine::resolve($result, $this)->when($this->onCoroutineResolution);
resolve($result, $this)->when($this->onCoroutineResolution);
}
} catch (\Throwable $e) {
// @TODO Remove coverage ignore block once PHP5 support is no longer required

View File

@ -124,7 +124,7 @@ class NativeReactor implements Reactor {
);
$result = call_user_func($watcher->callback, $this, $watcherId, $watcher->callbackData);
if ($result instanceof \Generator) {
Coroutine::resolve($result, $this)->when($this->onCoroutineResolution);
resolve($result, $this)->when($this->onCoroutineResolution);
}
}
}
@ -186,7 +186,7 @@ class NativeReactor implements Reactor {
foreach ($this->readWatchers[$streamId] as $watcherId => $watcher) {
$result = call_user_func($watcher->callback, $this, $watcherId, $readableStream, $watcher->callbackData);
if ($result instanceof \Generator) {
Coroutine::resolve($result, $this)->when($this->onCoroutineResolution);
resolve($result, $this)->when($this->onCoroutineResolution);
}
}
}
@ -195,7 +195,7 @@ class NativeReactor implements Reactor {
foreach ($this->writeWatchers[$streamId] as $watcherId => $watcher) {
$result = call_user_func($watcher->callback, $this, $watcherId, $writableStream, $watcher->callbackData);
if ($result instanceof \Generator) {
Coroutine::resolve($result, $this)->when($this->onCoroutineResolution);
resolve($result, $this)->when($this->onCoroutineResolution);
}
}
}
@ -222,7 +222,7 @@ class NativeReactor implements Reactor {
$result = call_user_func($watcher->callback, $this, $watcherId, $watcher->callbackData);
if ($result instanceof \Generator) {
Coroutine::resolve($result, $this)->when($this->onCoroutineResolution);
resolve($result, $this)->when($this->onCoroutineResolution);
}
if ($watcher->type === Watcher::TIMER_ONCE) {

View File

@ -104,7 +104,7 @@ class UvReactor implements ExtensionReactor {
);
$result = \call_user_func($watcher->callback, $this, $watcherId, $watcher->callbackData);
if ($result instanceof \Generator) {
Coroutine::resolve($result, $this)->when($this->onCoroutineResolution);
resolve($result, $this)->when($this->onCoroutineResolution);
}
} catch (\Throwable $e) {
// @TODO Remove coverage ignore block once PHP5 support is no longer required
@ -230,7 +230,7 @@ class UvReactor implements ExtensionReactor {
$watcherId = $watcher->id;
$result = \call_user_func($callback, $this, $watcherId, $watcher->callbackData);
if ($result instanceof \Generator) {
Coroutine::resolve($result, $this)->when($this->onCoroutineResolution);
resolve($result, $this)->when($this->onCoroutineResolution);
}
// The isset() check is necessary because the "once" timer
// callback may have cancelled itself when it was invoked.
@ -382,7 +382,7 @@ class UvReactor implements ExtensionReactor {
try {
$result = \call_user_func($watcher->callback, $this, $watcher->id, $watcher->stream, $watcher->callbackData);
if ($result instanceof \Generator) {
Coroutine::resolve($result, $this)->when($this->onCoroutineResolution);
resolve($result, $this)->when($this->onCoroutineResolution);
}
} catch (\Throwable $e) {
// @TODO Remove coverage ignore block once PHP5 support is no longer required
@ -423,7 +423,7 @@ class UvReactor implements ExtensionReactor {
try {
$result = \call_user_func($callback, $this, $watcher->id, $watcher->signo, $watcher->callbackData);
if ($result instanceof \Generator) {
Coroutine::resolve($result, $this)->when($this->onCoroutineResolution);
resolve($result, $this)->when($this->onCoroutineResolution);
}
} catch (\Throwable $e) {
// @TODO Remove coverage ignore block once PHP5 support is no longer required

View File

@ -564,6 +564,148 @@ function wait(Promise $promise, Reactor $reactor = null) {
return $resolvedResult;
}
/**
* Return a new function that will be resolved as a coroutine when invoked
*
* @param callable $func The callable to be wrapped for coroutine resolution
* @param \Amp\Reactor $reactor
* @return callable Returns a wrapped callable
* @TODO Use variadic function instead of func_get_args() once PHP5.5 is no longer supported
*/
function coroutine(callable $func, Reactor $reactor = null) {
return function() use ($func, $reactor) {
$out = \call_user_func_array($func, \func_get_args());
return ($out instanceof \Generator)
? resolve($out, $reactor)
: $out;
};
}
/**
* Resolve a Generator coroutine function
*
* Upon resolution the Generator return value is used to succeed the promised result. If an
* error occurs during coroutine resolution the returned promise fails.
*
* @param \Generator $generator The generator to resolve as a coroutine
* @param \Amp\Reactor $reactor
*/
function resolve(\Generator $generator, Reactor $reactor = null) {
$cs = new CoroutineState;
$cs->reactor = $reactor ?: reactor();
$cs->promisor = new Deferred;
$cs->generator = $generator;
$cs->returnValue = null;
$cs->currentPromise = null;
$cs->nestingLevel = 0;
__coroutineAdvance($cs);
return $cs->promisor->promise();
}
/**
* This function is used internally when resolving coroutines.
* It is not considered part of the public API and library users
* should not rely upon it in applications.
*/
function __coroutineAdvance(CoroutineState $cs) {
try {
$yielded = $cs->generator->current();
if (!isset($yielded)) {
if ($cs->generator->valid()) {
$cs->reactor->immediately("Amp\__coroutineNextTick", ["cb_data" => $cs]);
} elseif (isset($cs->returnValue)) {
$cs->promisor->succeed($cs->returnValue);
} else {
$result = (PHP_MAJOR_VERSION >= 7) ? $cs->generator->getReturn() : null;
$cs->promisor->succeed($result);
}
} elseif ($yielded instanceof Promise) {
if ($cs->nestingLevel < 3) {
$cs->nestingLevel++;
$yielded->when("Amp\__coroutineSend", $cs);
$cs->nestingLevel--;
} else {
$cs->currentPromise = $yielded;
$cs->reactor->immediately("Amp\__coroutineNextTick", ["cb_data" => $cs]);
}
} elseif ($yielded instanceof CoroutineResult) {
/**
* @TODO This block is necessary for PHP5; remove once PHP7 is required and
* we have return expressions in generators
*/
$cs->returnValue = $yielded->getReturn();
__coroutineSend(null, null, $cs);
} else {
/**
* @TODO Remove CoroutineResult from error message once PHP7 is required
*/
$error = new \DomainException(makeGeneratorError($cs->generator, \sprintf(
"Unexpected yield (Promise|CoroutineResult|null expected); %s yielded at key %s",
\is_object($yielded) ? \get_class($yielded) : \gettype($yielded),
$cs->generator->key()
)));
$cs->reactor->immediately(function() use ($cs, $error) {
$cs->promisor->fail($error);
});
}
} catch (\Throwable $uncaught) {
/**
* @codeCoverageIgnoreStart
* @TODO Remove these coverage ignore lines once PHP7 is required
*/
$cs->reactor->immediately(function() use ($cs, $uncaught) {
$cs->promisor->fail($uncaught);
});
/**
* @codeCoverageIgnoreEnd
*/
} catch (\Exception $uncaught) {
/**
* @TODO This extra catch block is necessary for PHP5; remove once PHP7 is required
*/
$cs->reactor->immediately(function() use ($cs, $uncaught) {
$cs->promisor->fail($uncaught);
});
}
}
/**
* This function is used internally when resolving coroutines.
* It is not considered part of the public API and library users
* should not rely upon it in applications.
*/
function __coroutineNextTick(Reactor $reactor, $watcherId, CoroutineState $cs) {
if ($cs->currentPromise) {
$promise = $cs->currentPromise;
$cs->currentPromise = null;
$promise->when("Amp\__coroutineSend", $cs);
} else {
__coroutineSend(null, null, $cs);
}
}
/**
* This function is used internally when resolving coroutines.
* It is not considered part of the public API and library users
* should not rely upon it in applications.
*/
function __coroutineSend($error, $result, CoroutineState $cs) {
try {
if ($error) {
$cs->generator->throw($error);
} else {
$cs->generator->send($result);
}
__coroutineAdvance($cs);
} catch (\Exception $uncaught) {
$cs->reactor->immediately(function() use ($cs, $uncaught) {
$cs->promisor->fail($uncaught);
});
}
}
/**
* A general purpose function for creating error messages from generator yields
*

View File

@ -1,144 +0,0 @@
<?php
namespace Amp\Test;
use Amp\NativeReactor;
use Amp\Coroutine;
use Amp\Success;
use Amp\Failure;
use Amp\Pause;
class CoroutineTest extends \PHPUnit_Framework_TestCase {
public function testWrap() {
$invoked = 0;
(new NativeReactor)->run(function($reactor) use (&$invoked) {
$co = function($reactor) use (&$invoked) {
yield new Success;
yield;
yield new Pause(25, $reactor);
$invoked++;
};
$wrapped = Coroutine::wrap($co, $reactor);
$wrapped($reactor);
});
$this->assertSame(1, $invoked);
}
public function testNestedResolutionContinuation() {
$invoked = 0;
(new NativeReactor)->run(function($reactor) use (&$invoked) {
$co = function() use (&$invoked) {
yield new Success;
yield new Success;
yield new Success;
yield new Success;
yield new Success;
yield Coroutine::result(42);
$invoked++;
};
$result = (yield Coroutine::resolve($co(), $reactor));
$this->assertSame(42, $result);
});
$this->assertSame(1, $invoked);
}
public function testCoroutineFauxReturnValue() {
$invoked = 0;
(new NativeReactor)->run(function($reactor) use (&$invoked) {
$co = function() use (&$invoked) {
yield;
yield Coroutine::result(42);
yield;
$invoked++;
};
$result = (yield Coroutine::resolve($co(), $reactor));
$this->assertSame(42, $result);
});
$this->assertSame(1, $invoked);
}
public function testResolutionFailuresAreThrownIntoGenerator() {
$invoked = 0;
(new NativeReactor)->run(function($reactor) use (&$invoked) {
$foo = function() {
$a = (yield new Success(21));
$b = 1;
try {
yield new Failure(new \Exception("test"));
$this->fail("Code path should not be reached");
} catch (\Exception $e) {
$this->assertSame("test", $e->getMessage());
$b = 2;
}
};
$result = (yield Coroutine::resolve($foo(), $reactor));
$invoked++;
});
$this->assertSame(1, $invoked);
}
/**
* @expectedException \Exception
* @expectedExceptionMessage a moveable feast
*/
public function testExceptionOnInitialAdvanceFailsResolution() {
(new NativeReactor)->run(function($reactor) use (&$invoked) {
$co = function() {
throw new \Exception("a moveable feast");
yield;
};
$result = (yield Coroutine::resolve($co(), $reactor));
});
}
/**
* @dataProvider provideInvalidYields
*/
public function testInvalidYieldFailsResolution($badYield) {
try {
(new NativeReactor)->run(function($reactor) use (&$invoked, $badYield) {
$gen = function() use ($badYield) {
yield;
yield $badYield;
yield;
};
yield Coroutine::resolve($gen(), $reactor);
});
$this->fail("execution should not reach this point");
} catch (\DomainException $e) {
$pos = strpos($e->getMessage(), "Unexpected yield (Promise|CoroutineResult|null expected);");
$this->assertSame(0, $pos);
return;
}
$this->fail("execution should not reach this point");
}
public function provideInvalidYields() {
return [
[42],
[3.14],
["string"],
[true],
[new \StdClass],
];
}
/**
* @expectedException \Exception
* @expectedExceptionMessage When in the chronicle of wasted time
*/
public function testUncaughtGeneratorExceptionFailsResolution() {
$invoked = 0;
(new NativeReactor)->run(function($reactor) use (&$invoked) {
$gen = function() {
yield;
throw new \Exception("When in the chronicle of wasted time");
yield;
};
yield Coroutine::resolve($gen(), $reactor);
$invoked++;
});
$this->assertSame(1, $invoked);
}
}

View File

@ -7,6 +7,8 @@ use Amp\Success;
use Amp\Failure;
use Amp\Deferred;
use Amp\PromiseStream;
use Amp\Pause;
use Amp\CoroutineResult;
class FunctionsTest extends \PHPUnit_Framework_TestCase {
@ -140,7 +142,7 @@ class FunctionsTest extends \PHPUnit_Framework_TestCase {
$this->assertNull($error);
$this->assertSame([1=>"test2", 2=>"test2", 3=>"test2"], $result);
}
public function testFilterReturnsEmptySuccessOnEmptyInput() {
$promise = \Amp\filter([], function () {});
$this->assertInstanceOf("Amp\Success", $promise);
@ -636,4 +638,137 @@ class FunctionsTest extends \PHPUnit_Framework_TestCase {
});
$this->assertTrue($completed);
}
public function testCoroutine() {
$invoked = 0;
(new NativeReactor)->run(function($reactor) use (&$invoked) {
$co = function($reactor) use (&$invoked) {
yield new Success;
yield;
yield new Pause(25, $reactor);
$invoked++;
};
$wrapped = \Amp\coroutine($co, $reactor);
$wrapped($reactor);
});
$this->assertSame(1, $invoked);
}
public function testNestedCoroutineResolutionContinuation() {
$invoked = 0;
(new NativeReactor)->run(function($reactor) use (&$invoked) {
$co = function() use (&$invoked) {
yield new Success;
yield new Success;
yield new Success;
yield new Success;
yield new Success;
yield new CoroutineResult(42);
$invoked++;
};
$result = (yield \Amp\resolve($co(), $reactor));
$this->assertSame(42, $result);
});
$this->assertSame(1, $invoked);
}
public function testCoroutineFauxReturnValue() {
$invoked = 0;
(new NativeReactor)->run(function($reactor) use (&$invoked) {
$co = function() use (&$invoked) {
yield;
yield new CoroutineResult(42);
yield;
$invoked++;
};
$result = (yield \Amp\resolve($co(), $reactor));
$this->assertSame(42, $result);
});
$this->assertSame(1, $invoked);
}
public function testResolutionFailuresAreThrownIntoGeneratorCoroutine() {
$invoked = 0;
(new NativeReactor)->run(function($reactor) use (&$invoked) {
$foo = function() {
$a = (yield new Success(21));
$b = 1;
try {
yield new Failure(new \Exception("test"));
$this->fail("Code path should not be reached");
} catch (\Exception $e) {
$this->assertSame("test", $e->getMessage());
$b = 2;
}
};
$result = (yield \Amp\resolve($foo(), $reactor));
$invoked++;
});
$this->assertSame(1, $invoked);
}
/**
* @expectedException \Exception
* @expectedExceptionMessage a moveable feast
*/
public function testExceptionOnInitialAdvanceFailsCoroutineResolution() {
(new NativeReactor)->run(function($reactor) use (&$invoked) {
$co = function() {
throw new \Exception("a moveable feast");
yield;
};
$result = (yield \Amp\resolve($co(), $reactor));
});
}
/**
* @dataProvider provideInvalidYields
*/
public function testInvalidYieldFailsCoroutineResolution($badYield) {
try {
(new NativeReactor)->run(function($reactor) use (&$invoked, $badYield) {
$gen = function() use ($badYield) {
yield;
yield $badYield;
yield;
};
yield \Amp\resolve($gen(), $reactor);
});
$this->fail("execution should not reach this point");
} catch (\DomainException $e) {
$pos = strpos($e->getMessage(), "Unexpected yield (Promise|CoroutineResult|null expected);");
$this->assertSame(0, $pos);
return;
}
$this->fail("execution should not reach this point");
}
public function provideInvalidYields() {
return [
[42],
[3.14],
["string"],
[true],
[new \StdClass],
];
}
/**
* @expectedException \Exception
* @expectedExceptionMessage When in the chronicle of wasted time
*/
public function testUncaughtGeneratorExceptionFailsCoroutineResolution() {
$invoked = 0;
(new NativeReactor)->run(function($reactor) use (&$invoked) {
$gen = function() {
yield;
throw new \Exception("When in the chronicle of wasted time");
yield;
};
yield \Amp\resolve($gen(), $reactor);
$invoked++;
});
$this->assertSame(1, $invoked);
}
}