1
0
mirror of https://github.com/danog/postgres.git synced 2024-11-26 20:15:02 +01:00

Use decorators instead of Operation

This commit is contained in:
Aaron Piotrowski 2018-07-05 00:11:37 -05:00
parent d9f9071e94
commit 667a94531c
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
20 changed files with 874 additions and 627 deletions

View File

@ -182,7 +182,7 @@ abstract class Connection implements Link, Handle
*
* @throws FailureException
*/
final public function transaction(int $isolation = Transaction::ISOLATION_COMMITTED): Promise
final public function transaction(int $isolation = ConnectionTransaction::ISOLATION_COMMITTED): Promise
{
if (! $this->handle) {
throw new FailureException('Not connected');
@ -190,19 +190,19 @@ abstract class Connection implements Link, Handle
return call(function () use ($isolation) {
switch ($isolation) {
case Transaction::ISOLATION_UNCOMMITTED:
case ConnectionTransaction::ISOLATION_UNCOMMITTED:
yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL READ UNCOMMITTED");
break;
case Transaction::ISOLATION_COMMITTED:
case ConnectionTransaction::ISOLATION_COMMITTED:
yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED");
break;
case Transaction::ISOLATION_REPEATABLE:
case ConnectionTransaction::ISOLATION_REPEATABLE:
yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ");
break;
case Transaction::ISOLATION_SERIALIZABLE:
case ConnectionTransaction::ISOLATION_SERIALIZABLE:
yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE");
break;
@ -212,9 +212,7 @@ abstract class Connection implements Link, Handle
$this->busy = new Deferred;
$transaction = new Transaction($this->handle, $isolation);
$transaction->onDestruct($this->release);
return $transaction;
return new ConnectionTransaction($this->handle, $isolation, $this->release);
});
}

View File

@ -0,0 +1,91 @@
<?php
namespace Amp\Postgres;
use Amp\Iterator;
use Amp\Promise;
final class ConnectionListener implements Listener
{
/** @var \Amp\Iterator */
private $iterator;
/** @var string */
private $channel;
/** @var callable|null */
private $unlisten;
/**
* @param \Amp\Iterator $iterator Iterator emitting notificatons on the channel.
* @param string $channel Channel name.
* @param callable(string $channel): $unlisten Function invoked to unlisten from the channel.
*/
public function __construct(Iterator $iterator, string $channel, callable $unlisten)
{
$this->iterator = $iterator;
$this->channel = $channel;
$this->unlisten = $unlisten;
}
public function __destruct()
{
if ($this->unlisten) {
$this->unlisten(); // Invokes $this->queue->complete().
}
}
/**
* {@inheritdoc}
*/
public function advance(): Promise
{
return $this->iterator->advance();
}
/**
* {@inheritdoc}
*
* @return Notification
*/
public function getCurrent(): Notification
{
return $this->iterator->getCurrent();
}
/**
* @return string Channel name.
*/
public function getChannel(): string
{
return $this->channel;
}
/**
* @return bool
*/
public function isListening(): bool
{
return $this->unlisten !== null;
}
/**
* Unlistens from the channel. No more values will be emitted from this listener.
*
* @return Promise<\Amp\Sql\CommandResult>
*
* @throws \Error If this method was previously invoked.
*/
public function unlisten(): Promise
{
if (!$this->unlisten) {
throw new \Error("Already unlistened on this channel");
}
/** @var Promise $promise */
$promise = ($this->unlisten)($this->channel);
$this->unlisten = null;
return $promise;
}
}

View File

@ -0,0 +1,334 @@
<?php
namespace Amp\Postgres;
use Amp\Promise;
use Amp\Sql\Transaction as SqlTransaction;
use Amp\Sql\TransactionError;
use function Amp\call;
final class ConnectionTransaction implements Handle, Transaction
{
/** @var Handle|null */
private $handle;
/** @var int */
private $isolation;
/** @var Internal\ReferenceQueue */
private $queue;
/** @var ConnectionListener[] */
private $listeners = [];
/**
* @param Handle $handle
* @param int $isolation
* @param callable $release
*
* @throws \Error If the isolation level is invalid.
*/
public function __construct(Handle $handle, int $isolation = SqlTransaction::ISOLATION_COMMITTED, callable $release)
{
switch ($isolation) {
case SqlTransaction::ISOLATION_UNCOMMITTED:
case SqlTransaction::ISOLATION_COMMITTED:
case SqlTransaction::ISOLATION_REPEATABLE:
case SqlTransaction::ISOLATION_SERIALIZABLE:
$this->isolation = $isolation;
break;
default:
throw new \Error("Isolation must be a valid transaction isolation level");
}
$this->handle = $handle;
$this->queue = new Internal\ReferenceQueue;
$listeners =& $this->listeners;
$this->queue->onDestruct(static function () use (&$listeners) {
foreach ($listeners as $listener) {
if ($listener->isListening()) {
$listener->unlisten();
}
}
});
}
public function __destruct()
{
if ($this->handle && $this->handle->isAlive()) {
$this->rollback(); // Invokes $this->queue->complete().
}
}
/**
* {@inheritdoc}
*/
public function lastUsedAt(): int
{
return $this->handle->lastUsedAt();
}
/**
* {@inheritdoc}
*
* Closes and commits all changes in the transaction.
*/
public function close()
{
if ($this->handle) {
$this->commit(); // Invokes $this->queue->unreference().
}
}
/**
* {@inheritdoc}
*/
public function isAlive(): bool
{
return $this->handle && $this->handle->isAlive();
}
/**
* @return bool True if the transaction is active, false if it has been committed or rolled back.
*/
public function isActive(): bool
{
return $this->handle !== null;
}
/**
* @return int
*/
public function getIsolationLevel(): int
{
return $this->isolation;
}
/**
* {@inheritdoc}
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function query(string $sql): Promise
{
if ($this->handle === null) {
throw new TransactionError("The transaction has been committed or rolled back");
}
$this->queue->reference();
$promise = $this->handle->query($sql);
$promise->onResolve(function ($exception, $result) {
if ($result instanceof Operation) {
$result->onDestruct([$this->queue, "unreference"]);
return;
}
$this->queue->unreference();
});
return $promise;
}
/**
* {@inheritdoc}
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function prepare(string $sql): Promise
{
if ($this->handle === null) {
throw new TransactionError("The transaction has been committed or rolled back");
}
$this->queue->reference();
$promise = $this->handle->prepare($sql);
$promise->onResolve(function ($exception, $statement) {
if ($statement instanceof Operation) {
$statement->onDestruct([$this->queue, "unreference"]);
return;
}
$this->queue->unreference();
});
return $promise;
}
/**
* {@inheritdoc}
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function execute(string $sql, array $params = []): Promise
{
if ($this->handle === null) {
throw new TransactionError("The transaction has been committed or rolled back");
}
$this->queue->reference();
$promise = $this->handle->execute($sql, $params);
$promise->onResolve(function ($exception, $result) {
if ($result instanceof Operation) {
$result->onDestruct([$this->queue, "unreference"]);
return;
}
$this->queue->unreference();
});
return $promise;
}
/**
* {@inheritdoc}
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function notify(string $channel, string $payload = ""): Promise
{
if ($this->handle === null) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->handle->notify($channel, $payload);
}
/**
* Commits the transaction and makes it inactive.
*
* @return Promise<\Amp\Sql\CommandResult>
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function commit(): Promise
{
if ($this->handle === null) {
throw new TransactionError("The transaction has been committed or rolled back");
}
$promise = $this->handle->query("COMMIT");
$this->handle = null;
$promise->onResolve([$this->queue, "unreference"]);
return $promise;
}
/**
* Rolls back the transaction and makes it inactive.
*
* @return Promise<\Amp\Sql\CommandResult>
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function rollback(): Promise
{
if ($this->handle === null) {
throw new TransactionError("The transaction has been committed or rolled back");
}
$promise = $this->handle->query("ROLLBACK");
$this->handle = null;
$promise->onResolve([$this->queue, "unreference"]);
return $promise;
}
/**
* Creates a savepoint with the given identifier.
*
* @param string $identifier Savepoint identifier.
*
* @return Promise<\Amp\Sql\CommandResult>
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function createSavepoint(string $identifier): Promise
{
return $this->query("SAVEPOINT " . $this->quoteName($identifier));
}
/**
* Rolls back to the savepoint with the given identifier.
*
* @param string $identifier Savepoint identifier.
*
* @return Promise<\Amp\Sql\CommandResult>
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function rollbackTo(string $identifier): Promise
{
return $this->query("ROLLBACK TO " . $this->quoteName($identifier));
}
/**
* Releases the savepoint with the given identifier.
*
* @param string $identifier Savepoint identifier.
*
* @return Promise<\Amp\Sql\CommandResult>
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function releaseSavepoint(string $identifier): Promise
{
return $this->query("RELEASE SAVEPOINT " . $this->quoteName($identifier));
}
/**
* {@inheritdoc}
*
* Listeners automatically unlisten when the transaction is committed or rolled back.
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function listen(string $channel): Promise
{
if ($this->handle === null) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return call(function () use ($channel) {
$listener = yield $this->handle->listen($channel);
$this->listeners[] = $listener;
return $listener;
});
}
/**
* {@inheritdoc}
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function quoteString(string $data): string
{
if ($this->handle === null) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->handle->quoteString($data);
}
/**
* {@inheritdoc}
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function quoteName(string $name): string
{
if ($this->handle === null) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->handle->quoteName($name);
}
}

View File

@ -2,28 +2,6 @@
namespace Amp\Postgres;
interface Handle extends Executor
interface Handle extends Executor, Quoter
{
/**
* Quotes (escapes) the given string for use as a string literal or identifier in a query. This method wraps the
* string in single quotes, so additional quotes should not be added in the query.
*
* @param string $data Unquoted data.
*
* @return string Quoted string wrapped in single quotes.
*
* @throws \Error If the connection to the database has been closed.
*/
public function quoteString(string $data): string;
/**
* Quotes (escapes) the given string for use as a name or identifier in a query.
*
* @param string $name Unquoted identifier.
*
* @return string Quoted identifier.
*
* @throws \Error If the connection to the database has been closed.
*/
public function quoteName(string $name): string;
}

View File

@ -4,83 +4,18 @@ namespace Amp\Postgres;
use Amp\Iterator;
use Amp\Promise;
use Amp\Sql\Operation;
final class Listener implements Iterator, Operation
interface Listener extends Iterator
{
/** @var \Amp\Iterator */
private $iterator;
/** @var string */
private $channel;
/** @var callable|null */
private $unlisten;
/** @var Internal\ReferenceQueue */
private $queue;
/**
* @param \Amp\Iterator $iterator Iterator emitting notificatons on the channel.
* @param string $channel Channel name.
* @param callable(string $channel): $unlisten Function invoked to unlisten from the channel.
*/
public function __construct(Iterator $iterator, string $channel, callable $unlisten)
{
$this->iterator = $iterator;
$this->channel = $channel;
$this->unlisten = $unlisten;
$this->queue = new Internal\ReferenceQueue;
}
public function __destruct()
{
if ($this->unlisten) {
$this->unlisten(); // Invokes $this->queue->complete().
}
}
/**
* {@inheritdoc}
*/
public function onDestruct(callable $onComplete)
{
$this->queue->onDestruct($onComplete);
}
/**
* {@inheritdoc}
*/
public function advance(): Promise
{
return $this->iterator->advance();
}
/**
* {@inheritdoc}
*
* @return Notification
*/
public function getCurrent(): Notification
{
return $this->iterator->getCurrent();
}
/**
* @return string Channel name.
*/
public function getChannel(): string
{
return $this->channel;
}
public function getChannel(): string;
/**
* @return bool
*/
public function isListening(): bool
{
return $this->unlisten !== null;
}
public function isListening(): bool;
/**
* Unlistens from the channel. No more values will be emitted from this listener.
@ -89,16 +24,5 @@ final class Listener implements Iterator, Operation
*
* @throws \Error If this method was previously invoked.
*/
public function unlisten(): Promise
{
if (!$this->unlisten) {
throw new \Error("Already unlistened on this channel");
}
/** @var Promise $promise */
$promise = ($this->unlisten)($this->channel);
$this->unlisten = null;
$promise->onResolve([$this->queue, "unreference"]);
return $promise;
}
public function unlisten(): Promise;
}

View File

@ -189,7 +189,7 @@ final class PgSqlHandle implements Handle
*/
public function isAlive(): bool
{
return $this->handle !== null;
return \is_resource($this->handle);
}
/**
@ -448,7 +448,7 @@ final class PgSqlHandle implements Handle
}
Loop::enable($this->poll);
return new Listener($emitter->iterate(), $channel, $this->unlisten);
return new ConnectionListener($emitter->iterate(), $channel, $this->unlisten);
});
}

View File

@ -3,10 +3,9 @@
namespace Amp\Postgres;
use Amp\Promise;
use Amp\Sql\Operation;
use Amp\Sql\Statement;
final class PgSqlStatement implements Statement, Operation
final class PgSqlStatement implements Statement
{
/** @var PgSqlHandle */
private $handle;
@ -38,14 +37,12 @@ final class PgSqlStatement implements Statement, Operation
$this->name = $name;
$this->sql = $sql;
$this->params = $params;
$this->queue = new Internal\ReferenceQueue;
$this->lastUsedAt = \time();
}
public function __destruct()
{
$this->handle->statementDeallocate($this->name);
$this->queue->unreference();
}
/** {@inheritdoc} */
@ -71,10 +68,4 @@ final class PgSqlStatement implements Statement, Operation
{
return $this->handle->statementExecute($this->name, Internal\replaceNamedParams($params, $this->params));
}
/** {@inheritdoc} */
public function onDestruct(callable $onDestruct)
{
$this->queue->onDestruct($onDestruct);
}
}

