mirror of
https://github.com/danog/dns.git
synced 2024-11-30 04:29:06 +01:00
Implement TCP retry and server reuse
This commit is contained in:
parent
48e37f4014
commit
0331eadcc8
@ -6,6 +6,7 @@ use Amp\Cache\ArrayCache;
|
||||
use Amp\Cache\Cache;
|
||||
use Amp\Coroutine;
|
||||
use Amp\Promise;
|
||||
use LibDNS\Messages\Message;
|
||||
use LibDNS\Messages\MessageTypes;
|
||||
use LibDNS\Records\Question;
|
||||
use LibDNS\Records\QuestionFactory;
|
||||
@ -25,6 +26,9 @@ class BasicResolver implements Resolver {
|
||||
/** @var Cache */
|
||||
private $cache;
|
||||
|
||||
/** @var array */
|
||||
private $servers = [];
|
||||
|
||||
public function __construct(Cache $cache = null, ConfigLoader $configLoader = null) {
|
||||
$this->cache = $cache ?? new ArrayCache;
|
||||
$this->configLoader = $configLoader ?? \stripos(PHP_OS, "win") === 0
|
||||
@ -58,27 +62,35 @@ class BasicResolver implements Resolver {
|
||||
|
||||
$nameservers = $this->config->getNameservers();
|
||||
$attempts = $this->config->getAttempts();
|
||||
$receivedTruncatedResponse = false;
|
||||
|
||||
for ($attempt = 0; $attempt < $attempts; ++$attempt) {
|
||||
$i = $attempt % \count($nameservers);
|
||||
$uri = "udp://" . $nameservers[$i];
|
||||
$protocol = $receivedTruncatedResponse ? "tcp" : "udp";
|
||||
|
||||
/** @var \Amp\Dns\Server $server */
|
||||
$server = yield UdpServer::connect($uri);
|
||||
$server = yield $this->getServer($protocol . "://" . $nameservers[$i]);
|
||||
|
||||
/** @var \LibDNS\Messages\Message $response */
|
||||
if (!$server->isAlive()) {
|
||||
unset($this->servers[$protocol . "://" . $nameservers[$i]]);
|
||||
|
||||
/** @var \Amp\Dns\Server $server */
|
||||
$server = yield $this->getServer($protocol . "://" . $nameservers[$i]);
|
||||
}
|
||||
|
||||
/** @var Message $response */
|
||||
$response = yield $server->ask($question);
|
||||
|
||||
if ($response->getResponseCode() !== 0) {
|
||||
throw new ResolutionException(\sprintf("Server returned error code: %d", $response->getResponseCode()));
|
||||
}
|
||||
|
||||
if ($response->getType() !== MessageTypes::RESPONSE) {
|
||||
throw new ResolutionException("Invalid server reply; expected RESPONSE but received QUERY");
|
||||
}
|
||||
$this->assertAcceptableResponse($response);
|
||||
|
||||
if ($response->isTruncated()) {
|
||||
// TODO: Retry via TCP
|
||||
if (!$receivedTruncatedResponse) {
|
||||
// Retry with TCP, don't count attempt
|
||||
$receivedTruncatedResponse = true;
|
||||
$attempt--;
|
||||
continue;
|
||||
}
|
||||
|
||||
throw new ResolutionException("Server returned truncated response");
|
||||
}
|
||||
|
||||
$answers = $response->getAnswerRecords();
|
||||
@ -165,4 +177,34 @@ class BasicResolver implements Resolver {
|
||||
|
||||
return $name;
|
||||
}
|
||||
|
||||
private function getServer($uri): Promise {
|
||||
if (isset($this->servers[$uri])) {
|
||||
return $this->servers[$uri];
|
||||
}
|
||||
|
||||
if (substr($uri, 0, 3) === "udp") {
|
||||
$server = UdpServer::connect($uri);
|
||||
} else {
|
||||
$server = TcpServer::connect($uri);
|
||||
}
|
||||
|
||||
$server->onResolve(function ($error) use ($uri) {
|
||||
if ($error) {
|
||||
unset($this->servers[$uri]);
|
||||
}
|
||||
});
|
||||
|
||||
return $server;
|
||||
}
|
||||
|
||||
private function assertAcceptableResponse(Message $response) {
|
||||
if ($response->getResponseCode() !== 0) {
|
||||
throw new ResolutionException(\sprintf("Server returned error code: %d", $response->getResponseCode()));
|
||||
}
|
||||
|
||||
if ($response->getType() !== MessageTypes::RESPONSE) {
|
||||
throw new ResolutionException("Invalid server reply; expected RESPONSE but received QUERY");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -49,6 +49,11 @@ abstract class Server {
|
||||
*/
|
||||
abstract protected function receive(): Promise;
|
||||
|
||||
/**
|
||||
* @return bool
|
||||
*/
|
||||
abstract public function isAlive(): bool;
|
||||
|
||||
protected function __construct($socket) {
|
||||
$this->input = new ResourceInputStream($socket);
|
||||
$this->output = new ResourceOutputStream($socket);
|
||||
@ -107,4 +112,4 @@ abstract class Server {
|
||||
$request->setID($id);
|
||||
return $request;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -23,6 +23,9 @@ class TcpServer extends Server {
|
||||
/** @var Parser */
|
||||
private $parser;
|
||||
|
||||
/** @var bool */
|
||||
private $isAlive = true;
|
||||
|
||||
public static function connect(string $uri, int $timeout = 5000): Promise {
|
||||
if (!$socket = @\stream_socket_client($uri, $errno, $errstr, 0, STREAM_CLIENT_ASYNC_CONNECT)) {
|
||||
throw new ResolutionException(\sprintf(
|
||||
@ -72,18 +75,26 @@ class TcpServer extends Server {
|
||||
$this->parser = new Parser(self::parser([$this->queue, 'push']));
|
||||
}
|
||||
|
||||
public function send(Message $message): Promise {
|
||||
protected function send(Message $message): Promise {
|
||||
$data = $this->encoder->encode($message);
|
||||
return $this->write(\pack("n", \strlen($data)) . $data);
|
||||
$promise = $this->write(\pack("n", \strlen($data)) . $data);
|
||||
$promise->onResolve(function ($error) {
|
||||
if ($error) {
|
||||
$this->isAlive = false;
|
||||
}
|
||||
});
|
||||
|
||||
return $promise;
|
||||
}
|
||||
|
||||
public function receive(): Promise {
|
||||
protected function receive(): Promise {
|
||||
if ($this->queue->isEmpty()) {
|
||||
return call(function () {
|
||||
do {
|
||||
$chunk = $this->read();
|
||||
|
||||
if ($chunk === null) {
|
||||
$this->isAlive = false;
|
||||
throw new ResolutionException("Reading from the server failed");
|
||||
}
|
||||
|
||||
@ -96,4 +107,8 @@ class TcpServer extends Server {
|
||||
|
||||
return new Success($this->queue->shift());
|
||||
}
|
||||
|
||||
public function isAlive(): bool {
|
||||
return $this->isAlive;
|
||||
}
|
||||
}
|
||||
|
@ -52,4 +52,8 @@ class UdpServer extends Server {
|
||||
return $this->decoder->decode($data);
|
||||
});
|
||||
}
|
||||
|
||||
public function isAlive(): bool {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user