1
0
mirror of https://github.com/danog/dns.git synced 2024-11-26 20:14:51 +01:00

Update for Amp v2

This commit is contained in:
Bob Weinand 2016-08-02 23:57:40 +02:00
parent d021dd20f1
commit 46589b730a
5 changed files with 71 additions and 68 deletions

View File

@ -30,12 +30,15 @@
], ],
"require": { "require": {
"php": ">=5.5", "php": ">=5.5",
"amphp/amp": "^1", "amphp/amp": "dev-master as 2.0",
"amphp/cache": "^0.1", "amphp/cache": "dev-amp_v2 as 0.2",
"amphp/file": "^0.1", "amphp/file": "dev-amp_v2 as 0.2",
"daverandom/libdns": "^1" "daverandom/libdns": "^1"
}, },
"minimum-stability": "dev",
"prefer-stable": true,
"require-dev": { "require-dev": {
"amphp/loop": "dev-master",
"phpunit/phpunit": "^4.8", "phpunit/phpunit": "^4.8",
"fabpot/php-cs-fixer": "^1.9" "fabpot/php-cs-fixer": "^1.9"
}, },

View File

@ -3,12 +3,14 @@
namespace Amp\Dns; namespace Amp\Dns;
use Amp\Cache\ArrayCache; use Amp\Cache\ArrayCache;
use Amp\CombinatorException; use Amp\MultiReasonException;
use Amp\CoroutineResult; use Amp\Coroutine;
use Amp\Deferred; use Amp\Deferred;
use Amp\Failure; use Amp\Failure;
use Amp\File\FilesystemException; use Amp\File\FilesystemException;
use Amp\Success; use Amp\Success;
use Amp\TimeoutException;
use Interop\Async\Loop;
use LibDNS\Decoder\DecoderFactory; use LibDNS\Decoder\DecoderFactory;
use LibDNS\Encoder\EncoderFactory; use LibDNS\Encoder\EncoderFactory;
use LibDNS\Messages\MessageFactory; use LibDNS\Messages\MessageFactory;
@ -42,7 +44,7 @@ class DefaultResolver implements Resolver {
$this->serverUriMap = []; $this->serverUriMap = [];
$this->serverIdTimeoutMap = []; $this->serverIdTimeoutMap = [];
$this->now = \time(); $this->now = \time();
$this->serverTimeoutWatcher = \Amp\repeat(function ($watcherId) { $this->serverTimeoutWatcher = \Amp\repeat(1000, function ($watcherId) {
$this->now = $now = \time(); $this->now = $now = \time();
foreach ($this->serverIdTimeoutMap as $id => $expiry) { foreach ($this->serverIdTimeoutMap as $id => $expiry) {
if ($now > $expiry) { if ($now > $expiry) {
@ -52,10 +54,8 @@ class DefaultResolver implements Resolver {
if (empty($this->serverIdMap)) { if (empty($this->serverIdMap)) {
\Amp\disable($watcherId); \Amp\disable($watcherId);
} }
}, 1000, $options = [ });
"enable" => true, \Amp\unreference($this->serverTimeoutWatcher);
"keep_alive" => false,
]);
} }
/** /**
@ -75,12 +75,12 @@ class DefaultResolver implements Resolver {
} }
/** /**
* {$inheritdoc} * {@inheritdoc}
*/ */
public function query($name, $type, array $options = []) { public function query($name, $type, array $options = []) {
$handler = [$this, empty($options["recurse"]) ? "doResolve" : "doRecurse"]; $handler = [$this, empty($options["recurse"]) ? "doResolve" : "doRecurse"];
$types = (array) $type; $types = (array) $type;
return $this->pipeResult(\Amp\resolve($handler($name, $types, $options)), $types); return $this->pipeResult(new Coroutine($handler($name, $types, $options)), $types);
} }
private function isValidHostName($name) { private function isValidHostName($name) {
@ -92,8 +92,8 @@ REGEX;
} }
// flatten $result while preserving order according to $types (append unspecified types for e.g. Record::ALL queries) // flatten $result while preserving order according to $types (append unspecified types for e.g. Record::ALL queries)
private function pipeResult($promise, array $types) { private function pipeResult($awaitable, array $types) {
return \Amp\pipe($promise, function (array $result) use ($types) { return \Amp\pipe($awaitable, function (array $result) use ($types) {
$retval = []; $retval = [];
foreach ($types as $type) { foreach ($types as $type) {
if (isset($result[$type])) { if (isset($result[$type])) {
@ -110,7 +110,7 @@ REGEX;
if (!isset($options["hosts"]) || $options["hosts"]) { if (!isset($options["hosts"]) || $options["hosts"]) {
static $hosts = null; static $hosts = null;
if ($hosts === null || !empty($options["reload_hosts"])) { if ($hosts === null || !empty($options["reload_hosts"])) {
return \Amp\pipe(\Amp\resolve($this->loadHostsFile()), function ($value) use (&$hosts, $name, $types, $options) { return \Amp\pipe(new Coroutine($this->loadHostsFile()), function ($value) use (&$hosts, $name, $types, $options) {
unset($options["reload_hosts"]); // avoid recursion unset($options["reload_hosts"]); // avoid recursion
$hosts = $value; $hosts = $value;
return $this->recurseWithHosts($name, $types, $options); return $this->recurseWithHosts($name, $types, $options);
@ -128,7 +128,7 @@ REGEX;
} }
} }
return \Amp\resolve($this->doRecurse($name, $types, $options)); return new Coroutine($this->doRecurse($name, $types, $options));
} }
private function doRecurse($name, array $types, $options) { private function doRecurse($name, array $types, $options) {
@ -139,10 +139,10 @@ REGEX;
$types = array_merge($types, [Record::CNAME, Record::DNAME]); $types = array_merge($types, [Record::CNAME, Record::DNAME]);
$lookupName = $name; $lookupName = $name;
for ($i = 0; $i < 30; $i++) { for ($i = 0; $i < 30; $i++) {
$result = (yield \Amp\resolve($this->doResolve($lookupName, $types, $options))); $result = (yield new Coroutine($this->doResolve($lookupName, $types, $options)));
if (count($result) > isset($result[Record::CNAME]) + isset($result[Record::DNAME])) { if (count($result) > isset($result[Record::CNAME]) + isset($result[Record::DNAME])) {
unset($result[Record::CNAME], $result[Record::DNAME]); unset($result[Record::CNAME], $result[Record::DNAME]);
yield new CoroutineResult($result); yield Coroutine::result($result);
return; return;
} }
// @TODO check for potentially using recursion and iterate over *all* CNAME/DNAME // @TODO check for potentially using recursion and iterate over *all* CNAME/DNAME
@ -200,20 +200,20 @@ REGEX;
); );
} }
$promisor = new Deferred; $deferred = new Deferred;
$server->pendingRequests[$requestId] = true; $server->pendingRequests[$requestId] = true;
$this->pendingRequests[$requestId] = [$promisor, $name, $type, $uri]; $this->pendingRequests[$requestId] = [$deferred, $name, $type, $uri];
return $promisor->promise(); return $deferred->getAwaitable();
} }
private function doResolve($name, array $types, $options) { private function doResolve($name, array $types, $options) {
if (!$this->config) { if (!$this->config) {
$this->config = (yield \Amp\resolve($this->loadResolvConf())); $this->config = (yield new Coroutine($this->loadResolvConf()));
} }
if (empty($types)) { if (empty($types)) {
yield new CoroutineResult([]); yield Coroutine::result([]);
return; return;
} }
@ -237,7 +237,7 @@ REGEX;
if (empty(array_filter($result))) { if (empty(array_filter($result))) {
throw new NoRecordException("No records returned for {$name} (cached result)"); throw new NoRecordException("No records returned for {$name} (cached result)");
} else { } else {
yield new CoroutineResult($result); yield Coroutine::result($result);
return; return;
} }
} }
@ -256,29 +256,29 @@ REGEX;
} }
foreach ($types as $type) { foreach ($types as $type) {
$promises[] = $this->doRequest($uri, $name, $type); $awaitables[] = $this->doRequest($uri, $name, $type);
} }
try { try {
list( , $resultArr) = (yield \Amp\timeout(\Amp\some($promises), $timeout)); list( , $resultArr) = (yield \Amp\timeout(\Amp\some($awaitables), $timeout));
foreach ($resultArr as $value) { foreach ($resultArr as $value) {
$result += $value; $result += $value;
} }
} catch (\Amp\TimeoutException $e) { } catch (TimeoutException $e) {
if (substr($uri, 0, 6) == "tcp://") { if (substr($uri, 0, 6) == "tcp://") {
throw new TimeoutException( throw new TimeoutException(
"Name resolution timed out for {$name}" "Name resolution timed out for {$name}"
); );
} else { } else {
$options["server"] = \preg_replace("#[a-z.]+://#", "tcp://", $uri); $options["server"] = \preg_replace("#[a-z.]+://#", "tcp://", $uri);
yield new CoroutineResult(\Amp\resolve($this->doResolve($name, $types, $options))); yield Coroutine::result(\Amp\coroutine($this->doResolve($name, $types, $options)));
return; return;
} }
} catch (ResolutionException $e) { } catch (ResolutionException $e) {
if (empty($result)) { // if we have no cached results if (empty($result)) { // if we have no cached results
throw $e; throw $e;
} }
} catch (CombinatorException $e) { // if all promises in Amp\some fail } catch (MultiReasonException $e) { // if all promises in Amp\some fail
if (empty($result)) { // if we have no cached results if (empty($result)) { // if we have no cached results
foreach ($e->getExceptions() as $ex) { foreach ($e->getExceptions() as $ex) {
if ($ex instanceof NoRecordException) { if ($ex instanceof NoRecordException) {
@ -289,7 +289,7 @@ REGEX;
} }
} }
yield new CoroutineResult($result); yield Coroutine::result($result);
} }
/** @link http://man7.org/linux/man-pages/man5/resolv.conf.5.html */ /** @link http://man7.org/linux/man-pages/man5/resolv.conf.5.html */
@ -349,7 +349,7 @@ REGEX;
} }
} }
yield new CoroutineResult($result); yield Coroutine::result($result);
} }
private function loadHostsFile($path = null) { private function loadHostsFile($path = null) {
@ -397,7 +397,7 @@ REGEX;
} }
} }
yield new CoroutineResult($data); yield Coroutine::result($data);
} }
private function parseCustomServerUri($uri) { private function parseCustomServerUri($uri) {
@ -462,27 +462,25 @@ REGEX;
$server->buffer = ""; $server->buffer = "";
$server->length = INF; $server->length = INF;
$server->pendingRequests = []; $server->pendingRequests = [];
$server->watcherId = \Amp\onReadable($socket, $this->makePrivateCallable("onReadable"), [ $server->watcherId = \Amp\onReadable($socket, $this->makePrivateCallable("onReadable"));
"enable" => true, \Amp\unreference($server->watcherId);
"keep_alive" => true,
]);
$this->serverIdMap[$id] = $server; $this->serverIdMap[$id] = $server;
$this->serverUriMap[$uri] = $server; $this->serverUriMap[$uri] = $server;
if (substr($uri, 0, 6) == "tcp://") { if (substr($uri, 0, 6) == "tcp://") {
$promisor = new Deferred; $deferred = new Deferred;
$server->connect = $promisor->promise(); $server->connect = $deferred->getAwaitable();
$watcher = \Amp\onWritable($server->socket, static function($watcher) use ($server, $promisor, &$timer) { $watcher = \Amp\onWritable($server->socket, static function($watcher) use ($server, $deferred, &$timer) {
\Amp\cancel($watcher); \Amp\cancel($watcher);
\Amp\cancel($timer); \Amp\cancel($timer);
unset($server->connect); unset($server->connect);
$promisor->succeed(); $deferred->resolve();
}); });
$timer = \Amp\once(function() use ($id, $promisor, $watcher, $uri) { $timer = \Amp\delay(5000, function() use ($id, $deferred, $watcher, $uri) {
\Amp\cancel($watcher); \Amp\cancel($watcher);
$this->unloadServer($id); $this->unloadServer($id);
$promisor->fail(new TimeoutException("Name resolution timed out, could not connect to server at $uri")); $deferred->fail(new TimeoutException("Name resolution timed out, could not connect to server at $uri"));
}, 5000); });
} }
return $server; return $server;
@ -505,8 +503,8 @@ REGEX;
} }
if ($error && $server->pendingRequests) { if ($error && $server->pendingRequests) {
foreach (array_keys($server->pendingRequests) as $requestId) { foreach (array_keys($server->pendingRequests) as $requestId) {
list($promisor) = $this->pendingRequests[$requestId]; list($deferred) = $this->pendingRequests[$requestId];
$promisor->fail($error); $deferred->fail($error);
} }
} }
} }
@ -568,13 +566,13 @@ REGEX;
} }
private function processDecodedResponse($serverId, $requestId, $response) { private function processDecodedResponse($serverId, $requestId, $response) {
list($promisor, $name, $type, $uri) = $this->pendingRequests[$requestId]; list($deferred, $name, $type, $uri) = $this->pendingRequests[$requestId];
// Retry via tcp if message has been truncated // Retry via tcp if message has been truncated
if ($response->isTruncated()) { if ($response->isTruncated()) {
if (\substr($uri, 0, 6) != "tcp://") { if (\substr($uri, 0, 6) != "tcp://") {
$uri = \preg_replace("#[a-z.]+://#", "tcp://", $uri); $uri = \preg_replace("#[a-z.]+://#", "tcp://", $uri);
$promisor->succeed($this->doRequest($uri, $name, $type)); $deferred->resolve($this->doRequest($uri, $name, $type));
} else { } else {
$this->finalizeResult($serverId, $requestId, new ResolutionException( $this->finalizeResult($serverId, $requestId, new ResolutionException(
"Server returned truncated response" "Server returned truncated response"
@ -602,7 +600,7 @@ REGEX;
return; return;
} }
list($promisor, $name) = $this->pendingRequests[$requestId]; list($deferred, $name) = $this->pendingRequests[$requestId];
$server = $this->serverIdMap[$serverId]; $server = $this->serverIdMap[$serverId];
unset( unset(
$this->pendingRequests[$requestId], $this->pendingRequests[$requestId],
@ -614,7 +612,7 @@ REGEX;
\Amp\enable($this->serverTimeoutWatcher); \Amp\enable($this->serverTimeoutWatcher);
} }
if ($error) { if ($error) {
$promisor->fail($error); $deferred->fail($error);
} else { } else {
foreach ($result as $type => $records) { foreach ($result as $type => $records) {
$minttl = INF; $minttl = INF;
@ -625,7 +623,7 @@ REGEX;
} }
$this->arrayCache->set("$name#$type", $records, $minttl); $this->arrayCache->set("$name#$type", $records, $minttl);
} }
$promisor->succeed($result); $deferred->resolve($result);
} }
} }

View File

@ -1,22 +1,27 @@
<?php <?php
namespace Amp\Dns; namespace Amp\Dns;
use Interop\Async\Loop;
const LOOP_STATE_IDENTIFIER = Resolver::class;
/** /**
* Retrieve the application-wide dns resolver instance * Retrieve the application-wide dns resolver instance
* *
* @param \Amp\Dns\Resolver $assign Optionally specify a new default dns resolver instance * @param \Amp\Dns\Resolver $resolver Optionally specify a new default dns resolver instance
* @return \Amp\Dns\Resolver Returns the application-wide dns resolver instance * @return \Amp\Dns\Resolver Returns the application-wide dns resolver instance
*/ */
function resolver(Resolver $assign = null) { function resolver(Resolver $resolver = null) {
static $resolver; if ($resolver === null) {
if ($assign) { $resolver = Loop::fetchState(LOOP_STATE_IDENTIFIER);
return ($resolver = $assign); if ($resolver) {
} elseif ($resolver) { return $resolver;
return $resolver; }
} else {
return $resolver = driver(); $resolver = driver();
} }
Loop::storeState(LOOP_STATE_IDENTIFIER, $resolver);
return $resolver;
} }
/** /**
* Create a new dns resolver best-suited for the current environment * Create a new dns resolver best-suited for the current environment
@ -55,7 +60,7 @@ function driver() {
* *
* @param string $name The hostname to resolve * @param string $name The hostname to resolve
* @param array $options * @param array $options
* @return \Amp\Promise * @return \Interop\Async\Awaitable
* @TODO add boolean "clear_cache" option flag * @TODO add boolean "clear_cache" option flag
*/ */
function resolve($name, array $options = []) { function resolve($name, array $options = []) {
@ -67,7 +72,7 @@ function resolve($name, array $options = []) {
* @param string $name Unlike resolve(), query() allows for requesting _any_ name (as DNS RFC allows for arbitrary strings) * @param string $name Unlike resolve(), query() allows for requesting _any_ name (as DNS RFC allows for arbitrary strings)
* @param int|int[] $type Use constants of Amp\Dns\Record * @param int|int[] $type Use constants of Amp\Dns\Record
* @param array $options @see resolve documentation * @param array $options @see resolve documentation
* @return \Amp\Promise * @return \Interop\Async\Awaitable
*/ */
function query($name, $type, array $options = []) { function query($name, $type, array $options = []) {
return resolver()->query($name, $type, $options); return resolver()->query($name, $type, $options);

View File

@ -3,15 +3,11 @@
namespace Amp\Dns\Test; namespace Amp\Dns\Test;
class IntegrationTest extends \PHPUnit_Framework_TestCase { class IntegrationTest extends \PHPUnit_Framework_TestCase {
protected function setUp() {
\Amp\reactor(\Amp\driver());
}
/** /**
* @group internet * @group internet
*/ */
public function testResolve() { public function testResolve() {
\Amp\run(function () { \Amp\execute(function () {
$names = [ $names = [
"google.com", "google.com",
"github.com", "github.com",

View File

@ -2,6 +2,7 @@
namespace Amp\Dns\Test; namespace Amp\Dns\Test;
use Amp\Coroutine;
use ReflectionObject; use ReflectionObject;
class ResolvConfTest extends \PHPUnit_Framework_TestCase { class ResolvConfTest extends \PHPUnit_Framework_TestCase {
@ -10,7 +11,7 @@ class ResolvConfTest extends \PHPUnit_Framework_TestCase {
$method = $reflector->getMethod("loadResolvConf"); $method = $reflector->getMethod("loadResolvConf");
$method->setAccessible(true); $method->setAccessible(true);
$result = \Amp\wait(\Amp\resolve($method->invoke(\Amp\Dns\resolver(), __DIR__ . "/data/resolv.conf"))); $result = \Amp\wait(new Coroutine($method->invoke(\Amp\Dns\resolver(), __DIR__ . "/data/resolv.conf")));
$this->assertSame([ $this->assertSame([
"nameservers" => [ "nameservers" => [
@ -27,7 +28,7 @@ class ResolvConfTest extends \PHPUnit_Framework_TestCase {
$method = $reflector->getMethod("loadResolvConf"); $method = $reflector->getMethod("loadResolvConf");
$method->setAccessible(true); $method->setAccessible(true);
$result = \Amp\wait(\Amp\resolve($method->invoke(\Amp\Dns\resolver(), __DIR__ . "/data/invalid.conf"))); $result = \Amp\wait(new Coroutine($method->invoke(\Amp\Dns\resolver(), __DIR__ . "/data/invalid.conf")));
$this->assertSame([ $this->assertSame([
"nameservers" => [ "nameservers" => [