View File

@ -6,6 +6,11 @@ use Amp\Coroutine;
use Amp\Promise;
use Amp\Sql\AbstractPool;
use Amp\Sql\Connector;
use Amp\Sql\Pool as SqlPool;
use Amp\Sql\ResultSet as SqlResultSet;
use Amp\Sql\Statement as SqlStatement;
use Amp\Sql\StatementPool as SqlStatementPool;
use Amp\Sql\Transaction as SqlTransaction;
use function Amp\call;
final class Pool extends AbstractPool implements Link
@ -27,6 +32,28 @@ final class Pool extends AbstractPool implements Link
return connector();
}
protected function createStatement(SqlStatement $statement, callable $release): SqlStatement
{
return new PooledStatement($statement, $release);
}
protected function createStatementPool(SqlPool $pool, SqlStatement $statement, callable $prepare): SqlStatementPool
{
return new StatementPool($pool, $statement, $prepare);
}
protected function createTransaction(SqlTransaction $transaction, callable $release): SqlTransaction
{
\assert($transaction instanceof Transaction);
return new PooledTransaction($transaction, $release);
}
protected function createResultSet(SqlResultSet $resultSet, callable $release): SqlResultSet
{
\assert($resultSet instanceof ResultSet);
return new PooledResultSet($resultSet, $release);
}
/**
* @param bool $reset True to automatically execute RESET ALL on a connection before it is used by the pool.
*/
@ -94,15 +121,13 @@ final class Pool extends AbstractPool implements Link
throw $exception;
}
$listener->onDestruct(function () {
return new PooledListener($listener, function () {
if (--$this->listenerCount === 0) {
$connection = $this->listeningConnection;
$this->listeningConnection = null;
$this->push($connection);
}
});
return $listener;
});
}
}

