isset($result[Record::CNAME]) + isset($result[Record::DNAME])) { unset($result[Record::CNAME], $result[Record::DNAME]); yield new CoroutineResult($result); return; } // @TODO check for potentially using recursion and iterate over *all* CNAME/DNAME // @FIXME check higher level for CNAME? foreach ([Record::CNAME, Record::DNAME] as $type) { if (isset($result[$type])) { list($lookupName) = $result[$type][0]; } } } throw new ResolutionException("CNAME or DNAME chain too long (possible recursion?)"); } function __doRequest($state, $uri, $name, $type) { $server = __loadExistingServer($state, $uri) ?: __loadNewServer($state, $uri); // Get the next available request ID do { $requestId = $state->requestIdCounter++; if ($state->requestIdCounter >= MAX_REQUEST_ID) { $state->requestIdCounter = 1; } } while (isset($state->pendingRequests[$requestId])); // Create question record $question = $state->questionFactory->create($type); $question->setName($name); // Create request message $request = $state->messageFactory->create(MessageTypes::QUERY); $request->getQuestionRecords()->add($question); $request->isRecursionDesired(true); $request->setID($requestId); // Encode request message $requestPacket = $state->encoder->encode($request); if (substr($uri, 0, 6) == "tcp://") { $requestPacket = pack("n", strlen($requestPacket)) . $requestPacket; } // Send request $bytesWritten = \fwrite($server->socket, $requestPacket); if ($bytesWritten === false || isset($packet[$bytesWritten])) { throw new ResolutionException( "Request send failed" ); } $promisor = new Deferred; $server->pendingRequests[$requestId] = true; $state->pendingRequests[$requestId] = [$promisor, $name, $type, $uri]; return $promisor->promise(); } function __doResolve($name, array $types, $options) { static $state; $state = $state ?: (yield \Amp\resolve(__init())); if (empty($types)) { yield new CoroutineResult([]); return; } assert(array_reduce($types, function ($result, $val) { return $result && \is_int($val); }, true), 'The $types passed to DNS functions must all be integers (from \Amp\Dns\Record class)'); $name = \strtolower($name); $result = []; // Check for cache hits if (!isset($options["cache"]) || $options["cache"]) { foreach ($types as $k => $type) { $cacheKey = "$name#$type"; $cacheValue = (yield $state->arrayCache->get($cacheKey)); if ($cacheValue !== null) { $result[$type] = $cacheValue; unset($types[$k]); } } if (empty($types)) { yield new CoroutineResult($result); return; } } $timeout = empty($options["timeout"]) ? $state->config["timeout"] : (int) $options["timeout"]; if (empty($options["server"])) { if (empty($state->config["nameservers"])) { throw new ResolutionException("No nameserver specified in system config"); } $uri = "udp://" . $state->config["nameservers"][0]; } else { $uri = __parseCustomServerUri($options["server"]); } foreach ($types as $type) { $promises[] = __doRequest($state, $uri, $name, $type); } try { list( , $resultArr) = (yield \Amp\timeout(\Amp\some($promises), $timeout)); foreach ($resultArr as $value) { $result += $value; } } catch (\Amp\TimeoutException $e) { if (substr($uri, 0, 6) == "tcp://") { throw new TimeoutException( "Name resolution timed out for {$name}" ); } else { $options["server"] = \preg_replace("#[a-z.]+://#", "tcp://", $uri); yield new CoroutineResult(\Amp\resolve(__doResolve($name, $types, $options))); } } catch (ResolutionException $e) { if (empty($result)) { // if we have no cached results throw $e; } } catch (\RuntimeException $e) { // if all promises in Amp\some fail if (empty($result)) { // if we have no cached results throw new ResolutionException("All name resolution requests failed", 0, $e); } } yield new CoroutineResult($result); } function __init() { $state = new \StdClass; $state->messageFactory = new MessageFactory; $state->questionFactory = new QuestionFactory; $state->encoder = (new EncoderFactory)->create(); $state->decoder = (new DecoderFactory)->create(); $state->arrayCache = new ArrayCache; $state->requestIdCounter = 1; $state->pendingRequests = []; $state->serverIdMap = []; $state->serverUriMap = []; $state->serverIdTimeoutMap = []; $state->now = \time(); $state->serverTimeoutWatcher = \Amp\repeat(function ($watcherId) use ($state) { $state->now = $now = \time(); foreach ($state->serverIdTimeoutMap as $id => $expiry) { if ($now > $expiry) { __unloadServer($state, $id); } } if (empty($state->serverIdMap)) { \Amp\disable($watcherId); } }, 1000, $options = [ "enable" => true, "keep_alive" => false, ]); $state->config = (yield \Amp\resolve(__loadResolvConf())); yield new CoroutineResult($state); } /** @link http://man7.org/linux/man-pages/man5/resolv.conf.5.html */ function __loadResolvConf($path = null) { $result = [ "nameservers" => [ "8.8.8.8", "8.8.4.4", ], "timeout" => 3000, "attempts" => 2, ]; if (\stripos(PHP_OS, "win") !== 0) { $path = $path ?: "/etc/resolv.conf"; try { $lines = explode("\n", (yield \Amp\File\get($path))); $result["nameservers"] = []; foreach ($lines as $line) { $line = \preg_split('#\s+#', $line, 2); if (\count($line) !== 2) { continue; } list($type, $value) = $line; if ($type === "nameserver") { $line[1] = trim($line[1]); $ip = @\inet_pton($line[1]); if ($ip === false) { continue; } $result["nameservers"][] = $line[1] . ":53"; } elseif ($type === "options") { $optline = preg_split('#\s+#', $value, 2); if (\count($optline) !== 2) { continue; } // TODO: Respect the contents of the attempts setting during resolution list($option, $value) = $optline; if (in_array($option, ["timeout", "attempts"])) { $result[$option] = (int) $value; } } } } catch (\Amp\File\FilesystemException $e) {} } yield new CoroutineResult($result); } function __loadHostsFile($path = null) { $data = []; if (empty($path)) { $path = \stripos(PHP_OS, "win") === 0 ? 'C:\Windows\system32\drivers\etc\hosts' : '/etc/hosts'; } try { $contents = (yield \Amp\File\get($path)); } catch (\Exception $e) { yield new CoroutineResult($data); return; } $lines = \array_filter(\array_map("trim", \explode("\n", $contents))); foreach ($lines as $line) { if ($line[0] === "#") { continue; } $parts = \preg_split('/\s+/', $line); if (!($ip = @\inet_pton($parts[0]))) { continue; } elseif (isset($ip[4])) { $key = Record::AAAA; } else { $key = Record::A; } for ($i = 1, $l = \count($parts); $i < $l; $i++) { if (__isValidHostName($parts[$i])) { $data[$key][strtolower($parts[$i])] = $parts[0]; } } } yield new CoroutineResult($data); } function __parseCustomServerUri($uri) { if (!\is_string($uri)) { throw new ResolutionException( 'Invalid server address ($uri must be a string IP address, ' . gettype($uri) . " given)" ); } if (strpos("://", $uri) !== false) { return $uri; } if (($colonPos = strrpos(":", $uri)) !== false) { $addr = \substr($uri, 0, $colonPos); $port = \substr($uri, $colonPos); } else { $addr = $uri; $port = 53; } $addr = trim($addr, "[]"); if (!$inAddr = @\inet_pton($addr)) { throw new ResolutionException( 'Invalid server $uri; string IP address required' ); } return isset($inAddr[4]) ? "udp://[{$addr}]:{$port}" : "udp://{$addr}:{$port}"; } function __loadExistingServer($state, $uri) { if (empty($state->serverUriMap[$uri])) { return null; } $server = $state->serverUriMap[$uri]; if (\is_resource($server->socket)) { unset($state->serverIdTimeoutMap[$server->id]); \Amp\enable($server->watcherId); return $server; } __unloadServer($state, $server->id); return null; } function __loadNewServer($state, $uri) { if (!$socket = @\stream_socket_client($uri, $errno, $errstr)) { throw new ResolutionException(sprintf( "Connection to %s failed: [Error #%d] %s", $uri, $errno, $errstr )); } \stream_set_blocking($socket, false); $id = (int) $socket; $server = new \StdClass; $server->id = $id; $server->uri = $uri; $server->socket = $socket; $server->buffer = ""; $server->length = INF; $server->pendingRequests = []; $server->watcherId = \Amp\onReadable($socket, 'Amp\Dns\__onReadable', [ "enable" => true, "keep_alive" => true, "cb_data" => $state, ]); $state->serverIdMap[$id] = $server; $state->serverUriMap[$uri] = $server; return $server; } function __unloadServer($state, $serverId, $error = null) { $server = $state->serverIdMap[$serverId]; \Amp\cancel($server->watcherId); unset( $state->serverIdMap[$serverId], $state->serverUriMap[$server->uri] ); if (\is_resource($server->socket)) { @\fclose($server->socket); } if ($error && $server->pendingRequests) { foreach (array_keys($server->pendingRequests) as $requestId) { list($promisor) = $state->pendingRequests[$requestId]; $promisor->fail($error); } } } function __onReadable($watcherId, $socket, $state) { $serverId = (int) $socket; $packet = @\fread($socket, 512); if ($packet != "") { $server = $state->serverIdMap[$serverId]; if (\substr($server->uri, 0, 6) == "tcp://") { if ($server->length == INF) { $server->length = unpack("n", $packet)[1]; $packet = substr($packet, 2); } $server->buffer .= $packet; while ($server->length <= \strlen($server->buffer)) { __decodeResponsePacket($state, $serverId, substr($server->buffer, 0, $server->length)); $server->buffer = substr($server->buffer, $server->length); if (\strlen($server->buffer) >= 2 + $server->length) { $server->length = unpack("n", $server->buffer)[1]; $server->buffer = substr($server->buffer, 2); } else { $server->length = INF; } } } else { __decodeResponsePacket($state, $serverId, $packet); } } else { __unloadServer($state, $serverId, new ResolutionException( "Server connection failed" )); } } function __decodeResponsePacket($state, $serverId, $packet) { try { $response = $state->decoder->decode($packet); $requestId = $response->getID(); $responseCode = $response->getResponseCode(); $responseType = $response->getType(); if ($responseCode !== 0) { __finalizeResult($state, $serverId, $requestId, new ResolutionException( "Server returned error code: {$responseCode}" )); } elseif ($responseType !== MessageTypes::RESPONSE) { __unloadServer($state, $serverId, new ResolutionException( "Invalid server reply; expected RESPONSE but received QUERY" )); } else { __processDecodedResponse($state, $serverId, $requestId, $response); } } catch (\Exception $e) { __unloadServer($state, $serverId, new ResolutionException( "Response decode error", 0, $e )); } } function __processDecodedResponse($state, $serverId, $requestId, $response) { list($promisor, $name, $type, $uri) = $state->pendingRequests[$requestId]; // Retry via tcp if message has been truncated if ($response->isTruncated()) { if (\substr($uri, 0, 6) != "tcp://") { $uri = \preg_replace("#[a-z.]+://#", "tcp://", $uri); $promisor->succeed(__doRequest($state, $uri, $name, $type)); } else { __finalizeResult($state, $serverId, $requestId, new ResolutionException( "Server returned truncated response" )); } return; } $answers = $response->getAnswerRecords(); foreach ($answers as $record) { $result[$record->getType()][] = [(string) $record->getData(), $record->getType(), $record->getTTL()]; } if (empty($result)) { $state->arrayCache->set("$name#$type", [], 300); // "it MUST NOT cache it for longer than five (5) minutes" per RFC 2308 section 7.1 __finalizeResult($state, $serverId, $requestId, new NoRecordException( "No records returned for {$name}" )); } else { __finalizeResult($state, $serverId, $requestId, $error = null, $result); } } function __finalizeResult($state, $serverId, $requestId, $error = null, $result = null) { if (empty($state->pendingRequests[$requestId])) { return; } list($promisor, $name) = $state->pendingRequests[$requestId]; $server = $state->serverIdMap[$serverId]; unset( $state->pendingRequests[$requestId], $server->pendingRequests[$requestId] ); if (empty($server->pendingRequests)) { $state->serverIdTimeoutMap[$server->id] = $state->now + IDLE_TIMEOUT; \Amp\disable($server->watcherId); \Amp\enable($state->serverTimeoutWatcher); } if ($error) { $promisor->fail($error); } else { foreach ($result as $type => $records) { $minttl = INF; foreach ($records as list( , $ttl)) { if ($ttl && $minttl > $ttl) { $minttl = $ttl; } } $state->arrayCache->set("$name#$type", $records, $minttl); } $promisor->succeed($result); } }