mirror of
https://github.com/danog/postgres.git
synced 2024-11-30 04:29:12 +01:00
Drop Connection and Pool interfaces
Connection is now an abstract class and Pool is a concrete class. Removed AggregatePool as well.
This commit is contained in:
parent
4b1c56a2b1
commit
a77e0ec22c
@ -1,176 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Postgres;
|
||||
|
||||
use Amp\CallableMaker;
|
||||
use Amp\CancellationToken;
|
||||
use Amp\Deferred;
|
||||
use Amp\Promise;
|
||||
use function Amp\call;
|
||||
|
||||
abstract class AbstractConnection implements Connection {
|
||||
use CallableMaker;
|
||||
|
||||
/** @var \Amp\Postgres\Handle */
|
||||
private $handle;
|
||||
|
||||
/** @var \Amp\Deferred|null Used to only allow one transaction at a time. */
|
||||
private $busy;
|
||||
|
||||
/** @var callable */
|
||||
private $release;
|
||||
|
||||
/**
|
||||
* @param string $connectionString
|
||||
* @param \Amp\CancellationToken $token
|
||||
*
|
||||
* @return \Amp\Promise<\Amp\Postgres\Connection>
|
||||
*/
|
||||
abstract public static function connect(string $connectionString, CancellationToken $token = null): Promise;
|
||||
|
||||
/**
|
||||
* @param \Amp\Postgres\Handle $handle
|
||||
*/
|
||||
public function __construct(Handle $handle) {
|
||||
$this->handle = $handle;
|
||||
$this->release = $this->callableFromInstanceMethod("release");
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function isAlive(): bool {
|
||||
return $this->handle->isAlive();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function lastUsedAt(): int {
|
||||
return $this->handle->lastUsedAt();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function close() {
|
||||
$this->handle->close();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $methodName Method to execute.
|
||||
* @param mixed ...$args Arguments to pass to function.
|
||||
*
|
||||
* @return \Amp\Promise
|
||||
*
|
||||
* @throws \Amp\Postgres\FailureException
|
||||
*/
|
||||
private function send(string $methodName, ...$args): Promise {
|
||||
if ($this->busy) {
|
||||
return call(function () use ($methodName, $args) {
|
||||
while ($this->busy) {
|
||||
yield $this->busy->promise();
|
||||
}
|
||||
|
||||
return yield ([$this->handle, $methodName])(...$args);
|
||||
});
|
||||
}
|
||||
|
||||
return ([$this->handle, $methodName])(...$args);
|
||||
}
|
||||
|
||||
/**
|
||||
* Releases the transaction lock.
|
||||
*/
|
||||
private function release() {
|
||||
\assert($this->busy !== null);
|
||||
|
||||
$deferred = $this->busy;
|
||||
$this->busy = null;
|
||||
$deferred->resolve();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function query(string $sql): Promise {
|
||||
return $this->send("query", $sql);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function execute(string $sql, array $params = []): Promise {
|
||||
return $this->send("execute", $sql, $params);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function prepare(string $sql): Promise {
|
||||
return $this->send("prepare", $sql);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function notify(string $channel, string $payload = ""): Promise {
|
||||
return $this->send("notify", $channel, $payload);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function listen(string $channel): Promise {
|
||||
return $this->send("listen", $channel);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function transaction(int $isolation = Transaction::COMMITTED): Promise {
|
||||
return call(function () use ($isolation) {
|
||||
switch ($isolation) {
|
||||
case Transaction::UNCOMMITTED:
|
||||
yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL READ UNCOMMITTED");
|
||||
break;
|
||||
|
||||
case Transaction::COMMITTED:
|
||||
yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED");
|
||||
break;
|
||||
|
||||
case Transaction::REPEATABLE:
|
||||
yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ");
|
||||
break;
|
||||
|
||||
case Transaction::SERIALIZABLE:
|
||||
yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE");
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new \Error("Invalid transaction type");
|
||||
}
|
||||
|
||||
$this->busy = new Deferred;
|
||||
|
||||
$transaction = new Transaction($this->handle, $isolation);
|
||||
$transaction->onDestruct($this->release);
|
||||
return $transaction;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function quoteString(string $data): string {
|
||||
return $this->handle->quoteString($data);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function quoteName(string $name): string {
|
||||
return $this->handle->quoteName($name);
|
||||
}
|
||||
}
|
@ -1,410 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Postgres;
|
||||
|
||||
use Amp\CallableMaker;
|
||||
use Amp\Coroutine;
|
||||
use Amp\Deferred;
|
||||
use Amp\Loop;
|
||||
use Amp\Promise;
|
||||
use function Amp\call;
|
||||
|
||||
abstract class AbstractPool implements Pool {
|
||||
use CallableMaker;
|
||||
|
||||
const DEFAULT_IDLE_TIMEOUT = 60;
|
||||
|
||||
/** @var \SplQueue */
|
||||
private $idle;
|
||||
|
||||
/** @var \SplObjectStorage */
|
||||
private $connections;
|
||||
|
||||
/** @var \Amp\Promise|null */
|
||||
private $promise;
|
||||
|
||||
/** @var \Amp\Deferred|null */
|
||||
private $deferred;
|
||||
|
||||
/** @var \Amp\Postgres\Connection|\Amp\Promise|null Connection used for notification listening. */
|
||||
private $listeningConnection;
|
||||
|
||||
/** @var int Number of listeners on listening connection. */
|
||||
private $listenerCount = 0;
|
||||
|
||||
/** @var callable */
|
||||
private $push;
|
||||
|
||||
/** @var int */
|
||||
private $pending = 0;
|
||||
|
||||
/** @var bool */
|
||||
private $resetConnections = true;
|
||||
|
||||
/** @var int */
|
||||
private $idleTimeout = self::DEFAULT_IDLE_TIMEOUT;
|
||||
|
||||
/** @var string */
|
||||
private $timeoutWatcher;
|
||||
|
||||
/** @var bool */
|
||||
private $closed = false;
|
||||
|
||||
/**
|
||||
* @return \Amp\Promise<\Amp\Postgres\Connection>
|
||||
*
|
||||
* @throws \Amp\Postgres\FailureException
|
||||
*/
|
||||
abstract protected function createConnection(): Promise;
|
||||
|
||||
public function __construct() {
|
||||
$this->connections = $connections = new \SplObjectStorage;
|
||||
$this->idle = $idle = new \SplQueue;
|
||||
$this->push = $this->callableFromInstanceMethod("push");
|
||||
|
||||
$idleTimeout = &$this->idleTimeout;
|
||||
|
||||
$this->timeoutWatcher = Loop::repeat(1000, static function () use (&$idleTimeout, $connections, $idle) {
|
||||
$now = \time();
|
||||
while (!$idle->isEmpty()) {
|
||||
/** @var \Amp\Postgres\Connection $connection */
|
||||
$connection = $idle->bottom();
|
||||
|
||||
if ($connection->lastUsedAt() + $idleTimeout > $now) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Remove connection from pool.
|
||||
$idle->shift();
|
||||
$connections->detach($connection);
|
||||
$connection->close();
|
||||
}
|
||||
});
|
||||
|
||||
Loop::unreference($this->timeoutWatcher);
|
||||
}
|
||||
|
||||
public function __destruct() {
|
||||
Loop::cancel($this->timeoutWatcher);
|
||||
}
|
||||
|
||||
public function resetConnections(bool $reset = true) {
|
||||
$this->resetConnections = $reset;
|
||||
}
|
||||
|
||||
public function setIdleTimeout(int $timeout) {
|
||||
if ($timeout < 0) {
|
||||
throw new \Error("Timeout must be greater than or equal to 0");
|
||||
}
|
||||
|
||||
$this->idleTimeout = $timeout;
|
||||
|
||||
if ($this->idleTimeout > 0) {
|
||||
Loop::enable($this->timeoutWatcher);
|
||||
} else {
|
||||
Loop::disable($this->timeoutWatcher);
|
||||
}
|
||||
}
|
||||
|
||||
public function close() {
|
||||
$this->closed = true;
|
||||
foreach ($this->connections as $connection) {
|
||||
$connection->close();
|
||||
}
|
||||
$this->idle = new \SplQueue;
|
||||
$this->connections = new \SplObjectStorage;
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function extractConnection(): Promise {
|
||||
return call(function () {
|
||||
$connection = yield from $this->pop();
|
||||
$this->connections->detach($connection);
|
||||
return $connection;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function getConnectionCount(): int {
|
||||
return $this->connections->count();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function getIdleConnectionCount(): int {
|
||||
return $this->idle->count();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param \Amp\Postgres\Connection $connection
|
||||
*
|
||||
* @throws \Error if the connection is already part of this pool or if the connection is dead.
|
||||
*/
|
||||
protected function addConnection(Connection $connection) {
|
||||
if (isset($this->connections[$connection])) {
|
||||
throw new \Error("Connection is already a part of this pool");
|
||||
}
|
||||
|
||||
if (!$connection->isAlive()) {
|
||||
throw new \Error("The connection is dead");
|
||||
}
|
||||
|
||||
$this->connections->attach($connection);
|
||||
$this->idle->push($connection);
|
||||
|
||||
if ($this->deferred instanceof Deferred) {
|
||||
$this->deferred->resolve($connection);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return \Generator
|
||||
*
|
||||
* @resolve \Amp\Postgres\Connection
|
||||
*/
|
||||
private function pop(): \Generator {
|
||||
while ($this->promise !== null && $this->connections->count() + $this->pending >= $this->getMaxConnections()) {
|
||||
yield $this->promise; // Prevent simultaneous connection creation when connection count is at maximum - 1.
|
||||
}
|
||||
|
||||
while ($this->idle->isEmpty()) { // While loop to ensure an idle connection is available after promises below are resolved.
|
||||
if ($this->connections->count() + $this->pending >= $this->getMaxConnections()) {
|
||||
// All possible connections busy, so wait until one becomes available.
|
||||
try {
|
||||
$this->deferred = new Deferred;
|
||||
yield $this->promise = $this->deferred->promise(); // May be resolved with defunct connection.
|
||||
} finally {
|
||||
$this->deferred = null;
|
||||
$this->promise = null;
|
||||
}
|
||||
} else {
|
||||
// Max connection count has not been reached, so open another connection.
|
||||
++$this->pending;
|
||||
try {
|
||||
$connection = yield $this->createConnection();
|
||||
} finally {
|
||||
--$this->pending;
|
||||
}
|
||||
|
||||
$this->connections->attach($connection);
|
||||
return $connection;
|
||||
}
|
||||
}
|
||||
|
||||
// Shift a connection off the idle queue.
|
||||
$connection = $this->idle->shift();
|
||||
|
||||
if ($this->resetConnections) {
|
||||
yield $connection->query("RESET ALL");
|
||||
}
|
||||
|
||||
return $connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param \Amp\Postgres\Connection $connection
|
||||
*
|
||||
* @throws \Error If the connection is not part of this pool.
|
||||
*/
|
||||
private function push(Connection $connection) {
|
||||
\assert(isset($this->connections[$connection]), 'Connection is not part of this pool');
|
||||
|
||||
if ($connection->isAlive()) {
|
||||
$this->idle->push($connection);
|
||||
} else {
|
||||
$this->connections->detach($connection);
|
||||
}
|
||||
|
||||
if ($this->deferred instanceof Deferred) {
|
||||
$this->deferred->resolve($connection);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function query(string $sql): Promise {
|
||||
if ($this->closed) {
|
||||
throw new \Error("The pool has been closed");
|
||||
}
|
||||
|
||||
return call(function () use ($sql) {
|
||||
/** @var \Amp\Postgres\Connection $connection */
|
||||
$connection = yield from $this->pop();
|
||||
|
||||
try {
|
||||
$result = yield $connection->query($sql);
|
||||
} catch (\Throwable $exception) {
|
||||
$this->push($connection);
|
||||
throw $exception;
|
||||
}
|
||||
|
||||
if ($result instanceof Operation) {
|
||||
$result->onDestruct(function () use ($connection) {
|
||||
$this->push($connection);
|
||||
});
|
||||
} else {
|
||||
$this->push($connection);
|
||||
}
|
||||
|
||||
return $result;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function execute(string $sql, array $params = []): Promise {
|
||||
if ($this->closed) {
|
||||
throw new \Error("The pool has been closed");
|
||||
}
|
||||
|
||||
return call(function () use ($sql, $params) {
|
||||
/** @var \Amp\Postgres\Connection $connection */
|
||||
$connection = yield from $this->pop();
|
||||
|
||||
try {
|
||||
$result = yield $connection->execute($sql, $params);
|
||||
} catch (\Throwable $exception) {
|
||||
$this->push($connection);
|
||||
throw $exception;
|
||||
}
|
||||
|
||||
if ($result instanceof Operation) {
|
||||
$result->onDestruct(function () use ($connection) {
|
||||
$this->push($connection);
|
||||
});
|
||||
} else {
|
||||
$this->push($connection);
|
||||
}
|
||||
|
||||
return $result;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function prepare(string $sql): Promise {
|
||||
if ($this->closed) {
|
||||
throw new \Error("The pool has been closed");
|
||||
}
|
||||
|
||||
return call(function () use ($sql) {
|
||||
/** @var \Amp\Postgres\Connection $connection */
|
||||
$connection = yield from $this->pop();
|
||||
|
||||
try {
|
||||
/** @var \Amp\Postgres\Statement $statement */
|
||||
$statement = yield $connection->prepare($sql);
|
||||
} catch (\Throwable $exception) {
|
||||
$this->push($connection);
|
||||
throw $exception;
|
||||
}
|
||||
|
||||
$statement->onDestruct(function () use ($connection) {
|
||||
$this->push($connection);
|
||||
});
|
||||
|
||||
return $statement;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function notify(string $channel, string $payload = ""): Promise {
|
||||
if ($this->closed) {
|
||||
throw new \Error("The pool has been closed");
|
||||
}
|
||||
|
||||
return call(function () use ($channel, $payload) {
|
||||
/** @var \Amp\Postgres\Connection $connection */
|
||||
$connection = yield from $this->pop();
|
||||
|
||||
try {
|
||||
$result = yield $connection->notify($channel, $payload);
|
||||
} finally {
|
||||
$this->push($connection);
|
||||
}
|
||||
|
||||
return $result;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function listen(string $channel): Promise {
|
||||
if ($this->closed) {
|
||||
throw new \Error("The pool has been closed");
|
||||
}
|
||||
|
||||
return call(function () use ($channel) {
|
||||
++$this->listenerCount;
|
||||
|
||||
if ($this->listeningConnection === null) {
|
||||
$this->listeningConnection = new Coroutine($this->pop());
|
||||
}
|
||||
|
||||
if ($this->listeningConnection instanceof Promise) {
|
||||
$this->listeningConnection = yield $this->listeningConnection;
|
||||
}
|
||||
|
||||
try {
|
||||
/** @var \Amp\Postgres\Listener $listener */
|
||||
$listener = yield $this->listeningConnection->listen($channel);
|
||||
} catch (\Throwable $exception) {
|
||||
if (--$this->listenerCount === 0) {
|
||||
$connection = $this->listeningConnection;
|
||||
$this->listeningConnection = null;
|
||||
$this->push($connection);
|
||||
}
|
||||
throw $exception;
|
||||
}
|
||||
|
||||
$listener->onDestruct(function () {
|
||||
if (--$this->listenerCount === 0) {
|
||||
$connection = $this->listeningConnection;
|
||||
$this->listeningConnection = null;
|
||||
$this->push($connection);
|
||||
}
|
||||
});
|
||||
|
||||
return $listener;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function transaction(int $isolation = Transaction::COMMITTED): Promise {
|
||||
if ($this->closed) {
|
||||
throw new \Error("The pool has been closed");
|
||||
}
|
||||
|
||||
return call(function () use ($isolation) {
|
||||
/** @var \Amp\Postgres\Connection $connection */
|
||||
$connection = yield from $this->pop();
|
||||
|
||||
try {
|
||||
/** @var \Amp\Postgres\Transaction $transaction */
|
||||
$transaction = yield $connection->transaction($isolation);
|
||||
} catch (\Throwable $exception) {
|
||||
$this->push($connection);
|
||||
throw $exception;
|
||||
}
|
||||
|
||||
$transaction->onDestruct(function () use ($connection) {
|
||||
$this->push($connection);
|
||||
});
|
||||
|
||||
return $transaction;
|
||||
});
|
||||
}
|
||||
}
|
@ -1,34 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Postgres;
|
||||
|
||||
use Amp\Promise;
|
||||
|
||||
class AggregatePool extends AbstractPool {
|
||||
/**
|
||||
* @param \Amp\Postgres\Connection $connection
|
||||
*/
|
||||
public function addConnection(Connection $connection) {
|
||||
parent::addConnection($connection);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function createConnection(): Promise {
|
||||
throw new PoolError("Creating connections is not available in an aggregate pool");
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function getMaxConnections(): int {
|
||||
$count = $this->getConnectionCount();
|
||||
|
||||
if (!$count) {
|
||||
throw new PoolError("No connections in aggregate pool");
|
||||
}
|
||||
|
||||
return $count;
|
||||
}
|
||||
}
|
@ -2,5 +2,175 @@
|
||||
|
||||
namespace Amp\Postgres;
|
||||
|
||||
interface Connection extends Link, Handle {
|
||||
use Amp\CallableMaker;
|
||||
use Amp\CancellationToken;
|
||||
use Amp\Deferred;
|
||||
use Amp\Promise;
|
||||
use function Amp\call;
|
||||
|
||||
abstract class Connection implements Handle, Link {
|
||||
use CallableMaker;
|
||||
|
||||
/** @var \Amp\Postgres\Handle */
|
||||
private $handle;
|
||||
|
||||
/** @var \Amp\Deferred|null Used to only allow one transaction at a time. */
|
||||
private $busy;
|
||||
|
||||
/** @var callable */
|
||||
private $release;
|
||||
|
||||
/**
|
||||
* @param string $connectionString
|
||||
* @param \Amp\CancellationToken $token
|
||||
*
|
||||
* @return \Amp\Promise<\Amp\Postgres\Connection>
|
||||
*/
|
||||
abstract public static function connect(string $connectionString, CancellationToken $token = null): Promise;
|
||||
|
||||
/**
|
||||
* @param \Amp\Postgres\Handle $handle
|
||||
*/
|
||||
public function __construct(Handle $handle) {
|
||||
$this->handle = $handle;
|
||||
$this->release = $this->callableFromInstanceMethod("release");
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function isAlive(): bool {
|
||||
return $this->handle->isAlive();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function lastUsedAt(): int {
|
||||
return $this->handle->lastUsedAt();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function close() {
|
||||
$this->handle->close();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param string $methodName Method to execute.
|
||||
* @param mixed ...$args Arguments to pass to function.
|
||||
*
|
||||
* @return \Amp\Promise
|
||||
*
|
||||
* @throws \Amp\Postgres\FailureException
|
||||
*/
|
||||
private function send(string $methodName, ...$args): Promise {
|
||||
if ($this->busy) {
|
||||
return call(function () use ($methodName, $args) {
|
||||
while ($this->busy) {
|
||||
yield $this->busy->promise();
|
||||
}
|
||||
|
||||
return yield ([$this->handle, $methodName])(...$args);
|
||||
});
|
||||
}
|
||||
|
||||
return ([$this->handle, $methodName])(...$args);
|
||||
}
|
||||
|
||||
/**
|
||||
* Releases the transaction lock.
|
||||
*/
|
||||
private function release() {
|
||||
\assert($this->busy !== null);
|
||||
|
||||
$deferred = $this->busy;
|
||||
$this->busy = null;
|
||||
$deferred->resolve();
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function query(string $sql): Promise {
|
||||
return $this->send("query", $sql);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function execute(string $sql, array $params = []): Promise {
|
||||
return $this->send("execute", $sql, $params);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function prepare(string $sql): Promise {
|
||||
return $this->send("prepare", $sql);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function notify(string $channel, string $payload = ""): Promise {
|
||||
return $this->send("notify", $channel, $payload);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function listen(string $channel): Promise {
|
||||
return $this->send("listen", $channel);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function transaction(int $isolation = Transaction::COMMITTED): Promise {
|
||||
return call(function () use ($isolation) {
|
||||
switch ($isolation) {
|
||||
case Transaction::UNCOMMITTED:
|
||||
yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL READ UNCOMMITTED");
|
||||
break;
|
||||
|
||||
case Transaction::COMMITTED:
|
||||
yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED");
|
||||
break;
|
||||
|
||||
case Transaction::REPEATABLE:
|
||||
yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ");
|
||||
break;
|
||||
|
||||
case Transaction::SERIALIZABLE:
|
||||
yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE");
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new \Error("Invalid transaction type");
|
||||
}
|
||||
|
||||
$this->busy = new Deferred;
|
||||
|
||||
$transaction = new Transaction($this->handle, $isolation);
|
||||
$transaction->onDestruct($this->release);
|
||||
return $transaction;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function quoteString(string $data): string {
|
||||
return $this->handle->quoteString($data);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function quoteName(string $name): string {
|
||||
return $this->handle->quoteName($name);
|
||||
}
|
||||
}
|
||||
|
@ -1,46 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Postgres;
|
||||
|
||||
use Amp\Promise;
|
||||
|
||||
class ConnectionPool extends AbstractPool {
|
||||
const DEFAULT_MAX_CONNECTIONS = 100;
|
||||
|
||||
/** @var string */
|
||||
private $connectionString;
|
||||
|
||||
/** @var int */
|
||||
private $maxConnections;
|
||||
|
||||
/**
|
||||
* @param string $connectionString
|
||||
* @param int $maxConnections
|
||||
*
|
||||
* @throws \Error If $maxConnections is less than 1.
|
||||
*/
|
||||
public function __construct(string $connectionString, int $maxConnections = self::DEFAULT_MAX_CONNECTIONS) {
|
||||
parent::__construct();
|
||||
|
||||
$this->connectionString = $connectionString;
|
||||
|
||||
$this->maxConnections = $maxConnections;
|
||||
if ($this->maxConnections < 1) {
|
||||
throw new \Error("Pool must contain at least one connection");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
protected function createConnection(): Promise {
|
||||
return connect($this->connectionString);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function getMaxConnections(): int {
|
||||
return $this->maxConnections;
|
||||
}
|
||||
}
|
@ -9,7 +9,7 @@ use Amp\Loop;
|
||||
use Amp\NullCancellationToken;
|
||||
use Amp\Promise;
|
||||
|
||||
class PgSqlConnection extends AbstractConnection {
|
||||
class PgSqlConnection extends Connection {
|
||||
/**
|
||||
* @param string $connectionString
|
||||
* @param \Amp\CancellationToken $token
|
||||
|
408
lib/Pool.php
408
lib/Pool.php
@ -2,39 +2,417 @@
|
||||
|
||||
namespace Amp\Postgres;
|
||||
|
||||
use Amp\CallableMaker;
|
||||
use Amp\Coroutine;
|
||||
use Amp\Deferred;
|
||||
use Amp\Loop;
|
||||
use Amp\Promise;
|
||||
use function Amp\call;
|
||||
|
||||
class Pool implements Link {
|
||||
use CallableMaker;
|
||||
|
||||
const DEFAULT_MAX_CONNECTIONS = 100;
|
||||
const DEFAULT_IDLE_TIMEOUT = 60;
|
||||
|
||||
/** @var string */
|
||||
private $connectionString;
|
||||
|
||||
/** @var int */
|
||||
private $maxConnections;
|
||||
|
||||
/** @var \SplQueue */
|
||||
private $idle;
|
||||
|
||||
/** @var \SplObjectStorage */
|
||||
private $connections;
|
||||
|
||||
/** @var \Amp\Promise|null */
|
||||
private $promise;
|
||||
|
||||
/** @var \Amp\Deferred|null */
|
||||
private $deferred;
|
||||
|
||||
/** @var \Amp\Postgres\Connection|\Amp\Promise|null Connection used for notification listening. */
|
||||
private $listeningConnection;
|
||||
|
||||
/** @var int Number of listeners on listening connection. */
|
||||
private $listenerCount = 0;
|
||||
|
||||
/** @var callable */
|
||||
private $push;
|
||||
|
||||
/** @var int */
|
||||
private $pending = 0;
|
||||
|
||||
/** @var bool */
|
||||
private $resetConnections = true;
|
||||
|
||||
/** @var int */
|
||||
private $idleTimeout = self::DEFAULT_IDLE_TIMEOUT;
|
||||
|
||||
/** @var string */
|
||||
private $timeoutWatcher;
|
||||
|
||||
/** @var bool */
|
||||
private $closed = false;
|
||||
|
||||
public function __construct(string $connectionString, int $maxConnections = self::DEFAULT_MAX_CONNECTIONS) {
|
||||
$this->connectionString = $connectionString;
|
||||
|
||||
$this->maxConnections = $maxConnections;
|
||||
if ($this->maxConnections < 1) {
|
||||
throw new \Error("Pool must contain at least one connection");
|
||||
}
|
||||
|
||||
$this->connections = $connections = new \SplObjectStorage;
|
||||
$this->idle = $idle = new \SplQueue;
|
||||
$this->push = $this->callableFromInstanceMethod("push");
|
||||
|
||||
$idleTimeout = &$this->idleTimeout;
|
||||
|
||||
$this->timeoutWatcher = Loop::repeat(1000, static function () use (&$idleTimeout, $connections, $idle) {
|
||||
$now = \time();
|
||||
while (!$idle->isEmpty()) {
|
||||
/** @var \Amp\Postgres\Connection $connection */
|
||||
$connection = $idle->bottom();
|
||||
|
||||
if ($connection->lastUsedAt() + $idleTimeout > $now) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Close connection and remove it from the pool.
|
||||
$idle->shift();
|
||||
$connections->detach($connection);
|
||||
$connection->close();
|
||||
}
|
||||
});
|
||||
|
||||
Loop::unreference($this->timeoutWatcher);
|
||||
}
|
||||
|
||||
public function __destruct() {
|
||||
Loop::cancel($this->timeoutWatcher);
|
||||
}
|
||||
|
||||
public function resetConnections(bool $reset = true) {
|
||||
$this->resetConnections = $reset;
|
||||
}
|
||||
|
||||
public function setIdleTimeout(int $timeout) {
|
||||
if ($timeout < 0) {
|
||||
throw new \Error("Timeout must be greater than or equal to 0");
|
||||
}
|
||||
|
||||
$this->idleTimeout = $timeout;
|
||||
|
||||
if ($this->idleTimeout > 0) {
|
||||
Loop::enable($this->timeoutWatcher);
|
||||
} else {
|
||||
Loop::disable($this->timeoutWatcher);
|
||||
}
|
||||
}
|
||||
|
||||
public function close() {
|
||||
$this->closed = true;
|
||||
foreach ($this->connections as $connection) {
|
||||
$connection->close();
|
||||
}
|
||||
$this->idle = new \SplQueue;
|
||||
$this->connections = new \SplObjectStorage;
|
||||
}
|
||||
|
||||
interface Pool extends Link {
|
||||
/**
|
||||
* Extracts an idle connection from the pool. The connection is completely removed from the pool and cannot be
|
||||
* put back into the pool. Useful for operations where connection state must be changed.
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function extractConnection(): Promise {
|
||||
return call(function () {
|
||||
$connection = yield from $this->pop();
|
||||
$this->connections->detach($connection);
|
||||
return $connection;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @return \Amp\Promise<\Amp\Postgres\Connection>
|
||||
*
|
||||
* @return \Amp\Promise<\Amp\Mysql\Connection>
|
||||
* @throws \Amp\Postgres\FailureException
|
||||
*/
|
||||
public function extractConnection(): Promise;
|
||||
protected function createConnection(string $connectionString): Promise {
|
||||
return connect($connectionString);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return int Current number of connections in the pool.
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function getConnectionCount(): int;
|
||||
public function getConnectionCount(): int {
|
||||
return $this->connections->count();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return int Current number of idle connections in the pool.
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function getIdleConnectionCount(): int;
|
||||
public function getIdleConnectionCount(): int {
|
||||
return $this->idle->count();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return int Maximum number of connections.
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function getMaxConnections(): int;
|
||||
public function getMaxConnections(): int {
|
||||
return $this->maxConnections;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param bool $reset True to automatically RESET ALL on connections in the pool before using them.
|
||||
* @return \Generator
|
||||
*
|
||||
* @resolve \Amp\Postgres\Connection
|
||||
*/
|
||||
public function resetConnections(bool $reset);
|
||||
private function pop(): \Generator {
|
||||
while ($this->promise !== null && $this->connections->count() + $this->pending >= $this->getMaxConnections()) {
|
||||
yield $this->promise; // Prevent simultaneous connection creation when connection count is at maximum - 1.
|
||||
}
|
||||
|
||||
while ($this->idle->isEmpty()) { // While loop to ensure an idle connection is available after promises below are resolved.
|
||||
if ($this->connections->count() + $this->pending >= $this->getMaxConnections()) {
|
||||
// All possible connections busy, so wait until one becomes available.
|
||||
try {
|
||||
$this->deferred = new Deferred;
|
||||
yield $this->promise = $this->deferred->promise(); // May be resolved with defunct connection.
|
||||
} finally {
|
||||
$this->deferred = null;
|
||||
$this->promise = null;
|
||||
}
|
||||
} else {
|
||||
// Max connection count has not been reached, so open another connection.
|
||||
++$this->pending;
|
||||
try {
|
||||
$connection = yield $this->createConnection($this->connectionString);
|
||||
if (!$connection instanceof Connection) {
|
||||
throw new \Error(\sprintf(
|
||||
"%s::createConnection() must resolve to an instance of %s",
|
||||
static::class,
|
||||
Connection::class
|
||||
));
|
||||
}
|
||||
} finally {
|
||||
--$this->pending;
|
||||
}
|
||||
|
||||
$this->connections->attach($connection);
|
||||
return $connection;
|
||||
}
|
||||
}
|
||||
|
||||
// Shift a connection off the idle queue.
|
||||
$connection = $this->idle->shift();
|
||||
|
||||
if ($this->resetConnections) {
|
||||
yield $connection->query("RESET ALL");
|
||||
}
|
||||
|
||||
return $connection;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param int $timeout Number of seconds before idle connections are removed from the pool. Use 0 for no timeout.
|
||||
* @param \Amp\Postgres\Connection $connection
|
||||
*
|
||||
* @throws \Error If the connection is not part of this pool.
|
||||
*/
|
||||
public function setIdleTimeout(int $timeout);
|
||||
private function push(Connection $connection) {
|
||||
\assert(isset($this->connections[$connection]), 'Connection is not part of this pool');
|
||||
|
||||
if ($connection->isAlive()) {
|
||||
$this->idle->push($connection);
|
||||
} else {
|
||||
$this->connections->detach($connection);
|
||||
}
|
||||
|
||||
if ($this->deferred instanceof Deferred) {
|
||||
$this->deferred->resolve($connection);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function query(string $sql): Promise {
|
||||
if ($this->closed) {
|
||||
throw new \Error("The pool has been closed");
|
||||
}
|
||||
|
||||
return call(function () use ($sql) {
|
||||
/** @var \Amp\Postgres\Connection $connection */
|
||||
$connection = yield from $this->pop();
|
||||
|
||||
try {
|
||||
$result = yield $connection->query($sql);
|
||||
} catch (\Throwable $exception) {
|
||||
$this->push($connection);
|
||||
throw $exception;
|
||||
}
|
||||
|
||||
if ($result instanceof Operation) {
|
||||
$result->onDestruct(function () use ($connection) {
|
||||
$this->push($connection);
|
||||
});
|
||||
} else {
|
||||
$this->push($connection);
|
||||
}
|
||||
|
||||
return $result;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function execute(string $sql, array $params = []): Promise {
|
||||
if ($this->closed) {
|
||||
throw new \Error("The pool has been closed");
|
||||
}
|
||||
|
||||
return call(function () use ($sql, $params) {
|
||||
/** @var \Amp\Postgres\Connection $connection */
|
||||
$connection = yield from $this->pop();
|
||||
|
||||
try {
|
||||
$result = yield $connection->execute($sql, $params);
|
||||
} catch (\Throwable $exception) {
|
||||
$this->push($connection);
|
||||
throw $exception;
|
||||
}
|
||||
|
||||
if ($result instanceof Operation) {
|
||||
$result->onDestruct(function () use ($connection) {
|
||||
$this->push($connection);
|
||||
});
|
||||
} else {
|
||||
$this->push($connection);
|
||||
}
|
||||
|
||||
return $result;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function prepare(string $sql): Promise {
|
||||
if ($this->closed) {
|
||||
throw new \Error("The pool has been closed");
|
||||
}
|
||||
|
||||
return call(function () use ($sql) {
|
||||
/** @var \Amp\Postgres\Connection $connection */
|
||||
$connection = yield from $this->pop();
|
||||
|
||||
try {
|
||||
/** @var \Amp\Postgres\Statement $statement */
|
||||
$statement = yield $connection->prepare($sql);
|
||||
} catch (\Throwable $exception) {
|
||||
$this->push($connection);
|
||||
throw $exception;
|
||||
}
|
||||
|
||||
$statement->onDestruct(function () use ($connection) {
|
||||
$this->push($connection);
|
||||
});
|
||||
|
||||
return $statement;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function notify(string $channel, string $payload = ""): Promise {
|
||||
if ($this->closed) {
|
||||
throw new \Error("The pool has been closed");
|
||||
}
|
||||
|
||||
return call(function () use ($channel, $payload) {
|
||||
/** @var \Amp\Postgres\Connection $connection */
|
||||
$connection = yield from $this->pop();
|
||||
|
||||
try {
|
||||
$result = yield $connection->notify($channel, $payload);
|
||||
} finally {
|
||||
$this->push($connection);
|
||||
}
|
||||
|
||||
return $result;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function listen(string $channel): Promise {
|
||||
if ($this->closed) {
|
||||
throw new \Error("The pool has been closed");
|
||||
}
|
||||
|
||||
return call(function () use ($channel) {
|
||||
++$this->listenerCount;
|
||||
|
||||
if ($this->listeningConnection === null) {
|
||||
$this->listeningConnection = new Coroutine($this->pop());
|
||||
}
|
||||
|
||||
if ($this->listeningConnection instanceof Promise) {
|
||||
$this->listeningConnection = yield $this->listeningConnection;
|
||||
}
|
||||
|
||||
try {
|
||||
/** @var \Amp\Postgres\Listener $listener */
|
||||
$listener = yield $this->listeningConnection->listen($channel);
|
||||
} catch (\Throwable $exception) {
|
||||
if (--$this->listenerCount === 0) {
|
||||
$connection = $this->listeningConnection;
|
||||
$this->listeningConnection = null;
|
||||
$this->push($connection);
|
||||
}
|
||||
throw $exception;
|
||||
}
|
||||
|
||||
$listener->onDestruct(function () {
|
||||
if (--$this->listenerCount === 0) {
|
||||
$connection = $this->listeningConnection;
|
||||
$this->listeningConnection = null;
|
||||
$this->push($connection);
|
||||
}
|
||||
});
|
||||
|
||||
return $listener;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritdoc}
|
||||
*/
|
||||
public function transaction(int $isolation = Transaction::COMMITTED): Promise {
|
||||
if ($this->closed) {
|
||||
throw new \Error("The pool has been closed");
|
||||
}
|
||||
|
||||
return call(function () use ($isolation) {
|
||||
/** @var \Amp\Postgres\Connection $connection */
|
||||
$connection = yield from $this->pop();
|
||||
|
||||
try {
|
||||
/** @var \Amp\Postgres\Transaction $transaction */
|
||||
$transaction = yield $connection->transaction($isolation);
|
||||
} catch (\Throwable $exception) {
|
||||
$this->push($connection);
|
||||
throw $exception;
|
||||
}
|
||||
|
||||
$transaction->onDestruct(function () use ($connection) {
|
||||
$this->push($connection);
|
||||
});
|
||||
|
||||
return $transaction;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
@ -10,7 +10,7 @@ use Amp\NullCancellationToken;
|
||||
use Amp\Promise;
|
||||
use pq;
|
||||
|
||||
class PqConnection extends AbstractConnection {
|
||||
class PqConnection extends Connection {
|
||||
/**
|
||||
* @param string $connectionString
|
||||
* @param \Amp\CancellationToken $token
|
||||
|
@ -34,6 +34,6 @@ function connect(string $connectionString, CancellationToken $token = null): Pro
|
||||
*
|
||||
* @return \Amp\Postgres\Pool
|
||||
*/
|
||||
function pool(string $connectionString, int $maxConnections = ConnectionPool::DEFAULT_MAX_CONNECTIONS): Pool {
|
||||
return new ConnectionPool($connectionString, $maxConnections);
|
||||
function pool(string $connectionString, int $maxConnections = Pool::DEFAULT_MAX_CONNECTIONS): Pool {
|
||||
return new Pool($connectionString, $maxConnections);
|
||||
}
|
||||
|
@ -1,73 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace Amp\Postgres\Test;
|
||||
|
||||
use Amp\Postgres\AggregatePool;
|
||||
use Amp\Postgres\Connection;
|
||||
use Amp\Postgres\Pool;
|
||||
|
||||
class AggregatePoolTest extends AbstractPoolTest {
|
||||
/**
|
||||
* @param array $connections
|
||||
*
|
||||
* @return \PHPUnit_Framework_MockObject_MockObject|\Amp\Postgres\Pool
|
||||
*/
|
||||
protected function createPool(array $connections): Pool {
|
||||
$mock = $this->getMockBuilder(AggregatePool::class)
|
||||
->setConstructorArgs([0])
|
||||
->setMethods(['createConnection'])
|
||||
->getMock();
|
||||
|
||||
$mock->method('createConnection')
|
||||
->will($this->returnCallback(function () {
|
||||
$this->fail('The createConnection() method should not be called.');
|
||||
}));
|
||||
|
||||
foreach ($connections as $connection) {
|
||||
$mock->addConnection($connection);
|
||||
}
|
||||
|
||||
return $mock;
|
||||
}
|
||||
|
||||
public function testGetMaxConnections() {
|
||||
$pool = $this->createPool([$this->createConnection()]);
|
||||
$this->assertSame(1, $pool->getMaxConnections());
|
||||
$pool->addConnection($this->createConnection());
|
||||
$this->assertSame(2, $pool->getMaxConnections());
|
||||
}
|
||||
|
||||
public function testGetConnectionCount() {
|
||||
$pool = $this->createPool([$this->createConnection(), $this->createConnection()]);
|
||||
$this->assertSame(2, $pool->getConnectionCount());
|
||||
}
|
||||
|
||||
public function testGetIdleConnectionCount() {
|
||||
$pool = $this->createPool([$this->createConnection(), $this->createConnection()]);
|
||||
$this->assertSame(2, $pool->getIdleConnectionCount());
|
||||
$promise = $pool->query("SELECT 1");
|
||||
$this->assertSame(1, $pool->getIdleConnectionCount());
|
||||
}
|
||||
|
||||
/**
|
||||
* @expectedException \Error
|
||||
* @expectedExceptionMessage Connection is already a part of this pool
|
||||
*/
|
||||
public function testDoubleAddConnection() {
|
||||
$pool = $this->createPool([]);
|
||||
$connection = $this->createConnection();
|
||||
$pool->addConnection($connection);
|
||||
$pool->addConnection($connection);
|
||||
}
|
||||
|
||||
/**
|
||||
* @expectedException \Error
|
||||
* @expectedExceptionMessage The connection is dead
|
||||
*/
|
||||
public function testAddDeadConnection() {
|
||||
$pool = $this->createPool([]);
|
||||
$connection = $this->createMock(Connection::class);
|
||||
$connection->method('isAlive')->willReturn(false);
|
||||
$pool->addConnection($connection);
|
||||
}
|
||||
}
|
@ -4,7 +4,6 @@ namespace Amp\Postgres\Test;
|
||||
|
||||
use Amp\Delayed;
|
||||
use Amp\Loop;
|
||||
use Amp\Postgres\ConnectionPool;
|
||||
use Amp\Postgres\Pool;
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
@ -16,7 +15,7 @@ class ConnectionPoolTest extends AbstractPoolTest {
|
||||
* @return \PHPUnit_Framework_MockObject_MockObject|\Amp\Postgres\Pool
|
||||
*/
|
||||
protected function createPool(array $connections): Pool {
|
||||
$mock = $this->getMockBuilder(ConnectionPool::class)
|
||||
$mock = $this->getMockBuilder(Pool::class)
|
||||
->setConstructorArgs(['connection string', \count($connections)])
|
||||
->setMethods(['createConnection'])
|
||||
->getMock();
|
||||
@ -35,12 +34,12 @@ class ConnectionPoolTest extends AbstractPoolTest {
|
||||
* @expectedExceptionMessage Pool must contain at least one connection
|
||||
*/
|
||||
public function testInvalidMaxConnections() {
|
||||
$pool = new ConnectionPool('connection string', 0);
|
||||
$pool = new Pool('connection string', 0);
|
||||
}
|
||||
|
||||
public function testIdleConnectionsRemovedAfterTimeout() {
|
||||
Loop::run(function () {
|
||||
$pool = new ConnectionPool('host=localhost user=postgres');
|
||||
$pool = new Pool('host=localhost user=postgres');
|
||||
$pool->setIdleTimeout(2);
|
||||
$count = 3;
|
||||
|
||||
|
@ -2,22 +2,43 @@
|
||||
|
||||
namespace Amp\Postgres\Test;
|
||||
|
||||
use Amp\Postgres\AggregatePool;
|
||||
use Amp\Postgres\Link;
|
||||
use Amp\Postgres\PgSqlConnection;
|
||||
use Amp\Postgres\Pool;
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
|
||||
/**
|
||||
* @requires extension pgsql
|
||||
*/
|
||||
class PgSqlPoolTest extends AbstractLinkTest {
|
||||
const POOL_SIZE = 3;
|
||||
|
||||
/** @var resource[] PostgreSQL connection resources. */
|
||||
protected $handles = [];
|
||||
|
||||
public function createLink(string $connectionString): Link {
|
||||
$pool = new AggregatePool;
|
||||
for ($i = 0; $i < self::POOL_SIZE; ++$i) {
|
||||
$this->handles[] = \pg_connect($connectionString, \PGSQL_CONNECT_FORCE_NEW);
|
||||
}
|
||||
|
||||
$handle = \pg_connect($connectionString, \PGSQL_CONNECT_FORCE_NEW);
|
||||
$socket = \pg_socket($handle);
|
||||
$pool = $this->getMockBuilder(Pool::class)
|
||||
->setConstructorArgs(['connection string', \count($this->handles)])
|
||||
->setMethods(['createConnection'])
|
||||
->getMock();
|
||||
|
||||
$pool->method('createConnection')
|
||||
->will($this->returnCallback(function (): Promise {
|
||||
static $count = 0;
|
||||
if (!isset($this->handles[$count])) {
|
||||
$this->fail("createConnection called too many times");
|
||||
}
|
||||
$handle = $this->handles[$count];
|
||||
++$count;
|
||||
return new Success(new PgSqlConnection($handle, \pg_socket($handle)));
|
||||
}));
|
||||
|
||||
$handle = \reset($this->handles);
|
||||
|
||||
\pg_query($handle, "DROP TABLE IF EXISTS test");
|
||||
|
||||
@ -35,17 +56,6 @@ class PgSqlPoolTest extends AbstractLinkTest {
|
||||
}
|
||||
}
|
||||
|
||||
$this->handles[] = $handle;
|
||||
|
||||
$pool->addConnection(new PgSqlConnection($handle, $socket));
|
||||
|
||||
$handle = \pg_connect($connectionString, \PGSQL_CONNECT_FORCE_NEW);
|
||||
$socket = \pg_socket($handle);
|
||||
|
||||
$this->handles[] = $handle;
|
||||
|
||||
$pool->addConnection(new PgSqlConnection($handle, $socket));
|
||||
|
||||
return $pool;
|
||||
}
|
||||
|
||||
@ -56,5 +66,9 @@ class PgSqlPoolTest extends AbstractLinkTest {
|
||||
|
||||
\pg_query($this->handles[0], "ROLLBACK");
|
||||
\pg_query($this->handles[0], "DROP TABLE test");
|
||||
|
||||
foreach ($this->handles as $handle) {
|
||||
\pg_close($handle);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2,23 +2,45 @@
|
||||
|
||||
namespace Amp\Postgres\Test;
|
||||
|
||||
use Amp\Postgres\AggregatePool;
|
||||
use Amp\Postgres\Link;
|
||||
use Amp\Postgres\Pool;
|
||||
use Amp\Postgres\PqConnection;
|
||||
use Amp\Promise;
|
||||
use Amp\Success;
|
||||
|
||||
/**
|
||||
* @requires extension pq
|
||||
*/
|
||||
class PqPoolTest extends AbstractLinkTest {
|
||||
const POOL_SIZE = 3;
|
||||
|
||||
/** @var \pq\Connection[] */
|
||||
protected $handles = [];
|
||||
|
||||
public function createLink(string $connectionString): Link {
|
||||
$pool = new AggregatePool;
|
||||
for ($i = 0; $i < self::POOL_SIZE; ++$i) {
|
||||
$this->handles[] = $handle = new \pq\Connection($connectionString);
|
||||
$handle->nonblocking = true;
|
||||
$handle->unbuffered = true;
|
||||
}
|
||||
|
||||
$handle = new \pq\Connection($connectionString);
|
||||
$handle->nonblocking = true;
|
||||
$handle->unbuffered = true;
|
||||
$pool = $this->getMockBuilder(Pool::class)
|
||||
->setConstructorArgs(['connection string', \count($this->handles)])
|
||||
->setMethods(['createConnection'])
|
||||
->getMock();
|
||||
|
||||
$pool->method('createConnection')
|
||||
->will($this->returnCallback(function (): Promise {
|
||||
static $count = 0;
|
||||
if (!isset($this->handles[$count])) {
|
||||
$this->fail("createConnection called too many times");
|
||||
}
|
||||
$handle = $this->handles[$count];
|
||||
++$count;
|
||||
return new Success(new PqConnection($handle));
|
||||
}));
|
||||
|
||||
$handle = \reset($this->handles);
|
||||
|
||||
$handle->exec("DROP TABLE IF EXISTS test");
|
||||
|
||||
@ -36,18 +58,6 @@ class PqPoolTest extends AbstractLinkTest {
|
||||
}
|
||||
}
|
||||
|
||||
$this->handles[] = $handle;
|
||||
|
||||
$pool->addConnection(new PqConnection($handle));
|
||||
|
||||
$handle = new \pq\Connection($connectionString);
|
||||
$handle->nonblocking = true;
|
||||
$handle->unbuffered = true;
|
||||
|
||||
$this->handles[] = $handle;
|
||||
|
||||
$pool->addConnection(new PqConnection($handle));
|
||||
|
||||
return $pool;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user