If you're reading The Amp Guide as a github markdown file you should click the link below to view the styled HTML version. The github markdown display is really unflattering and misses out on features the Table of Contents. Please don't subject your eyes to this monstrosity; click here instead:
It may surprise people to learn that the PHP standard library already has everything we need to write event-driven and non-blocking applications. We only reach the limits of native PHP's functionality in this area when we ask it to poll several hundred streams for read/write capability at the same time. Even in this case, though, the fault is not with PHP but the underlying system `select()` call which is linear in its performance degradation as load increases.
For performance that scales out to high volume we require more advanced capabilities currently found only in extensions. If you wish to, for example, service 10,000 simultaneous clients in an Amp-backed socket server you would definitely need to use one of the reactors based on a PHP extension. However, if you're using Amp in a strictly local program for non-blocking concurrency or you don't need to handle more than ~100 or so simultaneous clients in a server application the native PHP functionality is perfectly adequate.
Amp currently exposes two separate implementations for its standard `Reactor` interface. Each behaves exactly the same way from an external API perspective. The main differences have to do with underlying performance characteristics. The one capability that the extension-based reactors do offer that's unavailable with the native implementation is the ability to watch for process control signals. The current implementations are listed here:
Hopefully this output demonstrates the concept that what happens inside the event reactor's run loop is like its own separate program. Your script will not continue past the point of `Reactor::run()` unless one of the there are no more scheduled events or `Reactor::stop()` is invoked.
While an application can and often does take place entirely inside the confines of the run loop, we can also use the reactor to do things like the following example which imposes a short-lived timeout for interactive console input:
Obviously we could have simply used `fgets(STDIN)` synchronously in this example. We're just demonstrating that it's possible to move in and out of the event loop to mix synchronous tasks with non-blocking tasks as needed.
In the above example we use the reactor's object API to manipulate event watchers watchers. Though it's possible to do so, note that it almost *never* makes sense to instantiate and run multiple event loop instances in a single-threaded process.
> Bugs arising from the existence of multiple reactor instances are exceedingly difficult to debug. The reason for this should be relatively clear: running one event loop will block script execution and prevent others from executing at the same time. This sort of "loop starvation" results in events that inexplicably fail to trigger. You should endeavor to always use the same reactor instance in your application. Because the event loop is often a truly global feature of an application the procedural API functions use a static instance to ensure the same `Reactor` is reused. Be careful about instantiating reactors manually and mixing in calls to the function API.
In the above example we've done a few very simple things:
- Register a readability watcher for a socket that will trigger our callback when there is
data available to read.
- When we read data from the stream in our triggered callback we pass that to a stateful parser
that does something domain-specific when certain conditions are met.
- If the `fread()` call indicates that the socket connection is dead we clean up any resources
we've allocated for the storage of this stream. This process should always include calling
`Reactor::cancel()` on any reactor watchers we registered in relation to the stream.
### onWritable()
- Streams are essentially *"always"* writable. The only time they aren't is when their
respective write buffers are full.
A common usage pattern for reacting to writability involves initializing a writability watcher without enabling it when a client first connects to a server. Once incomplete writes occur we're then able to "unpause" the write watcher using `Reactor::enable()` until data is fully sent without having to create and cancel new watcher resources on the same stream multiple times.
## Pausing, Resuming and Cancelling Watchers
All watchers, regardless of type, can be temporarily disabled and enabled in addition to being
cleared via `Reactor::cancel()`. This allows for advanced capabilities such as disabling the acceptance of new socket clients in server applications when simultaneity limits are reached. In general, the performance characteristics of watcher reuse via pause/resume are favorable by comparison to repeatedly cancelling and re-registering watchers.
### disable()
A simple disable example:
```php
<?php
$reactor = new Amp\NativeReactor;
// Register a watcher we'll disable
$watcherIdToDisable = $reactor->once(function() {
echo "I'll never execute in one second because: disable()\n";
}, $msDelay = 1000);
// Register a watcher to perform the disable() operation
$reactor->once(function() use ($watcherIdToDisable, $reactor) {
// Remember, nothing happens until the reactor runs, so it doesn't matter that we
// previously created and disabled $myWatcherId
$reactor->run(function($reactor) use ($myWatcherId) {
// Immediately enable the watcher when the reactor starts
$reactor->enable($myWatcherId);
// Now that it's enabled we'll see tick output in our console every 1000ms.
});
```
For a slightly more complex use case, let's look at a common scenario where a server might create a write watcher that is initially disabled but subsequently enabled as necessary:
```php
<?php
class Server {
private $reactor;
private $clients = [];
public function __construct(Amp\Reactor $reactor) {
$this->reactor = $reactor;
}
public function startServer() {
// ... server bind and accept logic would exist here
$this->reactor->run();
}
private function onNewClient($sock) {
$socketId = (int) $sock;
$client = new ClientStruct;
$client->socket = $sock;
$readWatcher = $this->reactor->onReadable($sock, function() use ($client) {
$this->onReadable($client);
});
$writeWatcher = $this->reactor->onReadable($sock, function() use ($client) {
// ... other class implementation details here ...
}
```
### cancel()
It's important to *always* cancel persistent watchers once you're finished with them or you'll create memory leaks in your application. This functionality works in exactly the same way as the above enable/disable examples:
```php
<?php
Amp\run(function() {
$myWatcherId = Amp\repeat(function() {
echo "tick\n";
}, $msInterval = 1000);
// Cancel $myWatcherId in five seconds and exit the reactor loop
Amp\once(function() use ($myWatcherId) {
Amp\cancel($myWatcherId);
}, $msDelay = 5000);
});
```
## Process Signal Watchers
The `Amp\SignalReactor` extends the base reactor interface to expose an API for handling process
control signals in your application like any other event. Simply use a compatible event reactor
implementation (`UvReactor` or `LibeventReactor`, preferably the former) and interact with its
`SignalReactor::onSignal()` method. Consider:
```php
<?php
(new Amp\UvReactor)->run(function($reactor) {
// Let's tick off output once per second so we can see activity.
| `"enable"` | All watchers are enabled by default. Passing the `"enable"` option with a falsy value will create a watcher in a disabled state. |
| `"msDelay"` | Used with `repeat()` watchers to specify a different millisecond timeout for the initial callback invocation. If not specified, repeating timer watchers wait until the `$msInterval` expires before their initial invocation. |
| `"callbackData"` | Optional user data to pass as the final parameter when invoking the watcher callback. If this option is unspecified a callback receives `null` as its final argument. |
A standard pattern in this area is to initialize writability watchers in a disabled state before subsequently enabling them at a later time as shown here:
@TODO Discuss how repeating timer watchers are rescheduled from `$timestampAtTickStart + $watcherMsInterval` and are not subject to drift but *may* stack up if executing very slow tasks with insufficiently low intervals in-between invocations.
The weak link when managing concurrency is humans; we simply don't think asynchronously or in parallel. Instead, we're really good at doing one thing at a time and the world around us generally fits this model. So to effectively design for concurrent processing in our code we have a couple of options:
2. Abstract concurrent task execution to make it feel synchronous.
## Promises
The basic unit of concurrency in an Amp application is the `Amp\Promise`. These objects should be thought of as "placeholders" for values or tasks that aren't yet complete. By using placeholders we're able to reason about the results of concurrent operations as if they were already complete variables.
> Amp promises do *not* conform to the "Thenables" abstraction common in javascript promise implementations. It is this author's opinion that chaining .then() calls is a suboptimal method for avoiding callback hell in a world with generator coroutines. Instead, Amp utilizes PHP generators to "synchronize" concurrent task execution.
In its simplest form the `Amp\Promise` aggregates callbacks for dealing with computational results once they eventually resolve. While most code will not interact with this API directly thanks to the magic of [Generators](#generators), let's take a quick look at the two simple API methods exposed on `Amp\Promise` implementations:
`Amp\Promise::when()` accepts an error-first callback. This callback is responsible for reacting to the eventual result of the computation represented by the promise placeholder. For example:
Those familiar with javascript code generally reflect that the above interface quickly devolves into ["callback hell"](http://callbackhell.com/), and they're correct. We will shortly see how to avoid this problem in the [Generators](#generators) section.
### watch()
`Amp\Promise::watch()` affords promise-producers ([Promisors](#promisors)) the ability to broadcast progress updates while a placeholder value resolves. Whether or not to actually send progress updates is left to individual libraries, but the functionality is available should applications require it. A simple example:
`Amp\Promisor` is the abstraction responsible for resolving future values once they become available. A library that resolves values asynchronously creates an `Amp\Promisor` and uses it to return an `Amp\Promise` to API consumers. Once the async library determines that the value is ready it resolves the promise held by the API consumer using methods on the linked promisor.
### The Promisor API
```php
interface Promisor {
public function promise();
public function update($progress);
public function succeed($result = null);
public function fail(\Exception $error);
}
```
Amp provides two base implementations for async value promisors: `Amp\Future` and `Amp\PrivateFuture`.
### Future Promisor
The standard `Amp\Future` is the more performant option of the two default `Amp\Promisor` implementations. It acts both as promisor and promise to minimize the number of new object/closure instantiations needed to resolve an async value. The drawback to this approach is that any code with a reference to the `Future` promisor can resolve the associated Promise.
### PrivateFuture Promisor
The `Amp\PrivateFuture` is more concerned with code safety than performance. It *does not* act as its own promise. Only code with a reference to a `PrivateFuture` instance can resolve its associated promise.
### Promisor Example
Here's a simple example of an async value producer `asyncMultiply()` creating a promisor and returning the associated promise to its API consumer. Note that the code below would work exactly the same had we used a `PrivateFuture` as our promisor instead of the `Future` employed below.
```php
<?php // Example async producer using promisor
function asyncMultiply($x, $y) {
// Create a new promisor
$promisor = new Amp\Future;
// Resolve the async result one second from now
Amp\once(function() use ($promisor, $x, $y) {
$promisor->succeed($x * $y);
}, $msDelay = 1000);
return $promisor->promise();
}
$promise = asyncMultiply(6, 7);
$result = Amp\wait($promise);
var_dump($result); // int(42)
```
## Combinators
### all()
The `all()` functor combines an array of promise objects into a single promise that will resolve
when all promises in the group resolve. If any one of the `Amp\Promise` instances fails the
combinator's `Promise` will fail. Otherwise the resulting `Promise` succeeds with an array matching
keys from the input array to their resolved values.
The `all()` combinator is extremely powerful because it allows us to concurrently execute many
asynchronous operations at the same time. Let's look at a simple example using the amp HTTP client
([artax](https://github.com/amphp/artax)) to retrieve multiple HTTP resources concurrently ...
```php
<?php
use function Amp\run;
use function Amp\all;
run(function() {
$httpClient = new Amp\Artax\Client;
$promiseArray = $httpClient->requestMulti([
"google" => "http://www.google.com",
"news" => "http://news.google.com",
"bing" => "http://www.bing.com",
"yahoo" => "https://www.yahoo.com",
]);
try {
// magic combinator sauce to flatten the promise
// array into a single promise
$responses = (yield all($promiseArray));
foreach ($responses as $key => $response) {
printf(
"%s | HTTP/%s %d %s\n",
$key,
$response->getProtocol(),
$response->getStatus(),
$response->getReason()
);
}
} catch (Exception $e) {
// If any one of the requests fails the combo
// promise returned by Amp\all() will fail and
// be thrown back into our generator here.
echo $e->getMessage(), "\n";
}
Amp\stop();
});
```
### some()
The `some()` functor is the same as `all()` except that it tolerates individual failures. As long
as at least one promise in the passed array the combined promise will succeed. The successful
resolution value is an array of the form `[$arrayOfErrors, $arrayOfSuccesses]`. The individual keys
in the component arrays are preserved from the promise array passed to the functor for evaluation.
Resolves with the first successful result. The resulting Promise will only fail if all
promises in the group fail or if the promise array is empty.
### map()
Maps eventual promise results using the specified callable.
### filter()
Filters eventual promise results using the specified callable.
If the functor returns a truthy value the resolved promise result is retained, otherwise it is
discarded. Array keys are retained for any results not filtered out by the functor.
## Generators
The addition of Generators in PHP 5.5 trivializes synchronization and error handling in async contexts. The Amp event reactor builds in co-routine support for all reactor callbacks so we can use the `yield` keyword to make async code feel synchronous. Let's look at a simple example executing inside the event reactor run loop:
> Any time a generator yields an `Amp\Promise` there exists the possibility that the associated async operation(s) could fail. When this happens the appropriate exception is thrown back into the calling generator. Applications should generally wrap their promise yields in `try/catch` blocks as an error handling mechanism in this case.
Any value yielded without an associated string yield key is referred to as an "implicit" yield. All implicit yields must be one of the following two types ...
| `Amp\Promise` | Any promise instance may be yielded and control will be returned to the generator once the promise resolves. If resolution fails the relevant exception is thrown into the generator and must be handled by the application or it will bubble up. If resolution succeeds the promise's resolved value is sent back into the generator. |
> Any yielded value that is not an `Amp\Promise` or `Generator` will be treated as an **error** and an appropriate exception will be thrown back into the original yielding generator. This strict behavior differs from older versions of the library in which implicit yield values were simply sent back to the yielding generator function.