From 3aaf2708f7d6f012ac9044d72acc47ee4bfd272e Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Sat, 24 Jun 2017 00:50:34 -0500 Subject: [PATCH] Use flag to control reading requests --- composer.json | 6 +++++- lib/Server.php | 44 +++++++++++++++++++++----------------------- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/composer.json b/composer.json index dc99a06..9c57cf4 100644 --- a/composer.json +++ b/composer.json @@ -27,12 +27,16 @@ { "name": "Niklas Keller", "email": "me@kelunik.com" + }, + { + "name": "Aaron Piotrowski", + "email": "aaron@trowski.com" } ], "require": { "php": ">=7.0", "amphp/amp": "^2", - "amphp/byte-stream": "^1", + "amphp/byte-stream": "^1.1", "amphp/cache": "^1", "amphp/file": "^0.2", "amphp/parser": "^1", diff --git a/lib/Server.php b/lib/Server.php index bf5bab9..54b5367 100644 --- a/lib/Server.php +++ b/lib/Server.php @@ -37,6 +37,9 @@ abstract class Server { /** @var int */ private $lastActivity; + /** @var bool */ + private $receiving = false; + /** * @param string $uri * @@ -73,6 +76,7 @@ abstract class Server { $this->onResolve = function (\Throwable $exception = null, Message $message = null) { $this->lastActivity = \time(); + $this->receiving = false; if ($exception) { $this->error($exception); @@ -81,18 +85,17 @@ abstract class Server { $id = $message->getId(); - if (!isset($this->questions[$id])) { - return; // Ignore duplicate response. + if (isset($this->questions[$id])) { // Ignore duplicate response. + $deferred = $this->questions[$id]; + unset($this->questions[$id]); + $deferred->resolve($message); } - $deferred = $this->questions[$id]; - unset($this->questions[$id]); - - $empty = empty($this->questions); - - $deferred->resolve($message); - - if (!$empty) { + if (empty($this->questions)) { + $this->input->unreference(); + } elseif (!$this->receiving) { + $this->input->reference(); + $this->receiving = true; $this->receive()->onResolve($this->onResolve); } }; @@ -113,8 +116,6 @@ abstract class Server { $this->nextId %= 0xffff; } - $empty = empty($this->questions); - if (isset($this->questions[$id])) { $deferred = $this->questions[$id]; unset($this->questions[$id]); @@ -133,7 +134,10 @@ abstract class Server { $this->questions[$id] = $deferred = new Deferred; - if ($empty) { + $this->input->reference(); + + if (!$this->receiving) { + $this->receiving = true; $this->receive()->onResolve($this->onResolve); } @@ -141,23 +145,17 @@ abstract class Server { return yield Promise\timeout($deferred->promise(), $timeout); } catch (Amp\TimeoutException $exception) { unset($this->questions[$id]); - $exception = new TimeoutException("Didn't receive a response within {$timeout} milliseconds."); - $this->error($exception); - throw $exception; + if (empty($this->questions)) { + $this->input->unreference(); + } + throw new TimeoutException("Didn't receive a response within {$timeout} milliseconds."); } }); } public function close() { - if ($this->input === null) { - return; - } - $this->input->close(); $this->output->close(); - - $this->input = null; - $this->output = null; } private function error(\Throwable $exception) {