1
0
mirror of https://github.com/danog/dns.git synced 2024-12-12 09:29:46 +01:00
dns/lib/Internal/Socket.php

250 lines
6.8 KiB
PHP
Raw Normal View History

2017-06-23 07:34:11 +02:00
<?php
namespace Amp\Dns\Internal;
2017-06-23 07:34:11 +02:00
2017-06-23 18:57:22 +02:00
use Amp;
2017-06-23 07:34:11 +02:00
use Amp\ByteStream\ResourceInputStream;
use Amp\ByteStream\ResourceOutputStream;
use Amp\ByteStream\StreamException;
2017-06-23 07:34:11 +02:00
use Amp\Deferred;
use Amp\Dns\ResolutionException;
use Amp\Dns\TimeoutException;
2017-06-23 07:34:11 +02:00
use Amp\Promise;
use LibDNS\Messages\Message;
use LibDNS\Messages\MessageFactory;
use LibDNS\Messages\MessageTypes;
use LibDNS\Records\Question;
use function Amp\call;
2017-06-23 07:34:11 +02:00
/** @internal */
abstract class Socket {
const MAX_CONCURRENT_REQUESTS = 500;
2017-06-23 19:13:28 +02:00
/** @var ResourceInputStream */
2017-06-23 07:34:11 +02:00
private $input;
2017-06-23 19:13:28 +02:00
/** @var ResourceOutputStream */
2017-06-23 07:34:11 +02:00
private $output;
/** @var array */
private $pending = [];
2017-06-23 07:34:11 +02:00
2017-06-23 19:13:28 +02:00
/** @var MessageFactory */
2017-06-23 07:34:11 +02:00
private $messageFactory;
/** @var callable */
private $onResolve;
2017-06-23 19:13:28 +02:00
/** @var int */
private $lastActivity;
2017-06-24 07:50:34 +02:00
/** @var bool */
private $receiving = false;
/** @var array */
private $queue = [];
2017-06-23 07:34:11 +02:00
/**
* @param string $uri
*
2017-06-23 19:13:28 +02:00
* @return Promise<\Amp\Dns\Server>
2017-06-23 07:34:11 +02:00
*/
abstract public static function connect(string $uri): Promise;
/**
2017-06-23 19:13:28 +02:00
* @param Message $message
2017-06-23 07:34:11 +02:00
*
2017-06-23 19:13:28 +02:00
* @return Promise<int>
2017-06-23 07:34:11 +02:00
*/
abstract protected function send(Message $message): Promise;
/**
2017-06-23 19:13:28 +02:00
* @return Promise<Message>
2017-06-23 07:34:11 +02:00
*/
abstract protected function receive(): Promise;
2017-06-23 12:49:16 +02:00
/**
* @return bool
*/
abstract public function isAlive(): bool;
2017-06-23 19:13:28 +02:00
public function getLastActivity(): int {
return $this->lastActivity;
}
2017-06-23 07:34:11 +02:00
protected function __construct($socket) {
$this->input = new ResourceInputStream($socket);
$this->output = new ResourceOutputStream($socket);
$this->messageFactory = new MessageFactory;
2017-06-23 19:13:28 +02:00
$this->lastActivity = \time();
2017-06-23 07:34:11 +02:00
$this->onResolve = function (\Throwable $exception = null, Message $message = null) {
2017-06-23 19:13:28 +02:00
$this->lastActivity = \time();
2017-06-24 07:50:34 +02:00
$this->receiving = false;
2017-06-23 19:13:28 +02:00
2017-06-23 07:34:11 +02:00
if ($exception) {
2017-06-23 23:25:50 +02:00
$this->error($exception);
2017-06-23 07:34:11 +02:00
return;
}
$id = $message->getId();
// Ignore duplicate and invalid responses.
if (isset($this->pending[$id]) && $this->matchesQuestion($message, $this->pending[$id]->question)) {
/** @var Deferred $deferred */
$deferred = $this->pending[$id]->deferred;
unset($this->pending[$id]);
2017-06-24 07:50:34 +02:00
$deferred->resolve($message);
2017-06-23 07:34:11 +02:00
}
if (empty($this->pending)) {
2017-06-24 07:50:34 +02:00
$this->input->unreference();
} elseif (!$this->receiving) {
$this->input->reference();
$this->receiving = true;
$this->receive()->onResolve($this->onResolve);
}
2017-06-23 07:34:11 +02:00
};
}
/**
* @param \LibDNS\Records\Question $question
2017-06-23 18:57:22 +02:00
* @param int $timeout
*
* @return \Amp\Promise<\LibDNS\Messages\Message>
*/
2017-06-23 18:57:22 +02:00
public function ask(Question $question, int $timeout): Promise {
return call(function () use ($question, $timeout) {
2017-06-23 19:13:28 +02:00
$this->lastActivity = \time();
if (\count($this->pending) > self::MAX_CONCURRENT_REQUESTS) {
$deferred = new Deferred;
$this->queue[] = $deferred;
yield $deferred->promise();
}
2017-06-23 07:34:11 +02:00
do {
$id = \random_int(0, 0xffff);
} while (isset($this->pending[$id]));
2017-06-23 07:34:11 +02:00
$message = $this->createMessage($question, $id);
try {
yield $this->send($message);
} catch (StreamException $exception) {
$exception = new ResolutionException("Sending the request failed", 0, $exception);
2017-06-23 23:25:50 +02:00
$this->error($exception);
throw $exception;
}
$deferred = new Deferred;
$pending = new class {
use Amp\Struct;
public $deferred;
public $question;
};
$pending->deferred = $deferred;
$pending->question = $question;
$this->pending[$id] = $pending;
2017-06-24 07:50:34 +02:00
$this->input->reference();
if (!$this->receiving) {
$this->receiving = true;
$this->receive()->onResolve($this->onResolve);
}
2017-06-23 07:34:11 +02:00
2017-06-23 18:57:22 +02:00
try {
return yield Promise\timeout($deferred->promise(), $timeout);
2017-06-23 23:25:50 +02:00
} catch (Amp\TimeoutException $exception) {
unset($this->pending[$id]);
if (empty($this->pending)) {
2017-06-24 07:50:34 +02:00
$this->input->unreference();
}
2017-06-24 07:50:34 +02:00
throw new TimeoutException("Didn't receive a response within {$timeout} milliseconds.");
} finally {
if ($this->queue) {
$deferred = array_shift($this->queue);
$deferred->resolve();
}
2017-06-23 18:57:22 +02:00
}
});
2017-06-23 07:34:11 +02:00
}
public function close() {
$this->input->close();
$this->output->close();
}
2017-06-23 23:25:50 +02:00
private function error(\Throwable $exception) {
$this->close();
if (empty($this->pending)) {
2017-06-23 23:25:50 +02:00
return;
}
if (!$exception instanceof ResolutionException) {
$message = "Unexpected error during resolution: " . $exception->getMessage();
$exception = new ResolutionException($message, 0, $exception);
}
$pending = $this->pending;
$this->pending = [];
2017-06-23 23:25:50 +02:00
foreach ($pending as $pendingQuestion) {
/** @var Deferred $deferred */
$deferred = $pendingQuestion->deferred;
2017-06-23 23:25:50 +02:00
$deferred->fail($exception);
}
}
2017-06-23 07:34:11 +02:00
protected function read(): Promise {
return $this->input->read();
}
protected function write(string $data): Promise {
return $this->output->write($data);
}
protected function createMessage(Question $question, int $id): Message {
$request = $this->messageFactory->create(MessageTypes::QUERY);
$request->getQuestionRecords()->add($question);
$request->isRecursionDesired(true);
$request->setID($id);
return $request;
}
private function matchesQuestion(Message $message, Question $question): bool {
if ($message->getType() !== MessageTypes::RESPONSE) {
return false;
}
$questionRecords = $message->getQuestionRecords();
// We only ever ask one question at a time
if (\count($questionRecords) !== 1) {
return false;
}
$questionRecord = $questionRecords->current();
if ($questionRecord->getClass() !== $question->getClass()) {
return false;
}
if ($questionRecord->getType() !== $question->getType()) {
return false;
}
if ($questionRecord->getName()->getValue() !== $question->getName()->getValue()) {
return false;
}
return true;
}
2017-06-23 12:49:16 +02:00
}