mirror of
https://github.com/danog/dns.git
synced 2024-11-30 04:29:06 +01:00
Clean up unused sockets
This commit is contained in:
parent
b0aa8daeca
commit
dd938ce846
@ -4,8 +4,10 @@ namespace Amp\Dns;
|
||||
|
||||
use Amp\Cache\ArrayCache;
|
||||
use Amp\Cache\Cache;
|
||||
use Amp\Loop;
|
||||
use Amp\MultiReasonException;
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
use LibDNS\Messages\Message;
|
||||
use LibDNS\Messages\MessageTypes;
|
||||
use LibDNS\Records\Question;
|
||||
@ -27,12 +29,18 @@ class BasicResolver implements Resolver {
|
||||
/** @var Cache */
|
||||
private $cache;
|
||||
|
||||
/** @var array */
|
||||
/** @var Server[] */
|
||||
private $servers = [];
|
||||
|
||||
/** @var array */
|
||||
/** @var Promise[] */
|
||||
private $pendingServers = [];
|
||||
|
||||
/** @var Promise[] */
|
||||
private $pendingQueries = [];
|
||||
|
||||
/** @var string */
|
||||
private $gcWatcher;
|
||||
|
||||
public function __construct(Cache $cache = null, ConfigLoader $configLoader = null) {
|
||||
$this->cache = $cache ?? new ArrayCache;
|
||||
$this->configLoader = $configLoader ?? \stripos(PHP_OS, "win") === 0
|
||||
@ -40,6 +48,22 @@ class BasicResolver implements Resolver {
|
||||
: new UnixConfigLoader;
|
||||
|
||||
$this->questionFactory = new QuestionFactory;
|
||||
|
||||
$this->gcWatcher = Loop::repeat(5000, function () {
|
||||
$now = \time();
|
||||
|
||||
foreach ($this->servers as $server) {
|
||||
if ($server->getLastActivity() < $now - 60) {
|
||||
unset($this->servers);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Loop::unreference($this->gcWatcher);
|
||||
}
|
||||
|
||||
public function __destruct() {
|
||||
Loop::cancel($this->gcWatcher);
|
||||
}
|
||||
|
||||
/** @inheritdoc */
|
||||
@ -296,7 +320,11 @@ class BasicResolver implements Resolver {
|
||||
|
||||
private function getServer($uri): Promise {
|
||||
if (isset($this->servers[$uri])) {
|
||||
return $this->servers[$uri];
|
||||
return new Success($this->servers[$uri]);
|
||||
}
|
||||
|
||||
if (isset($this->pendingServers[$uri])) {
|
||||
return $this->pendingServers[$uri];
|
||||
}
|
||||
|
||||
if (\substr($uri, 0, 3) === "udp") {
|
||||
@ -305,9 +333,11 @@ class BasicResolver implements Resolver {
|
||||
$server = TcpServer::connect($uri);
|
||||
}
|
||||
|
||||
$server->onResolve(function ($error) use ($uri) {
|
||||
$server->onResolve(function ($error, $server) use ($uri) {
|
||||
if ($error) {
|
||||
unset($this->servers[$uri]);
|
||||
unset($this->pendingServers[$uri]);
|
||||
} else {
|
||||
$this->servers[$uri] = $server;
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -15,15 +15,16 @@ use LibDNS\Records\Question;
|
||||
use function Amp\call;
|
||||
|
||||
abstract class Server {
|
||||
/** @var \Amp\ByteStream\ResourceInputStream */
|
||||
/** @var ResourceInputStream */
|
||||
private $input;
|
||||
|
||||
/** @var \Amp\ByteStream\ResourceOutputStream */
|
||||
/** @var ResourceOutputStream */
|
||||
private $output;
|
||||
|
||||
/** @var \Amp\Deferred[] */
|
||||
/** @var Deferred[] */
|
||||
private $questions = [];
|
||||
|
||||
/** @var MessageFactory */
|
||||
private $messageFactory;
|
||||
|
||||
/** @var int */
|
||||
@ -32,22 +33,25 @@ abstract class Server {
|
||||
/** @var callable */
|
||||
private $onResolve;
|
||||
|
||||
/** @var int */
|
||||
private $lastActivity;
|
||||
|
||||
/**
|
||||
* @param string $uri
|
||||
*
|
||||
* @return \Amp\Promise<\Amp\Dns\Server>
|
||||
* @return Promise<\Amp\Dns\Server>
|
||||
*/
|
||||
abstract public static function connect(string $uri): Promise;
|
||||
|
||||
/**
|
||||
* @param \LibDNS\Messages\Message $message
|
||||
* @param Message $message
|
||||
*
|
||||
* @return \Amp\Promise<int>
|
||||
* @return Promise<int>
|
||||
*/
|
||||
abstract protected function send(Message $message): Promise;
|
||||
|
||||
/**
|
||||
* @return \Amp\Promise<\LibDNS\Messages\Message>
|
||||
* @return Promise<Message>
|
||||
*/
|
||||
abstract protected function receive(): Promise;
|
||||
|
||||
@ -56,12 +60,19 @@ abstract class Server {
|
||||
*/
|
||||
abstract public function isAlive(): bool;
|
||||
|
||||
public function getLastActivity(): int {
|
||||
return $this->lastActivity;
|
||||
}
|
||||
|
||||
protected function __construct($socket) {
|
||||
$this->input = new ResourceInputStream($socket);
|
||||
$this->output = new ResourceOutputStream($socket);
|
||||
$this->messageFactory = new MessageFactory;
|
||||
$this->lastActivity = \time();
|
||||
|
||||
$this->onResolve = function (\Throwable $exception = null, Message $message = null) {
|
||||
$this->lastActivity = \time();
|
||||
|
||||
if ($exception) {
|
||||
$questions = $this->questions;
|
||||
$this->questions = [];
|
||||
@ -98,6 +109,8 @@ abstract class Server {
|
||||
*/
|
||||
public function ask(Question $question, int $timeout): Promise {
|
||||
return call(function () use ($question, $timeout) {
|
||||
$this->lastActivity = \time();
|
||||
|
||||
$id = $this->nextId++;
|
||||
if ($this->nextId > 0xffff) {
|
||||
$this->nextId %= 0xffff;
|
||||
|
Loading…
Reference in New Issue
Block a user