mirror of
https://github.com/danog/dns.git
synced 2024-11-26 20:14:51 +01:00
Use flag to control reading requests
This commit is contained in:
parent
5f73365b9b
commit
3aaf2708f7
@ -27,12 +27,16 @@
|
||||
{
|
||||
"name": "Niklas Keller",
|
||||
"email": "me@kelunik.com"
|
||||
},
|
||||
{
|
||||
"name": "Aaron Piotrowski",
|
||||
"email": "aaron@trowski.com"
|
||||
}
|
||||
],
|
||||
"require": {
|
||||
"php": ">=7.0",
|
||||
"amphp/amp": "^2",
|
||||
"amphp/byte-stream": "^1",
|
||||
"amphp/byte-stream": "^1.1",
|
||||
"amphp/cache": "^1",
|
||||
"amphp/file": "^0.2",
|
||||
"amphp/parser": "^1",
|
||||
|
@ -37,6 +37,9 @@ abstract class Server {
|
||||
/** @var int */
|
||||
private $lastActivity;
|
||||
|
||||
/** @var bool */
|
||||
private $receiving = false;
|
||||
|
||||
/**
|
||||
* @param string $uri
|
||||
*
|
||||
@ -73,6 +76,7 @@ abstract class Server {
|
||||
|
||||
$this->onResolve = function (\Throwable $exception = null, Message $message = null) {
|
||||
$this->lastActivity = \time();
|
||||
$this->receiving = false;
|
||||
|
||||
if ($exception) {
|
||||
$this->error($exception);
|
||||
@ -81,18 +85,17 @@ abstract class Server {
|
||||
|
||||
$id = $message->getId();
|
||||
|
||||
if (!isset($this->questions[$id])) {
|
||||
return; // Ignore duplicate response.
|
||||
}
|
||||
|
||||
if (isset($this->questions[$id])) { // Ignore duplicate response.
|
||||
$deferred = $this->questions[$id];
|
||||
unset($this->questions[$id]);
|
||||
|
||||
$empty = empty($this->questions);
|
||||
|
||||
$deferred->resolve($message);
|
||||
}
|
||||
|
||||
if (!$empty) {
|
||||
if (empty($this->questions)) {
|
||||
$this->input->unreference();
|
||||
} elseif (!$this->receiving) {
|
||||
$this->input->reference();
|
||||
$this->receiving = true;
|
||||
$this->receive()->onResolve($this->onResolve);
|
||||
}
|
||||
};
|
||||
@ -113,8 +116,6 @@ abstract class Server {
|
||||
$this->nextId %= 0xffff;
|
||||
}
|
||||
|
||||
$empty = empty($this->questions);
|
||||
|
||||
if (isset($this->questions[$id])) {
|
||||
$deferred = $this->questions[$id];
|
||||
unset($this->questions[$id]);
|
||||
@ -133,7 +134,10 @@ abstract class Server {
|
||||
|
||||
$this->questions[$id] = $deferred = new Deferred;
|
||||
|
||||
if ($empty) {
|
||||
$this->input->reference();
|
||||
|
||||
if (!$this->receiving) {
|
||||
$this->receiving = true;
|
||||
$this->receive()->onResolve($this->onResolve);
|
||||
}
|
||||
|
||||
@ -141,23 +145,17 @@ abstract class Server {
|
||||
return yield Promise\timeout($deferred->promise(), $timeout);
|
||||
} catch (Amp\TimeoutException $exception) {
|
||||
unset($this->questions[$id]);
|
||||
$exception = new TimeoutException("Didn't receive a response within {$timeout} milliseconds.");
|
||||
$this->error($exception);
|
||||
throw $exception;
|
||||
if (empty($this->questions)) {
|
||||
$this->input->unreference();
|
||||
}
|
||||
throw new TimeoutException("Didn't receive a response within {$timeout} milliseconds.");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
public function close() {
|
||||
if ($this->input === null) {
|
||||
return;
|
||||
}
|
||||
|
||||
$this->input->close();
|
||||
$this->output->close();
|
||||
|
||||
$this->input = null;
|
||||
$this->output = null;
|
||||
}
|
||||
|
||||
private function error(\Throwable $exception) {
|
||||
|
Loading…
Reference in New Issue
Block a user