From ff2e0c600ba19ae11e41b948cb1152717eeb8fee Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Fri, 22 May 2020 21:54:02 -0500 Subject: [PATCH] Refactor where unbuffered result is released --- src/PqHandle.php | 39 ++++++++++++++++++++--------------- src/PqUnbufferedResultSet.php | 38 +++++++++------------------------- 2 files changed, 32 insertions(+), 45 deletions(-) diff --git a/src/PqHandle.php b/src/PqHandle.php index cbebcaf..0edc7e5 100644 --- a/src/PqHandle.php +++ b/src/PqHandle.php @@ -182,7 +182,7 @@ final class PqHandle implements Handle } } - if ($this->handle === null) { + if (!$this->handle) { throw new ConnectionException("The connection to the database has been closed"); } @@ -216,6 +216,14 @@ final class PqHandle implements Handle return $result; } + /** + * @param pq\Result $result + * @param string|null $sql + * + * @return Result + * + * @throws FailureException + */ private function makeResult(pq\Result $result, ?string $sql): Result { switch ($result->status) { @@ -235,7 +243,7 @@ final class PqHandle implements Handle return new Coroutine($this->fetch($sql)); }, $result, - \Closure::fromCallable([$this, 'release']) + $this->busy->promise() ); case pq\Result::NONFATAL_ERROR: @@ -268,8 +276,8 @@ final class PqHandle implements Handle private function fetch(string $sql): \Generator { - if ($this->handle === null) { - return null; + if (!$this->handle) { + throw new ConnectionException("Connection closed"); } if (!$this->handle->busy) { // Results buffered. @@ -295,7 +303,16 @@ final class PqHandle implements Handle switch ($result->status) { case pq\Result::TUPLES_OK: // End of result set. - return $this->fetchNextResult($sql); + $deferred = $this->busy; + $this->busy = null; + + try { + $deferred->resolve($this->fetchNextResult($sql)); + } catch (\Throwable $exception) { + $deferred->fail($exception); + } + + return null; case pq\Result::SINGLE_TUPLE: return $result; @@ -305,18 +322,6 @@ final class PqHandle implements Handle } } - private function release(): void - { - \assert( - $this->busy instanceof Deferred && $this->busy !== $this->deferred, - "Connection in invalid state when releasing" - ); - - $deferred = $this->busy; - $this->busy = null; - $deferred->resolve(); - } - /** * Executes the named statement using the given parameters. * diff --git a/src/PqUnbufferedResultSet.php b/src/PqUnbufferedResultSet.php index 0e6d92d..e94c8ca 100644 --- a/src/PqUnbufferedResultSet.php +++ b/src/PqUnbufferedResultSet.php @@ -3,7 +3,6 @@ namespace Amp\Postgres; use Amp\AsyncGenerator; -use Amp\Deferred; use Amp\DisposedException; use Amp\Promise; use Amp\Sql\Result; @@ -11,28 +10,21 @@ use pq; final class PqUnbufferedResultSet implements Result { - /** @var int */ - private $numCols; - - /** @var AsyncGenerator */ + /** @var AsyncGenerator, null, null> */ private $generator; - /** @var Deferred */ - private $next; + /** @var Promise */ + private $nextResult; /** - * @param callable():Promise $fetch Function to fetch next result row. - * @param \pq\Result $result PostgreSQL result object. - * @param callable():void $release Invoked once the result has been fully consumed. + * @param callable():Promise<\pq\Result|Result|null> $fetch Function to fetch next result row. + * @param \pq\Result $result Initial PostgreSQL result object. + * @param Promise $nextResult */ - public function __construct(callable $fetch, pq\Result $result, callable $release) + public function __construct(callable $fetch, pq\Result $result, Promise $nextResult) { - $this->numCols = $result->numCols; - - $this->next = $deferred = new Deferred; - $this->generator = new AsyncGenerator(static function (callable $yield) use ( - $deferred, $release, $result, $fetch - ): \Generator { + $this->nextResult = $nextResult; + $this->generator = new AsyncGenerator(static function (callable $yield) use ($result, $fetch): \Generator { try { do { $promise = $fetch(); @@ -45,23 +37,13 @@ final class PqUnbufferedResultSet implements Result while (($result = yield $promise) instanceof pq\Result) { $promise = $fetch(); } - } finally { - if ($result instanceof Result) { - $deferred->resolve($result); - return; - } - - // Only release if there was no next result set. - $release(); - - $deferred->resolve(null); } }); } public function getNextResult(): Promise { - return $this->next->promise(); + return $this->nextResult; } /**