resolve(new self($socket)); }); try { return yield Promise\timeout($deferred->promise(), $timeout); } catch (Amp\TimeoutException $e) { throw new TimeoutException("Name resolution timed out, could not connect to server at $uri"); } finally { Loop::cancel($watcher); } }); } public static function parser(callable $callback): \Generator { $decoder = (new DecoderFactory)->create(); while (true) { $length = yield 2; $length = \unpack("n", $length)[1]; $rawData = yield $length; $callback($decoder->decode($rawData)); } } protected function __construct($socket) { parent::__construct($socket); $this->encoder = (new EncoderFactory)->create(); $this->queue = new \SplQueue; $this->parser = new Parser(self::parser([$this->queue, 'push'])); } protected function send(Message $message): Promise { $data = $this->encoder->encode($message); $promise = $this->write(\pack("n", \strlen($data)) . $data); $promise->onResolve(function ($error) { if ($error) { $this->isAlive = false; } }); return $promise; } protected function receive(): Promise { if ($this->queue->isEmpty()) { return call(function () { do { $chunk = $this->read(); if ($chunk === null) { $this->isAlive = false; throw new ResolutionException("Reading from the server failed"); } $this->parser->push($chunk); } while ($this->queue->isEmpty()); return $this->queue->shift(); }); } return new Success($this->queue->shift()); } public function isAlive(): bool { return $this->isAlive; } }