1
0
mirror of https://github.com/danog/amp.git synced 2025-01-22 05:11:42 +01:00

Miscellaneous function changes (more after jump)

- Combinator functions optimized for performance
- Amp\reactor() now accepts an optional assignment parameter for
  modifying the global default event reactor instance to allow for
  third-party Reactor implementations.
- Renamed functions:
    . Amp\getReactor() -> Amp\reactor()
- Removed functions:
    . Amp\chooseReactor()
    . Amp\tick()
    . Amp\immediately()
    . Amp\once()
    . Amp\repeat()
    . Amp\onReadable()
    . Amp\onWritable()
    . Amp\onSignal()
    . Amp\enable()
    . Amp\disable()
    . Amp\cancel()
This commit is contained in:
Daniel Lowrey 2015-05-20 15:18:30 -04:00
parent 9fa6010f19
commit c7e4e8d0c3
4 changed files with 290 additions and 270 deletions

View File

@ -1,5 +1,27 @@
### HEAD
- Combinator functions optimized for performance
- Amp\reactor() now accepts an optional assignment parameter for
modifying the global default event reactor instance to allow for
third-party Reactor implementations.
- Renamed functions:
. Amp\getReactor() -> Amp\reactor()
- Removed functions:
. Amp\chooseReactor()
. Amp\tick()
. Amp\immediately()
. Amp\once()
. Amp\repeat()
. Amp\onReadable()
. Amp\onWritable()
. Amp\onSignal()
. Amp\enable()
. Amp\disable()
. Amp\cancel()
v1.0.0-beta4
------------
- Expose `AMP_PRODUCTION_MODE` constant to alternate performance/safety
- Removed `Amp\PrivateFuture` -- the safety of the `Amp\Future`
implementation is now dependent upon `AMP_PRODUCTION_MODE`

View File

@ -12,7 +12,7 @@ class Pause implements Promise {
$msTimeout
));
}
$reactor = $reactor ?: getReactor();
$reactor = $reactor ?: reactor();
$reactor->once(function() { $this->resolve(); }, $msTimeout);
}
}

View File

