14 KiB
Managing Concurrency
The weak link when managing concurrency is humans; we simply don't think asynchronously or in parallel. Instead, we're really good at doing one thing at a time and the world around us generally fits this model. So to effectively design for concurrent processing in our code we have a couple of options:
- Get smarter (not feasible);
- Abstract concurrent task execution to make it feel synchronous.
Promises
The basic unit of concurrency in an Amp application is the Amp\Promise
. These objects should be thought of as "placeholders" for values or tasks that aren't yet complete. By using placeholders we're able to reason about the results of concurrent operations as if they were already complete variables.
Note
Amp promises do not conform to the "Thenables" abstraction common in javascript promise implementations. It is this author's opinion that chaining .then() calls is a suboptimal method for avoiding callback hell in a world with generator coroutines. Instead, Amp utilizes PHP generators to "synchronize" concurrent task execution.
The Promise API
interface Promise {
public function when(callable $func, $cbData = null);
public function watch(callable $func, $cbData = null);
}
In its simplest form the Amp\Promise
aggregates callbacks for dealing with computational results once they eventually resolve. While most code will not interact with this API directly thanks to the magic of Generators, let's take a quick look at the two simple API methods exposed on Amp\Promise
implementations:
Method | Callback Signature |
---|---|
when | function($error = null, $result = null, $cbData = null) |
watch | function($updateData, $cbData = null) |
when()
Amp\Promise::when()
accepts an error-first callback. This callback is responsible for reacting to the eventual result of the computation represented by the promise placeholder. For example:
<?php
$promise = someFunctionThatReturnsAPromise();
$promise->when(function($error = null, $result = null) {
if ($error) {
printf(
"Something went wrong:\n%s\n",
$e->getMessage()
);
} else {
printf(
"Hurray! Our result is:\n%s\n",
print_r($result, true)
);
}
});
Note
We do not use type declarations here, as PHP 7 introduced the new
Throwable
interface and Amp is PHP 5 compatible.
Those familiar with javascript code generally reflect that the above interface quickly devolves into "callback hell", and they're correct. We will shortly see how to avoid this problem in the Generators section.
Optional Callback Data
The optional $cbData
can be used to avoid creating a new closure binding the value and thus avoiding the overhead. It is passed as a parameter to the callback.
watch()
Amp\Promise::watch()
affords promise-producers (Promisors) the ability to broadcast progress updates while a placeholder value resolves. Whether or not to actually send progress updates is left to individual libraries, but the functionality is available should applications require it. A simple example:
<?php
$promise = someAsyncFunctionWithProgressUpdates();
$promise->watch(function($update) {
printf(
"Woot, we got an update of some kind:\n%s\n",
print_r($update, true)
);
});
Optional Callback Data
The optional $cbData
can be used to avoid creating a new closure binding the value and thus avoiding the overhead. It is passed as a parameter to the callback.
Promisors
Amp\Promisor
is the abstraction responsible for resolving future values once they become available. A library that resolves values asynchronously creates an Amp\Promisor
and uses it to return an Amp\Promise
to API consumers. Once the async library determines that the value is ready it resolves the promise held by the API consumer using methods on the linked promisor.
The Promisor API
interface Promisor {
public function promise();
public function update($progress);
public function succeed($result = null);
public function fail($error);
}
promise()
Returns the corresponding Promise
instance. Promisor
and Promise
are separated, so the consumer of the promise can't fulfill it.
update()
Updates the promise. Invokes all registered Promise::watch()
callbacks.
succeed()
Resolves the promise with the first parameter as value, otherwise null
. If a Amp\Promise
is passed, the resolution will wait until the passed promise has been resolved. Invokes all registered Promise::when()
callbacks.
fail()
Makes the promise fail. Invokes all registered Promise::when()
callbacks with the passed Exception
/ Throwable
as $error
argument.
Note
We do not use type declarations here, as PHP 7 introduced the new
Throwable
interface and Amp is PHP 5 compatible.
Deferred
Amp\Deferred
is the standard Amp\Promisor
implementation.
Here's a simple example of an async value producer asyncMultiply()
creating a promisor and returning the associated promise to its API consumer. Note that the code below would work exactly the same had we used a PrivateFuture
as our promisor instead of the Future
employed below.
<?php // Example async producer using promisor
function asyncMultiply($x, $y) {
// Create a new promisor
$deferred = new Amp\Deferred;
// Resolve the async result one second from now
Amp\once(function() use ($deferred, $x, $y) {
$deferred->succeed($x * $y);
}, $msDelay = 1000);
return $deferred->promise();
}
$promise = asyncMultiply(6, 7);
$result = Amp\wait($promise);
var_dump($result); // int(42)
Combinators
all()
The all()
functor combines an array of promise objects into a single promise that will resolve
when all promises in the group resolve. If any one of the Amp\Promise
instances fails the
combinator's Promise
will fail. Otherwise the resulting Promise
succeeds with an array matching
keys from the input array to their resolved values.
The all()
combinator is extremely powerful because it allows us to concurrently execute many
asynchronous operations at the same time. Let's look at a simple example using the Amp HTTP client
(Artax) to retrieve multiple HTTP resources concurrently ...
<?php
use function Amp\run;
use function Amp\all;
use function Amp\stop;
run(function() {
$httpClient = new Amp\Artax\Client;
$promiseArray = $httpClient->requestMulti([
"google" => "http://www.google.com",
"news" => "http://news.google.com",
"bing" => "http://www.bing.com",
"yahoo" => "https://www.yahoo.com",
]);
try {
// magic combinator sauce to flatten the promise
// array into a single promise
$responses = (yield all($promiseArray));
foreach ($responses as $key => $response) {
printf(
"%s | HTTP/%s %d %s\n",
$key,
$response->getProtocol(),
$response->getStatus(),
$response->getReason()
);
}
} catch (Amp\CombinatorException $e) {
// If any one of the requests fails the combo
// promise returned by Amp\all() will fail and
// be thrown back into our generator here.
echo $e->getMessage(), "\n";
}
stop();
});
some()
The some()
functor is the same as all()
except that it tolerates individual failures. As long
as at least one promise in the passed array the combined promise will succeed. The successful
resolution value is an array of the form [$arrayOfErrors, $arrayOfSuccesses]
. The individual keys
in the component arrays are preserved from the promise array passed to the functor for evaluation.
any()
The any()
functor is the same as some()
except that it tolerates all failures. It will succeed even if all promises failed.
first()
Resolves with the first successful result. The resulting Promise will only fail if all promises in the group fail or if the promise array is empty.
map()
Maps eventual promise results using the specified callable.
filter()
Filters eventual promise results using the specified callable.
If the functor returns a truthy value the resolved promise result is retained, otherwise it is discarded. Array keys are retained for any results not filtered out by the functor.
Generators
The addition of generators in PHP 5.5 trivializes synchronization and error handling in async contexts. The Amp event reactor builds in coroutine support for all reactor callbacks so we can use the yield
keyword to make async code feel synchronous. Let's look at a simple example executing inside the event reactor run loop:
<?php
function asyncMultiply($x, $y) {
yield new Amp\Pause($millisecondsToPause = 100);
return ($x * $y);
}
Amp\run(function() {
try {
// Yield control until the generator resolves
// and return its eventual result.
$result = yield from asyncMultiply(2, 21); // int(42)
} catch (Exception $e) {
// If promise resolution fails the exception is
// thrown back to us and we handle it as needed.
}
});
As you can see in the above example there is no need for callbacks or .then()
chaining. Instead,
we're able to use yield
statements to control program flow even when future computational results
are still pending.
Note
Any time a generator yields an
Amp\Promise
there exists the possibility that the associated async operation(s) could fail. When this happens the appropriate exception is thrown back into the calling generator. Applications should generally wrap their promise yields intry/catch
blocks as an error handling mechanism in this case.
Subgenerators
Using PHP 7, you can use yield from
to delegate a sub task to another generator. That generator will be embedded into the currently running generator. If you're using PHP 5, you can achieve the same using yield Amp\resolve($generator);
.
Implicit Yield Behavior
Any value yielded without an associated string yield key is referred to as an "implicit" yield. All implicit yields must be one of the following two types ...
Yieldable | Description |
---|---|
Amp\Promise |
Any promise instance may be yielded and control will be returned to the generator once the promise resolves. If resolution fails the relevant exception is thrown into the generator and must be handled by the application or it will bubble up. If resolution succeeds the promise's resolved value is sent back into the generator. |
null |
Gives the event loop time to run other tasks. Continues the generator in the next tick of the loop, just like Amp\immediately . |
Important
Any yielded value that is not an
Amp\Promise
ornull
will be treated as an error and an appropriate exception will be thrown back into the original yielding generator. This strict behavior differs from older versions of the library in which implicit yield values were simply sent back to the yielding generator function.
Helpers
pipe()
Takes a Promise
as first and a callable
as second argument. Upon resolution of the promise, the callable
is invoked in case of a success and can be used to transform the value. The returned promise resolves to the returned value in case of a success. In case of a thrown exception or promise failure, the promise is failed with that exception.
promises()
Normalizes an array of mixed values / Promises / Promisors to an array of promises.
timeout()
Takes a Promise
as first and timeout in milliseconds as second parameter. Returns a promise that's resolved / failed with the original promise's return value / failure reason or a TimeoutException
in case the given promise doesn't resolve within the specified timeout.
coroutine()
Transforms a callable
given as first argument into a coroutine function.
resolve()
Resolves a Generator
coroutine into a promise. It accepts the Generator
or a callable
returning a Generator
as first and only argument.
Upon resolution the Generator
return value is used to succeed the promised result. If an error occurs during coroutine resolution the returned promise fails.
A Generator
coroutine executes the Generator
until a Promise
is yielded. It waits for the promise to complete and resumes the Generator
execution with the resolution value of the yielded promise or throws an exception into the Generator
in case the yielded promise failed.
wait()
Block script execution indefinitely until the specified Promise
resolves. The Promise
is passed as the first and only argument.
In the event of promise failure this method will throw the exception responsible for the failure. Otherwise the promise's resolved value is returned.
This function should only be used outside of Amp\run
when mixing synchronous and asynchronous code.