66
src/PooledListener.php Normal file
View File

@ -0,0 +1,66 @@
<?php
namespace Amp\Postgres;
use Amp\Promise;
class PooledListener implements Listener
{
/** @var Listener */
private $listener;
/** @var callable|null */
private $release;
public function __construct(Listener $listener, callable $release)
{
$this->listener = $listener;
$this->release = $release;
if (!$this->listener->isListening()) {
($this->release)();
$this->release = null;
}
}
public function __destruct()
{
if ($this->listener->isListening()) {
$this->unlisten(); // Invokes $this->release callback.
}
}
public function advance(): Promise
{
return $this->listener->advance();
}
public function getCurrent()
{
return $this->listener->getCurrent();
}
public function getChannel(): string
{
return $this->listener->getChannel();
}
public function isListening(): bool
{
return $this->listener->isListening();
}
public function unlisten(): Promise
{
if (!$this->release) {
throw new \Error("Already unlistened on this channel");
}
$promise = $this->listener->unlisten();
$promise->onResolve($this->release);
$this->release = null;
return $promise;
}
}

40
src/PooledResultSet.php Normal file
View File

@ -0,0 +1,40 @@
<?php
namespace Amp\Postgres;
use Amp\Promise;
class PooledResultSet implements ResultSet
{
/** @var ResultSet */
private $result;
/** @var callable|null */
private $release;
public function __construct(ResultSet $result, callable $release)
{
$this->result = $result;
$this->release = $release;
}
public function __destruct()
{
($this->release)();
}
public function advance(int $type = self::FETCH_ASSOC): Promise
{
return $this->result->advance($type);
}
public function getCurrent()
{
return $this->result->getCurrent();
}
public function numFields(): int
{
return $this->result->numFields();
}
}

