diff --git a/lib/BasicResolver.php b/lib/BasicResolver.php index 2da4f18..cf66f79 100644 --- a/lib/BasicResolver.php +++ b/lib/BasicResolver.php @@ -4,7 +4,6 @@ namespace Amp\Dns; use Amp\Cache\ArrayCache; use Amp\Cache\Cache; -use Amp\Coroutine; use Amp\MultiReasonException; use Amp\Promise; use LibDNS\Messages\Message; @@ -111,7 +110,7 @@ class BasicResolver implements Resolver { } } - return array_merge(...$records); + return \array_merge(...$records); }); } @@ -135,83 +134,81 @@ class BasicResolver implements Resolver { /** @inheritdoc */ public function query(string $name, int $type): Promise { - return new Coroutine($this->doQuery($name, $type)); - } + return call(function () use ($name, $type) { + if (!$this->config) { + $this->config = yield $this->configLoader->loadConfig(); + } - public function doQuery(string $name, int $type): \Generator { - if (!$this->config) { - $this->config = yield $this->configLoader->loadConfig(); - } + $name = $this->normalizeName($name, $type); + $question = $this->createQuestion($name, $type); - $name = $this->normalizeName($name, $type); - $question = $this->createQuestion($name, $type); + if (null !== $cachedValue = yield $this->cache->get($this->getCacheKey($name, $type))) { + return $this->decodeCachedResult($name, $type, $cachedValue); + } - if (null !== $cachedValue = yield $this->cache->get($this->getCacheKey($name, $type))) { - return $this->decodeCachedResult($name, $type, $cachedValue); - } + $nameservers = $this->config->getNameservers(); + $attempts = $this->config->getAttempts(); + $receivedTruncatedResponse = false; - $nameservers = $this->config->getNameservers(); - $attempts = $this->config->getAttempts(); - $receivedTruncatedResponse = false; - - for ($attempt = 0; $attempt < $attempts; ++$attempt) { - $i = $attempt % \count($nameservers); - $protocol = $receivedTruncatedResponse ? "tcp" : "udp"; - - /** @var \Amp\Dns\Server $server */ - $server = yield $this->getServer($protocol . "://" . $nameservers[$i]); - - if (!$server->isAlive()) { - unset($this->servers[$protocol . "://" . $nameservers[$i]]); + for ($attempt = 0; $attempt < $attempts; ++$attempt) { + $i = $attempt % \count($nameservers); + $protocol = $receivedTruncatedResponse ? "tcp" : "udp"; /** @var \Amp\Dns\Server $server */ $server = yield $this->getServer($protocol . "://" . $nameservers[$i]); - } - /** @var Message $response */ - $response = yield $server->ask($question); - $this->assertAcceptableResponse($response); + if (!$server->isAlive()) { + unset($this->servers[$protocol . "://" . $nameservers[$i]]); - if ($response->isTruncated()) { - if (!$receivedTruncatedResponse) { - // Retry with TCP, don't count attempt - $receivedTruncatedResponse = true; - $attempt--; - continue; + /** @var \Amp\Dns\Server $server */ + $server = yield $this->getServer($protocol . "://" . $nameservers[$i]); } - throw new ResolutionException("Server returned truncated response"); + /** @var Message $response */ + $response = yield $server->ask($question); + $this->assertAcceptableResponse($response); + + if ($response->isTruncated()) { + if (!$receivedTruncatedResponse) { + // Retry with TCP, don't count attempt + $receivedTruncatedResponse = true; + $attempt--; + continue; + } + + throw new ResolutionException("Server returned truncated response"); + } + + $answers = $response->getAnswerRecords(); + $result = []; + $ttls = []; + + /** @var \LibDNS\Records\Resource $record */ + foreach ($answers as $record) { + $recordType = $record->getType(); + + $result[$recordType][] = (string) $record->getData(); + $ttls[$recordType] = \min($ttls[$recordType] ?? \PHP_INT_MAX, $record->getTTL()); + } + + foreach ($result as $recordType => $records) { + // We don't care here whether storing in the cache fails + $this->cache->set($this->getCacheKey($name, $recordType), \json_encode($records), $ttls[$recordType]); + } + + if (!isset($result[$type])) { + // "it MUST NOT cache it for longer than five (5) minutes" per RFC 2308 section 7.1 + $this->cache->set($this->getCacheKey($name, $type), \json_encode([]), 300); + throw new NoRecordException("No records returned for {$name}"); + } + + return \array_map(function ($data) use ($type, $ttls) { + return new Record($data, $type, $ttls[$type]); + }, $result[$type]); } - $answers = $response->getAnswerRecords(); - $result = []; - $ttls = []; - - /** @var \LibDNS\Records\Resource $record */ - foreach ($answers as $record) { - $recordType = $record->getType(); - - $result[$recordType][] = (string) $record->getData(); - $ttls[$recordType] = \min($ttls[$recordType] ?? \PHP_INT_MAX, $record->getTTL()); - } - - foreach ($result as $recordType => $records) { - // We don't care here whether storing in the cache fails - $this->cache->set($this->getCacheKey($name, $recordType), \json_encode($records), $ttls[$recordType]); - } - - if (!isset($result[$type])) { - // "it MUST NOT cache it for longer than five (5) minutes" per RFC 2308 section 7.1 - $this->cache->set($this->getCacheKey($name, $type), \json_encode([]), 300); - throw new NoRecordException("No records returned for {$name}"); - } - - return array_map(function ($data) use ($type, $ttls) { - return new Record($data, $type, $ttls[$type]); - }, $result[$type]); - } - - throw new ResolutionException("No response from any nameserver after {$attempts} attempts"); + throw new ResolutionException("No response from any nameserver after {$attempts} attempts"); + }); } /** @@ -269,9 +266,9 @@ class BasicResolver implements Resolver { if ($type === Record::PTR) { if (($packedIp = @inet_pton($name)) !== false) { if (isset($packedIp[4])) { // IPv6 - $name = wordwrap(strrev(bin2hex($packedIp)), 1, ".", true) . ".ip6.arpa"; + $name = \wordwrap(\strrev(\bin2hex($packedIp)), 1, ".", true) . ".ip6.arpa"; } else { // IPv4 - $name = inet_ntop(strrev($packedIp)) . ".in-addr.arpa"; + $name = \inet_ntop(\strrev($packedIp)) . ".in-addr.arpa"; } } } else if (\in_array($type, [Record::A, Record::AAAA])) { @@ -286,7 +283,7 @@ class BasicResolver implements Resolver { return $this->servers[$uri]; } - if (substr($uri, 0, 3) === "udp") { + if (\substr($uri, 0, 3) === "udp") { $server = UdpServer::connect($uri); } else { $server = TcpServer::connect($uri); diff --git a/lib/Server.php b/lib/Server.php index 74fff6f..0043c64 100644 --- a/lib/Server.php +++ b/lib/Server.php @@ -4,13 +4,14 @@ namespace Amp\Dns; use Amp\ByteStream\ResourceInputStream; use Amp\ByteStream\ResourceOutputStream; -use Amp\Coroutine; +use Amp\ByteStream\StreamException; use Amp\Deferred; use Amp\Promise; use LibDNS\Messages\Message; use LibDNS\Messages\MessageFactory; use LibDNS\Messages\MessageTypes; use LibDNS\Records\Question; +use function Amp\call; abstract class Server { /** @var \Amp\ByteStream\ResourceInputStream */ @@ -61,44 +62,69 @@ abstract class Server { $this->onResolve = function (\Throwable $exception = null, Message $message = null) { if ($exception) { + $questions = $this->questions; + $this->questions = []; + foreach ($questions as $deferred) { + $deferred->fail($exception); + } return; } $id = $message->getId(); if (!isset($this->questions[$id])) { - return; + return; // Ignore duplicate response. } $deferred = $this->questions[$id]; unset($this->questions[$id]); + + $empty = empty($this->questions); + $deferred->resolve($message); + + if (!$empty) { + $this->receive()->onResolve($this->onResolve); + } }; } + /** + * @param \LibDNS\Records\Question $question + * + * @return \Amp\Promise<\LibDNS\Messages\Message> + */ public function ask(Question $question): Promise { - return new Coroutine($this->doAsk($question)); - } + return call(function () use ($question) { + $id = $this->nextId++; + if ($this->nextId > 0xffff) { + $this->nextId %= 0xffff; + } - private function doAsk(Question $question): \Generator { - $id = $this->nextId++; - if ($this->nextId > 0xffff) { - $this->nextId %= 0xffff; - } + $empty = empty($this->questions); - if (isset($this->questions[$id])) { - $deferred = $this->questions[$id]; - unset($this->questions[$id]); - $deferred->fail(new ResolutionException("Request hasn't been answered with 65k requests in between")); - } + if (isset($this->questions[$id])) { + $deferred = $this->questions[$id]; + unset($this->questions[$id]); + $deferred->fail(new ResolutionException("Request hasn't been answered with 65k requests in between")); + } - $message = $this->createMessage($question, $id); - $this->questions[$id] = $deferred = new Deferred; + $message = $this->createMessage($question, $id); - yield $this->send($message); - $this->receive()->onResolve($this->onResolve); + try { + yield $this->send($message); + } catch (StreamException $exception) { + throw new ResolutionException("Sending the request failed", 0, $exception); + } - return yield $deferred->promise(); + if ($empty) { + $this->receive()->onResolve($this->onResolve); + } + + $this->questions[$id] = $deferred = new Deferred; + + return yield $deferred->promise(); + }); } protected function read(): Promise {