1
0
mirror of https://github.com/danog/amp.git synced 2024-11-30 04:29:08 +01:00

Merge concurrency primitives

This commit is contained in:
Daniel Lowrey 2014-09-22 16:47:48 -04:00
parent df47c6f09b
commit 56e720cd50
20 changed files with 1767 additions and 273 deletions

53
lib/Failure.php Normal file
View File

@ -0,0 +1,53 @@
<?php
namespace Alert;
/**
* Represents the failed resolution of a Promisor's future computation
*/
class Failure implements Promise {
private $error;
/**
* @param \Exception $error
*/
public function __construct(\Exception $error) {
$this->error = $error;
}
/**
* Pass the resolved failure Exception to the specified callback
*
* NOTE: because this object represents a failed Promise it will *always* invoke the specified
* $func callback immediately.
*
* @param callable $func
* @return void
*/
public function when(callable $func) {
$func($this->error, $result = null);
}
/**
* Does nothing -- a resolved promise has no progress updates
*
* @param callable $func
* @return void
*/
public function watch(callable $func) {
return;
}
/**
* Wait for Future value resolution
*
* NOTE: because this object represents a failed Promise it will *always* immediately throw the
* exception responsible for resolution failure.
*
* @throws \Exception
* @return void
*/
public function wait() {
throw $this->error;
}
}

177
lib/Future.php Normal file
View File

@ -0,0 +1,177 @@
<?php
namespace Alert;
class Future implements Promisor, Promise {
private $reactor;
private $isWaiting = false;
private $isResolved = false;
private $watchers = [];
private $whens = [];
private $error;
private $result;
/**
* @param \Alert\Reactor $reactor
*/
public function __construct(Reactor $reactor = null) {
$this->reactor = $reactor ?: ReactorFactory::select();
}
/**
* Retrieve the Promise placeholder for this deferred value
*
* This implementation acts as both Promisor and Promise so we simply return the
* current instance. If users require a Promisor that can only be resolved by code
* holding a reference to the Promisor they may instead use Alert\PrivateFuture.
*
* @return \Alert\Promise
*/
public function promise() {
return $this;
}
/**
* Notify the $func callback when the promise resolves (whether successful or not)
*
* $func callbacks are invoked with parameters in error-first style.
*
* @param callable $func
* @return self
*/
public function when(callable $func) {
if ($this->isResolved) {
$func($this->error, $this->result);
} else {
$this->whens[] = $func;
}
return $this;
}
/**
* Notify the $func callback when resolution progress events are emitted
*
* @param callable $func
* @return self
*/
public function watch(callable $func) {
if (!$this->isResolved) {
$this->watchers[] = $func;
}
return $this;
}
/**
* Block script execution indefinitely until the promise resolves
*
* @throws \Exception
* @return mixed
*/
public function wait() {
if ($this->error) {
throw $this->error;
} elseif ($this->isResolved) {
return $this->result;
}
$resolvedError;
$resolvedResult;
$this->whens[] = function($error, $result) use (&$resolvedError, &$resolvedResult) {
$resolvedError = $error;
$resolvedResult = $result;
$this->isWaiting = false;
};
$this->isWaiting = true;
while ($this->isWaiting) {
$this->reactor->tick();
}
if ($resolvedError) {
throw $resolvedError;
}
return $resolvedResult;
}
/**
* Update watchers of resolution progress events
*
* @param mixed $progress
* @throws \LogicException
* @return void
*/
public function update($progress) {
if ($this->isResolved) {
throw new \LogicException(
'Cannot update resolved promise'
);
}
foreach ($this->watchers as $watcher) {
$watcher($progress);
}
}
/**
* Resolve the promised value as a success
*
* @param mixed $result
* @throws \LogicException
* @return void
*/
public function succeed($result = null) {
if ($this->isResolved) {
throw new \LogicException(
'Promise already resolved'
);
} elseif ($result === $this) {
throw new \LogicException(
'A Promise cannot act as its own resolution result'
);
} elseif ($result instanceof Promise) {
$result->when(function(\Exception $error = null, $result = null) {
if ($error) {
$this->fail($error);
} else {
$this->succeed($result);
}
});
} else {
$this->isResolved = true;
$this->result = $result;
$error = null;
foreach ($this->whens as $when) {
$when($error, $result);
}
$this->whens = $this->watchers = [];
}
}
/**
* Resolve the promised value as a failure
*
* @param \Exception $error
* @throws \LogicException If the Promise has already resolved
* @return void
*/
public function fail(\Exception $error) {
if ($this->isResolved) {
throw new \LogicException(
'Promise already resolved'
);
}
$this->isResolved = true;
$this->error = $error;
$result = null;
foreach ($this->whens as $when) {
$when($error, $result);
}
$this->whens = $this->watchers = [];
}
}

View File

