1
0
mirror of https://github.com/danog/postgres.git synced 2025-01-22 05:11:14 +01:00

Update for async-interop namespace change and Amp name changes

This commit is contained in:
Aaron Piotrowski 2017-01-18 11:05:05 -06:00
parent 9ccadb17bd
commit 1e3874856d
24 changed files with 69 additions and 68 deletions

View File

@ -3,7 +3,7 @@
namespace Amp\Postgres;
use Amp\{ CallableMaker, Coroutine, Deferred, function pipe };
use Interop\Async\Promise;
use AsyncInterop\Promise;
abstract class AbstractConnection implements Connection {
use CallableMaker;
@ -21,7 +21,7 @@ abstract class AbstractConnection implements Connection {
* @param string $connectionString
* @param int $timeout Timeout until the connection attempt fails.
*
* @return \Interop\Async\Promise<\Amp\Postgres\Connection>
* @return \AsyncInterop\Promise<\Amp\Postgres\Connection>
*/
abstract public static function connect(string $connectionString, int $timeout = null): Promise;
@ -119,7 +119,7 @@ abstract class AbstractConnection implements Connection {
throw new \Error("Invalid transaction type");
}
return pipe($promise, function (CommandResult $result) use ($isolation) {
return pipe($promise, function (CommandResult $result) use ($isolation): Transaction {
$this->busy = new Deferred;
$transaction = new Transaction($this->executor, $isolation);
$transaction->onComplete($this->release);

View File

@ -3,7 +3,7 @@
namespace Amp\Postgres;
use Amp\{ Coroutine, Deferred };
use Interop\Async\Promise;
use AsyncInterop\Promise;
abstract class AbstractPool implements Pool {
/** @var \SplQueue */
@ -15,20 +15,20 @@ abstract class AbstractPool implements Pool {
/** @var \SplObjectStorage */
private $connections;
/** @var \Interop\Async\Promise|null */
/** @var \AsyncInterop\Promise|null */
private $promise;
/** @var \Amp\Deferred|null */
private $deferred;
/** @var \Amp\Postgres\Connection|\Interop\Async\Promise|null Connection used for notification listening. */
/** @var \Amp\Postgres\Connection|\AsyncInterop\Promise|null Connection used for notification listening. */
private $listeningConnection;
/** @var int Number of listeners on listening connection. */
private $listenerCount = 0;
/**
* @return \Interop\Async\Promise<\Amp\Postgres\Connection>
* @return \AsyncInterop\Promise<\Amp\Postgres\Connection>
*
* @throws \Amp\Postgres\FailureException
*/

View File

@ -2,7 +2,7 @@
namespace Amp\Postgres;
use Interop\Async\Promise;
use AsyncInterop\Promise;
class AggregatePool extends AbstractPool {
/**

View File

@ -2,13 +2,13 @@
namespace Amp\Postgres;
use Interop\Async\Promise;
use AsyncInterop\Promise;
interface Connection extends Executor {
/**
* @param int $isolation
*
* @return \Interop\Async\Promise<\Amp\Postgres\Transaction>
* @return \AsyncInterop\Promise<\Amp\Postgres\Transaction>
*
* @throws \Amp\Postgres\FailureException
*/
@ -17,7 +17,7 @@ interface Connection extends Executor {
/**
* @param string $channel Channel name.
*
* @return \Interop\Async\Promise<\Amp\Postgres\Listener>
* @return \AsyncInterop\Promise<\Amp\Postgres\Listener>
*
* @throws \Amp\Postgres\FailureException
*/

View File

@ -2,7 +2,7 @@
namespace Amp\Postgres;
use Interop\Async\Promise;
use AsyncInterop\Promise;
class ConnectionPool extends AbstractPool {
const DEFAULT_MAX_CONNECTIONS = 100;

View File

@ -2,13 +2,13 @@
namespace Amp\Postgres;
use Interop\Async\Promise;
use AsyncInterop\Promise;
interface Executor {
/**
* @param string $sql
*
* @return \Interop\Async\Promise<\Amp\Postgres\Result>
* @return \AsyncInterop\Promise<\Amp\Postgres\Result>
*
* @throws \Amp\Postgres\FailureException
*/
@ -18,7 +18,7 @@ interface Executor {
* @param string $sql
* @param mixed ...$params
*
* @return \Interop\Async\Promise<\Amp\Postgres\Result>
* @return \AsyncInterop\Promise<\Amp\Postgres\Result>
*
* @throws \Amp\Postgres\FailureException
*/
@ -27,7 +27,7 @@ interface Executor {
/**
* @param string $sql
*
* @return \Interop\Async\Promise<\Amp\Postgres\Statement>
* @return \AsyncInterop\Promise<\Amp\Postgres\Statement>
*
* @throws \Amp\Postgres\FailureException
*/
@ -37,7 +37,7 @@ interface Executor {
* @param string $channel Channel name.
* @param string $payload Notification payload.
*
* @return \Interop\Async\Promise<\Amp\Postgres\CommandResult>
* @return \AsyncInterop\Promise<\Amp\Postgres\CommandResult>
*/
public function notify(string $channel, string $payload = ""): Promise;
}

View File

@ -2,10 +2,10 @@
namespace Amp\Postgres;
use Amp\{ Observable, Observer };
use Interop\Async\Promise;
use Amp\{ Listener as StreamListener, Stream };
use AsyncInterop\Promise;
class Listener extends Observer implements Operation {
class Listener extends StreamListener implements Operation {
use Internal\Operation;
/** @var string */
@ -15,12 +15,12 @@ class Listener extends Observer implements Operation {
private $unlisten;
/**
* @param \Amp\Observable $observable Observable emitting notificatons on the channel.
* @param \Amp\Stream $stream Stream emitting notificatons on the channel.
* @param string $channel Channel name.
* @param callable(string $channel): void $unlisten Function invoked to unlisten from the channel.
*/
public function __construct(Observable $observable, string $channel, callable $unlisten) {
parent::__construct($observable);
public function __construct(Stream $stream, string $channel, callable $unlisten) {
parent::__construct($stream);
$this->channel = $channel;
$this->unlisten = $unlisten;
}
@ -35,9 +35,10 @@ class Listener extends Observer implements Operation {
/**
* Unlistens from the channel. No more values will be emitted on theis channel.
*
* @return \Interop\Async\Promise<\Amp\Postgres\CommandResult>
* @return \AsyncInterop\Promise<\Amp\Postgres\CommandResult>
*/
public function unlisten(): Promise {
/** @var \AsyncInterop\Promise $promise */
$promise = ($this->unlisten)($this->channel);
$promise->when(function () {
$this->complete();

View File

@ -3,14 +3,14 @@
namespace Amp\Postgres;
use Amp\{ Deferred, TimeoutException };
use Interop\Async\{ Loop, Promise };
use AsyncInterop\{ Loop, Promise };
class PgSqlConnection extends AbstractConnection {
/**
* @param string $connectionString
* @param int|null $timeout
*
* @return \Interop\Async\Promise<\Amp\Postgres\PgSqlConnection>
* @return \AsyncInterop\Promise<\Amp\Postgres\PgSqlConnection>
*
* @throws \Amp\Postgres\FailureException
*/

View File

@ -2,8 +2,8 @@
namespace Amp\Postgres;
use Amp\{ CallableMaker, Coroutine, Deferred, Postponed, function pipe };
use Interop\Async\{ Loop, Promise };
use Amp\{ CallableMaker, Coroutine, Deferred, Emitter, function pipe };
use AsyncInterop\{ Loop, Promise };
class PgSqlExecutor implements Executor {
use CallableMaker;
@ -26,7 +26,7 @@ class PgSqlExecutor implements Executor {
/** @var callable */
private $createResult;
/** @var \Amp\Postponed[] */
/** @var \Amp\Emitter[] */
private $listeners = [];
/** @var callable */
@ -226,17 +226,17 @@ class PgSqlExecutor implements Executor {
*/
public function listen(string $channel): Promise {
return pipe($this->query(\sprintf("LISTEN %s", $channel)), function (CommandResult $result) use ($channel) {
$postponed = new Postponed;
$postponed = new Emitter;
$this->listeners[$channel] = $postponed;
Loop::enable($this->poll);
return new Listener($postponed->getObservable(), $channel, $this->unlisten);
return new Listener($postponed->getStream(), $channel, $this->unlisten);
});
}
/**
* @param string $channel
*
* @return \Interop\Async\Promise
* @return \AsyncInterop\Promise
*
* @throws \Error
*/

View File

@ -2,7 +2,7 @@
namespace Amp\Postgres;
use Interop\Async\Promise;
use AsyncInterop\Promise;
class PgSqlStatement implements Statement {
/** @var string */
@ -30,7 +30,7 @@ class PgSqlStatement implements Statement {
/**
* @param mixed ...$params
*
* @return \Interop\Async\Promise<\Amp\Postgres\Result>
* @return \AsyncInterop\Promise<\Amp\Postgres\Result>
*
* @throws \Amp\Postgres\FailureException If executing the statement fails.
*/

View File

@ -2,7 +2,7 @@
namespace Amp\Postgres;
use Amp\Emitter;
use Amp\Producer;
class PgSqlTupleResult extends TupleResult implements \Countable {
/** @var resource PostgreSQL result resource. */
@ -13,7 +13,7 @@ class PgSqlTupleResult extends TupleResult implements \Countable {
*/
public function __construct($handle) {
$this->handle = $handle;
parent::__construct(new Emitter(static function (callable $emit) use ($handle) {
parent::__construct(new Producer(static function (callable $emit) use ($handle) {
$count = \pg_num_rows($handle);
for ($i = 0; $i < $count; ++$i) {
$result = \pg_fetch_assoc($handle);

View File

@ -2,7 +2,7 @@
namespace Amp\Postgres;
use Amp\Emitter;
use Amp\Producer;
use pq;
class PqBufferedResult extends TupleResult implements \Countable {
@ -14,7 +14,7 @@ class PqBufferedResult extends TupleResult implements \Countable {
*/
public function __construct(pq\Result $result) {
$this->result = $result;
parent::__construct(new Emitter(static function (callable $emit) use ($result) {
parent::__construct(new Producer(static function (callable $emit) use ($result) {
for ($count = 0; $row = $result->fetchRow(pq\Result::FETCH_ASSOC); ++$count) {
yield $emit($row);
}

View File

@ -3,7 +3,7 @@
namespace Amp\Postgres;
use Amp\{ Deferred, TimeoutException };
use Interop\Async\{ Loop, Promise };
use AsyncInterop\{ Loop, Promise };
use pq;
class PqConnection extends AbstractConnection {
@ -11,7 +11,7 @@ class PqConnection extends AbstractConnection {
* @param string $connectionString
* @param int|null $timeout
*
* @return \Interop\Async\Promise<\Amp\Postgres\PgSqlConnection>
* @return \AsyncInterop\Promise<\Amp\Postgres\PgSqlConnection>
*
* @throws \Amp\Postgres\FailureException
*/

View File

@ -2,8 +2,8 @@
namespace Amp\Postgres;
use Amp\{ CallableMaker, Coroutine, Deferred, Postponed, function pipe };
use Interop\Async\{ Loop, Promise };
use Amp\{ CallableMaker, Coroutine, Deferred, Emitter, function pipe };
use AsyncInterop\{ Loop, Promise };
use pq;
class PqExecutor implements Executor {
@ -24,7 +24,7 @@ class PqExecutor implements Executor {
/** @var string */
private $await;
/** @var \Amp\Postponed[] */
/** @var \Amp\Emitter[] */
private $listeners;
/** @var callable */
@ -236,7 +236,7 @@ class PqExecutor implements Executor {
* {@inheritdoc}
*/
public function listen(string $channel): Promise {
$postponed = new Postponed;
$postponed = new Emitter;
$promise = new Coroutine($this->send(
[$this->handle, "listenAsync"],
$channel,
@ -251,14 +251,14 @@ class PqExecutor implements Executor {
return pipe($promise, function () use ($postponed, $channel) {
$this->listeners[$channel] = $postponed;
Loop::enable($this->poll);
return new Listener($postponed->getObservable(), $channel, $this->unlisten);
return new Listener($postponed->getStream(), $channel, $this->unlisten);
});
}
/**
* @param string $channel
*
* @return \Interop\Async\Promise
* @return \AsyncInterop\Promise
*
* @throws \Error
*/

View File

@ -3,7 +3,7 @@
namespace Amp\Postgres;
use Amp\{ Coroutine, function rethrow };
use Interop\Async\Promise;
use AsyncInterop\Promise;
use pq;
class PqStatement implements Statement {
@ -36,7 +36,7 @@ class PqStatement implements Statement {
/**
* @param mixed ...$params
*
* @return \Interop\Async\Promise<\Amp\Postgres\Result>
* @return \AsyncInterop\Promise<\Amp\Postgres\Result>
*
* @throws \Amp\Postgres\FailureException If executing the statement fails.
*/

View File

@ -2,7 +2,7 @@
namespace Amp\Postgres;
use Amp\{ Coroutine, Emitter };
use Amp\{ Coroutine, Producer };
use pq;
class PqUnbufferedResult extends TupleResult implements Operation {
@ -17,7 +17,7 @@ class PqUnbufferedResult extends TupleResult implements Operation {
*/
public function __construct(callable $fetch, pq\Result $result) {
$this->numCols = $result->numCols;
parent::__construct(new Emitter(function (callable $emit) use ($result, $fetch) {
parent::__construct(new Producer(function (callable $emit) use ($result, $fetch) {
$count = 0;
try {
do {

View File

@ -2,13 +2,13 @@
namespace Amp\Postgres;
use Interop\Async\Promise;
use AsyncInterop\Promise;
interface Statement {
/**
* @param mixed ...$params
*
* @return \Interop\Async\Promise<\Amp\Postgres\Result>
* @return \AsyncInterop\Promise<\Amp\Postgres\Result>
*/
public function execute(...$params): Promise;
}

View File

@ -3,7 +3,7 @@
namespace Amp\Postgres;
use Amp\Coroutine;
use Interop\Async\Promise;
use AsyncInterop\Promise;
class Transaction implements Executor, Operation {
use Internal\Operation;
@ -103,7 +103,7 @@ class Transaction implements Executor, Operation {
/**
* Commits the transaction and makes it inactive.
*
* @return \Interop\Async\Promise<\Amp\Postgres\CommandResult>
* @return \AsyncInterop\Promise<\Amp\Postgres\CommandResult>
*
* @throws \Amp\Postgres\TransactionError
*/
@ -131,7 +131,7 @@ class Transaction implements Executor, Operation {
/**
* Rolls back the transaction and makes it inactive.
*
* @return \Interop\Async\Promise<\Amp\Postgres\CommandResult>
* @return \AsyncInterop\Promise<\Amp\Postgres\CommandResult>
*
* @throws \Amp\Postgres\TransactionError
*/
@ -161,7 +161,7 @@ class Transaction implements Executor, Operation {
*
* @param string $identifier Savepoint identifier.
*
* @return \Interop\Async\Promise<\Amp\Postgres\CommandResult>
* @return \AsyncInterop\Promise<\Amp\Postgres\CommandResult>
*
* @throws \Amp\Postgres\TransactionError
*/
@ -175,7 +175,7 @@ class Transaction implements Executor, Operation {
*
* @param string $identifier Savepoint identifier.
*
* @return \Interop\Async\Promise<\Amp\Postgres\CommandResult>
* @return \AsyncInterop\Promise<\Amp\Postgres\CommandResult>
*
* @throws \Amp\Postgres\TransactionError
*/
@ -189,7 +189,7 @@ class Transaction implements Executor, Operation {
*
* @param string $identifier Savepoint identifier.
*
* @return \Interop\Async\Promise<\Amp\Postgres\CommandResult>
* @return \AsyncInterop\Promise<\Amp\Postgres\CommandResult>
*
* @throws \Amp\Postgres\TransactionError
*/

View File

@ -2,9 +2,9 @@
namespace Amp\Postgres;
use Amp\Observer;
use Amp\Listener;
abstract class TupleResult extends Observer implements Result {
abstract class TupleResult extends Listener implements Result {
/**
* Returns the number of fields (columns) in each row.
*

View File

@ -2,13 +2,13 @@
namespace Amp\Postgres;
use Interop\Async\Promise;
use AsyncInterop\Promise;
/**
* @param string $connectionString
* @param int $timeout
*
* @return \Interop\Async\Promise<\Amp\Postgres\Connection>
* @return \AsyncInterop\Promise<\Amp\Postgres\Connection>
*
* @throws \Amp\Postgres\FailureException If connecting fails.
* @throws \Error If neither ext-pgsql or pecl-pq is loaded.

View File

@ -1,10 +1,10 @@
<?php declare(strict_types = 1);
<?php
namespace Amp\Postgres\Test;
use Amp\{ Coroutine, Pause };
use Amp\Postgres\{ CommandResult, Connection, QueryError, Transaction, TransactionError, TupleResult };
use Interop\Async\Loop;
use AsyncInterop\Loop;
abstract class AbstractConnectionTest extends \PHPUnit_Framework_TestCase {
/** @var \Amp\Postgres\Connection */

View File

@ -1,10 +1,10 @@
<?php declare(strict_types = 1);
<?php
namespace Amp\Postgres\Test;
use Amp\Postgres\{ CommandResult, Connection, Statement, Transaction, TupleResult };
use Amp\Success;
use Interop\Async\Loop;
use AsyncInterop\Loop;
abstract class AbstractPoolTest extends \PHPUnit_Framework_TestCase {
/**

View File

@ -4,7 +4,7 @@ namespace Amp\Postgres\Test;
use Amp\Postgres\ConnectionPool;
use Amp\Success;
use Interop\Async\Promise;
use AsyncInterop\Promise;
class ConnectionPoolTest extends AbstractPoolTest {
/**

View File

@ -3,7 +3,7 @@
namespace Amp\Postgres\Test;
use Amp\Postgres\{ Connection, function connect };
use Interop\Async\Loop;
use AsyncInterop\Loop;
class FunctionsTest extends \PHPUnit_Framework_TestCase {
public function setUp() {