1
0
mirror of https://github.com/danog/postgres.git synced 2024-11-30 04:29:12 +01:00

Fix statement storage

Seems I simplified too much and made a circular reference. However, I hope to have found what was causing the issue in #19 — the refCount was being incremented when a promise for a statement was returned, inflating the number of statement references. This takes a similar approach as in 1.0.3, but with anonymous classes instead.
This commit is contained in:
Aaron Piotrowski 2019-03-26 11:18:39 -05:00
parent 9c499e1f5e
commit e460f50716
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
2 changed files with 75 additions and 27 deletions

View File

@ -10,6 +10,7 @@ use Amp\Promise;
use Amp\Sql\ConnectionException; use Amp\Sql\ConnectionException;
use Amp\Sql\FailureException; use Amp\Sql\FailureException;
use Amp\Sql\QueryError; use Amp\Sql\QueryError;
use Amp\Struct;
use Amp\Success; use Amp\Success;
use function Amp\call; use function Amp\call;
@ -50,7 +51,7 @@ final class PgSqlHandle implements Handle
/** @var callable */ /** @var callable */
private $unlisten; private $unlisten;
/** @var Promise[] */ /** @var Struct[] */
private $statements = []; private $statements = [];
/** @var int */ /** @var int */
@ -312,6 +313,12 @@ final class PgSqlHandle implements Handle
\assert(isset($this->statements[$name]), "Named statement not found when deallocating"); \assert(isset($this->statements[$name]), "Named statement not found when deallocating");
$storage = $this->statements[$name];
if (--$storage->refCount) {
return new Success;
}
unset($this->statements[$name]); unset($this->statements[$name]);
return $this->query(\sprintf("DEALLOCATE %s", $name)); return $this->query(\sprintf("DEALLOCATE %s", $name));
@ -362,16 +369,34 @@ final class PgSqlHandle implements Handle
$name = Handle::STATEMENT_NAME_PREFIX . \sha1($modifiedSql); $name = Handle::STATEMENT_NAME_PREFIX . \sha1($modifiedSql);
if (isset($this->statements[$name])) { if (isset($this->statements[$name])) {
return $this->statements[$name]; $storage = $this->statements[$name];
if ($storage->promise instanceof Promise) {
return $storage->promise;
}
++$storage->refCount; // Only increase refCount when returning a new object.
return new Success(new PgSqlStatement($this, $name, $sql, $names));
} }
return $this->statements[$name] = call(function () use ($name, $names, $sql, $modifiedSql) { $storage = new class {
use Struct;
public $refCount = 1;
public $promise;
};
$this->statements[$name] = $storage;
return $storage->promise = call(function () use ($storage, $name, $names, $sql, $modifiedSql) {
try { try {
/** @var resource $result PostgreSQL result resource. */ /** @var resource $result PostgreSQL result resource. */
$result = yield from $this->send("pg_send_prepare", $name, $modifiedSql); $result = yield from $this->send("pg_send_prepare", $name, $modifiedSql);
} catch (\Throwable $exception) { } catch (\Throwable $exception) {
unset($this->statements[$name]); unset($this->statements[$name]);
throw $exception; throw $exception;
} finally {
$storage->promise = null;
} }
switch (\pg_result_status($result, \PGSQL_STATUS_LONG)) { switch (\pg_result_status($result, \PGSQL_STATUS_LONG)) {

View File

@ -11,6 +11,7 @@ use Amp\Promise;
use Amp\Sql\ConnectionException; use Amp\Sql\ConnectionException;
use Amp\Sql\FailureException; use Amp\Sql\FailureException;
use Amp\Sql\QueryError; use Amp\Sql\QueryError;
use Amp\Struct;
use Amp\Success; use Amp\Success;
use pq; use pq;
use function Amp\call; use function Amp\call;
@ -38,11 +39,8 @@ final class PqHandle implements Handle
/** @var \Amp\Emitter[] */ /** @var \Amp\Emitter[] */
private $listeners; private $listeners;
/** @var Promise[] */ /** @var Struct[] */
private $statementPromises = []; private $statements = [];
/** @var \pq\Statement[] */
private $statementHandles = [];
/** @var callable */ /** @var callable */
private $fetch; private $fetch;
@ -314,9 +312,13 @@ final class PqHandle implements Handle
*/ */
public function statementExecute(string $name, array $params): Promise public function statementExecute(string $name, array $params): Promise
{ {
\assert(isset($this->statementHandles[$name]), "Named statement not found when executing"); \assert(isset($this->statements[$name]), "Named statement not found when executing");
return new Coroutine($this->send([$this->statementHandles[$name], "execAsync"], $params)); $storage = $this->statements[$name];
\assert($storage->statement instanceof pq\Statement, "Statement storage in invalid state");
return new Coroutine($this->send([$storage->statement, "execAsync"], $params));
} }
/** /**
@ -332,15 +334,19 @@ final class PqHandle implements Handle
return new Success; // Connection dead. return new Success; // Connection dead.
} }
\assert( \assert(isset($this->statements[$name]), "Named statement not found when deallocating");
isset($this->statementPromises[$name], $this->statementHandles[$name]),
"Named statement not found when deallocating"
);
$statement = $this->statementHandles[$name]; $storage = $this->statements[$name];
unset($this->statementPromises[$name], $this->statementHandles[$name]);
return new Coroutine($this->send([$statement, "deallocateAsync"])); if (--$storage->refCount) {
return new Success;
}
unset($this->statements[$name]);
\assert($storage->statement instanceof pq\Statement, "Statement storage in invalid state");
return new Coroutine($this->send([$storage->statement, "deallocateAsync"]));
} }
/** /**
@ -383,19 +389,36 @@ final class PqHandle implements Handle
$name = Handle::STATEMENT_NAME_PREFIX . \sha1($modifiedSql); $name = Handle::STATEMENT_NAME_PREFIX . \sha1($modifiedSql);
if (isset($this->statementPromises[$name])) { if (isset($this->statements[$name])) {
return $this->statementPromises[$name]; $storage = $this->statements[$name];
}
return $this->statementPromises[$name] = call(function () use ($names, $name, $sql, $modifiedSql) { if ($storage->promise instanceof Promise) {
try { return $storage->promise;
$statement = yield from $this->send([$this->handle, "prepareAsync"], $name, $modifiedSql);
} catch (\Throwable $exception) {
unset($this->statementPromises[$name]);
throw $exception;
} }
$this->statementHandles[$name] = $statement; ++$storage->refCount; // Only increase refCount when returning a new object.
return new Success(new PqStatement($this, $name, $sql, $names));
}
$storage = new class {
use Struct;
public $refCount = 1;
public $promise;
public $statement;
};
$this->statements[$name] = $storage;
return $storage->promise = call(function () use ($storage, $names, $name, $sql, $modifiedSql) {
try {
$storage->statement = yield from $this->send([$this->handle, "prepareAsync"], $name, $modifiedSql);
} catch (\Throwable $exception) {
unset($this->statements[$name]);
throw $exception;
} finally {
$storage->promise = null;
}
return new PqStatement($this, $name, $sql, $names); return new PqStatement($this, $name, $sql, $names);
}); });