51
src/PooledStatement.php Normal file
View File

@ -0,0 +1,51 @@
<?php
namespace Amp\Postgres;
use Amp\Promise;
use Amp\Sql\Statement;
class PooledStatement implements Statement
{
/** @var Statement */
private $statement;
/** @var callable|null */
private $release;
public function __construct(Statement $statement, callable $release)
{
$this->statement = $statement;
$this->release = $release;
if (!$this->statement->isAlive()) {
($this->release)();
$this->release = null;
}
}
public function __destruct()
{
($this->release)();
}
public function execute(array $params = []): Promise
{
return $this->statement->execute($params);
}
public function isAlive(): bool
{
return $this->statement->isAlive();
}
public function getQuery(): string
{
return $this->statement->getQuery();
}
public function lastUsedAt(): int
{
return $this->statement->lastUsedAt();
}
}

198
src/PooledTransaction.php Normal file
View File

@ -0,0 +1,198 @@
<?php
namespace Amp\Postgres;
use Amp\Promise;
use Amp\Sql\TransactionError;
class PooledTransaction implements Transaction
{
/** @var Transaction|null */
private $transaction;
/** @var callable|null */
private $release;
/**
* PooledTransaction constructor.
*
* @param Transaction $transaction
* @param callable $release
*/
public function __construct(Transaction $transaction, callable $release)
{
$this->transaction = $transaction;
$this->release = $release;
if (!$this->transaction->isActive()) {
($this->release)();
$this->transaction = null;
$this->release = null;
}
}
public function __destruct()
{
if ($this->transaction && $this->transaction->isActive()) {
$this->close(); // Invokes $this->release callback.
}
}
public function query(string $sql): Promise
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->transaction->query($sql);
}
public function prepare(string $sql): Promise
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->transaction->prepare($sql);
}
public function execute(string $sql, array $params = []): Promise
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->transaction->execute($sql, $params);
}
public function notify(string $channel, string $payload = ""): Promise
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->transaction->notify($channel, $payload);
}
public function listen(string $channel): Promise
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->transaction->listen($channel);
}
public function isAlive(): bool
{
return $this->transaction && $this->transaction->isAlive();
}
public function lastUsedAt(): int
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->transaction->lastUsedAt();
}
public function close()
{
if (!$this->transaction) {
return;
}
$promise = $this->transaction->commit();
$promise->onResolve($this->release);
$this->transaction = null;
}
public function quoteString(string $data): string
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->transaction->quoteString($data);
}
public function quoteName(string $name): string
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->transaction->quoteName($name);
}
public function getIsolationLevel(): int
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->transaction->getIsolationLevel();
}
public function isActive(): bool
{
return $this->transaction && $this->transaction->isActive();
}
public function commit(): Promise
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
$promise = $this->transaction->commit();
$promise->onResolve($this->release);
$this->transaction = null;
return $promise;
}
public function rollback(): Promise
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
$promise = $this->transaction->rollback();
$promise->onResolve($this->release);
$this->transaction = null;
return $promise;
}
public function createSavepoint(string $identifier): Promise
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->transaction->createSavepoint($identifier);
}
public function rollbackTo(string $identifier): Promise
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->transaction->rollbackTo($identifier);
}
public function releaseSavepoint(string $identifier): Promise
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->transaction->releaseSavepoint($identifier);
}
}