@ -3,193 +3,102 @@
namespace Amp;
/**
* Get the global singleton event reactor instance
* Get the default event reactor instance
*
* @param \Amp\Reactor $assignReactor Optionally specify a new default event reactor instance
* @return \Amp\Reactor Returns the default reactor instance
*/
function getReactor() {
function reactor(Reactor $assignReactor = null) {
static $reactor;
return $reactor ?: ($reactor = chooseReactor());
}
/**
* Select the most appropriate event reactor given the current execution environment
*/
function chooseReactor() {
if (extension_loaded('uv')) {
return new UvReactor;
if ($assignReactor) {
return ($reactor = $assignReactor);
} elseif ($reactor) {
return $reactor;
} elseif (\extension_loaded('uv')) {
return ($reactor = new UvReactor);
} else {
return new NativeReactor;
return ($reactor = new NativeReactor);
}
}
/**
* Start an event reactor and assume program flow control
* Start the default event reactor and assume program flow control
*
* This is a shortcut function for invoking Reactor::run() on the global
* default event reactor.
*
* @param callable $onStart An optional callback to invoke immediately when the Reactor starts
* @return void
*/
function run(callable $onStart = null) {
getReactor()->run($onStart);
reactor()->run($onStart);
}
/**
* Execute a single event loop iteration
*/
function tick($noWait = false) {
getReactor()->tick($noWait);
}
/**
* Stop the event reactor
* Stop the default event reactor and return program flow control
*
* This is a shortcut function for invoking Reactor::stop() on the global
* default event reactor.
*
* @return void
*/
function stop() {
getReactor()->stop();
reactor()->stop();
}
/**
* Schedule a callback for immediate invocation in the next event loop iteration
* Flatten an array of promises into a single promise
*
* NOTE: Watchers registered using this function are automatically garbage collected after execution.
*/
function immediately(callable $func) {
return getReactor()->immediately($func);
}
/**
* Schedule a callback to execute once
* Upon resolution the returned promise's $result parameter is set to an array
* whose keys match the original input array and whose values match the individual
* resolution results of its component promises.
*
* NOTE: Watchers registered using this function are automatically garbage collected after execution.
*/
function once(callable $func, $millisecondDelay) {
return getReactor()->once($func, $millisecondDelay);
}
/**
* Schedule a recurring callback to execute every $interval seconds until cancelled
* If any one of the Promises fails the resulting Promise will immediately fail.
*
* IMPORTANT: Watchers registered using this function must be manually cleared using cancel() to
* free the associated memory. Failure to cancel repeating watchers (even if disable() is used)
* will lead to memory leaks.
*/
function repeat(callable $func, $millisecondDelay) {
return getReactor()->repeat($func, $millisecondDelay);
}
/**
* Enable a disabled timer or stream IO watcher
*
* Calling enable() on an already-enabled watcher will have no effect.
*/
function enable($watcherId) {
getReactor()->enable($watcherId);
}
/**
* Temporarily disable (but don't cancel) an existing timer/stream watcher
*
* Calling disable() on a nonexistent or previously-disabled watcher will have no effect.
*
* NOTE: Disabling a repeating or stream watcher is not sufficient to free associated resources.
* When the watcher is no longer needed applications must still use cancel() to clear related
* memory and avoid leaks.
*/
function disable($watcherId) {
getReactor()->disable($watcherId);
}
/**
* Cancel an existing timer/stream watcher
*
* Calling cancel() on a non-existent watcher ID will have no effect.
*/
function cancel($watcherId) {
getReactor()->cancel($watcherId);
}
/**
* Watch a stream IO resource for readable data and trigger the specified callback when actionable
*
* IMPORTANT: Watchers registered using this function must be manually cleared using cancel() to
* free the associated memory. Failure to cancel repeating watchers (even if disable() is used)
* will lead to memory leaks.
*
* @param resource $stream
*/
function onReadable($stream, callable $func, $enableNow = true) {
getReactor()->onReadable($stream, $func, $enableNow);
}
/**
* Watch a stream IO resource for writability and trigger the specified callback when actionable
*
* NOTE: Sockets are essentially "always writable" (as long as their write buffer is not full).
* Therefore, it's critical that applications disable or cancel write watchers as soon as all data
* is written or the watcher will trigger endlessly and hammer the CPU.
*
* IMPORTANT: Watchers registered using this function must be manually cleared using cancel() to
* free the associated memory. Failure to cancel repeating watchers (even if disable() is used)
* will lead to memory leaks.
*
* @param resource $stream
*/
function onWritable($stream, callable $func, $enableNow = true) {
getReactor()->onWritable($stream, $func, $enableNow);
}
/**
* React to process control signals
*/
function onSignal($signo, callable $onSignal) {
/**
* @var $reactor \Amp\SignalReactor
*/
$reactor = getReactor();
if ($reactor instanceof SignalReactor) {
return $reactor->onSignal($signo, $onSignal);
} else {
throw new \RuntimeException(
'Your PHP environment does not support signal handling. Please install pecl/libevent or the php-uv extension'
);
}
}
/**
* If any one of the Promises fails the resulting Promise will fail. Otherwise
* the resulting Promise succeeds with an array matching keys from the input array
* to their resolved values.
* @param array An array of promises to flatten into a single promise
* @return \Amp\Promise
*/
function all(array $promises) {
if (empty($promises)) {
return new Success([]);
}
$results = [];
$remaining = count($promises);
$promisor = new Deferred;
$struct = new \StdClass;
$struct->remaining = count($promises);
$struct->results = [];
$struct->promisor = new Deferred;
foreach ($promises as $key => $resolvable) {
if (!$resolvable instanceof Promise) {
$resolvable = new Success($resolvable);
$onResolve = function($error, $result, $cbData) {
list($struct, $key) = $cbData;
if (empty($struct->remaining)) {
// If the promisor already resolved we don't need to bother
return;
}
if ($error) {
$struct->results = null;
$struct->remaining = 0;
$struct->promisor->fail($error);
return;
}
$resolvable->when(function($error, $result) use (&$remaining, &$results, $key, $promisor) {
// If the promisor already failed don't bother
if (empty($remaining)) {
return;
}
$struct->results[$key] = $result;
if (--$struct->remaining === 0) {
$struct->promisor->succeed($struct->results);
}
};
if ($error) {
$remaining = 0;
$promisor->fail($error);
return;
foreach ($promises as $key => $promise) {
if ($promise instanceof Promise) {
$promise->when($onResolve, [$struct, $key]);
} else {
$struct->results[$key] = $promise;
if (--$struct->remaining === 0) {
$struct->promisor->succeed($struct->results);
}
$results[$key] = $result;
if (--$remaining === 0) {
$promisor->succeed($results);
}
});
}
}
return $promisor->promise();
return $struct->promisor->promise();
}
/**
@ -204,44 +113,55 @@ function all(array $promises) {
*
* The individual keys in the resulting arrays are preserved from the initial Promise array
* passed to the function for evaluation.
*
* @param array An array of promises to flatten into a single promise
* @return \Amp\Promise
*/
function some(array $promises) {
if (empty($promises)) {
return new Failure(new \LogicException(
'No promises or values provided for resolution'
"No promises or values provided for resolution"
));
}
$errors = [];
$results = [];
$remaining = count($promises);
$promisor = new Deferred;
$struct = new \StdClass;
$struct->remaining = count($promises);
$struct->errors = [];
$struct->results = [];
$struct->promisor = new Deferred;
foreach ($promises as $key => $resolvable) {
if (!$resolvable instanceof Promise) {
$resolvable = new Success($resolvable);
$onResolve = function($error, $result, $cbData) {
list($struct, $key) = $cbData;
if ($error) {
$struct->errors[$key] = $error;
} else {
$struct->results[$key] = $result;
}
if (--$struct->remaining) {
return;
}
if (empty($struct->results)) {
array_unshift($struct->errors, "All promises passed to Amp\some() failed");
$struct->promisor->fail(new \RuntimeException(
implode("\n\n", $struct->errors)
));
} else {
$struct->promisor->succeed([$struct->errors, $struct->results]);
}
};
$resolvable->when(function($error, $result) use (&$remaining, &$results, &$errors, $key, $promisor) {
if ($error) {
$errors[$key] = $error;
} else {
$results[$key] = $result;
foreach ($promises as $key => $promise) {
if ($promise instanceof Promise) {
$promise->when($onResolve, [$struct, $key]);
} else {
$struct->results[$key] = $promise;
if (--$struct->remaining === 0) {
$struct->promisor->succeed([$struct->errors, $struct->results]);
}
if (--$remaining > 0) {
return;
} elseif (empty($results)) {
$promisor->fail(new \RuntimeException(
'All promises failed'
));
} else {
$promisor->succeed([$errors, $results]);
}
});
}
}
return $promisor->promise();
return $struct->promisor->promise();
}
/**
@ -249,115 +169,156 @@ function some(array $promises) {
*
* This function is the same as some() with the notable exception that it will never fail even
* if all promises in the array resolve unsuccessfully.
*
* @param array An array of promises to flatten into a single promise
* @return \Amp\Promise
*/
function any(array $promises) {
if (empty($promises)) {
return new Success([[], []]);
}
$results = [];
$errors = [];
$remaining = count($promises);
$promisor = new Deferred;
$struct = new \StdClass;
$struct->remaining = count($promises);
$struct->errors = [];
$struct->results = [];
$struct->promisor = new Deferred;
foreach ($promises as $key => $resolvable) {
if (!$resolvable instanceof Promise) {
$resolvable = new Success($resolvable);
$onResolve = function($error, $result, $cbData) {
list($struct, $key) = $cbData;
if ($error) {
$struct->errors[$key] = $error;
} else {
$struct->results[$key] = $result;
}
if (--$struct->remaining === 0) {
$struct->promisor->succeed([$struct->errors, $struct->results]);
}
};
$resolvable->when(function($error, $result) use (&$remaining, &$results, &$errors, $key, $promisor) {
if ($error) {
$errors[$key] = $error;
} else {
$results[$key] = $result;
foreach ($promises as $key => $promise) {
if ($promise instanceof Promise) {
$promise->when($onResolve, [$struct, $key]);
} else {
$struct->results[$key] = $promise;
if (--$struct->remaining === 0) {
$struct->promisor->succeed([$struct->errors, $struct->results]);
}
if (--$remaining === 0) {
$promisor->succeed([$errors, $results]);
}
});
}
}
return $promisor->promise();
return $struct->promisor->promise();
}
/**
* Resolves with the first successful Promise value. The resulting Promise will only fail if all
* Promise values in the group fail or if the initial Promise array is empty.
*
* @param array An array of promises to flatten into a single promise
* @return \Amp\Promise
*/
function first(array $promises) {
if (empty($promises)) {
return new Failure(new \LogicException(
'No promises or values provided for resolution'
"No promises or values provided for first() resolution"
));
}
$remaining = count($promises);
$isComplete = false;
$promisor = new Deferred;
$struct = new \StdClass;
$struct->remaining = count($promises);
$struct->promisor = new Deferred;
foreach ($promises as $resolvable) {
if (!$resolvable instanceof Promise) {
$promisor->succeed($resolvable);
$onResolve = function($error, $result, $cbData) {
list($struct, $key) = $cbData;
if (empty($struct->remaining)) {
return;
}
if (empty($error)) {
$struct->remaining = 0;
$struct->promisor->succeed($result);
return;
}
if (--$struct->remaining === 0) {
$struct->promisor->fail(new \RuntimeException(
"All promises passed for first() resolution failed"
));
}
};
foreach ($promises as $key => $promise) {
if ($promise instanceof Promise) {
$promise->when($onResolve, [$struct, $key]);
} else {
$struct->remaining = 0;
$struct->promisor->succeed($promise);
break;
}
$promise->when(function($error, $result) use (&$remaining, &$isComplete, $promisor) {
if ($isComplete) {
// we don't care about Deferreds that resolve after the first
return;
} elseif ($error && --$remaining === 0) {
$promisor->fail(new \RuntimeException(
'All promises failed'
));
} elseif (empty($error)) {
$isComplete = true;
$promisor->succeed($result);
}
});
}
return $promisor->promise();
return $struct->promisor->promise();
}
/**
* Map promised deferred values using the specified functor
*
* @param array An array of promises whose values -- once resoved -- will be mapped by the functor
* @param callable $functor The mapping function to apply to eventual promise results
* @return \Amp\Promise
*/
function map(array $promises, callable $functor) {
if (empty($promises)) {
return new Success([]);
}
$results = [];
$remaining = count($promises);
$promisor = new Deferred;
$struct = new \StdClass;
$struct->remaining = count($promises);
$struct->results = [];
$struct->promisor = new Deferred;
$struct->functor = $functor;
foreach ($promises as $key => $resolvable) {
$promise = ($resolvable instanceof Promise) ? $resolvable : new Success($resolvable);
$promise->when(function($error, $result) use (&$remaining, &$results, $key, $promisor, $functor) {
if (empty($remaining)) {
// If the promise already failed we don't bother.
return;
}
if ($error) {
$remaining = 0;
$promisor->fail($error);
return;
}
$onResolve = function($error, $result, $cbData) {
list($struct, $key) = $cbData;
if (empty($struct->remaining)) {
// If the promisor already resolved we don't need to bother
return;
}
if ($error) {
$struct->results = null;
$struct->remaining = 0;
$struct->promisor->fail($error);
return;
}
$struct->remaining--;
try {
$struct->results[$key] = call_user_func($struct->functor, $result);
} catch (\Exception $e) {
$struct->remaining = 0;
$struct->promisor->fail($e);
return;
}
if ($struct->remaining === 0) {
$struct->promisor->succeed($struct->results);
}
};
foreach ($promises as $key => $promise) {
if ($promise instanceof Promise) {
$promise->when($onResolve, [$struct, $key]);
} else {
$struct->remaining--;
try {
$results[$key] = $functor($result);
if (--$remaining === 0) {
$promisor->succeed($results);
}
} catch (\Exception $error) {
$remaining = 0;
$promisor->fail($error);
$struct->results[$key] = call_user_func($struct->functor, $promise);
} catch (\Exception $e) {
$struct->remaining = 0;
$struct->promisor->fail($e);
}
});
if ($struct->remaining === 0) {
break;
}
}
}
return $promisor->promise();
return $struct->promisor->promise();
}
/**
@ -365,42 +326,69 @@ function map(array $promises, callable $functor) {
*
* If the functor returns a truthy value the resolved promise result is retained, otherwise it is
* discarded. Array keys are retained for any results not filtered out by the functor.
*
* @param array An array of promises whose values -- once resoved -- will be filtered by the functor
* @param callable $functor The filtering function to apply to eventual promise results
* @return \Amp\Promise
*/
function filter(array $promises, callable $functor) {
if (empty($promises)) {
return new Success([]);
}
$results = [];
$remaining = count($promises);
$promisor = new Deferred;
$struct = new \StdClass;
$struct->remaining = count($promises);
$struct->results = [];
$struct->promisor = new Deferred;
$struct->functor = $functor;
foreach ($promises as $key => $resolvable) {
$promise = ($resolvable instanceof Promise) ? $resolvable : new Success($resolvable);
$promise->when(function($error, $result) use (&$remaining, &$results, $key, $promisor, $functor) {
if (empty($remaining)) {
// If the deferred result already failed we don't bother.
return;
}
if ($error) {
$remaining = 0;
$promisor->fail($error);
return;
$onResolve = function($error, $result, $cbData) {
list($struct, $key) = $cbData;
if (empty($struct->remaining)) {
// If the promisor already resolved we don't need to bother
return;
}
if ($error) {
$struct->results = null;
$struct->remaining = 0;
$struct->promisor->fail($error);
return;
}
$struct->remaining--;
try {
if (call_user_func($struct->functor, $result)) {
$struct->results[$key] = $result;
}
} catch (\Exception $e) {
$struct->remaining = 0;
$struct->promisor->fail($e);
return;
}
if ($struct->remaining === 0) {
$struct->promisor->succeed($struct->results);
}
};
foreach ($promises as $key => $promise) {
if ($promise instanceof Promise) {
$promise->when($onResolve, [$struct, $key]);
} else {
$struct->remaining--;
try {
if ($functor($result)) {
$results[$key] = $result;
if (call_user_func($struct->functor, $promise)) {
$struct->results[$key] = $promise;
}
if (--$remaining === 0) {
$promisor->succeed($results);
}
} catch (\Exception $error) {
$promisor->fail($error);
} catch (\Exception $e) {
$struct->remaining = 0;
$struct->promisor->fail($e);
}
});
if ($struct->remaining === 0) {
break;
}
}
}
return $promisor->promise();
return $struct->promisor->promise();
}
/**
@ -446,7 +434,10 @@ function pipe($promise, callable $functor) {
* this can lead to hard-to-debug failures. If the async value producer uses a different event
* reactor instance from that specified in this method the wait() call will never return.
*
* @param \Amp\Promise $promise The promise on which to wait
* @param \Amp\Reactor $reactor
* @throws \Exception if the promise fails
* @return mixed Returns the eventual resolution result for the specified promise
*/
function wait(Promise $promise, Reactor $reactor = null) {
$isWaiting = true;
@ -459,7 +450,7 @@ function wait(Promise $promise, Reactor $reactor = null) {
$resolvedResult = $result;
});
$reactor = $reactor ?: getReactor();
$reactor = $reactor ?: reactor();
while ($isWaiting) {
$reactor->tick();
}
@ -473,6 +464,11 @@ function wait(Promise $promise, Reactor $reactor = null) {
/**
* Return a function that will be resolved as a coroutine once invoked
*
* @param callable $func The callable to be wrapped for coroutine resolution
* @param \Amp\Reactor $reactor
* @param callable $promisifier
* @return callable Returns the wrapped callable
*/
function coroutine(callable $func, Reactor $reactor = null, callable $promisifier = null) {
return function($data) use ($func, $reactor, $promisifier) {
@ -488,10 +484,14 @@ function coroutine(callable $func, Reactor $reactor = null, callable $promisifie
*
* Upon resolution the Generator return value is used to succeed the promised result. If an
* error occurs during coroutine resolution the promise fails.
*
* @param \Generator $generator The generator to resolve as a coroutine
* @param \Amp\Reactor $reactor
* @param callable $promisifier
*/
function resolve(\Generator $generator, Reactor $reactor = null, callable $promisifier = null) {
$cs = new \StdClass;
$cs->reactor = $reactor ?: getReactor();
$cs->reactor = $reactor ?: reactor();
$cs->promisor = new Deferred;
$cs->generator = $generator;
$cs->promisifier = $promisifier;

View File

@ -93,7 +93,7 @@ class FunctionsTest extends \PHPUnit_Framework_TestCase {
'r1' => new Success(42),
'r2' => new Failure($exception),
'r3' => new Success(40),
])->when(function($error) {
])->when(function(\Exception $error) {
throw $error;
});
}
@ -113,7 +113,6 @@ class FunctionsTest extends \PHPUnit_Framework_TestCase {
/**
* @expectedException \RuntimeException
* @expectedExceptionMessage All promises failed
*/
public function testSomeThrowsIfNoPromisesResolveSuccessfully() {
\Amp\some([
@ -233,7 +232,6 @@ class FunctionsTest extends \PHPUnit_Framework_TestCase {
/**
* @expectedException \RuntimeException
* @expectedExceptionMessage All promises failed
*/
public function testExplicitSomeCombinatorResolutionFailsOnError() {
(new NativeReactor)->run(function($reactor) {