1
0
mirror of https://github.com/danog/postgres.git synced 2024-11-30 04:29:12 +01:00

Remove listen() from Transaction

Created new interface Receiver for the listen() method that Transaction does not implement. Fixed Transaction implementation.
This commit is contained in:
Aaron Piotrowski 2018-07-11 17:47:30 -05:00
parent fdf1868cc1
commit 989fdb7def
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
11 changed files with 63 additions and 163 deletions

View File

@ -9,6 +9,7 @@ use Amp\Promise;
use Amp\Sql\ConnectionConfig;
use Amp\Sql\FailureException;
use Amp\Sql\Link;
use Amp\Sql\Transaction;
use function Amp\call;
abstract class Connection implements Link, Handle
@ -182,7 +183,7 @@ abstract class Connection implements Link, Handle
*
* @throws FailureException
*/
final public function transaction(int $isolation = ConnectionTransaction::ISOLATION_COMMITTED): Promise
final public function transaction(int $isolation = Transaction::ISOLATION_COMMITTED): Promise
{
if (! $this->handle) {
throw new FailureException('Not connected');
@ -190,19 +191,19 @@ abstract class Connection implements Link, Handle
return call(function () use ($isolation) {
switch ($isolation) {
case ConnectionTransaction::ISOLATION_UNCOMMITTED:
case Transaction::ISOLATION_UNCOMMITTED:
yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL READ UNCOMMITTED");
break;
case ConnectionTransaction::ISOLATION_COMMITTED:
case Transaction::ISOLATION_COMMITTED:
yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED");
break;
case ConnectionTransaction::ISOLATION_REPEATABLE:
case Transaction::ISOLATION_REPEATABLE:
yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ");
break;
case ConnectionTransaction::ISOLATION_SERIALIZABLE:
case Transaction::ISOLATION_SERIALIZABLE:
yield $this->handle->query("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE");
break;

View File

@ -7,7 +7,7 @@ use Amp\Sql\Transaction as SqlTransaction;
use Amp\Sql\TransactionError;
use function Amp\call;
final class ConnectionTransaction implements Handle, Transaction
final class ConnectionTransaction implements Transaction
{
/** @var Handle|null */
private $handle;
@ -15,11 +15,11 @@ final class ConnectionTransaction implements Handle, Transaction
/** @var int */
private $isolation;
/** @var Internal\ReferenceQueue */
private $queue;
/** @var callable */
private $release;
/** @var ConnectionListener[] */
private $listeners = [];
/** @var int */
private $refCount = 1;
/**
* @param Handle $handle
@ -43,22 +43,19 @@ final class ConnectionTransaction implements Handle, Transaction
}
$this->handle = $handle;
$this->queue = new Internal\ReferenceQueue;
$listeners =& $this->listeners;
$this->queue->onDestruct(static function () use (&$listeners) {
foreach ($listeners as $listener) {
if ($listener->isListening()) {
$listener->unlisten();
}
$refCount =& $this->refCount;
$this->release = static function () use (&$refCount, $release) {
if (--$refCount === 0) {
$release();
}
});
};
}
public function __destruct()
{
if ($this->handle && $this->handle->isAlive()) {
$this->rollback(); // Invokes $this->queue->complete().
$this->rollback(); // Invokes $this->release callback.
}
}
@ -78,7 +75,7 @@ final class ConnectionTransaction implements Handle, Transaction
public function close()
{
if ($this->handle) {
$this->commit(); // Invokes $this->queue->unreference().
$this->commit(); // Invokes $this->release callback.
}
}
@ -117,20 +114,16 @@ final class ConnectionTransaction implements Handle, Transaction
throw new TransactionError("The transaction has been committed or rolled back");
}
$this->queue->reference();
return call(function () use ($sql) {
$result = yield $this->handle->query($sql);
$promise = $this->handle->query($sql);
$promise->onResolve(function ($exception, $result) {
if ($result instanceof Operation) {
$result->onDestruct([$this->queue, "unreference"]);
return;
if ($result instanceof ResultSet) {
++$this->refCount;
return new PooledResultSet($result, $this->release);
}
$this->queue->unreference();
return $result;
});
return $promise;
}
/**
@ -144,20 +137,10 @@ final class ConnectionTransaction implements Handle, Transaction
throw new TransactionError("The transaction has been committed or rolled back");
}
$this->queue->reference();
$promise = $this->handle->prepare($sql);
$promise->onResolve(function ($exception, $statement) {
if ($statement instanceof Operation) {
$statement->onDestruct([$this->queue, "unreference"]);
return;
}
$this->queue->unreference();
return call(function () use ($sql) {
$statement = yield $this->handle->query($sql);
return new PooledStatement($statement, $this->release);
});
return $promise;
}
/**
@ -171,20 +154,16 @@ final class ConnectionTransaction implements Handle, Transaction
throw new TransactionError("The transaction has been committed or rolled back");
}
$this->queue->reference();
return call(function () use ($sql, $params) {
$result = yield $this->handle->execute($sql, $params);
$promise = $this->handle->execute($sql, $params);
$promise->onResolve(function ($exception, $result) {
if ($result instanceof Operation) {
$result->onDestruct([$this->queue, "unreference"]);
return;
if ($result instanceof ResultSet) {
++$this->refCount;
return new PooledResultSet($result, $this->release);
}
$this->queue->unreference();
return $result;
});
return $promise;
}
@ -217,7 +196,7 @@ final class ConnectionTransaction implements Handle, Transaction
$promise = $this->handle->query("COMMIT");
$this->handle = null;
$promise->onResolve([$this->queue, "unreference"]);
$promise->onResolve($this->release);
return $promise;
}
@ -237,7 +216,7 @@ final class ConnectionTransaction implements Handle, Transaction
$promise = $this->handle->query("ROLLBACK");
$this->handle = null;
$promise->onResolve([$this->queue, "unreference"]);
$promise->onResolve($this->release);
return $promise;
}
@ -284,26 +263,6 @@ final class ConnectionTransaction implements Handle, Transaction
return $this->query("RELEASE SAVEPOINT " . $this->quoteName($identifier));
}
/**
* {@inheritdoc}
*
* Listeners automatically unlisten when the transaction is committed or rolled back.
*
* @throws TransactionError If the transaction has been committed or rolled back.
*/
public function listen(string $channel): Promise
{
if ($this->handle === null) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return call(function () use ($channel) {
$listener = yield $this->handle->listen($channel);
$this->listeners[] = $listener;
return $listener;
});
}
/**
* {@inheritdoc}
*

View File

@ -7,8 +7,6 @@ use Amp\Sql\Executor as SqlExecutor;
interface Executor extends SqlExecutor
{
const STATEMENT_NAME_PREFIX = "amp_";
/**
* @param string $channel Channel name.
* @param string $payload Notification payload.
@ -19,15 +17,4 @@ interface Executor extends SqlExecutor
* @throws \Amp\Sql\ConnectionException If the connection to the database is lost.
*/
public function notify(string $channel, string $payload = ""): Promise;
/**
* @param string $channel Channel name.
*
* @return Promise<Listener>
*
* @throws \Amp\Sql\FailureException If the operation fails due to unexpected condition.
* @throws \Amp\Sql\ConnectionException If the connection to the database is lost.
* @throws \Amp\Sql\QueryError If the operation fails due to an error in the query (such as a syntax error).
*/
public function listen(string $channel): Promise;
}

View File

@ -2,6 +2,7 @@
namespace Amp\Postgres;
interface Handle extends Executor, Quoter
interface Handle extends Receiver, Quoter
{
const STATEMENT_NAME_PREFIX = "amp_";
}

View File

@ -1,61 +0,0 @@
<?php
namespace Amp\Postgres\Internal;
use Amp\Loop;
final class ReferenceQueue
{
/** @var callable[]|null */
private $onDestruct = [];
/** @var int */
private $refCount = 1;
public function onDestruct(callable $onDestruct)
{
if (!$this->refCount) {
try {
$onDestruct();
} catch (\Throwable $exception) {
Loop::defer(function () use ($exception) {
throw $exception; // Rethrow to event loop error handler.
});
}
return;
}
$this->onDestruct[] = $onDestruct;
}
public function reference()
{
\assert($this->refCount, "The reference queue has already been fully unreferenced and destroyed");
++$this->refCount;
}
public function unreference()
{
\assert($this->refCount, "The reference queue has already been fully unreferenced and destroyed");
if (--$this->refCount) {
return;
}
foreach ($this->onDestruct as $callback) {
try {
$callback();
} catch (\Throwable $exception) {
Loop::defer(function () use ($exception) {
throw $exception; // Rethrow to event loop error handler.
});
}
}
$this->onDestruct = null;
}
public function isReferenced(): bool
{
return (bool) $this->refCount;
}
}

View File

@ -4,6 +4,6 @@ namespace Amp\Postgres;
use Amp\Sql\Link as SqlLink;
interface Link extends Executor, SqlLink
interface Link extends Receiver, SqlLink
{
}

View File

@ -365,7 +365,7 @@ final class PgSqlHandle implements Handle
$modifiedSql = Internal\parseNamedParams($sql, $names);
$name = self::STATEMENT_NAME_PREFIX . \sha1($modifiedSql);
$name = Handle::STATEMENT_NAME_PREFIX . \sha1($modifiedSql);
if (isset($this->statements[$name])) {
$storage = $this->statements[$name];

View File

@ -74,15 +74,6 @@ final class PooledTransaction implements Transaction
return $this->transaction->notify($channel, $payload);
}
public function listen(string $channel): Promise
{
if (!$this->transaction) {
throw new TransactionError("The transaction has been committed or rolled back");
}
return $this->transaction->listen($channel);
}
public function isAlive(): bool
{
return $this->transaction && $this->transaction->isAlive();

View File

@ -382,7 +382,7 @@ final class PqHandle implements Handle
$modifiedSql = Internal\parseNamedParams($sql, $names);
$name = self::STATEMENT_NAME_PREFIX . \sha1($modifiedSql);
$name = Handle::STATEMENT_NAME_PREFIX . \sha1($modifiedSql);
if (isset($this->statements[$name])) {
$storage = $this->statements[$name];

19
src/Receiver.php Normal file
View File

@ -0,0 +1,19 @@
<?php
namespace Amp\Postgres;
use Amp\Promise;
interface Receiver extends Executor
{
/**
* @param string $channel Channel name.
*
* @return Promise<Listener>
*
* @throws \Amp\Sql\FailureException If the operation fails due to unexpected condition.
* @throws \Amp\Sql\ConnectionException If the connection to the database is lost.
* @throws \Amp\Sql\QueryError If the operation fails due to an error in the query (such as a syntax error).
*/
public function listen(string $channel): Promise;
}

View File

@ -4,6 +4,9 @@ namespace Amp\Postgres;
use Amp\Sql\Transaction as SqlTransaction;
interface Transaction extends Quoter, SqlTransaction
/**
* Note that notifications sent during a transaction are not delivered until the transaction has been committed.
*/
interface Transaction extends Executor, Quoter, SqlTransaction
{
}