diff --git a/lib/BasicResolver.php b/lib/BasicResolver.php index 00355f4..0bccda9 100644 --- a/lib/BasicResolver.php +++ b/lib/BasicResolver.php @@ -9,7 +9,6 @@ use Amp\MultiReasonException; use Amp\Promise; use Amp\Success; use LibDNS\Messages\Message; -use LibDNS\Messages\MessageTypes; use LibDNS\Records\Question; use LibDNS\Records\QuestionFactory; use function Amp\call; @@ -51,6 +50,10 @@ final class BasicResolver implements Resolver { $this->questionFactory = new QuestionFactory; $this->gcWatcher = Loop::repeat(5000, function () { + if (empty($this->servers)) { + return; + } + $now = \time(); foreach ($this->servers as $key => $server) { @@ -183,21 +186,21 @@ final class BasicResolver implements Resolver { $nameservers = $this->config->getNameservers(); $attempts = $this->config->getAttempts(); - $receivedTruncatedResponse = false; + $protocol = "udp"; + $attempt = 0; - for ($attempt = 0; $attempt < $attempts; ++$attempt) { - $i = $attempt % \count($nameservers); - $protocol = $receivedTruncatedResponse ? "tcp" : "udp"; + /** @var Server $server */ + $uri = $protocol . "://" . $nameservers[0]; + $server = yield $this->getServer($uri); + while ($attempt < $attempts) { try { - /** @var \Amp\Dns\Server $server */ - $server = yield $this->getServer($protocol . "://" . $nameservers[$i]); - if (!$server->isAlive()) { - $this->servers[$protocol . "://" . $nameservers[$i]]->close(); - unset($this->servers[$protocol . "://" . $nameservers[$i]]); + unset($this->servers[$uri]); + $server->close(); /** @var \Amp\Dns\Server $server */ + $i = $attempt % \count($nameservers); $server = yield $this->getServer($protocol . "://" . $nameservers[$i]); } @@ -206,10 +209,11 @@ final class BasicResolver implements Resolver { $this->assertAcceptableResponse($response); if ($response->isTruncated()) { - if (!$receivedTruncatedResponse) { + if ($protocol !== "tcp") { // Retry with TCP, don't count attempt - $receivedTruncatedResponse = true; - $attempt--; + $protocol = "tcp"; + $i = $attempt % \count($nameservers); + $server = yield $this->getServer($protocol . "://" . $nameservers[$i]); continue; } @@ -244,6 +248,9 @@ final class BasicResolver implements Resolver { return new Record($data, $type, $ttls[$type]); }, $result[$type]); } catch (TimeoutException $e) { + $i = ++$attempt % \count($nameservers); + $server = yield $this->getServer($protocol . "://" . $nameservers[$i]); + continue; } } @@ -327,6 +334,10 @@ final class BasicResolver implements Resolver { } private function getServer($uri): Promise { + if (\substr($uri, 0, 3) === "udp") { + return UdpServer::connect($uri); + } + if (isset($this->servers[$uri])) { return new Success($this->servers[$uri]); } @@ -335,16 +346,12 @@ final class BasicResolver implements Resolver { return $this->pendingServers[$uri]; } - if (\substr($uri, 0, 3) === "udp") { - $server = UdpServer::connect($uri); - } else { - $server = TcpServer::connect($uri); - } + $server = TcpServer::connect($uri); $server->onResolve(function ($error, $server) use ($uri) { - if ($error) { - unset($this->pendingServers[$uri]); - } else { + unset($this->pendingServers[$uri]); + + if (!$error) { $this->servers[$uri] = $server; } }); diff --git a/lib/Server.php b/lib/Server.php index 59916ea..59cdf5e 100644 --- a/lib/Server.php +++ b/lib/Server.php @@ -16,6 +16,8 @@ use function Amp\call; /** @internal */ abstract class Server { + const MAX_CONCURRENT_REQUESTS = 500; + /** @var ResourceInputStream */ private $input; @@ -28,9 +30,6 @@ abstract class Server { /** @var MessageFactory */ private $messageFactory; - /** @var int */ - private $nextId = 0; - /** @var callable */ private $onResolve; @@ -40,6 +39,9 @@ abstract class Server { /** @var bool */ private $receiving = false; + /** @var array */ + private $queue = []; + /** * @param string $uri * @@ -113,17 +115,15 @@ abstract class Server { return call(function () use ($question, $timeout) { $this->lastActivity = \time(); - $id = $this->nextId++; - if ($this->nextId > 0xffff) { - $this->nextId %= 0xffff; + if (\count($this->pending) > self::MAX_CONCURRENT_REQUESTS) { + $deferred = new Deferred; + $this->queue[] = $deferred; + yield $deferred->promise(); } - if (isset($this->pending[$id])) { - /** @var Deferred $deferred */ - $deferred = $this->pending[$id]->deferred; - unset($this->pending[$id]); - $deferred->fail(new ResolutionException("Request hasn't been answered with 65k requests in between")); - } + do { + $id = \random_int(0, 0xffff); + } while (isset($this->pending[$id])); $message = $this->createMessage($question, $id); @@ -158,10 +158,17 @@ abstract class Server { return yield Promise\timeout($deferred->promise(), $timeout); } catch (Amp\TimeoutException $exception) { unset($this->pending[$id]); + if (empty($this->pending)) { $this->input->unreference(); } + throw new TimeoutException("Didn't receive a response within {$timeout} milliseconds."); + } finally { + if ($this->queue) { + $deferred = array_shift($this->queue); + $deferred->resolve(); + } } }); }