*/ abstract public static function connect(string $uri): Promise; /** * @param \LibDNS\Messages\Message $message * * @return \Amp\Promise */ abstract protected function send(Message $message): Promise; /** * @return \Amp\Promise<\LibDNS\Messages\Message> */ abstract protected function receive(): Promise; /** * @return bool */ abstract public function isAlive(): bool; protected function __construct($socket) { $this->input = new ResourceInputStream($socket); $this->output = new ResourceOutputStream($socket); $this->messageFactory = new MessageFactory; $this->onResolve = function (\Throwable $exception = null, Message $message = null) { if ($exception) { $questions = $this->questions; $this->questions = []; foreach ($questions as $deferred) { $deferred->fail($exception); } return; } $id = $message->getId(); if (!isset($this->questions[$id])) { return; // Ignore duplicate response. } $deferred = $this->questions[$id]; unset($this->questions[$id]); $empty = empty($this->questions); $deferred->resolve($message); if (!$empty) { $this->receive()->onResolve($this->onResolve); } }; } /** * @param \LibDNS\Records\Question $question * * @return \Amp\Promise<\LibDNS\Messages\Message> */ public function ask(Question $question): Promise { return call(function () use ($question) { $id = $this->nextId++; if ($this->nextId > 0xffff) { $this->nextId %= 0xffff; } $empty = empty($this->questions); if (isset($this->questions[$id])) { $deferred = $this->questions[$id]; unset($this->questions[$id]); $deferred->fail(new ResolutionException("Request hasn't been answered with 65k requests in between")); } $message = $this->createMessage($question, $id); try { yield $this->send($message); } catch (StreamException $exception) { throw new ResolutionException("Sending the request failed", 0, $exception); } if ($empty) { $this->receive()->onResolve($this->onResolve); } $this->questions[$id] = $deferred = new Deferred; return yield $deferred->promise(); }); } protected function read(): Promise { return $this->input->read(); } protected function write(string $data): Promise { return $this->output->write($data); } protected function createMessage(Question $question, int $id): Message { $request = $this->messageFactory->create(MessageTypes::QUERY); $request->getQuestionRecords()->add($question); $request->isRecursionDesired(true); $request->setID($id); return $request; } }