1
0
mirror of https://github.com/danog/dns.git synced 2025-01-23 05:51:11 +01:00

Update for Revolt changes and Futures

This commit is contained in:
Aaron Piotrowski 2021-09-14 20:56:17 -05:00
parent 7e78dd0c96
commit 223e61f720
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
10 changed files with 211 additions and 226 deletions

View File

@ -45,11 +45,11 @@
"php": ">=8",
"ext-json": "*",
"ext-filter": "*",
"amphp/amp": "dev-v3-revolt",
"amphp/byte-stream": "dev-v2-revolt",
"amphp/cache": "dev-v2-revolt",
"amphp/amp": "dev-v3-revolt as 3",
"amphp/byte-stream": "dev-v2-revolt as 2",
"amphp/cache": "dev-v2-revolt as 2",
"amphp/parser": "^1",
"amphp/windows-registry": "dev-v1-revolt",
"amphp/windows-registry": "dev-v1-revolt as 1",
"daverandom/libdns": "^2.0.1"
},
"require-dev": {

View File

@ -18,7 +18,7 @@ final class Config
private bool $rotation = false;
public function __construct(array $nameservers, array $knownHosts = [], int $timeout = 3000, int $attempts = 2)
public function __construct(array $nameservers, array $knownHosts = [], float $timeout = 3, int $attempts = 2)
{
if (\count($nameservers) < 1) {
throw new ConfigException("At least one nameserver is required for a valid config");
@ -96,7 +96,7 @@ final class Config
return $this->knownHosts;
}
public function getTimeout(): int
public function getTimeout(): float
{
return $this->timeout;
}

View File

@ -5,18 +5,16 @@ namespace Amp\Dns\Internal;
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\ByteStream\StreamException;
use Amp\CancelledException;
use Amp\Deferred;
use Amp\Dns\DnsException;
use Amp\Dns\TimeoutException;
use Amp\Promise;
use Amp\TimeoutException as PromiseTimeoutException;
use Amp\TimeoutCancellationToken;
use LibDNS\Messages\Message;
use LibDNS\Messages\MessageFactory;
use LibDNS\Messages\MessageTypes;
use LibDNS\Records\Question;
use Revolt\EventLoop\Internal\Struct;
use function Amp\async;
use function Amp\await;
use function Revolt\EventLoop\defer;
/** @internal */
abstract class Socket
@ -35,8 +33,6 @@ abstract class Socket
/** @var array Contains already sent queries with no response yet. For UDP this is exactly zero or one item. */
private array $pending = [];
private MessageFactory $messageFactory;
/** @var callable */
private $onResolve;
/** @var int Used for determining whether the socket can be garbage collected, because it's inactive. */
private int $lastActivity;
private bool $receiving = false;
@ -49,35 +45,46 @@ abstract class Socket
$this->output = new ResourceOutputStream($socket);
$this->messageFactory = new MessageFactory;
$this->lastActivity = \time();
}
$this->onResolve = function (\Throwable $exception = null, Message $message = null) {
$this->lastActivity = \time();
$this->receiving = false;
if ($exception) {
$this->error($exception);
return;
private function fetch(): void {
defer(function (): void {
try {
$this->handleResolution(null, $this->receive());
} catch (\Throwable $exception) {
$this->handleResolution($exception);
}
});
}
\assert($message instanceof Message);
$id = $message->getId();
private function handleResolution(?\Throwable $exception, ?Message $message = null): void
{
$this->lastActivity = \time();
$this->receiving = false;
// Ignore duplicate and invalid responses.
if (isset($this->pending[$id]) && $this->matchesQuestion($message, $this->pending[$id]->question)) {
/** @var Deferred $deferred */
$deferred = $this->pending[$id]->deferred;
unset($this->pending[$id]);
$deferred->resolve($message);
}
if ($exception) {
$this->error($exception);
return;
}
if (empty($this->pending)) {
$this->input->unreference();
} elseif (!$this->receiving) {
$this->input->reference();
$this->receiving = true;
async(fn () => $this->receive())->onResolve($this->onResolve);
}
};
\assert($message instanceof Message);
$id = $message->getId();
// Ignore duplicate and invalid responses.
if (isset($this->pending[$id]) && $this->matchesQuestion($message, $this->pending[$id]->question)) {
/** @var Deferred $deferred */
$deferred = $this->pending[$id]->deferred;
unset($this->pending[$id]);
$deferred->complete($message);
}
if (empty($this->pending)) {
$this->input->unreference();
} elseif (!$this->receiving) {
$this->input->reference();
$this->receiving = true;
$this->fetch();
}
}
abstract public function isAlive(): bool;
@ -89,18 +96,18 @@ abstract class Socket
/**
* @param Question $question
* @param int $timeout
* @param float $timeout
*
* @return Message
*/
final public function ask(Question $question, int $timeout): Message
final public function ask(Question $question, float $timeout): Message
{
$this->lastActivity = \time();
if (\count($this->pending) > self::MAX_CONCURRENT_REQUESTS) {
$deferred = new Deferred;
$this->queue[] = $deferred;
await($deferred->promise());
$deferred->getFuture()->join();
}
do {
@ -109,8 +116,6 @@ abstract class Socket
$deferred = new Deferred;
$pending = new class {
use Struct;
public Deferred $deferred;
public Question $question;
};
@ -133,19 +138,19 @@ abstract class Socket
if (!$this->receiving) {
$this->receiving = true;
async(fn () => $this->receive())->onResolve($this->onResolve);
$this->fetch();
}
try {
return await(Promise\timeout($deferred->promise(), $timeout));
} catch (PromiseTimeoutException $exception) {
return $deferred->getFuture()->join(new TimeoutCancellationToken($timeout));
} catch (CancelledException $exception) {
unset($this->pending[$id]);
if (empty($this->pending)) {
$this->input->unreference();
}
throw new TimeoutException("Didn't receive a response within {$timeout} milliseconds.");
throw new TimeoutException("Didn't receive a response within {$timeout} seconds.");
} finally {
if ($this->queue) {
$deferred = \array_shift($this->queue);
@ -202,7 +207,7 @@ abstract class Socket
foreach ($pending as $pendingQuestion) {
/** @var Deferred $deferred */
$deferred = $pendingQuestion->deferred;
$deferred->fail($exception);
$deferred->error($exception);
}
}

View File

@ -2,18 +2,17 @@
namespace Amp\Dns\Internal;
use Amp\CancelledException;
use Amp\Deferred;
use Amp\Dns\DnsException;
use Amp\Dns\TimeoutException;
use Amp\Parser\Parser;
use Amp\Promise;
use Amp\TimeoutException as PromiseTimeoutException;
use Amp\TimeoutCancellationToken;
use LibDNS\Decoder\DecoderFactory;
use LibDNS\Encoder\Encoder;
use LibDNS\Encoder\EncoderFactory;
use LibDNS\Messages\Message;
use Revolt\EventLoop\Loop;
use function Amp\await;
/** @internal */
final class TcpSocket extends Socket
@ -35,12 +34,12 @@ final class TcpSocket extends Socket
$watcher = Loop::onWritable($socket, static function (string $watcher) use ($socket, $deferred): void {
Loop::cancel($watcher);
$deferred->resolve(new self($socket));
$deferred->complete(new self($socket));
});
try {
return await(Promise\timeout($deferred->promise(), $timeout));
} catch (PromiseTimeoutException $e) {
return $deferred->getFuture()->join(new TimeoutCancellationToken($timeout));
} catch (CancelledException $e) {
Loop::cancel($watcher);
throw new TimeoutException("Name resolution timed out, could not connect to server at $uri");
}

View File

@ -4,17 +4,15 @@ namespace Amp\Dns;
use Amp\Cache\ArrayCache;
use Amp\Cache\Cache;
use Amp\CompositeException;
use Amp\Dns\Internal\Socket;
use Amp\Dns\Internal\TcpSocket;
use Amp\Dns\Internal\UdpSocket;
use Amp\MultiReasonException;
use Amp\Promise;
use Amp\Future;
use LibDNS\Messages\Message;
use LibDNS\Records\Question;
use LibDNS\Records\QuestionFactory;
use Revolt\EventLoop\Loop;
use function Amp\async;
use function Amp\await;
final class Rfc1035StubResolver implements Resolver
{
@ -31,17 +29,17 @@ final class Rfc1035StubResolver implements Resolver
private int $configStatus = self::CONFIG_NOT_LOADED;
private ?Promise $pendingConfig = null;
private ?Future $pendingConfig = null;
private Cache $cache;
/** @var Socket[] */
private array $sockets = [];
/** @var Promise[] */
/** @var Future[] */
private array $pendingSockets = [];
/** @var Promise[] */
/** @var Future[] */
private array $pendingQueries = [];
private string $gcWatcher;
@ -164,13 +162,11 @@ final class Rfc1035StubResolver implements Resolver
}
try {
[, $records] = await(Promise\some([
async(fn () => $this->query($searchName, Record::A)),
async(fn () => $this->query($searchName, Record::AAAA)),
]));
return \array_merge(...$records);
} catch (MultiReasonException $e) {
return Future\any([
Future\spawn(fn () => $this->query($searchName, Record::A)),
Future\spawn(fn () => $this->query($searchName, Record::AAAA)),
]);
} catch (CompositeException $e) {
$errors = [];
foreach ($e->getReasons() as $reason) {
@ -228,10 +224,10 @@ final class Rfc1035StubResolver implements Resolver
public function reloadConfig(): Config
{
if ($this->pendingConfig) {
return await($this->pendingConfig);
$this->pendingConfig->join();
}
$promise = async(function (): Config {
$this->pendingConfig = Future\spawn(function (): Config {
try {
$this->config = $this->configLoader->loadConfig();
$this->configStatus = self::CONFIG_LOADED;
@ -255,18 +251,14 @@ final class Rfc1035StubResolver implements Resolver
);
\restore_error_handler();
}
} finally {
$this->pendingConfig = null;
}
return $this->config;
});
$this->pendingConfig = $promise;
$promise->onResolve(function (): void {
$this->pendingConfig = null;
});
return await($promise);
return $this->pendingConfig->join();
}
/** @inheritdoc */
@ -275,132 +267,133 @@ final class Rfc1035StubResolver implements Resolver
$pendingQueryKey = $type . " " . $name;
if (isset($this->pendingQueries[$pendingQueryKey])) {
return await($this->pendingQueries[$pendingQueryKey]);
return $this->pendingQueries[$pendingQueryKey]->join();
}
$promise = async(function () use ($name, $type): array {
if ($this->configStatus === self::CONFIG_NOT_LOADED) {
$this->reloadConfig();
}
if ($this->configStatus === self::CONFIG_FAILED) {
return $this->blockingFallbackResolver->query($name, $type);
}
$future = Future\spawn(function () use ($name, $type): array {
try {
if ($this->configStatus === self::CONFIG_NOT_LOADED) {
$this->reloadConfig();
}
if ($this->configStatus === self::CONFIG_FAILED) {
return $this->blockingFallbackResolver->query($name, $type);
}
$name = $this->normalizeName($name, $type);
$question = $this->createQuestion($name, $type);
$name = $this->normalizeName($name, $type);
$question = $this->createQuestion($name, $type);
if (null !== $cachedValue = $this->cache->get($this->getCacheKey($name, $type))) {
return $this->decodeCachedResult($name, $type, $cachedValue);
}
if (null !== $cachedValue = $this->cache->get($this->getCacheKey($name, $type))) {
return $this->decodeCachedResult($name, $type, $cachedValue);
}
$nameservers = $this->selectNameservers();
$nameserversCount = \count($nameservers);
$attempts = $this->config->getAttempts();
$protocol = "udp";
$attempt = 0;
$nameservers = $this->selectNameservers();
$nameserversCount = \count($nameservers);
$attempts = $this->config->getAttempts();
$protocol = "udp";
$attempt = 0;
/** @var Socket $socket */
$uri = $protocol . "://" . $nameservers[0];
$socket = $this->getSocket($uri);
/** @var Socket $socket */
$uri = $protocol . "://" . $nameservers[0];
$socket = $this->getSocket($uri);
$attemptDescription = [];
$attemptDescription = [];
while ($attempt < $attempts) {
try {
if (!$socket->isAlive()) {
unset($this->sockets[$uri]);
$socket->close();
$uri = $protocol . "://" . $nameservers[$attempt % $nameserversCount];
$socket = $this->getSocket($uri);
}
$attemptDescription[] = $uri;
$response = $socket->ask($question, $this->config->getTimeout());
$this->assertAcceptableResponse($response, $name);
// UDP sockets are never reused, they're not in the $this->sockets map
if ($protocol === "udp") {
// Defer call, because it interferes with the unreference() call in Internal\Socket otherwise
Loop::defer(static function () use ($socket): void {
while ($attempt < $attempts) {
try {
if (!$socket->isAlive()) {
unset($this->sockets[$uri]);
$socket->close();
});
}
if ($response->isTruncated()) {
if ($protocol !== "tcp") {
// Retry with TCP, don't count attempt
$protocol = "tcp";
$uri = $protocol . "://" . $nameservers[$attempt % $nameserversCount];
$socket = $this->getSocket($uri);
continue;
}
throw new DnsException("Server returned a truncated response for '{$name}' (" . Record::getName($type) . ")");
$attemptDescription[] = $uri;
$response = $socket->ask($question, $this->config->getTimeout());
$this->assertAcceptableResponse($response, $name);
// UDP sockets are never reused, they're not in the $this->sockets map
if ($protocol === "udp") {
// Defer call, because it interferes with the unreference() call in Internal\Socket otherwise
Loop::defer(static function () use ($socket): void {
$socket->close();
});
}
if ($response->isTruncated()) {
if ($protocol !== "tcp") {
// Retry with TCP, don't count attempt
$protocol = "tcp";
$uri = $protocol . "://" . $nameservers[$attempt % $nameserversCount];
$socket = $this->getSocket($uri);
continue;
}
throw new DnsException("Server returned a truncated response for '{$name}' (" . Record::getName($type) . ")");
}
$answers = $response->getAnswerRecords();
$result = [];
$ttls = [];
/** @var \LibDNS\Records\Resource $record */
foreach ($answers as $record) {
$recordType = $record->getType();
$result[$recordType][] = (string) $record->getData();
// Cache for max one day
$ttls[$recordType] = \min($ttls[$recordType] ?? 86400, $record->getTTL());
}
foreach ($result as $recordType => $records) {
// We don't care here whether storing in the cache fails
$this->cache->set(
$this->getCacheKey($name, $recordType),
\json_encode($records),
$ttls[$recordType]
);
}
if (!isset($result[$type])) {
// "it MUST NOT cache it for longer than five (5) minutes" per RFC 2308 section 7.1
$this->cache->set($this->getCacheKey($name, $type), \json_encode([]), 300);
throw new NoRecordException("No records returned for '{$name}' (" . Record::getName($type) . ")");
}
return \array_map(static function ($data) use ($type, $ttls) {
return new Record($data, $type, $ttls[$type]);
}, $result[$type]);
} catch (TimeoutException $e) {
// Defer call, because it might interfere with the unreference() call in Internal\Socket otherwise
Loop::defer(function () use ($socket, $uri): void {
unset($this->sockets[$uri]);
$socket->close();
});
$uri = $protocol . "://" . $nameservers[++$attempt % $nameserversCount];
$socket = $this->getSocket($uri);
continue;
}
$answers = $response->getAnswerRecords();
$result = [];
$ttls = [];
/** @var \LibDNS\Records\Resource $record */
foreach ($answers as $record) {
$recordType = $record->getType();
$result[$recordType][] = (string) $record->getData();
// Cache for max one day
$ttls[$recordType] = \min($ttls[$recordType] ?? 86400, $record->getTTL());
}
foreach ($result as $recordType => $records) {
// We don't care here whether storing in the cache fails
$this->cache->set(
$this->getCacheKey($name, $recordType),
\json_encode($records),
$ttls[$recordType]
);
}
if (!isset($result[$type])) {
// "it MUST NOT cache it for longer than five (5) minutes" per RFC 2308 section 7.1
$this->cache->set($this->getCacheKey($name, $type), \json_encode([]), 300);
throw new NoRecordException("No records returned for '{$name}' (" . Record::getName($type) . ")");
}
return \array_map(static function ($data) use ($type, $ttls) {
return new Record($data, $type, $ttls[$type]);
}, $result[$type]);
} catch (TimeoutException $e) {
// Defer call, because it might interfere with the unreference() call in Internal\Socket otherwise
Loop::defer(function () use ($socket, $uri): void {
unset($this->sockets[$uri]);
$socket->close();
});
$uri = $protocol . "://" . $nameservers[++$attempt % $nameserversCount];
$socket = $this->getSocket($uri);
continue;
}
throw new TimeoutException(\sprintf(
"No response for '%s' (%s) from any nameserver within %d ms after %d attempts, tried %s",
$name,
Record::getName($type),
$this->config->getTimeout(),
$attempts,
\implode(", ", $attemptDescription)
));
} finally {
unset($this->pendingQueries[$type . " " . $name]);
}
throw new TimeoutException(\sprintf(
"No response for '%s' (%s) from any nameserver within %d ms after %d attempts, tried %s",
$name,
Record::getName($type),
$this->config->getTimeout(),
$attempts,
\implode(", ", $attemptDescription)
));
});
$this->pendingQueries[$type . " " . $name] = $promise;
$promise->onResolve(function () use ($name, $type) {
unset($this->pendingQueries[$type . " " . $name]);
});
$this->pendingQueries[$type . " " . $name] = $future;
return await($promise);
return $future->join();
}
private function queryHosts(string $name, int $typeRestriction = null): array
@ -494,20 +487,22 @@ final class Rfc1035StubResolver implements Resolver
}
if (isset($this->pendingSockets[$uri])) {
return await($this->pendingSockets[$uri]);
return $this->pendingSockets[$uri]->join();
}
$promise = async(fn () => TcpSocket::connect($uri));
$promise->onResolve(function (?\Throwable $error, TcpSocket $server) use ($uri): void {
unset($this->pendingSockets[$uri]);
if (!$error) {
$this->sockets[$uri] = $server;
$future = Future\spawn(function () use ($uri) {
try {
$socket = TcpSocket::connect($uri);
$this->sockets[$uri] = $socket;
return $socket;
} finally {
unset($this->pendingSockets[$uri]);
}
});
return await($promise);
$this->pendingSockets[$uri] = $future;
return $future->join();
}
/**

View File

@ -7,11 +7,11 @@ final class UnixConfigLoader implements ConfigLoader
public const MAX_NAMESERVERS = 3;
public const MAX_DNS_SEARCH = 6;
public const MAX_TIMEOUT = 30 * 1000;
public const MAX_TIMEOUT = 30;
public const MAX_ATTEMPTS = 5;
public const MAX_NDOTS = 15;
public const DEFAULT_TIMEOUT = 5 * 1000;
public const DEFAULT_TIMEOUT = 5;
public const DEFAULT_ATTEMPTS = 2;
public const DEFAULT_NDOTS = 1;
@ -153,7 +153,7 @@ final class UnixConfigLoader implements ConfigLoader
return []; // don't overwrite option value
}
// The value for this option is silently capped to 30s
return ["timeout", (int) \min($value * 1000, self::MAX_TIMEOUT)];
return ["timeout", (int) \min($value, self::MAX_TIMEOUT)];
case "attempts":
$value = (int) $value;

View File

@ -4,8 +4,6 @@ namespace Amp\Dns;
use Revolt\EventLoop\Loop;
const LOOP_STATE_IDENTIFIER = Resolver::class;
/**
* Retrieve the application-wide dns resolver instance.
*
@ -15,19 +13,15 @@ const LOOP_STATE_IDENTIFIER = Resolver::class;
*/
function resolver(Resolver $resolver = null): Resolver
{
if ($resolver === null) {
$resolver = Loop::getState(LOOP_STATE_IDENTIFIER);
static $map;
$map ??= new \WeakMap();
$driver = Loop::getDriver();
if ($resolver) {
return $resolver;
}
$resolver = createDefaultResolver();
if ($resolver) {
return $map[$driver] = $resolver;
}
Loop::setState(LOOP_STATE_IDENTIFIER, $resolver);
return $resolver;
return $map[$driver] ??= createDefaultResolver();
}
/**

View File

@ -10,18 +10,10 @@ use Amp\Dns\UnixConfigLoader;
use Amp\Dns\WindowsConfigLoader;
use Amp\PHPUnit\AsyncTestCase;
use PHPUnit\Framework\MockObject\MockObject;
use function Amp\async;
use function Amp\await;
use function Amp\Future\spawn;
class IntegrationTest extends AsyncTestCase
{
public function setUp(): void
{
parent::setUp();
$this->ignoreLoopWatchers();
}
/**
* @param string $hostname
*
@ -165,10 +157,10 @@ class IntegrationTest extends AsyncTestCase
*/
public function testRequestSharing(): void
{
$promise1 = async(fn () => Dns\query("example.com", Record::A));
$promise2 = async(fn () => Dns\query("example.com", Record::A));
$promise1 = spawn(fn () => Dns\query("example.com", Record::A));
$promise2 = spawn(fn () => Dns\query("example.com", Record::A));
self::assertSame(await($promise1), await($promise2));
self::assertSame($promise1->join(), $promise2->join());
}
public function provideHostnames(): array

View File

@ -29,19 +29,19 @@ class TcpSocketTest extends SocketTest
$socket = $this->connect();
$result = $socket->ask($question, 3000);
$result = $socket->ask($question, 3);
self::assertInstanceOf(Message::class, $result);
self::assertSame(MessageTypes::RESPONSE, $result->getType());
// Google's DNS times out really fast
delay(3000);
delay(3);
$this->expectException(Dns\DnsException::class);
$this->expectExceptionMessageMatches("(Sending the request failed|Reading from the server failed)");
$socket->ask($question, 3000);
$socket->ask($question, 3);
}
protected function connect(): Dns\Internal\Socket

View File

@ -19,7 +19,7 @@ class UnixConfigLoaderTest extends AsyncTestCase
"[2001:4860:4860::8888]:53",
], $result->getNameservers());
self::assertSame(30000, $result->getTimeout());
self::assertSame(30.0, $result->getTimeout());
self::assertSame(3, $result->getAttempts());
self::assertEmpty($result->getSearchList());
self::assertSame(1, $result->getNdots());
@ -37,7 +37,7 @@ class UnixConfigLoaderTest extends AsyncTestCase
"[2001:4860:4860::8888]:53",
], $result->getNameservers());
self::assertSame(30000, $result->getTimeout());
self::assertSame(30.0, $result->getTimeout());
self::assertSame(3, $result->getAttempts());
self::assertSame(['local', 'local1', 'local2', 'local3', 'local4', 'local5'], $result->getSearchList());
self::assertSame(15, $result->getNdots());
@ -55,7 +55,7 @@ class UnixConfigLoaderTest extends AsyncTestCase
"[2001:4860:4860::8888]:53",
], $result->getNameservers());
self::assertSame(5000, $result->getTimeout());
self::assertSame(5.0, $result->getTimeout());
self::assertSame(2, $result->getAttempts());
self::assertTrue($result->isRotationEnabled());
}
@ -71,7 +71,7 @@ class UnixConfigLoaderTest extends AsyncTestCase
"[2001:4860:4860::8888]:53",
], $result->getNameservers());
self::assertSame(5000, $result->getTimeout());
self::assertSame(5.0, $result->getTimeout());
self::assertSame(2, $result->getAttempts());
self::assertSame(1, $result->getNdots());
}
@ -92,7 +92,7 @@ class UnixConfigLoaderTest extends AsyncTestCase
self::assertSame(['local'], $result->getSearchList());
self::assertSame(1000, $result->getTimeout());
self::assertSame(1.0, $result->getTimeout());
self::assertSame(5, $result->getAttempts());
self::assertSame(10, $result->getNdots());
self::assertTrue($result->isRotationEnabled());