1
0
mirror of https://github.com/danog/postgres.git synced 2024-11-30 04:29:12 +01:00

Fix simultaneous requests to prepare the same query

This commit is contained in:
Aaron Piotrowski 2018-10-30 11:12:14 -05:00
parent 01f9b5c1c6
commit b7fa576952
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB

View File

@ -13,7 +13,6 @@ use Amp\Sql\Pool as SqlPool;
use Amp\Sql\ResultSet as SqlResultSet; use Amp\Sql\ResultSet as SqlResultSet;
use Amp\Sql\Statement as SqlStatement; use Amp\Sql\Statement as SqlStatement;
use Amp\Sql\Transaction as SqlTransaction; use Amp\Sql\Transaction as SqlTransaction;
use Amp\Success;
use cash\LRUCache; use cash\LRUCache;
use function Amp\call; use function Amp\call;
@ -64,14 +63,18 @@ final class Pool extends ConnectionPool implements Link
$this->statementWatcher = Loop::repeat(1000, static function () use (&$idleTimeout, $statements) { $this->statementWatcher = Loop::repeat(1000, static function () use (&$idleTimeout, $statements) {
$now = \time(); $now = \time();
foreach ($statements as $hash => $statement) { foreach ($statements as $sql => $statement) {
if ($statement instanceof Promise) {
continue;
}
\assert($statement instanceof StatementPool); \assert($statement instanceof StatementPool);
if ($statement->getLastUsedAt() + $idleTimeout > $now) { if ($statement->getLastUsedAt() + $idleTimeout > $now) {
return; return;
} }
$statements->remove($hash); $statements->remove($sql);
} }
}); });
@ -143,18 +146,34 @@ final class Pool extends ConnectionPool implements Link
throw new \Error("The pool has been closed"); throw new \Error("The pool has been closed");
} }
return call(function () use ($sql) {
if ($this->statements->containsKey($sql)) { if ($this->statements->containsKey($sql)) {
$statement = $this->statements->get($sql); $statement = $this->statements->get($sql);
\assert($statement instanceof SqlStatement);
if ($statement instanceof Promise) {
$statement = yield $statement; // Wait for prior request to resolve.
}
\assert($statement instanceof StatementPool);
if ($statement->isAlive()) { if ($statement->isAlive()) {
return new Success($statement); return $statement;
} }
} }
return call(function () use ($sql) { $promise = parent::prepare($sql);
$statement = yield parent::prepare($sql); $this->statements->put($sql, $promise); // Insert promise into queue so subsequent requests get promise.
\assert($statement instanceof SqlStatement);
$this->statements->put($sql, $statement); try {
$statement = yield $promise;
\assert($statement instanceof StatementPool);
} catch (\Throwable $exception) {
$this->statements->remove($sql);
throw $exception;
}
$this->statements->put($sql, $statement); // Replace promise in queue with statement object.
return $statement; return $statement;
}); });
} }