mirror of
https://github.com/danog/postgres.git
synced 2024-11-30 04:29:12 +01:00
Remove statement pool caching
This commit is contained in:
parent
0fcf265c41
commit
0b530983c2
@ -22,8 +22,7 @@
|
||||
"php": "^7.0",
|
||||
"amphp/amp": "^2",
|
||||
"amphp/sql": "^1",
|
||||
"amphp/sql-common": "^1",
|
||||
"cash/lrucache": "^1"
|
||||
"amphp/sql-common": "^1"
|
||||
},
|
||||
"require-dev": {
|
||||
"amphp/phpunit-util": "^1",
|
||||
|
104
src/Pool.php
104
src/Pool.php
@ -3,7 +3,6 @@
|
||||
namespace Amp\Postgres;
|
||||
|
||||
use Amp\Coroutine;
|
||||
use Amp\Loop;
|
||||
use Amp\Promise;
|
||||
use Amp\Sql\Common\ConnectionPool;
|
||||
use Amp\Sql\Common\StatementPool as SqlStatementPool;
|
||||
@ -13,7 +12,6 @@ use Amp\Sql\Pool as SqlPool;
|
||||
use Amp\Sql\ResultSet as SqlResultSet;
|
||||
use Amp\Sql\Statement as SqlStatement;
|
||||
use Amp\Sql\Transaction as SqlTransaction;
|
||||
use cash\LRUCache;
|
||||
use function Amp\call;
|
||||
|
||||
final class Pool extends ConnectionPool implements Link
|
||||
@ -27,12 +25,6 @@ final class Pool extends ConnectionPool implements Link
|
||||
/** @var bool */
|
||||
private $resetConnections;
|
||||
|
||||
/** @var string */
|
||||
private $statementWatcher;
|
||||
|
||||
/** @var LRUCache|\IteratorAggregate Least-recently-used cache of StatementPool objects. */
|
||||
private $statements;
|
||||
|
||||
/**
|
||||
* @param ConnectionConfig $config
|
||||
* @param int $maxConnections
|
||||
@ -50,45 +42,6 @@ final class Pool extends ConnectionPool implements Link
|
||||
parent::__construct($config, $maxConnections, $idleTimeout, $connector);
|
||||
|
||||
$this->resetConnections = $resetConnections;
|
||||
|
||||
$this->statements = $statements = new class($maxConnections) extends LRUCache implements \IteratorAggregate {
|
||||
public function getIterator(): \Iterator
|
||||
{
|
||||
yield from $this->data;
|
||||
}
|
||||
};
|
||||
|
||||
$this->statementWatcher = Loop::repeat(1000, static function () use (&$idleTimeout, $statements) {
|
||||
$now = \time();
|
||||
|
||||
foreach ($statements as $sql => $statement) {
|
||||
if ($statement instanceof Promise) {
|
||||
continue;
|
||||
}
|
||||
|
||||
\assert($statement instanceof StatementPool);
|
||||
|
||||
if ($statement->getLastUsedAt() + $idleTimeout > $now) {
|
||||
return;
|
||||
}
|
||||
|
||||
$statements->remove($sql);
|
||||
}
|
||||
});
|
||||
|
||||
Loop::unreference($this->statementWatcher);
|
||||
}
|
||||
|
||||
public function __destruct()
|
||||
{
|
||||
parent::__destruct();
|
||||
Loop::cancel($this->statementWatcher);
|
||||
}
|
||||
|
||||
public function close()
|
||||
{
|
||||
parent::close();
|
||||
$this->statements->clear();
|
||||
}
|
||||
|
||||
/**
|
||||
@ -133,63 +86,6 @@ final class Pool extends ConnectionPool implements Link
|
||||
return $connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*
|
||||
* Caches prepared statements for reuse.
|
||||
*/
|
||||
public function prepare(string $sql): Promise
|
||||
{
|
||||
if (!$this->isAlive()) {
|
||||
throw new \Error("The pool has been closed");
|
||||
}
|
||||
|
||||
return call(function () use ($sql) {
|
||||
$name = Handle::STATEMENT_NAME_PREFIX . \sha1($sql);
|
||||
|
||||
if ($this->statements->containsKey($name)) {
|
||||
$statement = $this->statements->get($name);
|
||||
|
||||
if ($statement instanceof Promise) {
|
||||
$statement = yield $statement; // Wait for prior request to resolve.
|
||||
}
|
||||
|
||||
\assert($statement instanceof StatementPool);
|
||||
|
||||
if ($statement->isAlive()) {
|
||||
return $statement;
|
||||
}
|
||||
}
|
||||
|
||||
$promise = parent::prepare($sql);
|
||||
$this->statements->put($name, $promise); // Insert promise into queue so subsequent requests get promise.
|
||||
|
||||
try {
|
||||
$statement = yield $promise;
|
||||
\assert($statement instanceof StatementPool);
|
||||
} catch (\Throwable $exception) {
|
||||
$this->statements->remove($name);
|
||||
throw $exception;
|
||||
}
|
||||
|
||||
$this->statements->put($name, $statement); // Replace promise in queue with statement object.
|
||||
|
||||
return $statement;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function execute(string $sql, array $params = []): Promise
|
||||
{
|
||||
return call(function () use ($sql, $params) {
|
||||
$statement = yield $this->prepare($sql);
|
||||
\assert($statement instanceof SqlStatement);
|
||||
return yield $statement->execute($params);
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
|
@ -1,27 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Postgres\Test;
|
||||
|
||||
use Amp\Loop;
|
||||
use Amp\Postgres\StatementPool;
|
||||
|
||||
abstract class AbstractPoolTest extends AbstractLinkTest
|
||||
{
|
||||
public function testPrepareSameQueryReturnsSameStatementPool()
|
||||
{
|
||||
Loop::run(function () {
|
||||
$sql = "SELECT * FROM test WHERE domain=\$1";
|
||||
|
||||
/** @var StatementPool $statement1 */
|
||||
$statement1 = yield $this->connection->prepare($sql);
|
||||
|
||||
/** @var StatementPool $statement2 */
|
||||
$statement2 = yield $this->connection->prepare($sql);
|
||||
|
||||
$this->assertInstanceOf(StatementPool::class, $statement1);
|
||||
$this->assertInstanceOf(StatementPool::class, $statement2);
|
||||
|
||||
$this->assertSame($statement1, $statement2);
|
||||
});
|
||||
}
|
||||
}
|
@ -14,7 +14,7 @@ use Amp\Success;
|
||||
/**
|
||||
* @requires extension pgsql
|
||||
*/
|
||||
class PgSqlPoolTest extends AbstractPoolTest
|
||||
class PgSqlPoolTest extends AbstractLinkTest
|
||||
{
|
||||
const POOL_SIZE = 3;
|
||||
|
||||
|
@ -13,7 +13,7 @@ use Amp\Success;
|
||||
/**
|
||||
* @requires extension pq
|
||||
*/
|
||||
class PqPoolTest extends AbstractPoolTest
|
||||
class PqPoolTest extends AbstractLinkTest
|
||||
{
|
||||
const POOL_SIZE = 3;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user