View File

@ -238,8 +238,7 @@ final class PqHandle implements Handle
case pq\Result::SINGLE_TUPLE:
$this->busy = new Deferred;
$result = new PqUnbufferedResultSet($this->fetch, $result);
$result->onDestruct($this->release);
$result = new PqUnbufferedResultSet($this->fetch, $result, $this->release);
return $result;
case pq\Result::NONFATAL_ERROR:
@ -452,7 +451,7 @@ final class PqHandle implements Handle
}
Loop::enable($this->poll);
return new Listener($emitter->iterate(), $channel, $this->unlisten);
return new ConnectionListener($emitter->iterate(), $channel, $this->unlisten);
});
}

View File

@ -3,10 +3,9 @@
namespace Amp\Postgres;
use Amp\Promise;
use Amp\Sql\Operation;
use Amp\Sql\Statement;
final class PqStatement implements Statement, Operation
final class PqStatement implements Statement
{
/** @var @return PromisePqHandle */
private $handle;
@ -17,9 +16,6 @@ final class PqStatement implements Statement, Operation
/** @var string */
private $sql;
/** @var @return PromiseInternal\ReferenceQueue */
private $queue;
/** @var array */
private $params;
@ -27,7 +23,7 @@ final class PqStatement implements Statement, Operation
private $lastUsedAt;
/**
* @param @return PromisePqHandle $handle
* @param PqHandle $handle
* @param string $name Statement name.
* @param string $sql Original prepared SQL query.
* @param string[] $params Parameter indices to parameter names.
@ -38,14 +34,12 @@ final class PqStatement implements Statement, Operation
$this->name = $name;
$this->params = $params;
$this->sql = $sql;
$this->queue = new Internal\ReferenceQueue;
$this->lastUsedAt = \time();
}
public function __destruct()
{
$this->handle->statementDeallocate($this->name);
$this->queue->unreference();
}
/** {@inheritdoc} */
@ -72,10 +66,4 @@ final class PqStatement implements Statement, Operation
$this->lastUsedAt = \time();
return $this->handle->statementExecute($this->name, Internal\replaceNamedParams($params, $this->params));
}
/** {@inheritdoc} */
public function onDestruct(callable $onDestruct)
{
$this->queue->onDestruct($onDestruct);
}
}

