1
0
mirror of https://github.com/danog/dns.git synced 2024-11-26 20:14:51 +01:00

Keep receiving if there are still pending requests

This commit is contained in:
Aaron Piotrowski 2017-06-23 10:06:30 -05:00
parent f536ddfd8a
commit a4b714c0b0
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
2 changed files with 111 additions and 88 deletions

View File

@ -4,7 +4,6 @@ namespace Amp\Dns;
use Amp\Cache\ArrayCache;
use Amp\Cache\Cache;
use Amp\Coroutine;
use Amp\MultiReasonException;
use Amp\Promise;
use LibDNS\Messages\Message;
@ -111,7 +110,7 @@ class BasicResolver implements Resolver {
}
}
return array_merge(...$records);
return \array_merge(...$records);
});
}
@ -135,83 +134,81 @@ class BasicResolver implements Resolver {
/** @inheritdoc */
public function query(string $name, int $type): Promise {
return new Coroutine($this->doQuery($name, $type));
}
return call(function () use ($name, $type) {
if (!$this->config) {
$this->config = yield $this->configLoader->loadConfig();
}
public function doQuery(string $name, int $type): \Generator {
if (!$this->config) {
$this->config = yield $this->configLoader->loadConfig();
}
$name = $this->normalizeName($name, $type);
$question = $this->createQuestion($name, $type);
$name = $this->normalizeName($name, $type);
$question = $this->createQuestion($name, $type);
if (null !== $cachedValue = yield $this->cache->get($this->getCacheKey($name, $type))) {
return $this->decodeCachedResult($name, $type, $cachedValue);
}
if (null !== $cachedValue = yield $this->cache->get($this->getCacheKey($name, $type))) {
return $this->decodeCachedResult($name, $type, $cachedValue);
}
$nameservers = $this->config->getNameservers();
$attempts = $this->config->getAttempts();
$receivedTruncatedResponse = false;
$nameservers = $this->config->getNameservers();
$attempts = $this->config->getAttempts();
$receivedTruncatedResponse = false;
for ($attempt = 0; $attempt < $attempts; ++$attempt) {
$i = $attempt % \count($nameservers);
$protocol = $receivedTruncatedResponse ? "tcp" : "udp";
/** @var \Amp\Dns\Server $server */
$server = yield $this->getServer($protocol . "://" . $nameservers[$i]);
if (!$server->isAlive()) {
unset($this->servers[$protocol . "://" . $nameservers[$i]]);
for ($attempt = 0; $attempt < $attempts; ++$attempt) {
$i = $attempt % \count($nameservers);
$protocol = $receivedTruncatedResponse ? "tcp" : "udp";
/** @var \Amp\Dns\Server $server */
$server = yield $this->getServer($protocol . "://" . $nameservers[$i]);
}
/** @var Message $response */
$response = yield $server->ask($question);
$this->assertAcceptableResponse($response);
if (!$server->isAlive()) {
unset($this->servers[$protocol . "://" . $nameservers[$i]]);
if ($response->isTruncated()) {
if (!$receivedTruncatedResponse) {
// Retry with TCP, don't count attempt
$receivedTruncatedResponse = true;
$attempt--;
continue;
/** @var \Amp\Dns\Server $server */
$server = yield $this->getServer($protocol . "://" . $nameservers[$i]);
}
throw new ResolutionException("Server returned truncated response");
/** @var Message $response */
$response = yield $server->ask($question);
$this->assertAcceptableResponse($response);
if ($response->isTruncated()) {
if (!$receivedTruncatedResponse) {
// Retry with TCP, don't count attempt
$receivedTruncatedResponse = true;
$attempt--;
continue;
}
throw new ResolutionException("Server returned truncated response");
}
$answers = $response->getAnswerRecords();
$result = [];
$ttls = [];
/** @var \LibDNS\Records\Resource $record */
foreach ($answers as $record) {
$recordType = $record->getType();
$result[$recordType][] = (string) $record->getData();
$ttls[$recordType] = \min($ttls[$recordType] ?? \PHP_INT_MAX, $record->getTTL());
}
foreach ($result as $recordType => $records) {
// We don't care here whether storing in the cache fails
$this->cache->set($this->getCacheKey($name, $recordType), \json_encode($records), $ttls[$recordType]);
}
if (!isset($result[$type])) {
// "it MUST NOT cache it for longer than five (5) minutes" per RFC 2308 section 7.1
$this->cache->set($this->getCacheKey($name, $type), \json_encode([]), 300);
throw new NoRecordException("No records returned for {$name}");
}
return \array_map(function ($data) use ($type, $ttls) {
return new Record($data, $type, $ttls[$type]);
}, $result[$type]);
}
$answers = $response->getAnswerRecords();
$result = [];
$ttls = [];
/** @var \LibDNS\Records\Resource $record */
foreach ($answers as $record) {
$recordType = $record->getType();
$result[$recordType][] = (string) $record->getData();
$ttls[$recordType] = \min($ttls[$recordType] ?? \PHP_INT_MAX, $record->getTTL());
}
foreach ($result as $recordType => $records) {
// We don't care here whether storing in the cache fails
$this->cache->set($this->getCacheKey($name, $recordType), \json_encode($records), $ttls[$recordType]);
}
if (!isset($result[$type])) {
// "it MUST NOT cache it for longer than five (5) minutes" per RFC 2308 section 7.1
$this->cache->set($this->getCacheKey($name, $type), \json_encode([]), 300);
throw new NoRecordException("No records returned for {$name}");
}
return array_map(function ($data) use ($type, $ttls) {
return new Record($data, $type, $ttls[$type]);
}, $result[$type]);
}
throw new ResolutionException("No response from any nameserver after {$attempts} attempts");
throw new ResolutionException("No response from any nameserver after {$attempts} attempts");
});
}
/**
@ -269,9 +266,9 @@ class BasicResolver implements Resolver {
if ($type === Record::PTR) {
if (($packedIp = @inet_pton($name)) !== false) {
if (isset($packedIp[4])) { // IPv6
$name = wordwrap(strrev(bin2hex($packedIp)), 1, ".", true) . ".ip6.arpa";
$name = \wordwrap(\strrev(\bin2hex($packedIp)), 1, ".", true) . ".ip6.arpa";
} else { // IPv4
$name = inet_ntop(strrev($packedIp)) . ".in-addr.arpa";
$name = \inet_ntop(\strrev($packedIp)) . ".in-addr.arpa";
}
}
} else if (\in_array($type, [Record::A, Record::AAAA])) {
@ -286,7 +283,7 @@ class BasicResolver implements Resolver {
return $this->servers[$uri];
}
if (substr($uri, 0, 3) === "udp") {
if (\substr($uri, 0, 3) === "udp") {
$server = UdpServer::connect($uri);
} else {
$server = TcpServer::connect($uri);

View File

@ -4,13 +4,14 @@ namespace Amp\Dns;
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\Coroutine;
use Amp\ByteStream\StreamException;
use Amp\Deferred;
use Amp\Promise;
use LibDNS\Messages\Message;
use LibDNS\Messages\MessageFactory;
use LibDNS\Messages\MessageTypes;
use LibDNS\Records\Question;
use function Amp\call;
abstract class Server {
/** @var \Amp\ByteStream\ResourceInputStream */
@ -61,44 +62,69 @@ abstract class Server {
$this->onResolve = function (\Throwable $exception = null, Message $message = null) {
if ($exception) {
$questions = $this->questions;
$this->questions = [];
foreach ($questions as $deferred) {
$deferred->fail($exception);
}
return;
}
$id = $message->getId();
if (!isset($this->questions[$id])) {
return;
return; // Ignore duplicate response.
}
$deferred = $this->questions[$id];
unset($this->questions[$id]);
$empty = empty($this->questions);
$deferred->resolve($message);
if (!$empty) {
$this->receive()->onResolve($this->onResolve);
}
};
}
/**
* @param \LibDNS\Records\Question $question
*
* @return \Amp\Promise<\LibDNS\Messages\Message>
*/
public function ask(Question $question): Promise {
return new Coroutine($this->doAsk($question));
}
return call(function () use ($question) {
$id = $this->nextId++;
if ($this->nextId > 0xffff) {
$this->nextId %= 0xffff;
}
private function doAsk(Question $question): \Generator {
$id = $this->nextId++;
if ($this->nextId > 0xffff) {
$this->nextId %= 0xffff;
}
$empty = empty($this->questions);
if (isset($this->questions[$id])) {
$deferred = $this->questions[$id];
unset($this->questions[$id]);
$deferred->fail(new ResolutionException("Request hasn't been answered with 65k requests in between"));
}
if (isset($this->questions[$id])) {
$deferred = $this->questions[$id];
unset($this->questions[$id]);
$deferred->fail(new ResolutionException("Request hasn't been answered with 65k requests in between"));
}
$message = $this->createMessage($question, $id);
$this->questions[$id] = $deferred = new Deferred;
$message = $this->createMessage($question, $id);
yield $this->send($message);
$this->receive()->onResolve($this->onResolve);
try {
yield $this->send($message);
} catch (StreamException $exception) {
throw new ResolutionException("Sending the request failed", 0, $exception);
}
return yield $deferred->promise();
if ($empty) {
$this->receive()->onResolve($this->onResolve);
}
$this->questions[$id] = $deferred = new Deferred;
return yield $deferred->promise();
});
}
protected function read(): Promise {