1
0
mirror of https://github.com/danog/dns.git synced 2025-01-23 05:51:11 +01:00

Refactor TcpServer connect logic and fix its parser

This commit is contained in:
Niklas Keller 2017-06-23 08:39:49 +02:00
parent c250d471c8
commit f0a67f5619

View File

@ -2,6 +2,7 @@
namespace Amp\Dns;
use Amp;
use Amp\Deferred;
use Amp\Loop;
use Amp\Parser\Parser;
@ -13,10 +14,13 @@ use LibDNS\Messages\Message;
use function Amp\call;
class TcpServer extends Server {
/** @var \LibDNS\Encoder\Encoder */
private $encoder;
/** @var \SplQueue */
private $queue;
/** @var Parser */
private $parser;
public static function connect(string $uri, int $timeout = 5000): Promise {
@ -31,28 +35,38 @@ class TcpServer extends Server {
\stream_set_blocking($socket, false);
$deferred = new Deferred;
$watcher = Loop::onWritable($socket, static function ($watcher) use ($socket, $deferred, &$timer) {
Loop::cancel($watcher);
Loop::cancel($timer);
$deferred->resolve(new self($socket));
});
$timer = Loop::delay($timeout, function () use ($deferred, $watcher, $uri) {
Loop::cancel($watcher);
$deferred->fail(new TimeoutException("Name resolution timed out, could not connect to server at $uri"));
});
return call(function () use ($uri, $socket, $timeout) {
$deferred = new Deferred;
return $deferred->promise();
$watcher = Loop::onWritable($socket, static function () use ($socket, $deferred) {
$deferred->resolve(new self($socket));
});
try {
return yield Promise\timeout($deferred->promise(), $timeout);
} catch (Amp\TimeoutException $e) {
throw new TimeoutException("Name resolution timed out, could not connect to server at $uri");
} finally {
Loop::cancel($watcher);
}
});
}
public static function parser(callable $callback) {
public static function parser(callable $callback): \Generator {
$decoder = (new DecoderFactory)->create();
$length = \unpack("n", yield 2)[1];
$callback($decoder->decode(yield $length));
while (true) {
$length = yield 2;
$length = \unpack("n", $length)[1];
$rawData = yield $length;
$callback($decoder->decode($rawData));
}
}
protected function __construct($socket) {
parent::__construct($socket);
$this->encoder = (new EncoderFactory)->create();
$this->queue = new \SplQueue;
$this->parser = new Parser(self::parser([$this->queue, 'push']));
@ -82,4 +96,4 @@ class TcpServer extends Server {
return new Success($this->queue->shift());
}
}
}