View File

@ -20,19 +20,15 @@ final class PqUnbufferedResultSet implements ResultSet
/** @var int Next row fetch type. */
private $type = self::FETCH_ASSOC;
/** @var Internal\ReferenceQueue */
private $queue;
/**
* @param callable(): $fetch Function to fetch next result row.
* @param \pq\Result $result PostgreSQL result object.
*/
public function __construct(callable $fetch, pq\Result $result)
public function __construct(callable $fetch, pq\Result $result, callable $release)
{
$this->numCols = $result->numCols;
$this->queue = $queue = new Internal\ReferenceQueue;
$this->producer = new Producer(static function (callable $emit) use ($queue, $result, $fetch) {
$this->producer = new Producer(static function (callable $emit) use ($release, $result, $fetch) {
try {
do {
$result->autoConvert = pq\Result::CONV_SCALAR | pq\Result::CONV_ARRAY;
@ -40,7 +36,7 @@ final class PqUnbufferedResultSet implements ResultSet
$result = yield $fetch();
} while ($result instanceof pq\Result);
} finally {
$queue->unreference();
$release();
}
});
}
@ -87,12 +83,4 @@ final class PqUnbufferedResultSet implements ResultSet
{
return $this->numCols;
}
/**
* {@inheritdoc}
*/
public function onDestruct(callable $onComplete)
{
$this->queue->onDestruct($onComplete);
}
}

29
src/Quoter.php Normal file
View File

@ -0,0 +1,29 @@
<?php
namespace Amp\Postgres;
interface Quoter
{
/**
* Quotes (escapes) the given string for use as a string literal or identifier in a query. This method wraps the
* string in single quotes, so additional quotes should not be added in the query.
*
* @param string $data Unquoted data.
*
* @return string Quoted string wrapped in single quotes.
*
* @throws \Error If the connection to the database has been closed.
*/
public function quoteString(string $data): string;
/**
* Quotes (escapes) the given string for use as a name or identifier in a query.
*
* @param string $name Unquoted identifier.
*
* @return string Quoted identifier.
*
* @throws \Error If the connection to the database has been closed.
*/
public function quoteName(string $name): string;
}

15
src/StatementPool.php Normal file
View File

@ -0,0 +1,15 @@
<?php
namespace Amp\Postgres;
use Amp\Sql\ResultSet as SqlResultSet;
use Amp\Sql\StatementPool as SqlStatementPool;
class StatementPool extends SqlStatementPool
{
protected function createResultSet(SqlResultSet $resultSet, callable $release): SqlResultSet
{
\assert($resultSet instanceof ResultSet);
return new PooledResultSet($resultSet, $release);
}
}

View File

@ -2,341 +2,8 @@
namespace Amp\Postgres;
use Amp\Promise;
use Amp\Sql\Operation;
use Amp\Sql\Transaction as SqlTransaction;
use Amp\Sql\TransactionError;
use function Amp\call;
final class Transaction implements Handle, SqlTransaction
interface Transaction extends Quoter, SqlTransaction
{
/** @var Handle|null */
private $handle;
/** @var int */
private $isolation;
/** @var Internal\ReferenceQueue */
private $queue;
/** @var Listener[] */
private $listeners = [];
/**
* @param Handle $handle
* @param int $isolation
*
* @throws \Error If the isolation level is invalid.
*/
public function __construct(Handle $handle, int $isolation = SqlTransaction::ISOLATION_COMMITTED)
{
switch ($isolation) {
case SqlTransaction::ISOLATION_UNCOMMITTED:
case SqlTransaction::ISOLATION_COMMITTED:
case SqlTransaction::ISOLATION_REPEATABLE:
case SqlTransaction::ISOLATION_SERIALIZABLE:
$this->isolation = $isolation;
break;
default:
throw new \Error("Isolation must be a valid transaction isolation level");
}
$this->handle = $handle;
$this->queue = new Internal\ReferenceQueue;
$listeners =& $this->listeners;
$this->queue->onDestruct(static function () use (&$listeners) {
foreach ($listeners as $listener) {
if ($listener->isListening()) {
$listener->unlisten();
}
}
});
}
public function __destruct()
{
if ($this->handle) {
$this->rollback(); // Invokes $this->queue->complete().
}
}
/**
* {@inheritdoc}
*/
public function lastUsedAt(): int
{
return $this->handle->lastUsedAt();
}
/**
* {@inheritdoc}
*
* Closes and commits all changes in the transaction.
*/
public function close()
{
if ($this->handle) {
$this->commit(); // Invokes $this->queue->unreference().
}
}
/**
* {@inheritdoc}
*/
public function onDestruct(callable $onComplete)
{
$this->queue->onDestruct($onComplete);
}
/**
* {@inheritdoc}
*/
public function isAlive(): bool
{
return $this->handle && $this->handle->isAlive();
}
/**
* @return bool True if the transaction is active, false if it has been committed or rolled back.
*/
public function isActive(): bool
{
return $this->handle !== null;
}
/**
* @return int
*/
public function getIsolationLevel(): int
{
return $this->isolation;
}
/**
* {@inheritdoc}
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function query(string $sql): Promise
{
if ($this->handle === null) {
throw new TransactionError("The transaction has been committed or rolled back");
}
$this->queue->reference();
$promise = $this->handle->query($sql);
$promise->onResolve(function ($exception, $result) {
if ($result instanceof Operation) {
$result->onDestruct([$this->queue, "unreference"]);
return;
}
$this->queue->unreference();
});
return $promise;
}
/**
* {@inheritdoc}
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function prepare(string $sql): Promise
{
if ($this->handle === null) {
throw new TransactionError("The transaction has been committed or rolled back");
}
$this->queue->reference();
$promise = $this->handle->prepare($sql);
$promise->onResolve(function ($exception, $statement) {
if ($statement instanceof Operation) {
$statement->onDestruct([$this->queue, "unreference"]);
return;
}
$this->queue->unreference();
});
return $promise;
}
/**
* {@inheritdoc}
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function execute(string $sql, array $params = []): Promise
{
if ($this->handle === null) {
throw new TransactionError("The transaction has been committed or rolled back");
}
$this->queue->reference();
$promise = $this->handle->execute($sql, $params);
$promise->onResolve(function ($exception, $result) {
if ($result instanceof Operation) {
$result->onDestruct([$this->queue, "unreference"]);
return;
}
$this->queue->unreference();
});
return $promise;
}
/**
* {@inheritdoc}
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function notify(string $channel, string $payload = ""): Promise
{
if ($this->handle === null) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->handle->notify($channel, $payload);
}
/**
* Commits the transaction and makes it inactive.
*
* @return Promise<\Amp\Sql\CommandResult>
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function commit(): Promise
{
if ($this->handle === null) {
throw new TransactionError("The transaction has been committed or rolled back");
}
$promise = $this->handle->query("COMMIT");
$this->handle = null;
$promise->onResolve([$this->queue, "unreference"]);
return $promise;
}
/**
* Rolls back the transaction and makes it inactive.
*
* @return Promise<\Amp\Sql\CommandResult>
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function rollback(): Promise
{
if ($this->handle === null) {
throw new TransactionError("The transaction has been committed or rolled back");
}
$promise = $this->handle->query("ROLLBACK");
$this->handle = null;
$promise->onResolve([$this->queue, "unreference"]);
return $promise;
}
/**
* Creates a savepoint with the given identifier.
*
* @param string $identifier Savepoint identifier.
*
* @return Promise<\Amp\Sql\CommandResult>
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function createSavepoint(string $identifier): Promise
{
return $this->query("SAVEPOINT " . $this->quoteName($identifier));
}
/**
* Rolls back to the savepoint with the given identifier.
*
* @param string $identifier Savepoint identifier.
*
* @return Promise<\Amp\Sql\CommandResult>
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function rollbackTo(string $identifier): Promise
{
return $this->query("ROLLBACK TO " . $this->quoteName($identifier));
}
/**
* Releases the savepoint with the given identifier.
*
* @param string $identifier Savepoint identifier.
*
* @return Promise<\Amp\Sql\CommandResult>
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function releaseSavepoint(string $identifier): Promise
{
return $this->query("RELEASE SAVEPOINT " . $this->quoteName($identifier));
}
/**
* {@inheritdoc}
*
* Listeners automatically unlisten when the transaction is committed or rolled back.
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function listen(string $channel): Promise
{
if ($this->handle === null) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return call(function () use ($channel) {
$listener = yield $this->handle->listen($channel);
$this->listeners[] = $listener;
return $listener;
});
}
/**
* {@inheritdoc}
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function quoteString(string $data): string
{
if ($this->handle === null) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->handle->quoteString($data);
}
/**
* {@inheritdoc}
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function quoteName(string $name): string
{
if ($this->handle === null) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->handle->quoteName($name);
}
}

View File

@ -1,55 +0,0 @@
<?php
namespace Amp\Postgres\Test;
use Amp\Delayed;
use Amp\Loop;
use Amp\Postgres\ConnectionConfig;
use Amp\Postgres\Pool;
use PHPUnit\Framework\TestCase;
class PoolTest extends TestCase
{
/**
* @expectedException \Error
* @expectedExceptionMessage Pool must contain at least one connection
*/
public function testInvalidMaxConnections()
{
new Pool(new ConnectionConfig('connection string'), 0);
}
public function testIdleConnectionsRemovedAfterTimeout()
{
Loop::run(function () {
$pool = new Pool(new ConnectionConfig('host=localhost user=postgres'));
$pool->setIdleTimeout(2);
$count = 3;
$promises = [];
for ($i = 0; $i < $count; ++$i) {
$promises[] = $pool->query("SELECT $i");
}
$results = yield $promises;
/** @var \Amp\Postgres\ResultSet $result */
foreach ($results as $result) {
while (yield $result->advance()); // Consume results to free connection
}
$this->assertSame($count, $pool->getConnectionCount());
yield new Delayed(1000);
$this->assertSame($count, $pool->getConnectionCount());
$result = yield $pool->query("SELECT $i");
while (yield $result->advance()); // Consume results to free connection
yield new Delayed(1000);
$this->assertSame(1, $pool->getConnectionCount());
});
}
}

View File

@ -1,80 +0,0 @@
<?php
namespace Amp\Postgres\Test;
use Amp\Delayed;
use Amp\Loop;
use Amp\PHPUnit\TestCase;
use Amp\Postgres\ConnectionConfig;
use Amp\Postgres\Pool;
use Amp\Postgres\ResultSet;
use Amp\Sql\PooledStatement;
use Amp\Sql\Statement;
use Amp\Success;
class PooledStatementTest extends TestCase
{
public function testActiveStatementsRemainAfterTimeout()
{
Loop::run(function () {
$pool = new Pool(new ConnectionConfig('host=localhost user=postgres'));
$statement = $this->createMock(Statement::class);
$statement->method('getQuery')
->willReturn('SELECT 1');
$statement->method('lastUsedAt')
->willReturn(\time());
$statement->expects($this->once())
->method('execute');
$pooledStatement = new PooledStatement($pool, $statement, $this->createCallback(0));
$this->assertTrue($pooledStatement->isAlive());
$this->assertSame(\time(), $pooledStatement->lastUsedAt());
yield new Delayed(1500); // Give timeout watcher enough time to execute.
$pooledStatement->execute();
$this->assertTrue($pooledStatement->isAlive());
$this->assertSame(\time(), $pooledStatement->lastUsedAt());
});
}
public function testIdleStatementsRemovedAfterTimeout()
{
Loop::run(function () {
$pool = new Pool(new ConnectionConfig('host=localhost user=postgres'));
$statement = $this->createMock(Statement::class);
$statement->method('getQuery')
->willReturn('SELECT 1');
$statement->method('lastUsedAt')
->willReturn(0);
$statement->expects($this->never())
->method('execute');
$prepare = function () {
$statement = $this->createMock(Statement::class);
$statement->expects($this->once())
->method('execute')
->willReturn(new Success($this->createMock(ResultSet::class)));
return new Success($statement);
};
$pooledStatement = new PooledStatement($pool, $statement, $prepare);
$this->assertTrue($pooledStatement->isAlive());
$this->assertSame(\time(), $pooledStatement->lastUsedAt());
yield new Delayed(1500); // Give timeout watcher enough time to execute and remove mock statement object.
$result = yield $pooledStatement->execute();
$this->assertInstanceOf(ResultSet::class, $result);
$this->assertTrue($pooledStatement->isAlive());
$this->assertSame(\time(), $pooledStatement->lastUsedAt());
});
}
}