1
0
mirror of https://github.com/danog/postgres.git synced 2024-12-02 09:27:54 +01:00

Refactor where unbuffered result is released

This commit is contained in:
Aaron Piotrowski 2020-05-22 21:54:02 -05:00
parent 47275f7cdb
commit ff2e0c600b
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
2 changed files with 32 additions and 45 deletions

View File

@ -182,7 +182,7 @@ final class PqHandle implements Handle
}
}
if ($this->handle === null) {
if (!$this->handle) {
throw new ConnectionException("The connection to the database has been closed");
}
@ -216,6 +216,14 @@ final class PqHandle implements Handle
return $result;
}
/**
* @param pq\Result $result
* @param string|null $sql
*
* @return Result
*
* @throws FailureException
*/
private function makeResult(pq\Result $result, ?string $sql): Result
{
switch ($result->status) {
@ -235,7 +243,7 @@ final class PqHandle implements Handle
return new Coroutine($this->fetch($sql));
},
$result,
\Closure::fromCallable([$this, 'release'])
$this->busy->promise()
);
case pq\Result::NONFATAL_ERROR:
@ -268,8 +276,8 @@ final class PqHandle implements Handle
private function fetch(string $sql): \Generator
{
if ($this->handle === null) {
return null;
if (!$this->handle) {
throw new ConnectionException("Connection closed");
}
if (!$this->handle->busy) { // Results buffered.
@ -295,7 +303,16 @@ final class PqHandle implements Handle
switch ($result->status) {
case pq\Result::TUPLES_OK: // End of result set.
return $this->fetchNextResult($sql);
$deferred = $this->busy;
$this->busy = null;
try {
$deferred->resolve($this->fetchNextResult($sql));
} catch (\Throwable $exception) {
$deferred->fail($exception);
}
return null;
case pq\Result::SINGLE_TUPLE:
return $result;
@ -305,18 +322,6 @@ final class PqHandle implements Handle
}
}
private function release(): void
{
\assert(
$this->busy instanceof Deferred && $this->busy !== $this->deferred,
"Connection in invalid state when releasing"
);
$deferred = $this->busy;
$this->busy = null;
$deferred->resolve();
}
/**
* Executes the named statement using the given parameters.
*

View File

@ -3,7 +3,6 @@
namespace Amp\Postgres;
use Amp\AsyncGenerator;
use Amp\Deferred;
use Amp\DisposedException;
use Amp\Promise;
use Amp\Sql\Result;
@ -11,28 +10,21 @@ use pq;
final class PqUnbufferedResultSet implements Result
{
/** @var int */
private $numCols;
/** @var AsyncGenerator */
/** @var AsyncGenerator<array<string, mixed>, null, null> */
private $generator;
/** @var Deferred */
private $next;
/** @var Promise<Result|null> */
private $nextResult;
/**
* @param callable():Promise<pq\Result|ResultSet|null> $fetch Function to fetch next result row.
* @param \pq\Result $result PostgreSQL result object.
* @param callable():void $release Invoked once the result has been fully consumed.
* @param callable():Promise<\pq\Result|Result|null> $fetch Function to fetch next result row.
* @param \pq\Result $result Initial PostgreSQL result object.
* @param Promise<Result|null> $nextResult
*/
public function __construct(callable $fetch, pq\Result $result, callable $release)
public function __construct(callable $fetch, pq\Result $result, Promise $nextResult)
{
$this->numCols = $result->numCols;
$this->next = $deferred = new Deferred;
$this->generator = new AsyncGenerator(static function (callable $yield) use (
$deferred, $release, $result, $fetch
): \Generator {
$this->nextResult = $nextResult;
$this->generator = new AsyncGenerator(static function (callable $yield) use ($result, $fetch): \Generator {
try {
do {
$promise = $fetch();
@ -45,23 +37,13 @@ final class PqUnbufferedResultSet implements Result
while (($result = yield $promise) instanceof pq\Result) {
$promise = $fetch();
}
} finally {
if ($result instanceof Result) {
$deferred->resolve($result);
return;
}
// Only release if there was no next result set.
$release();
$deferred->resolve(null);
}
});
}
public function getNextResult(): Promise
{
return $this->next->promise();
return $this->nextResult;
}
/**