1
0
mirror of https://github.com/danog/amp.git synced 2025-01-22 13:21:16 +01:00

Drop loop wrapper functions

This commit is contained in:
Aaron Piotrowski 2016-12-29 16:57:08 -06:00
parent b503836e32
commit b5d5b8dff1
7 changed files with 55 additions and 281 deletions

View File

@ -3,16 +3,14 @@
require dirname(__DIR__) . '/vendor/autoload.php'; require dirname(__DIR__) . '/vendor/autoload.php';
use Amp\Coroutine; use Amp\{ Coroutine, Pause, Postponed, Loop\NativeLoop };
use Amp\Pause; use Interop\Async\Loop;
use Amp\Postponed;
use Amp\Loop\NativeLoop;
Amp\execute(function () { Loop::execute(Amp\wrap(function () {
try { try {
$postponed = new Postponed; $postponed = new Postponed;
$observable = $postponed->getObservable(); $observable = $postponed->observe();
$observable->subscribe(function ($value) { $observable->subscribe(function ($value) {
printf("Observable emitted %d\n", $value); printf("Observable emitted %d\n", $value);
@ -47,4 +45,4 @@ Amp\execute(function () {
} catch (\Exception $exception) { } catch (\Exception $exception) {
printf("Exception: %s\n", $exception); printf("Exception: %s\n", $exception);
} }
}, $loop = new NativeLoop()); }), $loop = new NativeLoop());

View File

@ -3,13 +3,10 @@
require dirname(__DIR__) . '/vendor/autoload.php'; require dirname(__DIR__) . '/vendor/autoload.php';
use Amp\Coroutine; use Amp\{ Coroutine, Emitter, Observable, Observer, Pause };
use Amp\Emitter; use Interop\Async\Loop;
use Amp\Observable;
use Amp\Observer;
use Amp\Pause;
Amp\execute(function () { Loop::execute(Amp\wrap(function () {
try { try {
$emitter = new Emitter(function (callable $emit) { $emitter = new Emitter(function (callable $emit) {
yield $emit(1); yield $emit(1);
@ -41,4 +38,4 @@ Amp\execute(function () {
} catch (\Exception $exception) { } catch (\Exception $exception) {
printf("Exception: %s\n", $exception); printf("Exception: %s\n", $exception);
} }
}); }));

View File

@ -3,14 +3,10 @@
require dirname(__DIR__) . '/vendor/autoload.php'; require dirname(__DIR__) . '/vendor/autoload.php';
use Amp\Coroutine; use Amp\{ Coroutine, Observable, Observer, Pause, Postponed, Loop\NativeLoop };
use Amp\Observable; use Interop\Async\Loop;
use Amp\Observer;
use Amp\Pause;
use Amp\Postponed;
use Amp\Loop\NativeLoop;
Amp\execute(function () { Loop::execute(Amp\wrap(function () {
try { try {
$postponed = new Postponed; $postponed = new Postponed;
@ -29,7 +25,7 @@ Amp\execute(function () {
$postponed->resolve(11); $postponed->resolve(11);
}); });
$observable = $postponed->getObservable(); $observable = $postponed->observe();
$generator = function (Observable $observable) { $generator = function (Observable $observable) {
$observer = new Observer($observable); $observer = new Observer($observable);
@ -47,4 +43,4 @@ Amp\execute(function () {
} catch (\Exception $exception) { } catch (\Exception $exception) {
printf("Exception: %s\n", $exception); printf("Exception: %s\n", $exception);
} }
}, $loop = new NativeLoop()); }), $loop = new NativeLoop());

View File

@ -2,235 +2,11 @@
namespace Amp; namespace Amp;
use Interop\Async\{ Promise, Loop, Loop\Driver }; use Interop\Async\{ Loop, Promise };
/**
* Execute a callback within the event loop scope.
* Returned Generators are run as coroutines. Failures of the coroutine are forwarded to the loop error handler.
*
* @see \Interop\Async\Loop::execute()
*
* @param callable $callback
* @param \Interop\Async\Loop\Driver|null $driver
*/
function execute(callable $callback, Driver $driver = null) {
Loop::execute(function () use ($callback) {
$result = $callback();
if ($result instanceof \Generator) {
rethrow(new Coroutine($result));
}
}, $driver);
}
/**
* Stops the event loop.
*
* @see \Interop\Async\Loop::stop()
*/
function stop() {
Loop::stop();
}
/**
* Execute a callback when a stream resource becomes readable.
* Returned Generators are run as coroutines. Failures of the coroutine are forwarded to the loop error handler.
*
* @see \Interop\Async\Loop::onReadable()
*
* @param resource $stream The stream to monitor.
* @param callable(string $watcherId, resource $stream, mixed $data) $callback The callback to execute.
* @param mixed $data
*
* @return string Watcher identifier.
*/
function onReadable($stream, callable $callback, $data = null): string {
return Loop::onReadable($stream, function ($watcherId, $stream, $data) use ($callback) {
$result = $callback($watcherId, $stream, $data);
if ($result instanceof \Generator) {
rethrow(new Coroutine($result));
return;
}
}, $data);
}
/**
* Execute a callback when a stream resource becomes writable.
* Returned Generators are run as coroutines. Failures of the coroutine are forwarded to the loop error handler.
*
* @see \Interop\Async\Loop::onWritable()
*
* @param resource $stream The stream to monitor.
* @param callable(string $watcherId, resource $stream, mixed $data) $callback The callback to execute.
* @param mixed $data
*
* @return string Watcher identifier.
*/
function onWritable($stream, callable $callback, $data = null): string {
return Loop::onWritable($stream, function ($watcherId, $stream, $data) use ($callback) {
$result = $callback($watcherId, $stream, $data);
if ($result instanceof \Generator) {
rethrow(new Coroutine($result));
}
}, $data);
}
/**
* Execute a callback when a signal is received.
* Returned Generators are run as coroutines. Failures of the coroutine are forwarded to the loop error handler.
*
* @see \Interop\Async\Loop::onSignal()
*
* @param int $signo The signal number to monitor.
* @param callable(string $watcherId, int $signo, mixed $data) $callback The callback to execute.
* @param mixed $data
*
* @return string Watcher identifier.
*/
function onSignal(int $signo, callable $callback, $data = null): string {
return Loop::onSignal($signo, function ($watcherId, $signo, $data) use ($callback) {
$result = $callback($watcherId, $signo, $data);
if ($result instanceof \Generator) {
rethrow(new Coroutine($result));
}
}, $data);
}
/**
* Defer the execution of a callback.
* Returned Generators are run as coroutines. Failures of the coroutine are forwarded to the loop error handler.
*
* @see \Interop\Async\Loop::defer()
*
* @param callable(string $watcherId, mixed $data) $callback The callback to delay.
* @param mixed $data
*
* @return string Watcher identifier.
*/
function defer(callable $callback, $data = null): string {
return Loop::defer(function ($watcherId, $data) use ($callback) {
$result = $callback($watcherId, $data);
if ($result instanceof \Generator) {
rethrow(new Coroutine($result));
}
}, $data);
}
/**
* Delay the execution of a callback.
* Returned Generators are run as coroutines. Failures of the coroutine are forwarded to the loop error handler.
*
* @see \Interop\Async\Loop::delay()
*
* @param int $time
* @param callable(string $watcherId, mixed $data) $callback The callback to delay.
* @param mixed $data
*
* @return string Watcher identifier.
*/
function delay(int $time, callable $callback, $data = null): string {
return Loop::delay($time, function ($watcherId, $data) use ($callback) {
$result = $callback($watcherId, $data);
if ($result instanceof \Generator) {
rethrow(new Coroutine($result));
}
}, $data);
}
/**
* Repeatedly execute a callback.
* Returned Generators are run as coroutines. Failures of the coroutine are forwarded to the loop error handler.
*
* @see \Interop\Async\Loop::repeat()
*
* @param int $time
* @param callable(string $watcherId, mixed $data) $callback The callback to delay.
* @param mixed $data
*
* @return string Watcher identifier.
*/
function repeat(int $time, callable $callback, $data = null): string {
return Loop::repeat($time, function ($watcherId, $data) use ($callback) {
$result = $callback($watcherId, $data);
if ($result instanceof \Generator) {
rethrow(new Coroutine($result));
}
}, $data);
}
/**
* Enable a watcher.
*
* @see \Interop\Async\Loop::enable()
*
* @param string $watcherId
*/
function enable(string $watcherId) {
Loop::enable($watcherId);
}
/**
* Disable a watcher.
*
* @see \Interop\Async\Loop::disable()
*
* @param string $watcherId
*/
function disable(string $watcherId) {
Loop::disable($watcherId);
}
/**
* Cancel a watcher.
*
* @see \Interop\Async\Loop::cancel()
*
* @param string $watcherId
*/
function cancel(string $watcherId) {
Loop::cancel($watcherId);
}
/**
* Reference a watcher.
*
* @see \Interop\Async\Loop::reference()
*
* @param string $watcherId
*/
function reference(string $watcherId) {
Loop::reference($watcherId);
}
/**
* Unreference a watcher.
*
* @see \Interop\Async\Loop::unreference()
*
* @param string $watcherId
*/
function unreference(string $watcherId) {
Loop::unreference($watcherId);
}
/**
* Returned Generators are run as coroutines. Failures of the coroutine are forwarded to the loop error handler.
*
* @see \Interop\Async\Loop::setErrorHandler()
*
* @param callable $callback
*/
function setErrorHandler(callable $callback) {
Loop::setErrorHandler(function ($exception) use ($callback) {
$result = $callback($exception);
if ($result instanceof \Generator) {
rethrow(new Coroutine($result));
}
});
}
/** /**
* Wraps the callback in a promise/coroutine-aware function that automatically upgrades Generators to coroutines and * Wraps the callback in a promise/coroutine-aware function that automatically upgrades Generators to coroutines and
* calls rethrow() on the created coroutine. * calls rethrow() on the returned promises (or the coroutine created).
* *
* @param callable(...$args): \Generator|\Interop\Async\Promise|mixed $callback * @param callable(...$args): \Generator|\Interop\Async\Promise|mixed $callback
* *
@ -239,8 +15,13 @@ function setErrorHandler(callable $callback) {
function wrap(callable $callback): callable { function wrap(callable $callback): callable {
return function (...$args) use ($callback) { return function (...$args) use ($callback) {
$result = $callback(...$args); $result = $callback(...$args);
if ($result instanceof \Generator) { if ($result instanceof \Generator) {
rethrow(new Coroutine($result)); $result = new Coroutine($result);
}
if ($result instanceof Promise) {
rethrow($result);
} }
}; };
} }

View File

@ -4,6 +4,7 @@ namespace Amp\Test;
use Amp; use Amp;
use Amp\{ Deferred, Emitter, Pause }; use Amp\{ Deferred, Emitter, Pause };
use Interop\Async\Loop;
class EmitterTest extends \PHPUnit_Framework_TestCase { class EmitterTest extends \PHPUnit_Framework_TestCase {
const TIMEOUT = 100; const TIMEOUT = 100;
@ -18,7 +19,7 @@ class EmitterTest extends \PHPUnit_Framework_TestCase {
public function testEmit() { public function testEmit() {
$invoked = false; $invoked = false;
Amp\execute(function () use (&$invoked) { Loop::execute(Amp\wrap(function () use (&$invoked) {
$value = 1; $value = 1;
$emitter = new Emitter(function (callable $emit) use ($value) { $emitter = new Emitter(function (callable $emit) use ($value) {
@ -37,7 +38,7 @@ class EmitterTest extends \PHPUnit_Framework_TestCase {
$emitter->when(function ($exception, $result) use ($value) { $emitter->when(function ($exception, $result) use ($value) {
$this->assertSame($result, $value); $this->assertSame($result, $value);
}); });
}); }));
$this->assertTrue($invoked); $this->assertTrue($invoked);
} }
@ -47,7 +48,7 @@ class EmitterTest extends \PHPUnit_Framework_TestCase {
*/ */
public function testEmitSuccessfulPromise() { public function testEmitSuccessfulPromise() {
$invoked = false; $invoked = false;
Amp\execute(function () use (&$invoked) { Loop::execute(Amp\wrap(function () use (&$invoked) {
$deferred = new Deferred(); $deferred = new Deferred();
$emitter = new Emitter(function (callable $emit) use ($deferred) { $emitter = new Emitter(function (callable $emit) use ($deferred) {
@ -64,7 +65,7 @@ class EmitterTest extends \PHPUnit_Framework_TestCase {
$emitter->subscribe($callback); $emitter->subscribe($callback);
$deferred->resolve($value); $deferred->resolve($value);
}); }));
$this->assertTrue($invoked); $this->assertTrue($invoked);
} }
@ -74,7 +75,7 @@ class EmitterTest extends \PHPUnit_Framework_TestCase {
*/ */
public function testEmitFailedPromise() { public function testEmitFailedPromise() {
$exception = new \Exception; $exception = new \Exception;
Amp\execute(function () use ($exception) { Loop::execute(Amp\wrap(function () use ($exception) {
$deferred = new Deferred(); $deferred = new Deferred();
$emitter = new Emitter(function (callable $emit) use ($deferred) { $emitter = new Emitter(function (callable $emit) use ($deferred) {
@ -86,7 +87,7 @@ class EmitterTest extends \PHPUnit_Framework_TestCase {
$emitter->when(function ($reason) use ($exception) { $emitter->when(function ($reason) use ($exception) {
$this->assertSame($reason, $exception); $this->assertSame($reason, $exception);
}); });
}); }));
} }
/** /**
@ -94,7 +95,7 @@ class EmitterTest extends \PHPUnit_Framework_TestCase {
*/ */
public function testEmitBackPressure() { public function testEmitBackPressure() {
$emits = 3; $emits = 3;
Amp\execute(function () use (&$time, $emits) { Loop::execute(Amp\wrap(function () use (&$time, $emits) {
$emitter = new Emitter(function (callable $emit) use (&$time, $emits) { $emitter = new Emitter(function (callable $emit) use (&$time, $emits) {
$time = microtime(true); $time = microtime(true);
for ($i = 0; $i < $emits; ++$i) { for ($i = 0; $i < $emits; ++$i) {
@ -106,7 +107,7 @@ class EmitterTest extends \PHPUnit_Framework_TestCase {
$emitter->subscribe(function () { $emitter->subscribe(function () {
return new Pause(self::TIMEOUT); return new Pause(self::TIMEOUT);
}); });
}); }));
$this->assertGreaterThan(self::TIMEOUT * $emits, $time * 1000); $this->assertGreaterThan(self::TIMEOUT * $emits, $time * 1000);
} }
@ -118,7 +119,7 @@ class EmitterTest extends \PHPUnit_Framework_TestCase {
$exception = new \Exception; $exception = new \Exception;
try { try {
Amp\execute(function () use ($exception) { Loop::execute(Amp\wrap(function () use ($exception) {
$emitter = new Emitter(function (callable $emit) { $emitter = new Emitter(function (callable $emit) {
yield $emit(1); yield $emit(1);
yield $emit(2); yield $emit(2);
@ -127,7 +128,7 @@ class EmitterTest extends \PHPUnit_Framework_TestCase {
$emitter->subscribe(function () use ($exception) { $emitter->subscribe(function () use ($exception) {
throw $exception; throw $exception;
}); });
}); }));
} catch (\Exception $caught) { } catch (\Exception $caught) {
$this->assertSame($exception, $caught); $this->assertSame($exception, $caught);
} }
@ -140,14 +141,14 @@ class EmitterTest extends \PHPUnit_Framework_TestCase {
$exception = new \Exception; $exception = new \Exception;
try { try {
Amp\execute(function () use ($exception) { Loop::execute(Amp\wrap(function () use ($exception) {
$emitter = new Emitter(function (callable $emit) use ($exception) { $emitter = new Emitter(function (callable $emit) use ($exception) {
yield $emit(1); yield $emit(1);
throw $exception; throw $exception;
}); });
Amp\wait($emitter); Amp\wait($emitter);
}); }));
} catch (\Exception $caught) { } catch (\Exception $caught) {
$this->assertSame($exception, $caught); $this->assertSame($exception, $caught);
} }

View File

@ -4,7 +4,7 @@ namespace Amp\Test;
use Amp; use Amp;
use Amp\{ Deferred, Failure, Success }; use Amp\{ Deferred, Failure, Success };
use Interop\Async\Promise; use Interop\Async\{ Loop, Promise };
class MapTest extends \PHPUnit_Framework_TestCase { class MapTest extends \PHPUnit_Framework_TestCase {
public function testEmptyArray() { public function testEmptyArray() {
@ -20,7 +20,7 @@ class MapTest extends \PHPUnit_Framework_TestCase {
} }
public function testSuccessfulPromisesArray() { public function testSuccessfulPromisesArray() {
Amp\execute(function () { Loop::execute(Amp\wrap(function () {
$promises = [new Success(1), new Success(2), new Success(3)];; $promises = [new Success(1), new Success(2), new Success(3)];;
$count = 0; $count = 0;
@ -39,7 +39,7 @@ class MapTest extends \PHPUnit_Framework_TestCase {
} }
$this->assertSame(\count($promises), $count); $this->assertSame(\count($promises), $count);
}); }));
} }
public function testPendingPromisesArray() { public function testPendingPromisesArray() {
@ -76,7 +76,7 @@ class MapTest extends \PHPUnit_Framework_TestCase {
} }
public function testFailedPromisesArray() { public function testFailedPromisesArray() {
Amp\execute(function () { Loop::execute(Amp\wrap(function () {
$exception = new \Exception; $exception = new \Exception;
$promises = [new Failure($exception), new Failure($exception), new Failure($exception)];; $promises = [new Failure($exception), new Failure($exception), new Failure($exception)];;
@ -95,7 +95,7 @@ class MapTest extends \PHPUnit_Framework_TestCase {
} }
$this->assertSame(0, $count); $this->assertSame(0, $count);
}); }));
} }
/** /**
@ -103,7 +103,7 @@ class MapTest extends \PHPUnit_Framework_TestCase {
*/ */
public function testCallbackThrowingExceptionRejectsPromises() public function testCallbackThrowingExceptionRejectsPromises()
{ {
Amp\execute(function () { Loop::execute(Amp\wrap(function () {
$promises = [new Success(1), new Success(2), new Success(3)];; $promises = [new Success(1), new Success(2), new Success(3)];;
$exception = new \Exception; $exception = new \Exception;
@ -124,7 +124,7 @@ class MapTest extends \PHPUnit_Framework_TestCase {
$this->assertSame($exception, $reason); $this->assertSame($exception, $reason);
} }
} }
}); }));
} }
/** /**

View File

@ -4,12 +4,13 @@ namespace Amp\Test;
use Amp; use Amp;
use Amp\{ Emitter, Observer, Pause, Postponed }; use Amp\{ Emitter, Observer, Pause, Postponed };
use Interop\Async\Loop;
class ObserverTest extends \PHPUnit_Framework_TestCase { class ObserverTest extends \PHPUnit_Framework_TestCase {
const TIMEOUT = 10; const TIMEOUT = 10;
public function testSingleEmittingObservable() { public function testSingleEmittingObservable() {
Amp\execute(function () { Loop::execute(Amp\wrap(function () {
$value = 1; $value = 1;
$observable = new Emitter(function (callable $emit) use ($value) { $observable = new Emitter(function (callable $emit) use ($value) {
yield $emit($value); yield $emit($value);
@ -23,14 +24,14 @@ class ObserverTest extends \PHPUnit_Framework_TestCase {
} }
$this->assertSame($observer->getResult(), $value); $this->assertSame($observer->getResult(), $value);
}); }));
} }
/** /**
* @depends testSingleEmittingObservable * @depends testSingleEmittingObservable
*/ */
public function testFastEmittingObservable() { public function testFastEmittingObservable() {
Amp\execute(function () { Loop::execute(Amp\wrap(function () {
$count = 10; $count = 10;
$postponed = new Postponed; $postponed = new Postponed;
@ -49,14 +50,14 @@ class ObserverTest extends \PHPUnit_Framework_TestCase {
$this->assertSame($count, $i); $this->assertSame($count, $i);
$this->assertSame($observer->getResult(), $i); $this->assertSame($observer->getResult(), $i);
}); }));
} }
/** /**
* @depends testSingleEmittingObservable * @depends testSingleEmittingObservable
*/ */
public function testSlowEmittingObservable() { public function testSlowEmittingObservable() {
Amp\execute(function () { Loop::execute(Amp\wrap(function () {
$count = 10; $count = 10;
$observable = new Emitter(function (callable $emit) use ($count) { $observable = new Emitter(function (callable $emit) use ($count) {
for ($i = 0; $i < $count; ++$i) { for ($i = 0; $i < $count; ++$i) {
@ -74,14 +75,14 @@ class ObserverTest extends \PHPUnit_Framework_TestCase {
$this->assertSame($count, $i); $this->assertSame($count, $i);
$this->assertSame($observer->getResult(), $i); $this->assertSame($observer->getResult(), $i);
}); }));
} }
/** /**
* @depends testFastEmittingObservable * @depends testFastEmittingObservable
*/ */
public function testDrain() { public function testDrain() {
Amp\execute(function () { Loop::execute(Amp\wrap(function () {
$count = 10; $count = 10;
$postponed = new Postponed; $postponed = new Postponed;
@ -97,7 +98,7 @@ class ObserverTest extends \PHPUnit_Framework_TestCase {
$values = $observer->drain(); $values = $observer->drain();
$this->assertSame(\range(0, $count - 1), $values); $this->assertSame(\range(0, $count - 1), $values);
}); }));
} }
/** /**
@ -113,7 +114,7 @@ class ObserverTest extends \PHPUnit_Framework_TestCase {
} }
public function testFailingObservable() { public function testFailingObservable() {
Amp\execute(function () { Loop::execute(Amp\wrap(function () {
$exception = new \Exception; $exception = new \Exception;
$postponed = new Postponed; $postponed = new Postponed;
@ -135,7 +136,7 @@ class ObserverTest extends \PHPUnit_Framework_TestCase {
} catch (\Exception $reason) { } catch (\Exception $reason) {
$this->assertSame($exception, $reason); $this->assertSame($exception, $reason);
} }
}); }));
} }
/** /**
@ -171,12 +172,12 @@ class ObserverTest extends \PHPUnit_Framework_TestCase {
* @expectedExceptionMessage The observable has not resolved * @expectedExceptionMessage The observable has not resolved
*/ */
public function testGetResultBeforeResolution() { public function testGetResultBeforeResolution() {
Amp\execute(function () { Loop::execute(Amp\wrap(function () {
$postponed = new Postponed; $postponed = new Postponed;
$observer = new Observer($postponed->observe()); $observer = new Observer($postponed->observe());
$observer->getResult(); $observer->getResult();
}); }));
} }
} }