2016-12-30 06:21:17 +01:00
|
|
|
<?php
|
2016-09-14 16:27:39 +02:00
|
|
|
|
|
|
|
namespace Amp\Postgres;
|
|
|
|
|
2017-08-01 07:38:12 +02:00
|
|
|
use Amp\CallableMaker;
|
2017-06-21 05:17:53 +02:00
|
|
|
use Amp\Coroutine;
|
|
|
|
use Amp\Deferred;
|
|
|
|
use Amp\Promise;
|
2017-12-04 04:50:28 +01:00
|
|
|
use function Amp\call;
|
2016-09-14 16:27:39 +02:00
|
|
|
|
|
|
|
abstract class AbstractPool implements Pool {
|
2017-08-01 07:38:12 +02:00
|
|
|
use CallableMaker;
|
|
|
|
|
2016-09-14 16:27:39 +02:00
|
|
|
/** @var \SplQueue */
|
|
|
|
private $idle;
|
|
|
|
|
|
|
|
/** @var \SplObjectStorage */
|
|
|
|
private $connections;
|
|
|
|
|
2017-03-17 16:17:24 +01:00
|
|
|
/** @var \Amp\Promise|null */
|
2016-11-15 18:06:21 +01:00
|
|
|
private $promise;
|
2017-05-16 06:28:37 +02:00
|
|
|
|
2016-09-15 06:37:09 +02:00
|
|
|
/** @var \Amp\Deferred|null */
|
|
|
|
private $deferred;
|
2017-05-16 06:28:37 +02:00
|
|
|
|
2017-03-17 16:17:24 +01:00
|
|
|
/** @var \Amp\Postgres\Connection|\Amp\Promise|null Connection used for notification listening. */
|
2016-10-05 16:52:16 +02:00
|
|
|
private $listeningConnection;
|
2017-05-16 06:28:37 +02:00
|
|
|
|
2016-10-05 16:52:16 +02:00
|
|
|
/** @var int Number of listeners on listening connection. */
|
|
|
|
private $listenerCount = 0;
|
2016-09-14 16:27:39 +02:00
|
|
|
|
2017-08-01 07:38:12 +02:00
|
|
|
/** @var callable */
|
|
|
|
private $push;
|
|
|
|
|
2017-12-10 16:53:52 +01:00
|
|
|
/** @var int */
|
|
|
|
private $pending = 0;
|
|
|
|
|
2016-09-14 16:27:39 +02:00
|
|
|
/**
|
2017-03-17 16:17:24 +01:00
|
|
|
* @return \Amp\Promise<\Amp\Postgres\Connection>
|
2016-09-14 16:27:39 +02:00
|
|
|
*
|
|
|
|
* @throws \Amp\Postgres\FailureException
|
|
|
|
*/
|
2016-11-15 18:06:21 +01:00
|
|
|
abstract protected function createConnection(): Promise;
|
2016-09-14 16:27:39 +02:00
|
|
|
|
|
|
|
public function __construct() {
|
2017-08-01 07:38:12 +02:00
|
|
|
$this->connections = new \SplObjectStorage;
|
|
|
|
$this->idle = new \SplQueue;
|
|
|
|
$this->push = $this->callableFromInstanceMethod("push");
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
2017-11-23 03:34:08 +01:00
|
|
|
* {@inheritdoc}
|
2017-08-01 07:38:12 +02:00
|
|
|
*/
|
2017-11-23 03:34:08 +01:00
|
|
|
public function extractConnection(): Promise {
|
2017-12-04 04:50:28 +01:00
|
|
|
return call(function () {
|
2017-11-23 03:34:08 +01:00
|
|
|
$connection = yield from $this->pop();
|
|
|
|
$this->connections->detach($connection);
|
|
|
|
return $connection;
|
|
|
|
});
|
2016-09-14 16:27:39 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* {@inheritdoc}
|
|
|
|
*/
|
|
|
|
public function getConnectionCount(): int {
|
|
|
|
return $this->connections->count();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* {@inheritdoc}
|
|
|
|
*/
|
|
|
|
public function getIdleConnectionCount(): int {
|
|
|
|
return $this->idle->count();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @param \Amp\Postgres\Connection $connection
|
2017-12-10 16:53:52 +01:00
|
|
|
*
|
|
|
|
* @throws \Error if the connection is already part of this pool or if the connection is dead.
|
2016-09-14 16:27:39 +02:00
|
|
|
*/
|
|
|
|
protected function addConnection(Connection $connection) {
|
|
|
|
if (isset($this->connections[$connection])) {
|
2017-12-10 16:53:52 +01:00
|
|
|
throw new \Error("Connection is already a part of this pool");
|
|
|
|
}
|
|
|
|
|
|
|
|
if (!$connection->isAlive()) {
|
|
|
|
throw new \Error("The connection is dead");
|
2016-09-14 16:27:39 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
$this->connections->attach($connection);
|
|
|
|
$this->idle->push($connection);
|
2017-11-05 22:38:17 +01:00
|
|
|
|
|
|
|
if ($this->deferred instanceof Deferred) {
|
|
|
|
$this->deferred->resolve($connection);
|
|
|
|
}
|
2016-09-14 16:27:39 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @return \Generator
|
|
|
|
*
|
|
|
|
* @resolve \Amp\Postgres\Connection
|
|
|
|
*/
|
|
|
|
private function pop(): \Generator {
|
2017-12-10 16:53:52 +01:00
|
|
|
while ($this->promise !== null && $this->connections->count() + $this->pending >= $this->getMaxConnections()) {
|
|
|
|
yield $this->promise; // Prevent simultaneous connection creation when connection count is at maximum - 1.
|
2016-09-14 16:27:39 +02:00
|
|
|
}
|
|
|
|
|
2017-11-05 22:38:17 +01:00
|
|
|
while ($this->idle->isEmpty()) { // While loop to ensure an idle connection is available after promises below are resolved.
|
2016-09-14 16:27:39 +02:00
|
|
|
try {
|
2017-12-10 16:53:52 +01:00
|
|
|
if ($this->connections->count() + $this->pending >= $this->getMaxConnections()) {
|
2016-09-14 16:27:39 +02:00
|
|
|
// All possible connections busy, so wait until one becomes available.
|
2016-09-15 06:37:09 +02:00
|
|
|
$this->deferred = new Deferred;
|
2017-11-05 22:38:17 +01:00
|
|
|
yield $this->promise = $this->deferred->promise(); // May be resolved with defunct connection.
|
2016-09-14 16:27:39 +02:00
|
|
|
} else {
|
|
|
|
// Max connection count has not been reached, so open another connection.
|
2016-11-15 18:06:21 +01:00
|
|
|
$this->promise = $this->createConnection();
|
|
|
|
$this->addConnection(yield $this->promise);
|
2016-09-14 16:27:39 +02:00
|
|
|
}
|
|
|
|
} finally {
|
2016-09-15 06:37:09 +02:00
|
|
|
$this->deferred = null;
|
2016-11-15 18:06:21 +01:00
|
|
|
$this->promise = null;
|
2016-09-14 16:27:39 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Shift a connection off the idle queue.
|
|
|
|
return $this->idle->shift();
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* @param \Amp\Postgres\Connection $connection
|
|
|
|
*
|
|
|
|
* @throws \Error If the connection is not part of this pool.
|
|
|
|
*/
|
|
|
|
private function push(Connection $connection) {
|
2017-11-05 22:38:17 +01:00
|
|
|
\assert(isset($this->connections[$connection]), 'Connection is not part of this pool');
|
2016-09-14 16:27:39 +02:00
|
|
|
|
2017-11-05 22:38:17 +01:00
|
|
|
if ($connection->isAlive()) {
|
|
|
|
$this->idle->push($connection);
|
|
|
|
} else {
|
|
|
|
$this->connections->detach($connection);
|
|
|
|
}
|
2016-09-14 16:27:39 +02:00
|
|
|
|
2016-09-15 06:37:09 +02:00
|
|
|
if ($this->deferred instanceof Deferred) {
|
|
|
|
$this->deferred->resolve($connection);
|
2016-09-14 16:27:39 +02:00
|
|
|
}
|
|
|
|
}
|
2017-05-16 06:28:37 +02:00
|
|
|
|
2016-09-14 16:27:39 +02:00
|
|
|
/**
|
|
|
|
* {@inheritdoc}
|
|
|
|
*/
|
2016-11-15 18:06:21 +01:00
|
|
|
public function query(string $sql): Promise {
|
2017-12-04 04:50:28 +01:00
|
|
|
return call(function () use ($sql) {
|
|
|
|
/** @var \Amp\Postgres\Connection $connection */
|
|
|
|
$connection = yield from $this->pop();
|
2016-09-14 16:27:39 +02:00
|
|
|
|
2017-12-04 04:50:28 +01:00
|
|
|
try {
|
|
|
|
$result = yield $connection->query($sql);
|
|
|
|
} catch (\Throwable $exception) {
|
|
|
|
$this->push($connection);
|
|
|
|
throw $exception;
|
|
|
|
}
|
2017-05-16 06:28:37 +02:00
|
|
|
|
2017-12-04 04:50:28 +01:00
|
|
|
if ($result instanceof Operation) {
|
|
|
|
$result->onDestruct(function () use ($connection) {
|
|
|
|
$this->push($connection);
|
|
|
|
});
|
|
|
|
} else {
|
2016-09-14 16:27:39 +02:00
|
|
|
$this->push($connection);
|
2017-12-04 04:50:28 +01:00
|
|
|
}
|
2016-09-14 16:27:39 +02:00
|
|
|
|
2017-12-04 04:50:28 +01:00
|
|
|
return $result;
|
|
|
|
});
|
2016-09-14 16:27:39 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* {@inheritdoc}
|
|
|
|
*/
|
2017-11-18 04:33:49 +01:00
|
|
|
public function execute(string $sql, array $params = []): Promise {
|
2017-12-04 04:50:28 +01:00
|
|
|
return call(function () use ($sql, $params) {
|
|
|
|
/** @var \Amp\Postgres\Connection $connection */
|
|
|
|
$connection = yield from $this->pop();
|
2017-05-16 06:28:37 +02:00
|
|
|
|
2017-12-04 04:50:28 +01:00
|
|
|
try {
|
|
|
|
$result = yield $connection->execute($sql, $params);
|
|
|
|
} catch (\Throwable $exception) {
|
|
|
|
$this->push($connection);
|
|
|
|
throw $exception;
|
|
|
|
}
|
2017-05-16 06:28:37 +02:00
|
|
|
|
2017-12-04 04:50:28 +01:00
|
|
|
if ($result instanceof Operation) {
|
|
|
|
$result->onDestruct(function () use ($connection) {
|
|
|
|
$this->push($connection);
|
|
|
|
});
|
|
|
|
} else {
|
2016-09-14 16:27:39 +02:00
|
|
|
$this->push($connection);
|
2017-12-04 04:50:28 +01:00
|
|
|
}
|
2016-09-14 16:27:39 +02:00
|
|
|
|
2017-12-04 04:50:28 +01:00
|
|
|
return $result;
|
|
|
|
});
|
2016-09-14 16:27:39 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* {@inheritdoc}
|
|
|
|
*/
|
2016-11-15 18:06:21 +01:00
|
|
|
public function prepare(string $sql): Promise {
|
2017-12-04 04:50:28 +01:00
|
|
|
return call(function () use ($sql) {
|
|
|
|
/** @var \Amp\Postgres\Connection $connection */
|
|
|
|
$connection = yield from $this->pop();
|
2017-05-16 06:28:37 +02:00
|
|
|
|
2017-12-04 04:50:28 +01:00
|
|
|
try {
|
|
|
|
/** @var \Amp\Postgres\Statement $statement */
|
|
|
|
$statement = yield $connection->prepare($sql);
|
|
|
|
} catch (\Throwable $exception) {
|
|
|
|
$this->push($connection);
|
|
|
|
throw $exception;
|
|
|
|
}
|
2016-09-14 16:27:39 +02:00
|
|
|
|
2017-12-04 04:50:28 +01:00
|
|
|
$statement->onDestruct(function () use ($connection) {
|
|
|
|
$this->push($connection);
|
|
|
|
});
|
2017-05-16 06:28:37 +02:00
|
|
|
|
2017-12-04 04:50:28 +01:00
|
|
|
return $statement;
|
2017-11-18 06:55:16 +01:00
|
|
|
});
|
2016-09-14 16:27:39 +02:00
|
|
|
}
|
2017-05-16 06:28:37 +02:00
|
|
|
|
2016-09-21 07:18:24 +02:00
|
|
|
/**
|
|
|
|
* {@inheritdoc}
|
|
|
|
*/
|
2016-11-15 18:06:21 +01:00
|
|
|
public function notify(string $channel, string $payload = ""): Promise {
|
2017-12-04 04:50:28 +01:00
|
|
|
return call(function () use ($channel, $payload) {
|
|
|
|
/** @var \Amp\Postgres\Connection $connection */
|
|
|
|
$connection = yield from $this->pop();
|
2017-05-16 06:28:37 +02:00
|
|
|
|
2017-12-04 04:50:28 +01:00
|
|
|
try {
|
|
|
|
$result = yield $connection->notify($channel, $payload);
|
|
|
|
} finally {
|
|
|
|
$this->push($connection);
|
|
|
|
}
|
2017-05-16 06:28:37 +02:00
|
|
|
|
2017-12-04 04:50:28 +01:00
|
|
|
return $result;
|
|
|
|
});
|
2016-09-21 07:18:24 +02:00
|
|
|
}
|
2017-05-16 06:28:37 +02:00
|
|
|
|
2016-09-19 18:12:32 +02:00
|
|
|
/**
|
|
|
|
* {@inheritdoc}
|
|
|
|
*/
|
2016-11-15 18:06:21 +01:00
|
|
|
public function listen(string $channel): Promise {
|
2017-12-04 04:50:28 +01:00
|
|
|
return call(function () use ($channel) {
|
|
|
|
++$this->listenerCount;
|
2017-05-16 06:28:37 +02:00
|
|
|
|
2017-12-04 04:50:28 +01:00
|
|
|
if ($this->listeningConnection === null) {
|
|
|
|
$this->listeningConnection = new Coroutine($this->pop());
|
|
|
|
}
|
2017-05-16 06:28:37 +02:00
|
|
|
|
2017-12-04 04:50:28 +01:00
|
|
|
if ($this->listeningConnection instanceof Promise) {
|
|
|
|
$this->listeningConnection = yield $this->listeningConnection;
|
2016-10-05 16:52:16 +02:00
|
|
|
}
|
2017-05-16 06:28:37 +02:00
|
|
|
|
2017-12-04 04:50:28 +01:00
|
|
|
try {
|
|
|
|
/** @var \Amp\Postgres\Listener $listener */
|
|
|
|
$listener = yield $this->listeningConnection->listen($channel);
|
|
|
|
} catch (\Throwable $exception) {
|
|
|
|
if (--$this->listenerCount === 0) {
|
|
|
|
$connection = $this->listeningConnection;
|
|
|
|
$this->listeningConnection = null;
|
|
|
|
$this->push($connection);
|
|
|
|
}
|
|
|
|
throw $exception;
|
2016-10-05 16:52:16 +02:00
|
|
|
}
|
2017-05-16 06:28:37 +02:00
|
|
|
|
2017-12-04 04:50:28 +01:00
|
|
|
$listener->onDestruct(function () {
|
|
|
|
if (--$this->listenerCount === 0) {
|
|
|
|
$connection = $this->listeningConnection;
|
|
|
|
$this->listeningConnection = null;
|
|
|
|
$this->push($connection);
|
|
|
|
}
|
|
|
|
});
|
|
|
|
|
|
|
|
return $listener;
|
|
|
|
});
|
2016-09-19 18:12:32 +02:00
|
|
|
}
|
2016-09-14 16:27:39 +02:00
|
|
|
|
|
|
|
/**
|
|
|
|
* {@inheritdoc}
|
|
|
|
*/
|
2016-11-15 18:06:21 +01:00
|
|
|
public function transaction(int $isolation = Transaction::COMMITTED): Promise {
|
2017-12-04 04:50:28 +01:00
|
|
|
return call(function () use ($isolation) {
|
|
|
|
/** @var \Amp\Postgres\Connection $connection */
|
|
|
|
$connection = yield from $this->pop();
|
2017-05-16 06:28:37 +02:00
|
|
|
|
2017-12-04 04:50:28 +01:00
|
|
|
try {
|
|
|
|
/** @var \Amp\Postgres\Transaction $transaction */
|
|
|
|
$transaction = yield $connection->transaction($isolation);
|
|
|
|
} catch (\Throwable $exception) {
|
|
|
|
$this->push($connection);
|
|
|
|
throw $exception;
|
|
|
|
}
|
2016-09-14 16:27:39 +02:00
|
|
|
|
2017-12-04 04:50:28 +01:00
|
|
|
$transaction->onDestruct(function () use ($connection) {
|
|
|
|
$this->push($connection);
|
|
|
|
});
|
2017-05-16 06:28:37 +02:00
|
|
|
|
2017-12-04 04:50:28 +01:00
|
|
|
return $transaction;
|
2016-09-14 16:27:39 +02:00
|
|
|
});
|
|
|
|
}
|
|
|
|
}
|