mirror of
https://github.com/danog/postgres.git
synced 2024-12-14 10:28:01 +01:00
218 lines
5.9 KiB
PHP
218 lines
5.9 KiB
PHP
<?php declare(strict_types = 1);
|
|
|
|
namespace Amp\Postgres;
|
|
|
|
use Amp\{ Coroutine, Deferred };
|
|
use Interop\Async\Awaitable;
|
|
|
|
abstract class AbstractPool implements Pool {
|
|
/** @var \SplQueue */
|
|
private $idle;
|
|
|
|
/** @var \SplQueue */
|
|
private $busy;
|
|
|
|
/** @var \SplObjectStorage */
|
|
private $connections;
|
|
|
|
/** @var \Amp\Deferred|\Interop\Async\Awaitable|null */
|
|
private $awaitable;
|
|
|
|
/**
|
|
* @return \Interop\Async\Awaitable<\Amp\Postgres\Connection>
|
|
*
|
|
* @throws \Amp\Postgres\FailureException
|
|
*/
|
|
abstract protected function createConnection(): Awaitable;
|
|
|
|
public function __construct() {
|
|
$this->connections = new \SplObjectStorage();
|
|
$this->idle = new \SplQueue();
|
|
$this->busy = new \SplQueue();
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function getConnectionCount(): int {
|
|
return $this->connections->count();
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function getIdleConnectionCount(): int {
|
|
return $this->idle->count();
|
|
}
|
|
|
|
/**
|
|
* @param \Amp\Postgres\Connection $connection
|
|
*/
|
|
protected function addConnection(Connection $connection) {
|
|
if (isset($this->connections[$connection])) {
|
|
return;
|
|
}
|
|
|
|
$this->connections->attach($connection);
|
|
$this->idle->push($connection);
|
|
}
|
|
|
|
/**
|
|
* @coroutine
|
|
*
|
|
* @return \Generator
|
|
*
|
|
* @resolve \Amp\Postgres\Connection
|
|
*/
|
|
private function pop(): \Generator {
|
|
while (null !== $this->awaitable) {
|
|
try {
|
|
yield $this->awaitable; // Prevent simultaneous connection creation.
|
|
} catch (\Throwable $exception) {
|
|
// Ignore failure or cancellation of other operations.
|
|
}
|
|
}
|
|
|
|
if ($this->idle->isEmpty()) {
|
|
try {
|
|
if ($this->connections->count() >= $this->getMaxConnections()) {
|
|
// All possible connections busy, so wait until one becomes available.
|
|
$this->awaitable = new Deferred;
|
|
yield $this->awaitable->getAwaitable();
|
|
} else {
|
|
// Max connection count has not been reached, so open another connection.
|
|
$this->awaitable = $this->createConnection();
|
|
$this->addConnection(yield $this->awaitable);
|
|
}
|
|
} finally {
|
|
$this->awaitable = null;
|
|
}
|
|
}
|
|
|
|
// Shift a connection off the idle queue.
|
|
return $this->idle->shift();
|
|
}
|
|
|
|
/**
|
|
* @param \Amp\Postgres\Connection $connection
|
|
*
|
|
* @throws \Error If the connection is not part of this pool.
|
|
*/
|
|
private function push(Connection $connection) {
|
|
if (!isset($this->connections[$connection])) {
|
|
throw new \Error('Connection is not part of this pool');
|
|
}
|
|
|
|
$this->idle->push($connection);
|
|
|
|
if ($this->awaitable instanceof Deferred) {
|
|
$this->awaitable->resolve($connection);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function query(string $sql): Awaitable {
|
|
return new Coroutine($this->doQuery($sql));
|
|
}
|
|
|
|
private function doQuery(string $sql): \Generator {
|
|
/** @var \Amp\Postgres\Connection $connection */
|
|
$connection = yield from $this->pop();
|
|
|
|
try {
|
|
$result = yield $connection->query($sql);
|
|
} catch (\Throwable $exception) {
|
|
$this->push($connection);
|
|
throw $exception;
|
|
}
|
|
|
|
if ($result instanceof Operation) {
|
|
$result->onComplete(function () use ($connection) {
|
|
$this->push($connection);
|
|
});
|
|
} else {
|
|
$this->push($connection);
|
|
}
|
|
|
|
return $result;
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function execute(string $sql, ...$params): Awaitable {
|
|
return new Coroutine($this->doExecute($sql, $params));
|
|
}
|
|
|
|
private function doExecute(string $sql, array $params): \Generator {
|
|
/** @var \Amp\Postgres\Connection $connection */
|
|
$connection = yield from $this->pop();
|
|
|
|
try {
|
|
$result = yield $connection->execute($sql, ...$params);
|
|
} catch (\Throwable $exception) {
|
|
$this->push($connection);
|
|
throw $exception;
|
|
}
|
|
|
|
if ($result instanceof Operation) {
|
|
$result->onComplete(function () use ($connection) {
|
|
$this->push($connection);
|
|
});
|
|
} else {
|
|
$this->push($connection);
|
|
}
|
|
|
|
return $result;
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function prepare(string $sql): Awaitable {
|
|
return new Coroutine($this->doPrepare($sql));
|
|
}
|
|
|
|
private function doPrepare(string $sql): \Generator {
|
|
/** @var \Amp\Postgres\Connection $connection */
|
|
$connection = yield from $this->pop();
|
|
|
|
try {
|
|
/** @var \Amp\Postgres\Statement $statement */
|
|
$statement = yield $connection->prepare($sql);
|
|
} finally {
|
|
$this->push($connection);
|
|
}
|
|
|
|
return $statement;
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function transaction(int $isolation = Transaction::COMMITTED): Awaitable {
|
|
return new Coroutine($this->doTransaction($isolation));
|
|
}
|
|
|
|
private function doTransaction(int $isolation = Transaction::COMMITTED): \Generator {
|
|
/** @var \Amp\Postgres\Connection $connection */
|
|
$connection = yield from $this->pop();
|
|
|
|
try {
|
|
/** @var \Amp\Postgres\Transaction $transaction */
|
|
$transaction = yield $connection->transaction($isolation);
|
|
} catch (\Throwable $exception) {
|
|
$this->push($connection);
|
|
throw $exception;
|
|
}
|
|
|
|
$transaction->onComplete(function () use ($connection) {
|
|
$this->push($connection);
|
|
});
|
|
|
|
return $transaction;
|
|
}
|
|
}
|