@ -12,12 +12,20 @@ class LibeventReactor implements SignalReactor {
private $garbage = [];
private $gcEvent;
private $stopException;
private $onGeneratorError;
private $resolver;
public function __construct() {
$this->base = event_base_new();
$this->gcEvent = event_new();
event_timer_set($this->gcEvent, [$this, 'collectGarbage']);
event_base_set($this->gcEvent, $this->base);
$this->resolver = new Resolver($this);
$this->onGeneratorError = function($e, $r) {
if ($e) {
throw $e;
}
};
}
/**
@ -33,7 +41,12 @@ class LibeventReactor implements SignalReactor {
}
if ($onStart) {
$this->immediately(function() use ($onStart) { $onStart($this); });
$this->immediately(function() use ($onStart) {
$result = $onStart($this);
if ($result instanceof \Generator) {
$this->resolver->resolve($result)->when($this->onGeneratorError);
}
});
}
$this->doRun();
@ -141,7 +154,10 @@ class LibeventReactor implements SignalReactor {
try {
$callback = $watcher->callback;
$watcherId = $watcher->id;
$callback($this, $watcherId);
$result = $callback($this, $watcherId);
if ($result instanceof \Generator) {
$this->resolver->resolve($result)->when($this->onGeneratorError);
}
$this->cancel($watcherId);
} catch (\Exception $e) {
$this->stopException = $e;
@ -186,7 +202,10 @@ class LibeventReactor implements SignalReactor {
return function() use ($callback, $eventResource, $msDelay, $watcherId) {
try {
$callback($this, $watcherId);
$result = $callback($this, $watcherId);
if ($result instanceof \Generator) {
$this->resolver->resolve($result)->when($this->onGeneratorError);
}
event_add($eventResource, $msDelay);
} catch (\Exception $e) {
$this->stopException = $e;
@ -250,7 +269,10 @@ class LibeventReactor implements SignalReactor {
return function() use ($callback, $watcherId, $stream) {
try {
$callback($this, $watcherId, $stream);
$result = $callback($this, $watcherId, $stream);
if ($result instanceof \Generator) {
$this->resolver->resolve($result)->when($this->onGeneratorError);
}
} catch (\Exception $e) {
$this->stopException = $e;
$this->stop();
@ -317,7 +339,10 @@ class LibeventReactor implements SignalReactor {
return function() use ($callback, $watcherId, $signo) {
try {
$callback($this, $watcherId, $signo);
$result = $callback($this, $watcherId, $signo);
if ($result instanceof \Generator) {
$this->resolver->resolve($result)->when($this->onGeneratorError);
}
} catch (\Exception $e) {
$this->stopException = $e;
$this->stop();

View File

@ -16,6 +16,8 @@ class NativeReactor implements Reactor {
private $resolution = 1000;
private $lastWatcherId = 1;
private $isRunning = false;
private $resolver;
private $onGeneratorError;
private static $DISABLED_ALARM = 0;
private static $DISABLED_READ = 1;
@ -23,6 +25,15 @@ class NativeReactor implements Reactor {
private static $DISABLED_IMMEDIATE = 3;
private static $MICROSECOND = 1000000;
public function __construct() {
$this->resolver = new Resolver($this);
$this->onGeneratorError = function($e, $r) {
if ($e) {
throw $e;
}
};
}
/**
* Start the event reactor and assume program flow control
*
@ -37,7 +48,12 @@ class NativeReactor implements Reactor {
$this->isRunning = true;
if ($onStart) {
$this->immediately(function() use ($onStart) { $onStart($this); });
$this->immediately(function() use ($onStart) {
$result = $onStart($this);
if ($result instanceof \Generator) {
$this->resolver->resolve($result)->when($this->onGeneratorError);
}
});
}
$this->enableAlarms();
while ($this->isRunning) {
@ -84,7 +100,10 @@ class NativeReactor implements Reactor {
if ($immediates = $this->immediates) {
$this->immediates = [];
foreach ($immediates as $watcherId => $callback) {
$callback($this, $watcherId);
$result = $callback($this, $watcherId);
if ($result instanceof \Generator) {
$this->resolver->resolve($result)->when($this->onGeneratorError);
}
}
}
@ -122,13 +141,19 @@ class NativeReactor implements Reactor {
foreach ($r as $readableStream) {
$streamId = (int) $readableStream;
foreach ($this->readCallbacks[$streamId] as $watcherId => $callback) {
$callback($this, $watcherId, $readableStream);
$result = $callback($this, $watcherId, $readableStream);
if ($result instanceof \Generator) {
$this->resolver->resolve($result)->when($this->onGeneratorError);
}
}
}
foreach ($w as $writableStream) {
$streamId = (int) $writableStream;
foreach ($this->writeCallbacks[$streamId] as $watcherId => $callback) {
$callback($this, $watcherId, $writableStream);
$result = $callback($this, $watcherId, $writableStream);
if ($result instanceof \Generator) {
$this->resolver->resolve($result)->when($this->onGeneratorError);
}
}
}
}
@ -136,35 +161,33 @@ class NativeReactor implements Reactor {
private function executeAlarms() {
$now = microtime(true);
asort($this->alarmOrder);
foreach ($this->alarmOrder as $watcherId => $executionCutoff) {
if ($executionCutoff <= $now) {
$this->doAlarmCallback($watcherId);
} else {
if ($executionCutoff > $now) {
break;
}
list($callback, $nextExecution, $interval, $isRepeating) = $this->alarms[$watcherId];
if ($isRepeating) {
$nextExecution += $interval;
$this->alarms[$watcherId] = [$callback, $nextExecution, $interval, $isRepeating];
$this->alarmOrder[$watcherId] = $nextExecution;
} else {
unset(
$this->alarms[$watcherId],
$this->alarmOrder[$watcherId]
);
}
$result = $callback($this, $watcherId);
if ($result instanceof \Generator) {
$this->resolver->resolve($result)->when($this->onGeneratorError);
}
}
}
private function doAlarmCallback($watcherId) {
list($callback, $nextExecution, $interval, $isRepeating) = $this->alarms[$watcherId];
if ($isRepeating) {
$nextExecution += $interval;
$this->alarms[$watcherId] = [$callback, $nextExecution, $interval, $isRepeating];
$this->alarmOrder[$watcherId] = $nextExecution;
} else {
unset(
$this->alarms[$watcherId],
$this->alarmOrder[$watcherId]
);
}
$callback($this, $watcherId);
}
/**
* Schedule an event to trigger once at the specified time
*

74
lib/PrivateFuture.php Normal file
View File

@ -0,0 +1,74 @@
<?php
namespace Alert;
/**
* A PrivateFuture creates a read-only Promise that may *only* be fulfilled by holders of the
* actual PrivateFuture instance. This provides an additional layer of API protection over
* the standard Future Promisor implementation whose Promise can be resolved by any code
* holding a reference to the Future instance.
*/
class PrivateFuture implements Promisor {
private $resolver;
private $updater;
private $promise;
/**
* @param \Alert\Reactor $reactor
*/
public function __construct(Reactor $reactor = null) {
$reactor = $reactor ?: ReactorFactory::select();
$unresolved = new Unresolved($reactor);
$resolver = function(\Exception $error = null, $result = null) {
$this->resolve($error, $result); // bound to private Unresolved::resolve()
};
$updater = function($progress) {
$this->update($progress); // bound to private Unresolved::update()
};
$this->resolver = $resolver->bindTo($unresolved, $unresolved);
$this->updater = $updater->bindTo($unresolved, $unresolved);
$this->promise = $unresolved;
}
/**
* Promise future fulfillment via a temporary placeholder value
*
* @return \Alert\Promise
*/
public function promise() {
return $this->promise;
}
/**
* Update watchers of progress resolving the promised value
*
* @param mixed $progress
* @return void
*/
public function update($progress) {
$updater = $this->updater;
$updater($progress);
}
/**
* Resolve the promised value as a success
*
* @param mixed $result
* @return void
*/
public function succeed($result = null) {
$resolver = $this->resolver;
$resolver($error = null, $result);
}
/**
* Resolve the promised value as a failure
*
* @param \Exception $error
* @return void
*/
public function fail(\Exception $error) {
$resolver = $this->resolver;
$resolver($error, $result = null);
}
}

52
lib/Promise.php Normal file
View File

@ -0,0 +1,52 @@
<?php
namespace Alert;
/**
* A placeholder value for the future result of an asynchronous computation
*/
interface Promise {
/**
* Notify the $func callback when the promise resolves (whether successful or not)
*
* Implementations MUST invoke the $func callback in error-first style, e.g.:
*
* <?php
* $promise->when(function(\Exception $error = null, $result = null) {
* if ($error) {
* // failed
* } else {
* // succeeded
* }
* });
*
* Implementations MUST return the current object instance.
*
* @param callable $func
* @return self
*/
public function when(callable $func);
/**
* Notify the $func callback when resolution progress events are emitted
*
* Implementations MUST invoke $func callback with a single update parameter, e.g.:
*
* <?php
* $promise->watch(function($update) { ... });
*
* Implementations MUST return the current object instance.
*
* @param callable $func
* @return self
*/
public function watch(callable $func);
/**
* Block script execution indefinitely until the promise resolves
*
* In the event of promise failure, implementations MUST throw the Exception object used to
* fail the Promise. Upon success this method MUST return the successfully resolved value.
*/
public function wait();
}

63
lib/Promisor.php Normal file
View File

@ -0,0 +1,63 @@
<?php
namespace Alert;
/**
* A Promisor represents a contract to resolve a deferred value at some point in the future
*
* A Promisor resolves its associated placeholder value (Promise) Promisor::succeed() or
* Promisor::fail(). Promisor::update() may be used to notify watchers of progress resolving
* the future value.
*
* Example:
*
* function myAsyncProducer() {
* // Create a new promise that needs to be resolved
* $future = new Alert\Future;
*
* // When we eventually finish non-blocking value resolution we
* // simply call the relevant Promise method to notify any code
* // with references to the Promisor's associated Promise:
* // $future->succeed($value) -or- $future->fail($exceptionObj)
*
* return $future->promise();
* }
*
* The following correlations exist between Promisor and Promise methods:
*
* - Promisor::update | Promise::watch
* - Promisor::succeed | Promise::when
* - Promisor::fail | Promise::when
*/
interface Promisor {
/**
* Promise future fulfillment via a temporary placeholder value
*
* @return \Alert\Promise
*/
public function promise();
/**
* Update watchers of progress resolving the promised value
*
* @param mixed $progress
* @return void
*/
public function update($progress);
/**
* Resolve the promised value as a success
*
* @param mixed $result
* @return void
*/
public function succeed($result = null);
/**
* Resolve the promised value as a failure
*
* @param \Exception $error
* @return void
*/
public function fail(\Exception $error);
}

79
lib/Resolver.php Normal file
View File

@ -0,0 +1,79 @@
<?php
namespace Alert;
class Resolver {
private $reactor;
/**
* @param \Alert\Reactor $reactor
*/
public function __construct(Reactor $reactor = null) {
$this->reactor = $reactor ?: \Alert\reactor();
}
/**
* A co-routine to resolve Generators
*
* Returns a promise that will resolve when the generator completes. The final value yielded by
* the generator is used to resolve the returned promise on success.
*
* Generators are expected to yield Promise instances and/or other Generator instances.
*
* Example:
*
* $generator = function() {
* $a = (yield 2);
* $b = (yield new Success(21));
* yield $a * $b;
* };
*
* resolve($generator())->when(function($error, $result) {
* var_dump($result); // int(42)
* });
*
* @param \Generator
* @return \Alert\Promise
*/
public function resolve(\Generator $gen) {
$future = new Future($this->reactor);
$this->advance($gen, $future);
return $future;
}
private function advance(\Generator $gen, Future $future, $previousResult = null) {
try {
$current = $gen->current();
} catch (\Exception $e) {
return $future->fail($e);
}
if ($current instanceof Promise) {
$current->when(function($error, $result) use ($gen, $future) {
$this->send($gen, $future, $error, $result);
});
} elseif ($current instanceof \Generator) {
$this->resolve($current)->when(function($error, $result) use ($gen, $future) {
$this->send($gen, $future, $error, $result);
});
} elseif ($gen->valid()) {
$this->send($gen, $future, $error = null, $current);
} else {
$future->succeed($previousResult);
}
}
private function send(\Generator $gen, Future $future, \Exception $error = null, $result = null) {
try {
if ($error) {
$gen->throw($error);
} else {
$gen->send($result);
}
$this->advance($gen, $future, $result);
} catch (\Exception $error) {
$future->fail($error);
}
}
}

52
lib/Success.php Normal file
View File

@ -0,0 +1,52 @@
<?php
namespace Alert;
/**
* Represents the successful resolution of a Promisor's future computation
*/
class Success implements Promise {
private $result;
/**
* @param mixed $result
*/
public function __construct($result = null) {
$this->result = $result;
}
/**
* Pass the resolved result to the specified $func callback
*
* NOTE: because this object represents a successfully resolved Promise it will *always* invoke
* the specified $func callback immediately.
*
* @param callable $func
* @return void
*/
public function when(callable $func) {
$func($error = null, $this->result);
}
/**
* Does nothing -- a resolved promise has no progress updates
*
* @param callable $func
* @return void
*/
public function watch(callable $func) {
return;
}
/**
* Wait for Future value resolution
*
* NOTE: because this object represents a successfully resolved Promise it will *always* return
* the resolved result immediately.
*
* @return mixed
*/
public function wait() {
return $this->result;
}
}

124
lib/Unresolved.php Normal file
View File

@ -0,0 +1,124 @@
<?php
namespace Alert;
/**
* A placeholder value that will be resolved at some point in the future by
* the Promisor that created it.
*/
class Unresolved implements Promise {
private $reactor;
private $isWaiting = false;
private $isResolved = false;
private $watchers = [];
private $whens = [];
private $error;
private $result;
/**
* @param \Alert\Reactor $reactor
*/
public function __construct(Reactor $reactor = null) {
$this->reactor = $reactor ?: ReactorFactory::select();
}
/**
* Notify the $func callback when the promise resolves (whether successful or not)
*
* @param callable $func
* @return self
*/
public function when(callable $func) {
if ($this->isResolved) {
$func($this->error, $this->result);
} else {
$this->whens[] = $func;
}
return $this;
}
/**
* Notify the $func callback when resolution progress events are emitted
*
* @param callable $func
* @return self
*/
public function watch(callable $func) {
if (!$this->isResolved) {
$this->watchers[] = $func;
}
return $this;
}
/**
* Block script execution indefinitely until the promise resolves
*
* @throws \Exception
* @return mixed
*/
public function wait() {
if ($this->error) {
throw $error;
} elseif ($this->isResolved) {
return $this->result;
}
$resolvedError;
$resolvedResult;
$this->whens[] = function($error, $result) use (&$resolvedError, &$resolvedResult) {
$resolvedError = $error;
$resolvedResult = $result;
$this->isWaiting = false;
};
$this->isWaiting = true;
while ($this->isWaiting) {
$this->reactor->tick();
}
if ($resolvedError) {
throw $resolvedError;
}
return $resolvedResult;
}
private function resolve(\Exception $error = null, $result = null) {
if ($this->isResolved) {
throw new \LogicException(
'Promise already resolved'
);
} elseif ($result === $this) {
throw new \LogicException(
'A Promise cannot act as its own resolution result'
);
} elseif ($result instanceof Promise) {
$result->when(function($error, $result) {
$this->resolve($error, $result);
});
} else {
$this->isResolved = true;
$this->error = $error;
$this->result = $result;
foreach ($this->whens as $when) {
$when($error, $result);
}
$this->whens = $this->watchers = [];
}
}
private function update($progress) {
if ($this->isResolved) {
throw new \LogicException(
'Cannot update resolved promise'
);
}
foreach ($this->watchers as $watcher) {
$watcher($progress);
}
}
}

View File

@ -15,6 +15,8 @@ class UvReactor implements SignalReactor {
private $resolution = 1000;
private $isWindows;
private $immediates = [];
private $onGeneratorError;
private $resolver;
private static $MODE_ONCE = 0;
private static $MODE_REPEAT = 1;
@ -27,6 +29,12 @@ class UvReactor implements SignalReactor {
$this->gcWatcher = uv_timer_init($this->loop);
$this->gcCallback = function() { $this->collectGarbage(); };
$this->isWindows = (stripos(PHP_OS, 'win') === 0);
$this->resolver = new Resolver($this);
$this->onGeneratorError = function($e, $r) {
if ($e) {
throw $e;
}
};
}
private function collectGarbage() {
@ -48,7 +56,12 @@ class UvReactor implements SignalReactor {
$this->isRunning = true;
if ($onStart) {
$this->immediately(function() use ($onStart) { $onStart($this); });
$this->immediately(function() use ($onStart) {
$result = $onStart($this);
if ($result instanceof \Generator) {
$this->resolver->resolve($result)->when($this->onGeneratorError);
}
});
}
while ($this->isRunning) {
@ -68,7 +81,10 @@ class UvReactor implements SignalReactor {
private function doImmediates() {
$immediates = $this->immediates;
foreach ($immediates as $watcherId => $callback) {
$callback($this, $watcherId);
$result = $callback($this, $watcherId);
if ($result instanceof \Generator) {
$this->resolver->resolve($result)->when($this->onGeneratorError);
}
unset(
$this->immediates[$watcherId],
$this->watchers[$watcherId]
@ -188,7 +204,10 @@ class UvReactor implements SignalReactor {
private function wrapTimerCallback($watcher, $callback) {
return function() use ($watcher, $callback) {
try {
$callback($this, $watcher->id);
$result = $callback($this, $watcher->id);
if ($result instanceof \Generator) {
$this->resolver->resolve($result)->when($this->onGeneratorError);
}
if ($watcher->mode === self::$MODE_ONCE) {
$this->clearWatcher($watcher->id);
}
@ -309,7 +328,10 @@ class UvReactor implements SignalReactor {
private function wrapStreamCallback($watcher, $callback) {
return function() use ($watcher, $callback) {
try {
$callback($this, $watcher->id, $watcher->stream);
$result = $callback($this, $watcher->id, $watcher->stream);
if ($result instanceof \Generator) {
$this->resolver->resolve($result)->when($this->onGeneratorError);
}
} catch (\Exception $e) {
$this->stopException = $e;
$this->stop();
@ -343,7 +365,10 @@ class UvReactor implements SignalReactor {
private function wrapSignalCallback($watcher, $callback) {
return function() use ($watcher, $callback) {
try {
$callback($this, $watcher->id, $watcher->signo);
$result = $callback($this, $watcher->id, $watcher->signo);
if ($result instanceof \Generator) {
$this->resolver->resolve($result)->when($this->onGeneratorError);
}
} catch (\Exception $e) {
$this->stopException = $e;
$this->stop();

View File

@ -1,238 +1,471 @@
<?php
namespace Alert;
/**
* Schedule a callback for immediate invocation in the next event loop iteration
*
* Watchers registered using this function will be automatically garbage collected after execution.
*
* @param callable $func Any valid PHP callable
* @return int Returns the unique watcher ID for disable/enable/cancel
*/
function immediately(callable $func) {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
return $reactor->immediately($func);
}
/**
* Schedule a callback to execute once
*
* Watchers registered using this function will be automatically garbage collected after execution.
*
* @param callable $func Any valid PHP callable
* @param int $msDelay The delay in milliseconds before the callback will trigger (may be zero)
* @return int Returns the unique watcher ID for disable/enable/cancel
*/
function once(callable $func, $msDelay) {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
return $reactor->once($func, $msDelay);
}
/**
* Schedule a recurring callback to execute every $interval seconds until cancelled
*
* IMPORTANT: Watchers registered using this function must be manually cleared using cancel() to
* free the associated memory. Failure to cancel repeating watchers (even if disable() is used)
* will lead to memory leaks.
*
* @param callable $func Any valid PHP callable
* @param int $msDelay The delay in milliseconds in-between callback invocations (may be zero)
* @return int Returns the unique watcher ID for disable/enable/cancel
*/
function repeat(callable $func, $msDelay) {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
return $reactor->repeat($func, $msDelay);
}
/**
* Schedule an event to trigger once at the specified time
*
* Watchers registered using this function will be automatically garbage collected after execution.
*
* @param callable $func Any valid PHP callable
* @param string $timeString Any string that can be parsed by strtotime() and is in the future
* @return int Returns the unique watcher ID for disable/enable/cancel
*/
function at(callable $func, $timeString) {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
return $reactor->at($func, $timeString);
}
/**
* Enable a disabled timer or stream IO watcher
*
* Calling enable() on an already-enabled watcher will have no effect.
*
* @param int $watcherId
* @return void
*/
function enable($watcherId) {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
$reactor->enable($watcherId);
}
/**
* Temporarily disable (but don't cancel) an existing timer/stream watcher
*
* Calling disable() on a nonexistent or previously-disabled watcher will have no effect.
*
* NOTE: Disabling a repeating or stream watcher is not sufficient to free associated resources.
* When the watcher is no longer needed applications must still use cancel() to clear related
* memory and avoid leaks.
*
* @param int $watcherId
* @return void
*/
function disable($watcherId) {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
$reactor->disable($watcherId);
}
/**
* Cancel an existing timer/stream watcher
*
* Calling cancel() on a non-existent watcher will have no effect.
*
* @param int $watcherId
* @return void
*/
function cancel($watcherId) {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
$reactor->cancel($watcherId);
}
/**
* Watch a stream IO resource for readable data and trigger the specified callback when actionable
*
* IMPORTANT: Watchers registered using this function must be manually cleared using cancel() to
* free the associated memory. Failure to cancel repeating watchers (even if disable() is used)
* will lead to memory leaks.
*
* @param resource $stream A stream resource to watch for readable data
* @param callable $func Any valid PHP callable
* @param bool $enableNow Should the watcher be enabled now or held for later use?
* @return int Returns the unique watcher ID for disable/enable/cancel
*/
function onReadable($stream, callable $func, $enableNow = true) {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
return $reactor->onReadable($stream, $func, $enableNow);
}
/**
* Watch a stream IO resource for writability and trigger the specified callback when actionable
*
* NOTE: Sockets are essentially "always writable" (as long as their write buffer is not full).
* Therefore, it's critical that applications disable or cancel write watchers as soon as all data
* is written or the watcher will trigger endlessly and hammer the CPU.
*
* IMPORTANT: Watchers registered using this function must be manually cleared using cancel() to
* free the associated memory. Failure to cancel repeating watchers (even if disable() is used)
* will lead to memory leaks.
*
* @param resource $stream A stream resource to watch for writable data
* @param callable $func Any valid PHP callable
* @param bool $enableNow Should the watcher be enabled now or held for later use?
* @return int Returns the unique watcher ID for disable/enable/cancel
*/
function onWritable($stream, callable $func, $enableNow = true) {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
return $reactor->onWritable($stream, $func, $enableNow);
}
/**
* Similar to onReadable/onWritable but uses a flag bitmask for extended option assignment
*
* IMPORTANT: Watchers registered using this function must be manually cleared using cancel() to
* free the associated memory. Failure to cancel repeating watchers (even if disable() is used)
* will lead to memory leaks.
*
* @param resource $stream A stream resource to watch for IO capability
* @param callable $func Any valid PHP callable
* @param int $flags Option bitmask (Reactor::WATCH_READ, Reactor::WATCH_WRITE, etc)
*/
function watchStream($stream, callable $func, $flags) {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
return $reactor->watchStream($stream, $func, $flags);
}
/**
* Execute a single event loop iteration
*
* @return void
*/
function tick() {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
$reactor->tick();
}
/**
* Start the event reactor and assume program flow control
*
* @param callable $onStart Optional callback to invoke immediately upon reactor start
* @return void
*/
function run(callable $onStart = null) {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
$reactor->run($onStart);
}
/**
* Stop the event reactor
*
* @return void
*/
function stop() {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
$reactor->stop();
}
/**
* Get the global event reactor
*
* Note that the $factory callable is only invoked if no global reactor has yet been initialized.
*
* @param callable $factory Optional factory callable for initializing a reactor
* @return \Alert\Reactor
*/
function reactor(callable $factory = null) {
static $reactor;
return ($reactor = $reactor ?: ReactorFactory::select($factory));
}
/**
* React to process control signals
*
* @param int $signo The signal number to watch for
* @param callable $onSignal
* @throws \RuntimeException if the current environment cannot support signal handling
* @return int Returns a unique integer watcher ID
*/
function onSignal($signo, callable $onSignal) {
/**
* @var $reactor \Alert\SignalReactor
*/
static $reactor;
if ($reactor) {
return $reactor->onSignal($signo, $onSignal);
} elseif (!($reactor = ReactorFactory::select()) instanceof SignalReactor) {
throw new \RuntimeException(
'Your PHP environment does not support signal handling. Please install pecl/libevent or the php-uv extension'
);
} else {
return $reactor->onSignal($signo, $onSignal);
}
}
<?php
namespace Alert;
/**
* Schedule a callback for immediate invocation in the next event loop iteration
*
* Watchers registered using this function will be automatically garbage collected after execution.
*
* @param callable $func Any valid PHP callable
* @return int Returns the unique watcher ID for disable/enable/cancel
*/
function immediately(callable $func) {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
return $reactor->immediately($func);
}
/**
* Schedule a callback to execute once
*
* Watchers registered using this function will be automatically garbage collected after execution.
*
* @param callable $func Any valid PHP callable
* @param int $msDelay The delay in milliseconds before the callback will trigger (may be zero)
* @return int Returns the unique watcher ID for disable/enable/cancel
*/
function once(callable $func, $msDelay) {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
return $reactor->once($func, $msDelay);
}
/**
* Schedule a recurring callback to execute every $interval seconds until cancelled
*
* IMPORTANT: Watchers registered using this function must be manually cleared using cancel() to
* free the associated memory. Failure to cancel repeating watchers (even if disable() is used)
* will lead to memory leaks.
*
* @param callable $func Any valid PHP callable
* @param int $msDelay The delay in milliseconds in-between callback invocations (may be zero)
* @return int Returns the unique watcher ID for disable/enable/cancel
*/
function repeat(callable $func, $msDelay) {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
return $reactor->repeat($func, $msDelay);
}
/**
* Schedule an event to trigger once at the specified time
*
* Watchers registered using this function will be automatically garbage collected after execution.
*
* @param callable $func Any valid PHP callable
* @param string $timeString Any string that can be parsed by strtotime() and is in the future
* @return int Returns the unique watcher ID for disable/enable/cancel
*/
function at(callable $func, $timeString) {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
return $reactor->at($func, $timeString);
}
/**
* Enable a disabled timer or stream IO watcher
*
* Calling enable() on an already-enabled watcher will have no effect.
*
* @param int $watcherId
* @return void
*/
function enable($watcherId) {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
$reactor->enable($watcherId);
}
/**
* Temporarily disable (but don't cancel) an existing timer/stream watcher
*
* Calling disable() on a nonexistent or previously-disabled watcher will have no effect.
*
* NOTE: Disabling a repeating or stream watcher is not sufficient to free associated resources.
* When the watcher is no longer needed applications must still use cancel() to clear related
* memory and avoid leaks.
*
* @param int $watcherId
* @return void
*/
function disable($watcherId) {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
$reactor->disable($watcherId);
}
/**
* Cancel an existing timer/stream watcher
*
* Calling cancel() on a non-existent watcher will have no effect.
*
* @param int $watcherId
* @return void
*/
function cancel($watcherId) {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
$reactor->cancel($watcherId);
}
/**
* Watch a stream IO resource for readable data and trigger the specified callback when actionable
*
* IMPORTANT: Watchers registered using this function must be manually cleared using cancel() to
* free the associated memory. Failure to cancel repeating watchers (even if disable() is used)
* will lead to memory leaks.
*
* @param resource $stream A stream resource to watch for readable data
* @param callable $func Any valid PHP callable
* @param bool $enableNow Should the watcher be enabled now or held for later use?
* @return int Returns the unique watcher ID for disable/enable/cancel
*/
function onReadable($stream, callable $func, $enableNow = true) {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
return $reactor->onReadable($stream, $func, $enableNow);
}
/**
* Watch a stream IO resource for writability and trigger the specified callback when actionable
*
* NOTE: Sockets are essentially "always writable" (as long as their write buffer is not full).
* Therefore, it's critical that applications disable or cancel write watchers as soon as all data
* is written or the watcher will trigger endlessly and hammer the CPU.
*
* IMPORTANT: Watchers registered using this function must be manually cleared using cancel() to
* free the associated memory. Failure to cancel repeating watchers (even if disable() is used)
* will lead to memory leaks.
*
* @param resource $stream A stream resource to watch for writable data
* @param callable $func Any valid PHP callable
* @param bool $enableNow Should the watcher be enabled now or held for later use?
* @return int Returns the unique watcher ID for disable/enable/cancel
*/
function onWritable($stream, callable $func, $enableNow = true) {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
return $reactor->onWritable($stream, $func, $enableNow);
}
/**
* Similar to onReadable/onWritable but uses a flag bitmask for extended option assignment
*
* IMPORTANT: Watchers registered using this function must be manually cleared using cancel() to
* free the associated memory. Failure to cancel repeating watchers (even if disable() is used)
* will lead to memory leaks.
*
* @param resource $stream A stream resource to watch for IO capability
* @param callable $func Any valid PHP callable
* @param int $flags Option bitmask (Reactor::WATCH_READ, Reactor::WATCH_WRITE, etc)
*/
function watchStream($stream, callable $func, $flags) {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
return $reactor->watchStream($stream, $func, $flags);
}
/**
* Execute a single event loop iteration
*
* @return void
*/
function tick() {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
$reactor->tick();
}
/**
* Start the event reactor and assume program flow control
*
* @param callable $onStart Optional callback to invoke immediately upon reactor start
* @return void
*/
function run(callable $onStart = null) {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
$reactor->run($onStart);
}
/**
* Stop the event reactor
*
* @return void
*/
function stop() {
static $reactor;
$reactor = $reactor ?: ReactorFactory::select();
$reactor->stop();
}
/**
* Get the global event reactor
*
* Note that the $factory callable is only invoked if no global reactor has yet been initialized.
*
* @param callable $factory Optional factory callable for initializing a reactor
* @return \Alert\Reactor
*/
function reactor(callable $factory = null) {
static $reactor;
return ($reactor = $reactor ?: ReactorFactory::select($factory));
}
/**
* React to process control signals
*
* @param int $signo The signal number to watch for
* @param callable $onSignal
* @throws \RuntimeException if the current environment cannot support signal handling
* @return int Returns a unique integer watcher ID
*/
function onSignal($signo, callable $onSignal) {
/**
* @var $reactor \Alert\SignalReactor
*/
static $reactor;
if ($reactor) {
return $reactor->onSignal($signo, $onSignal);
} elseif (!($reactor = ReactorFactory::select()) instanceof SignalReactor) {
throw new \RuntimeException(
'Your PHP environment does not support signal handling. Please install pecl/libevent or the php-uv extension'
);
} else {
return $reactor->onSignal($signo, $onSignal);
}
}
/**
* If any one of the Promises fails the resulting Promise will fail. Otherwise
* the resulting Promise succeeds with an array matching keys from the input array
* to their resolved values.
*
* @param array[\Alert\Promise] $promises
* @return \Alert\Promise
*/
function all(array $promises) {
if (empty($promises)) {
return new Success([]);
}
$results = [];
$count = count($promises);
$future = new Future;
$done = false;
foreach ($promises as $key => $promise) {
$promise = ($promise instanceof Promise) ? $promise : new Success($promise);
$promise->when(function($error, $result) use (&$count, &$results, $key, $future, &$done) {
if ($done) {
// If the future already failed we don't bother.
return;
}
if ($error) {
$done = true;
$future->fail($error);
return;
}
$results[$key] = $result;
if (--$count === 0) {
$done = true;
$future->succeed($results);
}
});
}
return $future->promise();
}
/**
* Resolves with a two-item array delineating successful and failed Promise results.
*
* The resulting Promise will only fail if ALL of the Promise values fail or if the
* Promise array is empty.
*
* The resulting Promise is resolved with an indexed two-item array of the following form:
*
* [$arrayOfFailures, $arrayOfSuccesses]
*
* The individual keys in the resulting arrays are preserved from the initial Promise array
* passed to the function for evaluation.
*
* @param array[\Alert\Promise] $promises
* @return \Alert\Promise
*/
function some(array $promises) {
if (empty($promises)) {
return new Failure(new \LogicException(
'No promises or values provided'
));
}
$results = $errors = [];
$count = count($promises);
$future = new Future;
foreach ($promises as $key => $promise) {
$promise = ($promise instanceof Promise) ? $promise : new Success($promise);
$promise->when(function($error, $result) use (&$count, &$results, &$errors, $key, $future) {
if ($error) {
$errors[$key] = $error;
} else {
$results[$key] = $result;
}
if (--$count > 0) {
return;
} elseif (empty($results)) {
$future->fail(new \RuntimeException(
'All promises failed'
));
} else {
$future->succeed([$errors, $results]);
}
});
}
return $future->promise();
}
/**
* Resolves with the first successful Promise value. The resulting Promise will only fail if all
* Promise values in the group fail or if the initial Promise array is empty.
*
* @param array[\Alert\Promise] $promises
* @return \Alert\Promise
*/
function first(array $promises) {
if (empty($promises)) {
return new Failure(new \LogicException(
'No promises or values provided'
));
}
$count = count($promises);
$done = false;
$future = new Future;
foreach ($promises as $promise) {
$promise = ($promise instanceof Promise) ? $promise : new Success($promise);
$promise->when(function($error, $result) use (&$count, &$done, $future) {
if ($done) {
// we don't care about Futures that resolve after the first
return;
} elseif ($error && --$count === 0) {
$future->fail(new \RuntimeException(
'All promises failed'
));
} elseif (empty($error)) {
$done = true;
$this->succeed($result);
}
});
}
return $future->promise();
}
/**
* Map future values using the specified callable
*
* @param array $promises
* @param callable $func
* @return \Alert\Promise
*/
function map(array $promises, callable $func) {
if (empty($promises)) {
return new Success([]);
}
$results = [];
$count = count($promises);
$future = new Future;
$done = false;
foreach ($promises as $key => $promise) {
$promise = ($promise instanceof Promise) ? $promise : new Success($promise);
$promise->when(function($error, $result) use (&$count, &$results, $key, $future, $func, &$done) {
if ($done) {
// If the future already failed we don't bother.
return;
}
if ($error) {
$done = true;
$future->fail($error);
return;
}
$results[$key] = $func($result);
if (--$count === 0) {
$future->succeed($results);
}
});
}
return $future->promise();
}
/**
* Filter future values using the specified callable
*
* If the functor returns a truthy value the resolved promise result is retained, otherwise it is
* discarded. Array keys are retained for any results not filtered out by the functor.
*
* @param array $promises
* @param callable $func
* @return \Amp\Promise
*/
function filter(array $promises, callable $func) {
if (empty($promises)) {
return new Success([]);
}
$results = [];
$count = count($promises);
$future = new Future;
$done = false;
foreach ($promises as $key => $promise) {
$promise = ($promise instanceof Promise) ? $promise : new Success($promise);
$promise->when(function($error, $result) use (&$count, &$results, $key, $future, $func, &$done) {
if ($done) {
// If the future result already failed we don't bother.
return;
}
if ($error) {
$done = true;
$future->fail($error);
return;
}
if ($func($result)) {
$results[$key] = $result;
}
if (--$count === 0) {
$future->succeed($results);
}
});
}
return $future->promise();
}
/**
* A co-routine to resolve Generators that yield Promise instances
*
* Returns a promise that will resolve when the generator completes. The final value yielded by the
* generator is used to resolve the returned promise.
*
* @param \Generator
* @return \Alert\Promise
*/
function resolve(\Generator $gen) {
static $resolver;
if (empty($resolver)) {
$resolver = new Resolver;
}
return $resolver->resolve($gen);
}

27
test/FailureTest.php Normal file
View File

@ -0,0 +1,27 @@
<?php
namespace AlertTest;
use Alert\Failure;
class FailureTest extends \PHPUnit_Framework_TestCase {
public function testWhenInvokedImmediately() {
$exception = new \Exception('test');
$failure = new Failure($exception);
$failure->when(function($error, $result) use ($exception) {
$this->assertNull($result);
$this->assertSame($exception, $error);
});
}
/**
* @expectedException \RuntimeException
* @expectedExceptionMessage test
*/
public function testWaitThrowsImmediately() {
$exception = new \RuntimeException('test');
$failure = new Failure($exception);
$failure->wait();
}
}

73
test/FunctionsTest.php Normal file
View File

@ -0,0 +1,73 @@
<?php
namespace AlertTest;
use Alert\Success;
use Alert\Failure;
class FunctionsTest extends \PHPUnit_Framework_TestCase {
public function testAllResolvesWithArrayOfResults() {
$promises = [
'r1' => new Success(42),
'r2' => new Success(41),
];
$expected = ['r1' => 42, 'r2' => 41];
$results = \Alert\all($promises)->wait();
$this->assertSame($expected, $results);
}
/**
* @expectedException \RuntimeException
* @expectedExceptionMessage zanzibar
*/
public function testAllThrowsIfAnyIndividualPromiseFails() {
$exception = new \RuntimeException('zanzibar');
$promises = [
'r1' => new Success(42),
'r2' => new Failure($exception),
'r3' => new Success(40),
];
$results = \Alert\all($promises)->wait();
}
public function testSomeReturnsArrayOfErrorsAndResults() {
$exception = new \RuntimeException('zanzibar');
$promises = [
'r1' => new Success(42),
'r2' => new Failure($exception),
'r3' => new Success(40),
];
list($errors, $results) = \Alert\some($promises)->wait();
$this->assertSame(['r2' => $exception], $errors);
$this->assertSame(['r1' => 42, 'r3' => 40], $results);
}
/**
* @expectedException \RuntimeException
* @expectedExceptionMessage All promises failed
*/
public function testSomeThrowsIfNoPromisesResolveSuccessfully() {
$promises = [
'r1' => new Failure(new \RuntimeException),
'r2' => new Failure(new \RuntimeException),
];
list($errors, $results) = \Alert\some($promises)->wait();
}
public function testResolveResolvesGeneratorResult() {
$gen = function() {
$a = (yield 21);
$b = (yield new Success(2));
yield ($a * $b);
};
$promise = \Alert\resolve($gen());
$expected = 42;
$actual = $promise->wait();
$this->assertSame($expected, $actual);
}
}

102
test/FutureTest.php Normal file
View File

@ -0,0 +1,102 @@
<?php
namespace AlertTest;
use Alert\Future;
use Alert\NativeReactor;
class FutureTest extends \PHPUnit_Framework_TestCase {
public function testPromiseReturnsSelf() {
$future = new Future($this->getMock('Alert\Reactor'));
$this->assertSame($future, $future->promise());
}
public function testWhenInvokesCallbackWithResultIfAlreadySucceeded() {
$deferred = new Future($this->getMock('Alert\Reactor'));
$promise = $deferred->promise();
$deferred->succeed(42);
$promise->when(function($e, $r) {
$this->assertSame(42, $r);
$this->assertNull($e);
});
}
public function testWhenInvokesCallbackWithErrorIfAlreadyFailed() {
$promisor = new Future($this->getMock('Alert\Reactor'));
$promise = $promisor->promise();
$exception = new \Exception('test');
$promisor->fail($exception);
$promise->when(function($e, $r) use ($exception) {
$this->assertSame($exception, $e);
$this->assertNull($r);
});
}
public function testWaitReturnsOnResolution() {
$reactor = new NativeReactor;
$promisor = new Future($reactor);
$reactor->once(function() use ($promisor) { $promisor->succeed(42); }, $msDelay = 100);
$this->assertSame(42, $promisor->promise()->wait());
}
/**
* @expectedException \LogicException
* @expectedExceptionMessage Promise already resolved
*/
public function testSucceedThrowsIfAlreadyResolved() {
$promisor = new Future($this->getMock('Alert\Reactor'));
$promisor->succeed(42);
$promisor->succeed('zanzibar');
}
/**
* @expectedException \LogicException
* @expectedExceptionMessage A Promise cannot act as its own resolution result
*/
public function testSucceedThrowsIfPromiseIsTheResolutionValue() {
$promisor = new Future($this->getMock('Alert\Reactor'));
$promise = $promisor->promise();
$promisor->succeed($promise);
}
/**
* @expectedException \LogicException
* @expectedExceptionMessage Promise already resolved
*/
public function testFailThrowsIfAlreadyResolved() {
$promisor = new Future($this->getMock('Alert\Reactor'));
$promisor->succeed(42);
$promisor->fail(new \Exception);
}
public function testSucceedingWithPromisePipelinesResult() {
$reactor = new NativeReactor;
$promisor = new Future($reactor);
$next = new Future($reactor);
$reactor->once(function() use ($next) {
$next->succeed(42);
}, $msDelay = 1);
$promisor->succeed($next->promise());
$this->assertSame(42, $promisor->promise()->wait());
}
/**
* @expectedException \RuntimeException
* @expectedExceptionMessage fugazi
*/
public function testFailingWithPromisePipelinesResult() {
$reactor = new NativeReactor;
$promisor = new Future($reactor);
$next = new Future($reactor);
$reactor->once(function() use ($next) {
$next->fail(new \RuntimeException('fugazi'));
}, $msDelay = 10);
$promisor->succeed($next->promise());
$promisor->promise()->wait();
}
}

View File

@ -0,0 +1,74 @@
<?php
namespace AlertTest;
use Alert\PrivateFuture;
use Alert\NativeReactor;
class PrivateFutureTest extends \PHPUnit_Framework_TestCase {
public function testPromiseReturnsUnresolvedInstance() {
$future = new PrivateFuture($this->getMock('Alert\Reactor'));
$this->assertInstanceOf('Alert\Unresolved', $future->promise());
}
/**
* @expectedException \LogicException
* @expectedExceptionMessage Promise already resolved
*/
public function testSucceedThrowsIfAlreadyResolved() {
$promisor = new PrivateFuture($this->getMock('Alert\Reactor'));
$promisor->succeed(42);
$promisor->succeed('zanzibar');
}
/**
* @expectedException \LogicException
* @expectedExceptionMessage A Promise cannot act as its own resolution result
*/
public function testSucceedThrowsIfPromiseIsTheResolutionValue() {
$promisor = new PrivateFuture($this->getMock('Alert\Reactor'));
$promise = $promisor->promise();
$promisor->succeed($promise);
}
/**
* @expectedException \LogicException
* @expectedExceptionMessage Promise already resolved
*/
public function testFailThrowsIfAlreadyResolved() {
$promisor = new PrivateFuture($this->getMock('Alert\Reactor'));
$promisor->succeed(42);
$promisor->fail(new \Exception);
}
public function testSucceedingWithPromisePipelinesResult() {
$reactor = new NativeReactor;
$promisor = new PrivateFuture($reactor);
$next = new PrivateFuture($reactor);
$reactor->once(function() use ($next) {
$next->succeed(42);
}, $msDelay = 1);
$promisor->succeed($next->promise());
$this->assertSame(42, $promisor->promise()->wait());
}
/**
* @expectedException \RuntimeException
* @expectedExceptionMessage fugazi
*/
public function testFailingWithPromisePipelinesResult() {
$reactor = new NativeReactor;
$promisor = new PrivateFuture($reactor);
$next = new PrivateFuture($reactor);
$reactor->once(function() use ($next) {
$next->fail(new \RuntimeException('fugazi'));
}, $msDelay = 10);
$promisor->succeed($next->promise());
$promisor->promise()->wait();
}
}

View File

@ -269,4 +269,56 @@ abstract class ReactorTest extends \PHPUnit_Framework_TestCase {
$reactor->once([$reactor, 'stop'], $msDelay = 100);
$reactor->run();
}
public function testOnStartGeneratorResolvesAutomatically() {
$reactor = $this->getReactor();
$test = '';
$gen = function($reactor) use (&$test) {
yield;
$test = "Thus Spake Zarathustra";
$reactor->once(function() use ($reactor) { $reactor->stop(); }, 50);
};
$reactor->run($gen);
$this->assertSame("Thus Spake Zarathustra", $test);
}
public function testImmediatelyGeneratorResolvesAutomatically() {
$reactor = $this->getReactor();
$test = '';
$gen = function($reactor) use (&$test) {
yield;
$test = "The abyss will gaze back into you";
$reactor->once(function() use ($reactor) { $reactor->stop(); }, 50);
};
$reactor->immediately($gen);
$reactor->run();
$this->assertSame("The abyss will gaze back into you", $test);
}
public function testOnceGeneratorResolvesAutomatically() {
$reactor = $this->getReactor();
$test = '';
$gen = function($reactor) use (&$test) {
yield;
$test = "There are no facts, only interpretations.";
$reactor->once(function() use ($reactor) { $reactor->stop(); }, 50);
};
$reactor->once($gen, 1);
$reactor->run();
$this->assertSame("There are no facts, only interpretations.", $test);
}
public function testRepeatGeneratorResolvesAutomatically() {
$reactor = $this->getReactor();
$test = '';
$gen = function($reactor, $watcherId) use (&$test) {
$reactor->cancel($watcherId);
yield;
$test = "Art is the supreme task";
$reactor->stop();
};
$reactor->repeat($gen, 50);
$reactor->run();
$this->assertSame("Art is the supreme task", $test);
}
}

63
test/ResolverTest.php Normal file
View File

@ -0,0 +1,63 @@
<?php
namespace AlertTest;
use Alert\Success;
use Alert\Failure;
use Alert\Resolver;
use Alert\NativeReactor;
class ResolverTest extends \PHPUnit_Framework_TestCase {
public function testResolvedValueEqualsFinalYield() {
$gen = function() {
$a = (yield 21);
$b = (yield new Success(2));
yield ($a * $b);
};
$expected = 42;
$resolver = new Resolver(new NativeReactor);
$actual = $resolver->resolve($gen())->wait();
$this->assertSame($expected, $actual);
}
public function testFutureErrorsAreThrownIntoGenerator() {
$gen = function() {
$a = (yield 21);
$b = 1;
try {
yield new Failure(new \Exception('test'));
$this->fail('Code path should not be reached');
} catch (\Exception $e) {
$this->assertSame('test', $e->getMessage());
$b = 2;
}
yield ($a * $b);
};
$expected = 42;
$resolver = new Resolver(new NativeReactor);
$actual = $resolver->resolve($gen())->wait();
$this->assertSame($expected, $actual);
}
/**
* @expectedException \Exception
* @expectedExceptionMessage When in the chronicle of wasted time
*/
public function testUncaughtGeneratorExceptionFailsPromise() {
$gen = function() {
yield;
throw new \Exception('When in the chronicle of wasted time');
yield;
};
$resolver = new Resolver(new NativeReactor);
$promise = $resolver->resolve($gen());
$promise->when(function($error, $result) {
throw $error;
});
}
}

23
test/SuccessTest.php Normal file
View File

@ -0,0 +1,23 @@
<?php
namespace AlertTest;
use Alert\Success;
class SuccessTest extends \PHPUnit_Framework_TestCase {
public function testWhenInvokedImmediately() {
$value = 42;
$success = new Success($value);
$success->when(function($error, $result) use ($value) {
$this->assertNull($error);
$this->assertSame($value, $result);
});
}
public function testWaitReturnsResolvedValue() {
$value = 42;
$success = new Success($value);
$this->assertSame($value, $success->wait());
}
}

100
test/UnresolvedTest.php Normal file
View File

@ -0,0 +1,100 @@
<?php
namespace AlertTest;
use Alert\PrivateFuture;
use Alert\Future;
use Alert\NativeReactor;
class UnresolvedTest extends \PHPUnit_Framework_TestCase {
public function testWatchInvokesCallbackWithResultIfAlreadySucceeded() {
$deferred = new PrivateFuture($this->getMock('Alert\Reactor'));
$promise = $deferred->promise();
$deferred->succeed(42);
$promise->watch(function($p, $e, $r) {
$this->assertSame(42, $r);
$this->assertNull($p);
$this->assertNull($e);
});
}
public function testWatchInvokesCallbackWithErrorIfAlreadyFailed() {
$promisor = new PrivateFuture($this->getMock('Alert\Reactor'));
$promise = $promisor->promise();
$exception = new \Exception('test');
$promisor->fail($exception);
$promise->watch(function($p, $e, $r) use ($exception) {
$this->assertSame($exception, $e);
$this->assertNull($p);
$this->assertNull($r);
});
}
public function testWaitReturnsOnResolution() {
$reactor = new NativeReactor;
$promisor = new PrivateFuture($reactor);
$reactor->once(function() use ($promisor) { $promisor->succeed(42); }, $msDelay = 100);
$this->assertSame(42, $promisor->promise()->wait());
}
/**
* @expectedException \LogicException
* @expectedExceptionMessage Promise already resolved
*/
public function testSucceedThrowsIfAlreadyResolved() {
$promisor = new PrivateFuture($this->getMock('Alert\Reactor'));
$promisor->succeed(42);
$promisor->succeed('zanzibar');
}
/**
* @expectedException \LogicException
* @expectedExceptionMessage A Promise cannot act as its own resolution result
*/
public function testSucceedThrowsIfPromiseIsTheResolutionValue() {
$promisor = new PrivateFuture($this->getMock('Alert\Reactor'));
$promise = $promisor->promise();
$promisor->succeed($promise);
}
/**
* @expectedException \LogicException
* @expectedExceptionMessage Promise already resolved
*/
public function testFailThrowsIfAlreadyResolved() {
$promisor = new PrivateFuture($this->getMock('Alert\Reactor'));
$promisor->succeed(42);
$promisor->fail(new \Exception);
}
public function testSucceedingWithPromisePipelinesResult() {
$reactor = new NativeReactor;
$promisor = new PrivateFuture($reactor);
$next = new Future($reactor);
$reactor->once(function() use ($next) {
$next->succeed(42);
}, $msDelay = 1);
$promisor->succeed($next->promise());
$this->assertSame(42, $promisor->promise()->wait());
}
/**
* @expectedException \RuntimeException
* @expectedExceptionMessage fugazi
*/
public function testFailingWithPromisePipelinesResult() {
$reactor = new NativeReactor;
$promisor = new PrivateFuture($reactor);
$next = new Future($reactor);
$reactor->once(function() use ($next) {
$next->fail(new \RuntimeException('fugazi'));
}, $msDelay = 10);
$promisor->succeed($next->promise());
$promisor->promise()->wait();
}
}