1
0
mirror of https://github.com/danog/postgres.git synced 2025-01-22 05:11:14 +01:00

Update for Amp ^2

This commit is contained in:
Aaron Piotrowski 2017-05-15 23:14:02 -05:00
parent cdd8b8004f
commit a276d17be7
13 changed files with 120 additions and 99 deletions

View File

@ -19,7 +19,7 @@
}
],
"require": {
"amphp/amp": "dev-master as 2.0"
"amphp/amp": "^2.0"
},
"require-dev": {
"phpunit/phpunit": "^5"

View File

@ -2,7 +2,7 @@
namespace Amp\Postgres;
use Amp\{ CallableMaker, Coroutine, Deferred, Promise };
use Amp\{ CallableMaker, Coroutine, Deferred, Promise, function call };
abstract class AbstractConnection implements Connection {
use CallableMaker;
@ -117,8 +117,9 @@ abstract class AbstractConnection implements Connection {
default:
throw new \Error("Invalid transaction type");
}
return Promise\pipe($promise, function () use ($isolation): Transaction {
return call(function () use ($promise, $isolation) {
yield $promise;
$this->busy = new Deferred;
$transaction = new Transaction($this->executor, $isolation);
$transaction->onComplete($this->release);

View File

@ -2,7 +2,7 @@
namespace Amp\Postgres;
interface CommandResult extends Result {
interface CommandResult {
/**
* Returns the number of rows affected by the query.
*

View File

@ -2,11 +2,14 @@
namespace Amp\Postgres;
use Amp\{ Promise, Stream, StreamIterator };
use Amp\{ Iterator, Promise };
class Listener extends StreamIterator implements Operation {
class Listener implements Iterator, Operation {
use Internal\Operation;
/** @var \Amp\Iterator */
private $iterator;
/** @var string */
private $channel;
@ -14,16 +17,30 @@ class Listener extends StreamIterator implements Operation {
private $unlisten;
/**
* @param \Amp\Stream $stream Stream emitting notificatons on the channel.
* @param \Amp\Iterator $iterator Iterator emitting notificatons on the channel.
* @param string $channel Channel name.
* @param callable(string $channel): void $unlisten Function invoked to unlisten from the channel.
*/
public function __construct(Stream $stream, string $channel, callable $unlisten) {
parent::__construct($stream);
public function __construct(Iterator $iterator, string $channel, callable $unlisten) {
$this->iterator = $iterator;
$this->channel = $channel;
$this->unlisten = $unlisten;
}
/**
* {@inheritdoc}
*/
public function advance(): Promise {
return $this->iterator->advance();
}
/**
* {@inheritdoc}
*/
public function getCurrent() {
return $this->iterator->getCurrent();
}
/**
* @return string Channel name.
*/

View File

@ -2,7 +2,7 @@
namespace Amp\Postgres;
use Amp\{ CallableMaker, Coroutine, Deferred, Emitter, Loop, Promise, function Promise\pipe };
use Amp\{ CallableMaker, Deferred, Emitter, Loop, Promise, function call };
class PgSqlExecutor implements Executor {
use CallableMaker;
@ -22,9 +22,6 @@ class PgSqlExecutor implements Executor {
/** @var callable */
private $executeCallback;
/** @var callable */
private $createResult;
/** @var \Amp\Emitter[] */
private $listeners = [];
@ -91,7 +88,6 @@ class PgSqlExecutor implements Executor {
Loop::disable($this->poll);
Loop::disable($this->await);
$this->createResult = $this->callableFromInstanceMethod("createResult");
$this->executeCallback = $this->callableFromInstanceMethod("sendExecute");
$this->unlisten = $this->callableFromInstanceMethod("unlisten");
}
@ -154,12 +150,12 @@ class PgSqlExecutor implements Executor {
/**
* @param resource $result PostgreSQL result resource.
*
* @return \Amp\Postgres\Result
* @return \Amp\Postgres\CommandResult|\Amp\Postgres\TupleResult
*
* @throws \Amp\Postgres\FailureException
* @throws \Amp\Postgres\QueryError
*/
private function createResult($result): Result {
private function createResult($result) {
switch (\pg_result_status($result, \PGSQL_STATUS_LONG)) {
case \PGSQL_EMPTY_QUERY:
throw new QueryError("Empty query string");
@ -183,28 +179,35 @@ class PgSqlExecutor implements Executor {
}
private function sendExecute(string $name, array $params): Promise {
return pipe(new Coroutine($this->send("pg_send_execute", $name, $params)), $this->createResult);
return call(function () use ($name, $params) {
return $this->createResult(yield from $this->send("pg_send_execute", $name, $params));
});
}
/**
* {@inheritdoc}
*/
public function query(string $sql): Promise {
return pipe(new Coroutine($this->send("pg_send_query", $sql)), $this->createResult);
return call(function () use ($sql) {
return $this->createResult(yield from $this->send("pg_send_query", $sql));
});
}
/**
* {@inheritdoc}
*/
public function execute(string $sql, ...$params): Promise {
return pipe(new Coroutine($this->send("pg_send_query_params", $sql, $params)), $this->createResult);
return call(function () use ($sql, $params) {
return $this->createResult(yield from $this->send("pg_send_query_params", $sql, $params));
});
}
/**
* {@inheritdoc}
*/
public function prepare(string $sql): Promise {
return pipe(new Coroutine($this->send("pg_send_prepare", $sql, $sql)), function () use ($sql) {
return call(function () use ($sql) {
yield from $this->send("pg_send_prepare", $sql, $sql);
return new PgSqlStatement($sql, $this->executeCallback);
});
}
@ -224,11 +227,13 @@ class PgSqlExecutor implements Executor {
* {@inheritdoc}
*/
public function listen(string $channel): Promise {
return pipe($this->query(\sprintf("LISTEN %s", $channel)), function () use ($channel): Listener {
return call(function () use ($channel) {
yield $this->query(\sprintf("LISTEN %s"));
$emitter = new Emitter;
$this->listeners[$channel] = $emitter;
Loop::enable($this->poll);
return new Listener($emitter->stream(), $channel, $this->unlisten);
return new Listener($emitter->iterate(), $channel, $this->unlisten);
});
}
@ -253,7 +258,7 @@ class PgSqlExecutor implements Executor {
$promise = $this->query(\sprintf("UNLISTEN %s", $channel));
$promise->onResolve(function () use ($emitter) {
$emitter->resolve();
$emitter->complete();
});
return $promise;
}

View File

@ -4,7 +4,7 @@ namespace Amp\Postgres;
use Amp\Producer;
class PgSqlTupleResult extends TupleResult implements \Countable {
class PgSqlTupleResult extends TupleResult {
/** @var resource PostgreSQL result resource. */
private $handle;
@ -22,7 +22,6 @@ class PgSqlTupleResult extends TupleResult implements \Countable {
}
yield $emit($result);
}
return $count;
}));
}
@ -97,13 +96,6 @@ class PgSqlTupleResult extends TupleResult implements \Countable {
return \pg_field_size($this->handle, $this->filterNameOrNum($fieldNameOrNum));
}
/**
* @return int Number of rows in the result set.
*/
public function count(): int {
return $this->numRows();
}
/**
* @param int|string $fieldNameOrNum Field name or index.
*

View File

@ -5,7 +5,7 @@ namespace Amp\Postgres;
use Amp\Producer;
use pq;
class PqBufferedResult extends TupleResult implements \Countable {
class PqBufferedResult extends TupleResult {
/** @var \pq\Result */
private $result;
@ -15,10 +15,9 @@ class PqBufferedResult extends TupleResult implements \Countable {
public function __construct(pq\Result $result) {
$this->result = $result;
parent::__construct(new Producer(static function (callable $emit) use ($result) {
for ($count = 0; $row = $result->fetchRow(pq\Result::FETCH_ASSOC); ++$count) {
while ($row = $result->fetchRow(pq\Result::FETCH_ASSOC)) {
yield $emit($row);
}
return $count;
}));
}
@ -29,8 +28,4 @@ class PqBufferedResult extends TupleResult implements \Countable {
public function numFields(): int {
return $this->result->numCols;
}
public function count(): int {
return $this->numRows();
}
}

View File

@ -2,7 +2,7 @@
namespace Amp\Postgres;
use Amp\{ CallableMaker, Coroutine, Deferred, Emitter, Loop, Promise };
use Amp\{ CallableMaker, Coroutine, Deferred, Emitter, Loop, Promise, function call, function coroutine };
use pq;
class PqExecutor implements Executor {
@ -81,7 +81,7 @@ class PqExecutor implements Executor {
Loop::disable($this->await);
$this->send = $this->callableFromInstanceMethod("send");
$this->fetch = \Amp\coroutine($this->callableFromInstanceMethod("fetch"));
$this->fetch = coroutine($this->callableFromInstanceMethod("fetch"));
$this->unlisten = $this->callableFromInstanceMethod("unlisten");
$this->release = $this->callableFromInstanceMethod("release");
}
@ -235,22 +235,22 @@ class PqExecutor implements Executor {
* {@inheritdoc}
*/
public function listen(string $channel): Promise {
$emitter = new Emitter;
$promise = new Coroutine($this->send(
[$this->handle, "listenAsync"],
$channel,
static function (string $channel, string $message, int $pid) use ($emitter) {
$notification = new Notification;
$notification->channel = $channel;
$notification->pid = $pid;
$notification->payload = $message;
$emitter->emit($notification);
}));
return Promise\pipe($promise, function () use ($emitter, $channel): Listener {
return call(function () use ($channel) {
$emitter = new Emitter;
yield from $this->send(
[$this->handle, "listenAsync"],
$channel,
static function (string $channel, string $message, int $pid) use ($emitter) {
$notification = new Notification;
$notification->channel = $channel;
$notification->pid = $pid;
$notification->payload = $message;
$emitter->emit($notification);
});
$this->listeners[$channel] = $emitter;
Loop::enable($this->poll);
return new Listener($emitter->stream(), $channel, $this->unlisten);
return new Listener($emitter->iterate(), $channel, $this->unlisten);
});
}
@ -275,7 +275,7 @@ class PqExecutor implements Executor {
$promise = new Coroutine($this->send([$this->handle, "unlistenAsync"], $channel));
$promise->onResolve(function () use ($emitter) {
$emitter->resolve();
$emitter->complete();
});
return $promise;
}

View File

@ -2,13 +2,11 @@
namespace Amp\Postgres;
use Amp\{ CallableMaker, Producer };
use Amp\Producer;
use pq;
class PqUnbufferedResult extends TupleResult implements Operation {
use CallableMaker, Internal\Operation {
__destruct as private release;
}
use Internal\Operation;
/** @var int */
private $numCols;
@ -20,26 +18,18 @@ class PqUnbufferedResult extends TupleResult implements Operation {
public function __construct(callable $fetch, pq\Result $result) {
$this->numCols = $result->numCols;
parent::__construct(new Producer(function (callable $emit) use ($result, $fetch) {
$count = 0;
try {
do {
$next = $fetch(); // Request next result before current is consumed.
++$count;
yield $emit($result->fetchRow(pq\Result::FETCH_ASSOC));
$result = yield $next;
} while ($result instanceof pq\Result);
} finally {
$this->complete();
}
return $count;
}));
}
public function __destruct() {
parent::__destruct();
$this->release();
}
public function numFields(): int {
return $this->numCols;
}

View File

@ -1,5 +0,0 @@
<?php
namespace Amp\Postgres;
interface Result {}

View File

@ -2,9 +2,34 @@
namespace Amp\Postgres;
use Amp\StreamIterator;
use Amp\Iterator;
use Amp\Promise;
abstract class TupleResult implements Iterator {
/** @var \Amp\Iterator */
private $iterator;
/**
* @param \Amp\Iterator $iterator
*/
public function __construct(Iterator $iterator) {
$this->iterator = $iterator;
}
/**
* {@inheritdoc}
*/
public function advance(): Promise {
return $this->iterator->advance();
}
/**
* {@inheritdoc}
*/
public function getCurrent() {
return $this->iterator->getCurrent();
}
abstract class TupleResult extends StreamIterator implements Result {
/**
* Returns the number of fields (columns) in each row.
*

View File

@ -2,7 +2,7 @@
namespace Amp\Postgres\Test;
use Amp\{ Coroutine, Loop, Pause };
use Amp\{ Coroutine, Delayed, Loop };
use Amp\Postgres\{ CommandResult, Connection, QueryError, Transaction, TransactionError, TupleResult };
abstract class AbstractConnectionTest extends \PHPUnit_Framework_TestCase {
@ -95,7 +95,7 @@ abstract class AbstractConnectionTest extends \PHPUnit_Framework_TestCase {
$this->assertInstanceOf(TupleResult::class, $result);
$this->assertSame(2, $result->numFields());
while (yield $result->advance()) {
$row = $result->getCurrent();
$this->assertSame($data[0], $row['domain']);
@ -132,15 +132,15 @@ abstract class AbstractConnectionTest extends \PHPUnit_Framework_TestCase {
$result = yield $this->connection->query("SELECT {$value} as value");
if ($value) {
yield new Pause(100);
yield new Delayed(100);
}
while (yield $result->advance()) {
$row = $result->getCurrent();
$this->assertEquals($value, $row['value']);
}
});
Loop::run(function () use ($callback) {
yield \Amp\Promise\all([$callback(0), $callback(1)]);
});
@ -155,26 +155,26 @@ abstract class AbstractConnectionTest extends \PHPUnit_Framework_TestCase {
$result = yield $this->connection->query($query);
$data = $this->getData();
for ($i = 0; yield $result->advance(); ++$i) {
$row = $result->getCurrent();
$this->assertSame($data[$i][0], $row['domain']);
$this->assertSame($data[$i][1], $row['tld']);
}
});
try {
Loop::run(function () use ($callback) {
$failing = $callback("SELECT & FROM test");
$successful = $callback("SELECT * FROM test");
yield $successful;
yield $failing;
});
} catch (QueryError $exception) {
return;
}
$this->fail(\sprintf("Test did not throw an instance of %s", QueryError::class));
}
@ -185,7 +185,7 @@ abstract class AbstractConnectionTest extends \PHPUnit_Framework_TestCase {
$result = yield $this->connection->query("SELECT * FROM test");
$data = $this->getData();
for ($i = 0; yield $result->advance(); ++$i) {
$row = $result->getCurrent();
$this->assertSame($data[$i][0], $row['domain']);
@ -201,14 +201,14 @@ abstract class AbstractConnectionTest extends \PHPUnit_Framework_TestCase {
$result = yield $statement->execute();
$data = $this->getData();
for ($i = 0; yield $result->advance(); ++$i) {
$row = $result->getCurrent();
$this->assertSame($data[$i][0], $row['domain']);
$this->assertSame($data[$i][1], $row['tld']);
}
})());
Loop::run(function () use ($promises) {
yield \Amp\Promise\all($promises);
});
@ -223,7 +223,7 @@ abstract class AbstractConnectionTest extends \PHPUnit_Framework_TestCase {
$result = yield $statement->execute();
$data = $this->getData();
for ($i = 0; yield $result->advance(); ++$i) {
$row = $result->getCurrent();
$this->assertSame($data[$i][0], $row['domain']);
@ -236,14 +236,14 @@ abstract class AbstractConnectionTest extends \PHPUnit_Framework_TestCase {
$result = yield $this->connection->execute("SELECT * FROM test");
$data = $this->getData();
for ($i = 0; yield $result->advance(); ++$i) {
$row = $result->getCurrent();
$this->assertSame($data[$i][0], $row['domain']);
$this->assertSame($data[$i][1], $row['tld']);
}
})());
Loop::run(function () use ($promises) {
yield \Amp\Promise\all($promises);
});
@ -281,7 +281,7 @@ abstract class AbstractConnectionTest extends \PHPUnit_Framework_TestCase {
}
});
}
public function testConnect() {
Loop::run(function () {
$connect = $this->getConnectCallable();
@ -289,7 +289,7 @@ abstract class AbstractConnectionTest extends \PHPUnit_Framework_TestCase {
$this->assertInstanceOf(Connection::class, $connection);
});
}
/**
* @expectedException \Amp\Postgres\FailureException
*/
@ -299,7 +299,7 @@ abstract class AbstractConnectionTest extends \PHPUnit_Framework_TestCase {
$connection = yield $connect('host=localhost user=invalid', 100);
});
}
/**
* @expectedException \Amp\Postgres\FailureException
*/
@ -309,7 +309,7 @@ abstract class AbstractConnectionTest extends \PHPUnit_Framework_TestCase {
$connection = yield $connect('invalid connection string', 100);
});
}
/**
* @expectedException \Amp\Postgres\FailureException
*/

View File

@ -2,7 +2,7 @@
namespace Amp\Postgres\Test;
use Amp\{ Loop, Promise, Success };
use Amp\{ Loop, Promise, Success, function call };
use Amp\Postgres\{ CommandResult, Connection, Statement, Transaction, TupleResult };
abstract class AbstractPoolTest extends \PHPUnit_Framework_TestCase {
@ -175,7 +175,8 @@ abstract class AbstractPoolTest extends \PHPUnit_Framework_TestCase {
}
yield Promise\all(\array_map(function (Promise $promise) {
return Promise\pipe($promise, function (Transaction $transaction) {
return call(function () use ($promise) {
$transaction = yield $promise;
return $transaction->rollback();
});
}, $promises));