1
0
mirror of https://github.com/danog/postgres.git synced 2025-01-22 13:21:14 +01:00
postgres/lib/AbstractPool.php

289 lines
8.1 KiB
PHP
Raw Normal View History

2016-09-14 09:27:39 -05:00
<?php declare(strict_types = 1);
namespace Amp\Postgres;
use Amp\{ Coroutine, Deferred };
2016-11-15 11:06:21 -06:00
use Interop\Async\Promise;
2016-09-14 09:27:39 -05:00
abstract class AbstractPool implements Pool {
/** @var \SplQueue */
private $idle;
/** @var \SplQueue */
private $busy;
/** @var \SplObjectStorage */
private $connections;
2016-11-15 11:06:21 -06:00
/** @var \Interop\Async\Promise|null */
private $promise;
2016-09-14 23:37:09 -05:00
/** @var \Amp\Deferred|null */
private $deferred;
2016-10-05 09:52:16 -05:00
2016-11-15 11:06:21 -06:00
/** @var \Amp\Postgres\Connection|\Interop\Async\Promise|null Connection used for notification listening. */
2016-10-05 09:52:16 -05:00
private $listeningConnection;
/** @var int Number of listeners on listening connection. */
private $listenerCount = 0;
2016-09-14 09:27:39 -05:00
/**
2016-11-15 11:06:21 -06:00
* @return \Interop\Async\Promise<\Amp\Postgres\Connection>
2016-09-14 09:27:39 -05:00
*
* @throws \Amp\Postgres\FailureException
*/
2016-11-15 11:06:21 -06:00
abstract protected function createConnection(): Promise;
2016-09-14 09:27:39 -05:00
public function __construct() {
$this->connections = new \SplObjectStorage();
$this->idle = new \SplQueue();
$this->busy = new \SplQueue();
}
/**
* {@inheritdoc}
*/
public function getConnectionCount(): int {
return $this->connections->count();
}
/**
* {@inheritdoc}
*/
public function getIdleConnectionCount(): int {
return $this->idle->count();
}
/**
* @param \Amp\Postgres\Connection $connection
*/
protected function addConnection(Connection $connection) {
if (isset($this->connections[$connection])) {
return;
}
$this->connections->attach($connection);
$this->idle->push($connection);
}
/**
* @coroutine
*
* @return \Generator
*
* @resolve \Amp\Postgres\Connection
*/
private function pop(): \Generator {
2016-11-15 11:06:21 -06:00
while ($this->promise !== null) {
2016-09-14 09:27:39 -05:00
try {
2016-11-15 11:06:21 -06:00
yield $this->promise; // Prevent simultaneous connection creation.
2016-09-14 09:27:39 -05:00
} catch (\Throwable $exception) {
// Ignore failure or cancellation of other operations.
}
}
if ($this->idle->isEmpty()) {
try {
if ($this->connections->count() >= $this->getMaxConnections()) {
// All possible connections busy, so wait until one becomes available.
2016-09-14 23:37:09 -05:00
$this->deferred = new Deferred;
2016-11-15 11:06:21 -06:00
yield $this->promise = $this->deferred->promise();
2016-09-14 09:27:39 -05:00
} else {
// Max connection count has not been reached, so open another connection.
2016-11-15 11:06:21 -06:00
$this->promise = $this->createConnection();
$this->addConnection(yield $this->promise);
2016-09-14 09:27:39 -05:00
}
} finally {
2016-09-14 23:37:09 -05:00
$this->deferred = null;
2016-11-15 11:06:21 -06:00
$this->promise = null;
2016-09-14 09:27:39 -05:00
}
}
// Shift a connection off the idle queue.
return $this->idle->shift();
}
/**
* @param \Amp\Postgres\Connection $connection
*
* @throws \Error If the connection is not part of this pool.
*/
private function push(Connection $connection) {
if (!isset($this->connections[$connection])) {
throw new \Error('Connection is not part of this pool');
}
$this->idle->push($connection);
2016-09-14 23:37:09 -05:00
if ($this->deferred instanceof Deferred) {
$this->deferred->resolve($connection);
2016-09-14 09:27:39 -05:00
}
}
/**
* {@inheritdoc}
*/
2016-11-15 11:06:21 -06:00
public function query(string $sql): Promise {
2016-09-14 09:27:39 -05:00
return new Coroutine($this->doQuery($sql));
}
private function doQuery(string $sql): \Generator {
/** @var \Amp\Postgres\Connection $connection */
$connection = yield from $this->pop();
try {
$result = yield $connection->query($sql);
} catch (\Throwable $exception) {
$this->push($connection);
throw $exception;
}
if ($result instanceof Operation) {
$result->onComplete(function () use ($connection) {
$this->push($connection);
});
} else {
$this->push($connection);
}
return $result;
}
/**
* {@inheritdoc}
*/
2016-11-15 11:06:21 -06:00
public function execute(string $sql, ...$params): Promise {
2016-09-14 09:27:39 -05:00
return new Coroutine($this->doExecute($sql, $params));
}
private function doExecute(string $sql, array $params): \Generator {
/** @var \Amp\Postgres\Connection $connection */
$connection = yield from $this->pop();
try {
$result = yield $connection->execute($sql, ...$params);
} catch (\Throwable $exception) {
$this->push($connection);
throw $exception;
}
if ($result instanceof Operation) {
$result->onComplete(function () use ($connection) {
$this->push($connection);
});
} else {
$this->push($connection);
}
return $result;
}
/**
* {@inheritdoc}
*/
2016-11-15 11:06:21 -06:00
public function prepare(string $sql): Promise {
2016-09-14 09:27:39 -05:00
return new Coroutine($this->doPrepare($sql));
}
private function doPrepare(string $sql): \Generator {
/** @var \Amp\Postgres\Connection $connection */
$connection = yield from $this->pop();
try {
/** @var \Amp\Postgres\Statement $statement */
$statement = yield $connection->prepare($sql);
} finally {
$this->push($connection);
}
return $statement;
}
2016-09-19 11:12:32 -05:00
/**
* {@inheritdoc}
*/
2016-11-15 11:06:21 -06:00
public function notify(string $channel, string $payload = ""): Promise {
return new Coroutine($this->doNotify($channel, $payload));
}
private function doNotify(string $channel, string $payload): \Generator {
/** @var \Amp\Postgres\Connection $connection */
$connection = yield from $this->pop();
try {
$result = yield $connection->notify($channel, $payload);
} finally {
$this->push($connection);
}
return $result;
}
2016-09-19 11:12:32 -05:00
/**
* {@inheritdoc}
*/
2016-11-15 11:06:21 -06:00
public function listen(string $channel): Promise {
2016-09-19 11:12:32 -05:00
return new Coroutine($this->doListen($channel));
}
public function doListen(string $channel): \Generator {
2016-10-05 09:52:16 -05:00
++$this->listenerCount;
if ($this->listeningConnection === null) {
$this->listeningConnection = new Coroutine($this->pop());
}
2016-11-15 11:06:21 -06:00
if ($this->listeningConnection instanceof Promise) {
2016-10-05 09:52:16 -05:00
$this->listeningConnection = yield $this->listeningConnection;
}
2016-09-19 11:12:32 -05:00
try {
/** @var \Amp\Postgres\Listener $listener */
2016-10-05 09:52:16 -05:00
$listener = yield $this->listeningConnection->listen($channel);
} catch (\Throwable $exception) {
2016-10-05 09:52:16 -05:00
if (--$this->listenerCount === 0) {
$connection = $this->listeningConnection;
$this->listeningConnection = null;
$this->push($connection);
}
throw $exception;
2016-09-19 11:12:32 -05:00
}
2016-10-05 09:52:16 -05:00
$listener->onComplete(function () {
if (--$this->listenerCount === 0) {
$connection = $this->listeningConnection;
$this->listeningConnection = null;
$this->push($connection);
}
});
2016-09-19 11:12:32 -05:00
return $listener;
}
2016-09-14 09:27:39 -05:00
/**
* {@inheritdoc}
*/
2016-11-15 11:06:21 -06:00
public function transaction(int $isolation = Transaction::COMMITTED): Promise {
2016-09-14 09:27:39 -05:00
return new Coroutine($this->doTransaction($isolation));
}
private function doTransaction(int $isolation = Transaction::COMMITTED): \Generator {
/** @var \Amp\Postgres\Connection $connection */
$connection = yield from $this->pop();
try {
/** @var \Amp\Postgres\Transaction $transaction */
$transaction = yield $connection->transaction($isolation);
} catch (\Throwable $exception) {
$this->push($connection);
throw $exception;
}
$transaction->onComplete(function () use ($connection) {
$this->push($connection);
});
return $transaction;
}
}