diff --git a/src/Connection.php b/src/Connection.php index 4aaa06f..3f26476 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -182,7 +182,7 @@ abstract class Connection implements Link, Handle * * @throws FailureException */ - final public function transaction(int $isolation = Transaction::ISOLATION_COMMITTED): Promise + final public function transaction(int $isolation = ConnectionTransaction::ISOLATION_COMMITTED): Promise { if (! $this->handle) { throw new FailureException('Not connected'); @@ -190,19 +190,19 @@ abstract class Connection implements Link, Handle return call(function () use ($isolation) { switch ($isolation) { - case Transaction::ISOLATION_UNCOMMITTED: + case ConnectionTransaction::ISOLATION_UNCOMMITTED: yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL READ UNCOMMITTED"); break; - case Transaction::ISOLATION_COMMITTED: + case ConnectionTransaction::ISOLATION_COMMITTED: yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED"); break; - case Transaction::ISOLATION_REPEATABLE: + case ConnectionTransaction::ISOLATION_REPEATABLE: yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"); break; - case Transaction::ISOLATION_SERIALIZABLE: + case ConnectionTransaction::ISOLATION_SERIALIZABLE: yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE"); break; @@ -212,9 +212,7 @@ abstract class Connection implements Link, Handle $this->busy = new Deferred; - $transaction = new Transaction($this->handle, $isolation); - $transaction->onDestruct($this->release); - return $transaction; + return new ConnectionTransaction($this->handle, $isolation, $this->release); }); } diff --git a/src/ConnectionListener.php b/src/ConnectionListener.php new file mode 100644 index 0000000..9f16281 --- /dev/null +++ b/src/ConnectionListener.php @@ -0,0 +1,91 @@ +iterator = $iterator; + $this->channel = $channel; + $this->unlisten = $unlisten; + } + + public function __destruct() + { + if ($this->unlisten) { + $this->unlisten(); // Invokes $this->queue->complete(). + } + } + + /** + * {@inheritdoc} + */ + public function advance(): Promise + { + return $this->iterator->advance(); + } + + /** + * {@inheritdoc} + * + * @return Notification + */ + public function getCurrent(): Notification + { + return $this->iterator->getCurrent(); + } + + /** + * @return string Channel name. + */ + public function getChannel(): string + { + return $this->channel; + } + + /** + * @return bool + */ + public function isListening(): bool + { + return $this->unlisten !== null; + } + + /** + * Unlistens from the channel. No more values will be emitted from this listener. + * + * @return Promise<\Amp\Sql\CommandResult> + * + * @throws \Error If this method was previously invoked. + */ + public function unlisten(): Promise + { + if (!$this->unlisten) { + throw new \Error("Already unlistened on this channel"); + } + + /** @var Promise $promise */ + $promise = ($this->unlisten)($this->channel); + $this->unlisten = null; + + return $promise; + } +} diff --git a/src/ConnectionTransaction.php b/src/ConnectionTransaction.php new file mode 100644 index 0000000..fb635a7 --- /dev/null +++ b/src/ConnectionTransaction.php @@ -0,0 +1,334 @@ +isolation = $isolation; + break; + + default: + throw new \Error("Isolation must be a valid transaction isolation level"); + } + + $this->handle = $handle; + $this->queue = new Internal\ReferenceQueue; + + $listeners =& $this->listeners; + $this->queue->onDestruct(static function () use (&$listeners) { + foreach ($listeners as $listener) { + if ($listener->isListening()) { + $listener->unlisten(); + } + } + }); + } + + public function __destruct() + { + if ($this->handle && $this->handle->isAlive()) { + $this->rollback(); // Invokes $this->queue->complete(). + } + } + + /** + * {@inheritdoc} + */ + public function lastUsedAt(): int + { + return $this->handle->lastUsedAt(); + } + + /** + * {@inheritdoc} + * + * Closes and commits all changes in the transaction. + */ + public function close() + { + if ($this->handle) { + $this->commit(); // Invokes $this->queue->unreference(). + } + } + + /** + * {@inheritdoc} + */ + public function isAlive(): bool + { + return $this->handle && $this->handle->isAlive(); + } + + /** + * @return bool True if the transaction is active, false if it has been committed or rolled back. + */ + public function isActive(): bool + { + return $this->handle !== null; + } + + /** + * @return int + */ + public function getIsolationLevel(): int + { + return $this->isolation; + } + + /** + * {@inheritdoc} + * + * @throws TransactionError If the transaction has been committed or rolled back. + */ + public function query(string $sql): Promise + { + if ($this->handle === null) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + $this->queue->reference(); + + $promise = $this->handle->query($sql); + + $promise->onResolve(function ($exception, $result) { + if ($result instanceof Operation) { + $result->onDestruct([$this->queue, "unreference"]); + return; + } + + $this->queue->unreference(); + }); + + return $promise; + } + + /** + * {@inheritdoc} + * + * @throws TransactionError If the transaction has been committed or rolled back. + */ + public function prepare(string $sql): Promise + { + if ($this->handle === null) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + $this->queue->reference(); + + $promise = $this->handle->prepare($sql); + + $promise->onResolve(function ($exception, $statement) { + if ($statement instanceof Operation) { + $statement->onDestruct([$this->queue, "unreference"]); + return; + } + + $this->queue->unreference(); + }); + + return $promise; + } + + /** + * {@inheritdoc} + * + * @throws TransactionError If the transaction has been committed or rolled back. + */ + public function execute(string $sql, array $params = []): Promise + { + if ($this->handle === null) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + $this->queue->reference(); + + $promise = $this->handle->execute($sql, $params); + + $promise->onResolve(function ($exception, $result) { + if ($result instanceof Operation) { + $result->onDestruct([$this->queue, "unreference"]); + return; + } + + $this->queue->unreference(); + }); + + return $promise; + } + + + /** + * {@inheritdoc} + * + * @throws TransactionError If the transaction has been committed or rolled back. + */ + public function notify(string $channel, string $payload = ""): Promise + { + if ($this->handle === null) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return $this->handle->notify($channel, $payload); + } + + /** + * Commits the transaction and makes it inactive. + * + * @return Promise<\Amp\Sql\CommandResult> + * + * @throws TransactionError If the transaction has been committed or rolled back. + */ + public function commit(): Promise + { + if ($this->handle === null) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + $promise = $this->handle->query("COMMIT"); + $this->handle = null; + $promise->onResolve([$this->queue, "unreference"]); + + return $promise; + } + + /** + * Rolls back the transaction and makes it inactive. + * + * @return Promise<\Amp\Sql\CommandResult> + * + * @throws TransactionError If the transaction has been committed or rolled back. + */ + public function rollback(): Promise + { + if ($this->handle === null) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + $promise = $this->handle->query("ROLLBACK"); + $this->handle = null; + $promise->onResolve([$this->queue, "unreference"]); + + return $promise; + } + + /** + * Creates a savepoint with the given identifier. + * + * @param string $identifier Savepoint identifier. + * + * @return Promise<\Amp\Sql\CommandResult> + * + * @throws TransactionError If the transaction has been committed or rolled back. + */ + public function createSavepoint(string $identifier): Promise + { + return $this->query("SAVEPOINT " . $this->quoteName($identifier)); + } + + /** + * Rolls back to the savepoint with the given identifier. + * + * @param string $identifier Savepoint identifier. + * + * @return Promise<\Amp\Sql\CommandResult> + * + * @throws TransactionError If the transaction has been committed or rolled back. + */ + public function rollbackTo(string $identifier): Promise + { + return $this->query("ROLLBACK TO " . $this->quoteName($identifier)); + } + + /** + * Releases the savepoint with the given identifier. + * + * @param string $identifier Savepoint identifier. + * + * @return Promise<\Amp\Sql\CommandResult> + * + * @throws TransactionError If the transaction has been committed or rolled back. + */ + public function releaseSavepoint(string $identifier): Promise + { + return $this->query("RELEASE SAVEPOINT " . $this->quoteName($identifier)); + } + + /** + * {@inheritdoc} + * + * Listeners automatically unlisten when the transaction is committed or rolled back. + * + * @throws TransactionError If the transaction has been committed or rolled back. + */ + public function listen(string $channel): Promise + { + if ($this->handle === null) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return call(function () use ($channel) { + $listener = yield $this->handle->listen($channel); + $this->listeners[] = $listener; + return $listener; + }); + } + + /** + * {@inheritdoc} + * + * @throws TransactionError If the transaction has been committed or rolled back. + */ + public function quoteString(string $data): string + { + if ($this->handle === null) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return $this->handle->quoteString($data); + } + + /** + * {@inheritdoc} + * + * @throws TransactionError If the transaction has been committed or rolled back. + */ + public function quoteName(string $name): string + { + if ($this->handle === null) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return $this->handle->quoteName($name); + } +} diff --git a/src/Handle.php b/src/Handle.php index b393511..ad17d6f 100644 --- a/src/Handle.php +++ b/src/Handle.php @@ -2,28 +2,6 @@ namespace Amp\Postgres; -interface Handle extends Executor +interface Handle extends Executor, Quoter { - /** - * Quotes (escapes) the given string for use as a string literal or identifier in a query. This method wraps the - * string in single quotes, so additional quotes should not be added in the query. - * - * @param string $data Unquoted data. - * - * @return string Quoted string wrapped in single quotes. - * - * @throws \Error If the connection to the database has been closed. - */ - public function quoteString(string $data): string; - - /** - * Quotes (escapes) the given string for use as a name or identifier in a query. - * - * @param string $name Unquoted identifier. - * - * @return string Quoted identifier. - * - * @throws \Error If the connection to the database has been closed. - */ - public function quoteName(string $name): string; } diff --git a/src/Listener.php b/src/Listener.php index 8f15665..3006791 100644 --- a/src/Listener.php +++ b/src/Listener.php @@ -4,83 +4,18 @@ namespace Amp\Postgres; use Amp\Iterator; use Amp\Promise; -use Amp\Sql\Operation; -final class Listener implements Iterator, Operation +interface Listener extends Iterator { - /** @var \Amp\Iterator */ - private $iterator; - - /** @var string */ - private $channel; - - /** @var callable|null */ - private $unlisten; - - /** @var Internal\ReferenceQueue */ - private $queue; - - /** - * @param \Amp\Iterator $iterator Iterator emitting notificatons on the channel. - * @param string $channel Channel name. - * @param callable(string $channel): $unlisten Function invoked to unlisten from the channel. - */ - public function __construct(Iterator $iterator, string $channel, callable $unlisten) - { - $this->iterator = $iterator; - $this->channel = $channel; - $this->unlisten = $unlisten; - $this->queue = new Internal\ReferenceQueue; - } - - public function __destruct() - { - if ($this->unlisten) { - $this->unlisten(); // Invokes $this->queue->complete(). - } - } - - /** - * {@inheritdoc} - */ - public function onDestruct(callable $onComplete) - { - $this->queue->onDestruct($onComplete); - } - - /** - * {@inheritdoc} - */ - public function advance(): Promise - { - return $this->iterator->advance(); - } - - /** - * {@inheritdoc} - * - * @return Notification - */ - public function getCurrent(): Notification - { - return $this->iterator->getCurrent(); - } - /** * @return string Channel name. */ - public function getChannel(): string - { - return $this->channel; - } + public function getChannel(): string; /** * @return bool */ - public function isListening(): bool - { - return $this->unlisten !== null; - } + public function isListening(): bool; /** * Unlistens from the channel. No more values will be emitted from this listener. @@ -89,16 +24,5 @@ final class Listener implements Iterator, Operation * * @throws \Error If this method was previously invoked. */ - public function unlisten(): Promise - { - if (!$this->unlisten) { - throw new \Error("Already unlistened on this channel"); - } - - /** @var Promise $promise */ - $promise = ($this->unlisten)($this->channel); - $this->unlisten = null; - $promise->onResolve([$this->queue, "unreference"]); - return $promise; - } + public function unlisten(): Promise; } diff --git a/src/PgSqlHandle.php b/src/PgSqlHandle.php index 061c4b5..35b5551 100644 --- a/src/PgSqlHandle.php +++ b/src/PgSqlHandle.php @@ -189,7 +189,7 @@ final class PgSqlHandle implements Handle */ public function isAlive(): bool { - return $this->handle !== null; + return \is_resource($this->handle); } /** @@ -448,7 +448,7 @@ final class PgSqlHandle implements Handle } Loop::enable($this->poll); - return new Listener($emitter->iterate(), $channel, $this->unlisten); + return new ConnectionListener($emitter->iterate(), $channel, $this->unlisten); }); } diff --git a/src/PgSqlStatement.php b/src/PgSqlStatement.php index aa9fdf3..3d6790e 100644 --- a/src/PgSqlStatement.php +++ b/src/PgSqlStatement.php @@ -3,10 +3,9 @@ namespace Amp\Postgres; use Amp\Promise; -use Amp\Sql\Operation; use Amp\Sql\Statement; -final class PgSqlStatement implements Statement, Operation +final class PgSqlStatement implements Statement { /** @var PgSqlHandle */ private $handle; @@ -38,14 +37,12 @@ final class PgSqlStatement implements Statement, Operation $this->name = $name; $this->sql = $sql; $this->params = $params; - $this->queue = new Internal\ReferenceQueue; $this->lastUsedAt = \time(); } public function __destruct() { $this->handle->statementDeallocate($this->name); - $this->queue->unreference(); } /** {@inheritdoc} */ @@ -71,10 +68,4 @@ final class PgSqlStatement implements Statement, Operation { return $this->handle->statementExecute($this->name, Internal\replaceNamedParams($params, $this->params)); } - - /** {@inheritdoc} */ - public function onDestruct(callable $onDestruct) - { - $this->queue->onDestruct($onDestruct); - } } diff --git a/src/Pool.php b/src/Pool.php index 9640874..1cbac83 100644 --- a/src/Pool.php +++ b/src/Pool.php @@ -6,6 +6,11 @@ use Amp\Coroutine; use Amp\Promise; use Amp\Sql\AbstractPool; use Amp\Sql\Connector; +use Amp\Sql\Pool as SqlPool; +use Amp\Sql\ResultSet as SqlResultSet; +use Amp\Sql\Statement as SqlStatement; +use Amp\Sql\StatementPool as SqlStatementPool; +use Amp\Sql\Transaction as SqlTransaction; use function Amp\call; final class Pool extends AbstractPool implements Link @@ -27,6 +32,28 @@ final class Pool extends AbstractPool implements Link return connector(); } + protected function createStatement(SqlStatement $statement, callable $release): SqlStatement + { + return new PooledStatement($statement, $release); + } + + protected function createStatementPool(SqlPool $pool, SqlStatement $statement, callable $prepare): SqlStatementPool + { + return new StatementPool($pool, $statement, $prepare); + } + + protected function createTransaction(SqlTransaction $transaction, callable $release): SqlTransaction + { + \assert($transaction instanceof Transaction); + return new PooledTransaction($transaction, $release); + } + + protected function createResultSet(SqlResultSet $resultSet, callable $release): SqlResultSet + { + \assert($resultSet instanceof ResultSet); + return new PooledResultSet($resultSet, $release); + } + /** * @param bool $reset True to automatically execute RESET ALL on a connection before it is used by the pool. */ @@ -94,15 +121,13 @@ final class Pool extends AbstractPool implements Link throw $exception; } - $listener->onDestruct(function () { + return new PooledListener($listener, function () { if (--$this->listenerCount === 0) { $connection = $this->listeningConnection; $this->listeningConnection = null; $this->push($connection); } }); - - return $listener; }); } } diff --git a/src/PooledListener.php b/src/PooledListener.php new file mode 100644 index 0000000..5fdd2cc --- /dev/null +++ b/src/PooledListener.php @@ -0,0 +1,66 @@ +listener = $listener; + $this->release = $release; + + if (!$this->listener->isListening()) { + ($this->release)(); + $this->release = null; + } + } + + public function __destruct() + { + if ($this->listener->isListening()) { + $this->unlisten(); // Invokes $this->release callback. + } + } + + public function advance(): Promise + { + return $this->listener->advance(); + } + + public function getCurrent() + { + return $this->listener->getCurrent(); + } + + public function getChannel(): string + { + return $this->listener->getChannel(); + } + + public function isListening(): bool + { + return $this->listener->isListening(); + } + + public function unlisten(): Promise + { + if (!$this->release) { + throw new \Error("Already unlistened on this channel"); + } + + $promise = $this->listener->unlisten(); + $promise->onResolve($this->release); + + $this->release = null; + + return $promise; + } +} diff --git a/src/PooledResultSet.php b/src/PooledResultSet.php new file mode 100644 index 0000000..465d40b --- /dev/null +++ b/src/PooledResultSet.php @@ -0,0 +1,40 @@ +result = $result; + $this->release = $release; + } + + public function __destruct() + { + ($this->release)(); + } + + public function advance(int $type = self::FETCH_ASSOC): Promise + { + return $this->result->advance($type); + } + + public function getCurrent() + { + return $this->result->getCurrent(); + } + + public function numFields(): int + { + return $this->result->numFields(); + } +} diff --git a/src/PooledStatement.php b/src/PooledStatement.php new file mode 100644 index 0000000..0e482b3 --- /dev/null +++ b/src/PooledStatement.php @@ -0,0 +1,51 @@ +statement = $statement; + $this->release = $release; + + if (!$this->statement->isAlive()) { + ($this->release)(); + $this->release = null; + } + } + + public function __destruct() + { + ($this->release)(); + } + + public function execute(array $params = []): Promise + { + return $this->statement->execute($params); + } + + public function isAlive(): bool + { + return $this->statement->isAlive(); + } + + public function getQuery(): string + { + return $this->statement->getQuery(); + } + + public function lastUsedAt(): int + { + return $this->statement->lastUsedAt(); + } +} diff --git a/src/PooledTransaction.php b/src/PooledTransaction.php new file mode 100644 index 0000000..f6699b6 --- /dev/null +++ b/src/PooledTransaction.php @@ -0,0 +1,198 @@ +transaction = $transaction; + $this->release = $release; + + if (!$this->transaction->isActive()) { + ($this->release)(); + $this->transaction = null; + $this->release = null; + } + } + + public function __destruct() + { + if ($this->transaction && $this->transaction->isActive()) { + $this->close(); // Invokes $this->release callback. + } + } + + public function query(string $sql): Promise + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return $this->transaction->query($sql); + } + + public function prepare(string $sql): Promise + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return $this->transaction->prepare($sql); + } + + public function execute(string $sql, array $params = []): Promise + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return $this->transaction->execute($sql, $params); + } + + public function notify(string $channel, string $payload = ""): Promise + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return $this->transaction->notify($channel, $payload); + } + + public function listen(string $channel): Promise + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return $this->transaction->listen($channel); + } + + public function isAlive(): bool + { + return $this->transaction && $this->transaction->isAlive(); + } + + public function lastUsedAt(): int + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return $this->transaction->lastUsedAt(); + } + + public function close() + { + if (!$this->transaction) { + return; + } + + $promise = $this->transaction->commit(); + $promise->onResolve($this->release); + + $this->transaction = null; + } + + public function quoteString(string $data): string + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return $this->transaction->quoteString($data); + } + + public function quoteName(string $name): string + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return $this->transaction->quoteName($name); + } + + public function getIsolationLevel(): int + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return $this->transaction->getIsolationLevel(); + } + + public function isActive(): bool + { + return $this->transaction && $this->transaction->isActive(); + } + + public function commit(): Promise + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + $promise = $this->transaction->commit(); + $promise->onResolve($this->release); + + $this->transaction = null; + + return $promise; + } + + public function rollback(): Promise + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + $promise = $this->transaction->rollback(); + $promise->onResolve($this->release); + + $this->transaction = null; + + return $promise; + } + + public function createSavepoint(string $identifier): Promise + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return $this->transaction->createSavepoint($identifier); + } + + public function rollbackTo(string $identifier): Promise + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return $this->transaction->rollbackTo($identifier); + } + + public function releaseSavepoint(string $identifier): Promise + { + if (!$this->transaction) { + throw new TransactionError("The transaction has been committed or rolled back"); + } + + return $this->transaction->releaseSavepoint($identifier); + } +} diff --git a/src/PqHandle.php b/src/PqHandle.php index 74cc27b..087c905 100644 --- a/src/PqHandle.php +++ b/src/PqHandle.php @@ -238,8 +238,7 @@ final class PqHandle implements Handle case pq\Result::SINGLE_TUPLE: $this->busy = new Deferred; - $result = new PqUnbufferedResultSet($this->fetch, $result); - $result->onDestruct($this->release); + $result = new PqUnbufferedResultSet($this->fetch, $result, $this->release); return $result; case pq\Result::NONFATAL_ERROR: @@ -452,7 +451,7 @@ final class PqHandle implements Handle } Loop::enable($this->poll); - return new Listener($emitter->iterate(), $channel, $this->unlisten); + return new ConnectionListener($emitter->iterate(), $channel, $this->unlisten); }); } diff --git a/src/PqStatement.php b/src/PqStatement.php index 1788e44..82c410c 100644 --- a/src/PqStatement.php +++ b/src/PqStatement.php @@ -3,10 +3,9 @@ namespace Amp\Postgres; use Amp\Promise; -use Amp\Sql\Operation; use Amp\Sql\Statement; -final class PqStatement implements Statement, Operation +final class PqStatement implements Statement { /** @var @return PromisePqHandle */ private $handle; @@ -17,9 +16,6 @@ final class PqStatement implements Statement, Operation /** @var string */ private $sql; - /** @var @return PromiseInternal\ReferenceQueue */ - private $queue; - /** @var array */ private $params; @@ -27,7 +23,7 @@ final class PqStatement implements Statement, Operation private $lastUsedAt; /** - * @param @return PromisePqHandle $handle + * @param PqHandle $handle * @param string $name Statement name. * @param string $sql Original prepared SQL query. * @param string[] $params Parameter indices to parameter names. @@ -38,14 +34,12 @@ final class PqStatement implements Statement, Operation $this->name = $name; $this->params = $params; $this->sql = $sql; - $this->queue = new Internal\ReferenceQueue; $this->lastUsedAt = \time(); } public function __destruct() { $this->handle->statementDeallocate($this->name); - $this->queue->unreference(); } /** {@inheritdoc} */ @@ -72,10 +66,4 @@ final class PqStatement implements Statement, Operation $this->lastUsedAt = \time(); return $this->handle->statementExecute($this->name, Internal\replaceNamedParams($params, $this->params)); } - - /** {@inheritdoc} */ - public function onDestruct(callable $onDestruct) - { - $this->queue->onDestruct($onDestruct); - } } diff --git a/src/PqUnbufferedResultSet.php b/src/PqUnbufferedResultSet.php index 0794ab0..d6e0fa3 100644 --- a/src/PqUnbufferedResultSet.php +++ b/src/PqUnbufferedResultSet.php @@ -20,19 +20,15 @@ final class PqUnbufferedResultSet implements ResultSet /** @var int Next row fetch type. */ private $type = self::FETCH_ASSOC; - /** @var Internal\ReferenceQueue */ - private $queue; - /** * @param callable(): $fetch Function to fetch next result row. * @param \pq\Result $result PostgreSQL result object. */ - public function __construct(callable $fetch, pq\Result $result) + public function __construct(callable $fetch, pq\Result $result, callable $release) { $this->numCols = $result->numCols; - $this->queue = $queue = new Internal\ReferenceQueue; - $this->producer = new Producer(static function (callable $emit) use ($queue, $result, $fetch) { + $this->producer = new Producer(static function (callable $emit) use ($release, $result, $fetch) { try { do { $result->autoConvert = pq\Result::CONV_SCALAR | pq\Result::CONV_ARRAY; @@ -40,7 +36,7 @@ final class PqUnbufferedResultSet implements ResultSet $result = yield $fetch(); } while ($result instanceof pq\Result); } finally { - $queue->unreference(); + $release(); } }); } @@ -87,12 +83,4 @@ final class PqUnbufferedResultSet implements ResultSet { return $this->numCols; } - - /** - * {@inheritdoc} - */ - public function onDestruct(callable $onComplete) - { - $this->queue->onDestruct($onComplete); - } } diff --git a/src/Quoter.php b/src/Quoter.php new file mode 100644 index 0000000..43a41dc --- /dev/null +++ b/src/Quoter.php @@ -0,0 +1,29 @@ +isolation = $isolation; - break; - - default: - throw new \Error("Isolation must be a valid transaction isolation level"); - } - - $this->handle = $handle; - $this->queue = new Internal\ReferenceQueue; - - $listeners =& $this->listeners; - $this->queue->onDestruct(static function () use (&$listeners) { - foreach ($listeners as $listener) { - if ($listener->isListening()) { - $listener->unlisten(); - } - } - }); - } - - public function __destruct() - { - if ($this->handle) { - $this->rollback(); // Invokes $this->queue->complete(). - } - } - - /** - * {@inheritdoc} - */ - public function lastUsedAt(): int - { - return $this->handle->lastUsedAt(); - } - - /** - * {@inheritdoc} - * - * Closes and commits all changes in the transaction. - */ - public function close() - { - if ($this->handle) { - $this->commit(); // Invokes $this->queue->unreference(). - } - } - - /** - * {@inheritdoc} - */ - public function onDestruct(callable $onComplete) - { - $this->queue->onDestruct($onComplete); - } - - /** - * {@inheritdoc} - */ - public function isAlive(): bool - { - return $this->handle && $this->handle->isAlive(); - } - - /** - * @return bool True if the transaction is active, false if it has been committed or rolled back. - */ - public function isActive(): bool - { - return $this->handle !== null; - } - - /** - * @return int - */ - public function getIsolationLevel(): int - { - return $this->isolation; - } - - /** - * {@inheritdoc} - * - * @throws TransactionError If the transaction has been committed or rolled back. - */ - public function query(string $sql): Promise - { - if ($this->handle === null) { - throw new TransactionError("The transaction has been committed or rolled back"); - } - - $this->queue->reference(); - - $promise = $this->handle->query($sql); - - $promise->onResolve(function ($exception, $result) { - if ($result instanceof Operation) { - $result->onDestruct([$this->queue, "unreference"]); - return; - } - - $this->queue->unreference(); - }); - - return $promise; - } - - /** - * {@inheritdoc} - * - * @throws TransactionError If the transaction has been committed or rolled back. - */ - public function prepare(string $sql): Promise - { - if ($this->handle === null) { - throw new TransactionError("The transaction has been committed or rolled back"); - } - - $this->queue->reference(); - - $promise = $this->handle->prepare($sql); - - $promise->onResolve(function ($exception, $statement) { - if ($statement instanceof Operation) { - $statement->onDestruct([$this->queue, "unreference"]); - return; - } - - $this->queue->unreference(); - }); - - return $promise; - } - - /** - * {@inheritdoc} - * - * @throws TransactionError If the transaction has been committed or rolled back. - */ - public function execute(string $sql, array $params = []): Promise - { - if ($this->handle === null) { - throw new TransactionError("The transaction has been committed or rolled back"); - } - - $this->queue->reference(); - - $promise = $this->handle->execute($sql, $params); - - $promise->onResolve(function ($exception, $result) { - if ($result instanceof Operation) { - $result->onDestruct([$this->queue, "unreference"]); - return; - } - - $this->queue->unreference(); - }); - - return $promise; - } - - - /** - * {@inheritdoc} - * - * @throws TransactionError If the transaction has been committed or rolled back. - */ - public function notify(string $channel, string $payload = ""): Promise - { - if ($this->handle === null) { - throw new TransactionError("The transaction has been committed or rolled back"); - } - - return $this->handle->notify($channel, $payload); - } - - /** - * Commits the transaction and makes it inactive. - * - * @return Promise<\Amp\Sql\CommandResult> - * - * @throws TransactionError If the transaction has been committed or rolled back. - */ - public function commit(): Promise - { - if ($this->handle === null) { - throw new TransactionError("The transaction has been committed or rolled back"); - } - - $promise = $this->handle->query("COMMIT"); - $this->handle = null; - $promise->onResolve([$this->queue, "unreference"]); - - return $promise; - } - - /** - * Rolls back the transaction and makes it inactive. - * - * @return Promise<\Amp\Sql\CommandResult> - * - * @throws TransactionError If the transaction has been committed or rolled back. - */ - public function rollback(): Promise - { - if ($this->handle === null) { - throw new TransactionError("The transaction has been committed or rolled back"); - } - - $promise = $this->handle->query("ROLLBACK"); - $this->handle = null; - $promise->onResolve([$this->queue, "unreference"]); - - return $promise; - } - - /** - * Creates a savepoint with the given identifier. - * - * @param string $identifier Savepoint identifier. - * - * @return Promise<\Amp\Sql\CommandResult> - * - * @throws TransactionError If the transaction has been committed or rolled back. - */ - public function createSavepoint(string $identifier): Promise - { - return $this->query("SAVEPOINT " . $this->quoteName($identifier)); - } - - /** - * Rolls back to the savepoint with the given identifier. - * - * @param string $identifier Savepoint identifier. - * - * @return Promise<\Amp\Sql\CommandResult> - * - * @throws TransactionError If the transaction has been committed or rolled back. - */ - public function rollbackTo(string $identifier): Promise - { - return $this->query("ROLLBACK TO " . $this->quoteName($identifier)); - } - - /** - * Releases the savepoint with the given identifier. - * - * @param string $identifier Savepoint identifier. - * - * @return Promise<\Amp\Sql\CommandResult> - * - * @throws TransactionError If the transaction has been committed or rolled back. - */ - public function releaseSavepoint(string $identifier): Promise - { - return $this->query("RELEASE SAVEPOINT " . $this->quoteName($identifier)); - } - - /** - * {@inheritdoc} - * - * Listeners automatically unlisten when the transaction is committed or rolled back. - * - * @throws TransactionError If the transaction has been committed or rolled back. - */ - public function listen(string $channel): Promise - { - if ($this->handle === null) { - throw new TransactionError("The transaction has been committed or rolled back"); - } - - return call(function () use ($channel) { - $listener = yield $this->handle->listen($channel); - $this->listeners[] = $listener; - return $listener; - }); - } - - /** - * {@inheritdoc} - * - * @throws TransactionError If the transaction has been committed or rolled back. - */ - public function quoteString(string $data): string - { - if ($this->handle === null) { - throw new TransactionError("The transaction has been committed or rolled back"); - } - - return $this->handle->quoteString($data); - } - - /** - * {@inheritdoc} - * - * @throws TransactionError If the transaction has been committed or rolled back. - */ - public function quoteName(string $name): string - { - if ($this->handle === null) { - throw new TransactionError("The transaction has been committed or rolled back"); - } - - return $this->handle->quoteName($name); - } } diff --git a/test/PoolTest.php b/test/PoolTest.php deleted file mode 100644 index 1515809..0000000 --- a/test/PoolTest.php +++ /dev/null @@ -1,55 +0,0 @@ -setIdleTimeout(2); - $count = 3; - - $promises = []; - for ($i = 0; $i < $count; ++$i) { - $promises[] = $pool->query("SELECT $i"); - } - - $results = yield $promises; - - /** @var \Amp\Postgres\ResultSet $result */ - foreach ($results as $result) { - while (yield $result->advance()); // Consume results to free connection - } - - $this->assertSame($count, $pool->getConnectionCount()); - - yield new Delayed(1000); - - $this->assertSame($count, $pool->getConnectionCount()); - - $result = yield $pool->query("SELECT $i"); - while (yield $result->advance()); // Consume results to free connection - - yield new Delayed(1000); - - $this->assertSame(1, $pool->getConnectionCount()); - }); - } -} diff --git a/test/PooledStatementTest.php b/test/PooledStatementTest.php deleted file mode 100644 index 0a81ceb..0000000 --- a/test/PooledStatementTest.php +++ /dev/null @@ -1,80 +0,0 @@ -createMock(Statement::class); - $statement->method('getQuery') - ->willReturn('SELECT 1'); - $statement->method('lastUsedAt') - ->willReturn(\time()); - $statement->expects($this->once()) - ->method('execute'); - - $pooledStatement = new PooledStatement($pool, $statement, $this->createCallback(0)); - - $this->assertTrue($pooledStatement->isAlive()); - $this->assertSame(\time(), $pooledStatement->lastUsedAt()); - - yield new Delayed(1500); // Give timeout watcher enough time to execute. - - $pooledStatement->execute(); - - $this->assertTrue($pooledStatement->isAlive()); - $this->assertSame(\time(), $pooledStatement->lastUsedAt()); - }); - } - - public function testIdleStatementsRemovedAfterTimeout() - { - Loop::run(function () { - $pool = new Pool(new ConnectionConfig('host=localhost user=postgres')); - - $statement = $this->createMock(Statement::class); - $statement->method('getQuery') - ->willReturn('SELECT 1'); - $statement->method('lastUsedAt') - ->willReturn(0); - $statement->expects($this->never()) - ->method('execute'); - - $prepare = function () { - $statement = $this->createMock(Statement::class); - $statement->expects($this->once()) - ->method('execute') - ->willReturn(new Success($this->createMock(ResultSet::class))); - return new Success($statement); - }; - - $pooledStatement = new PooledStatement($pool, $statement, $prepare); - - $this->assertTrue($pooledStatement->isAlive()); - $this->assertSame(\time(), $pooledStatement->lastUsedAt()); - - yield new Delayed(1500); // Give timeout watcher enough time to execute and remove mock statement object. - - $result = yield $pooledStatement->execute(); - - $this->assertInstanceOf(ResultSet::class, $result); - - $this->assertTrue($pooledStatement->isAlive()); - $this->assertSame(\time(), $pooledStatement->lastUsedAt()); - }); - } -}