From 0743e6058361d14289160fd276907309e1f1b70d Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Fri, 22 May 2020 00:00:03 -0500 Subject: [PATCH] WIP --- examples/basic.php | 3 +- examples/listen.php | 3 +- examples/multi-listen.php | 7 ++-- examples/transaction.php | 3 +- src/PgSqlResultSet.php | 11 ++++-- src/PqBufferedResultSet.php | 11 ++++-- src/PqHandle.php | 65 +++++++++++++++++++++++++++++++---- src/PqUnbufferedResultSet.php | 24 +++++++++---- test/AbstractLinkTest.php | 2 -- 9 files changed, 98 insertions(+), 31 deletions(-) diff --git a/examples/basic.php b/examples/basic.php index db88a22..4188c9f 100644 --- a/examples/basic.php +++ b/examples/basic.php @@ -14,8 +14,7 @@ Amp\Loop::run(function () { /** @var \Amp\Postgres\ResultSet $result */ $result = yield $connection->query('SHOW ALL'); - while (yield $result->advance()) { - $row = $result->getCurrent(); + while ($row = yield $result->continue()) { \printf("%-35s = %s (%s)\n", $row['name'], $row['setting'], $row['description']); } }); diff --git a/examples/listen.php b/examples/listen.php index 5fadd7f..5e27cc0 100644 --- a/examples/listen.php +++ b/examples/listen.php @@ -31,8 +31,7 @@ Loop::run(function () { return $pool->notify($channel, "Data 2"); // Send second notification. }); - while (yield $listener->advance()) { - $notification = $listener->getCurrent(); + while ($notification = yield $listener->continue()) { \printf( "Received notification from PID %d on channel '%s' with payload: %s\n", $notification->pid, diff --git a/examples/multi-listen.php b/examples/multi-listen.php index 2a12d06..eccad8f 100644 --- a/examples/multi-listen.php +++ b/examples/multi-listen.php @@ -3,9 +3,9 @@ require \dirname(__DIR__) . '/vendor/autoload.php'; -use Amp\Iterator; use Amp\Loop; use Amp\Postgres; +use Amp\Stream; Loop::run(function () { $config = Postgres\ConnectionConfig::fromString('host=localhost user=postgres'); @@ -51,10 +51,9 @@ Loop::run(function () { return $pool->notify($channel1, "Data 1.2"); }); - $iterator = Iterator\merge([$listener1, $listener2]); // Merge both listeners into single iterator. + $stream = Stream\merge([$listener1, $listener2]); // Merge both listeners into single iterator. - while (yield $iterator->advance()) { - $notification = $iterator->getCurrent(); + while ($notification = yield $stream->continue()) { \printf( "Received notification from PID %d on channel '%s' with payload: %s\n", $notification->pid, diff --git a/examples/transaction.php b/examples/transaction.php index 97c3503..227896c 100644 --- a/examples/transaction.php +++ b/examples/transaction.php @@ -29,8 +29,7 @@ Amp\Loop::run(function () { $format = "%-20s | %-10s\n"; \printf($format, 'TLD', 'Domain'); - while (yield $result->advance()) { - $row = $result->getCurrent(); + while ($row = yield $result->continue()) { \printf($format, $row['domain'], $row['tld']); } diff --git a/src/PgSqlResultSet.php b/src/PgSqlResultSet.php index 81ec2f5..97a22f0 100644 --- a/src/PgSqlResultSet.php +++ b/src/PgSqlResultSet.php @@ -25,12 +25,17 @@ final class PgSqlResultSet implements ResultSet /** @var Internal\ArrayParser */ private $parser; + /** @var Promise */ + private $nextResult; + /** * @param resource $handle PostgreSQL result resource. + * @param Promise $nextResult */ - public function __construct($handle) + public function __construct($handle, Promise $nextResult) { $this->handle = $handle; + $this->nextResult = $nextResult; $numFields = \pg_num_fields($this->handle); for ($i = 0; $i < $numFields; ++$i) { @@ -73,14 +78,14 @@ final class PgSqlResultSet implements ResultSet return new Success($this->processRow($result)); } - public function dispose() + public function dispose(): void { $this->handle = null; } public function getNextResultSet(): Promise { - return new Success; + return $this->nextResult; } /** diff --git a/src/PqBufferedResultSet.php b/src/PqBufferedResultSet.php index 014b208..f3aeb95 100644 --- a/src/PqBufferedResultSet.php +++ b/src/PqBufferedResultSet.php @@ -16,13 +16,18 @@ final class PqBufferedResultSet implements ResultSet /** @var int */ private $position = 0; + /** @var Promise */ + private $nextResult; + /** * @param pq\Result $result PostgreSQL result object. + * @param Promise $nextResult Promise for next result set. */ - public function __construct(pq\Result $result) + public function __construct(pq\Result $result, Promise $nextResult) { $this->result = $result; $this->result->autoConvert = pq\Result::CONV_SCALAR | pq\Result::CONV_ARRAY; + $this->nextResult = $nextResult; } /** @@ -44,14 +49,14 @@ final class PqBufferedResultSet implements ResultSet /** * @inheritDoc */ - public function dispose() + public function dispose(): void { $this->result = null; } public function getNextResultSet(): Promise { - return new Success; // Empty stub for now. + return $this->nextResult; } public function getFieldCount(): int diff --git a/src/PqHandle.php b/src/PqHandle.php index b500bbd..33fff6a 100644 --- a/src/PqHandle.php +++ b/src/PqHandle.php @@ -169,8 +169,6 @@ final class PqHandle implements Handle * * @return \Generator * - * @resolve \Amp\Sql\CommandResult|\pq\Statement - * * @throws FailureException */ private function send(?string $sql, callable $method, ...$args): \Generator @@ -208,19 +206,64 @@ final class PqHandle implements Handle throw new FailureException("Unknown query result"); } + $result = $this->makeResult($result, $sql); + + if ($handle instanceof pq\Statement) { + return $handle; // Will be wrapped into a PqStatement object. + } + + return $result; + +// switch ($result->status) { +// case pq\Result::EMPTY_QUERY: +// throw new QueryError("Empty query string"); +// +// case pq\Result::COMMAND_OK: +// if ($handle instanceof pq\Statement) { +// return $handle; // Will be wrapped into a PqStatement object. +// } +// +// return new PqCommandResult($result); +// +// case pq\Result::TUPLES_OK: +// return new PqBufferedResultSet($result); +// +// case pq\Result::SINGLE_TUPLE: +// $this->busy = new Deferred; +// $result = new PqUnbufferedResultSet( +// coroutine(\Closure::fromCallable([$this, 'fetch'])), +// $result, +// \Closure::fromCallable([$this, 'release']) +// ); +// return $result; +// +// case pq\Result::NONFATAL_ERROR: +// case pq\Result::FATAL_ERROR: +// throw new QueryExecutionError($result->errorMessage, $result->diag, null, $sql ?? ''); +// +// case pq\Result::BAD_RESPONSE: +// throw new FailureException($result->errorMessage); +// +// default: +// throw new FailureException("Unknown result status"); +// } + } + + private function makeResult(pq\Result $result, ?string $sql) + { switch ($result->status) { case pq\Result::EMPTY_QUERY: throw new QueryError("Empty query string"); case pq\Result::COMMAND_OK: - if ($handle instanceof pq\Statement) { - return $handle; // Will be wrapped into a PqStatement object. - } - return new PqCommandResult($result); case pq\Result::TUPLES_OK: - return new PqBufferedResultSet($result); + if (!$this->handle->busy && ($next = $this->handle->getResult()) instanceof pq\Result) { + $next = new Success($this->makeResult($next, $sql)); + } + + return new PqBufferedResultSet($result, $next ?? new Success); case pq\Result::SINGLE_TUPLE: $this->busy = new Deferred; @@ -268,6 +311,14 @@ final class PqHandle implements Handle switch ($result->status) { case pq\Result::TUPLES_OK: // End of result set. + while (!$this->handle->busy && ($next = $this->handle->getResult()) instanceof pq\Result) { + if ($next->status === pq\Result::TUPLES_OK) { + continue; + } + + return $this->makeResult($result, null); + } + return null; case pq\Result::SINGLE_TUPLE: diff --git a/src/PqUnbufferedResultSet.php b/src/PqUnbufferedResultSet.php index 7ba5842..078efc5 100644 --- a/src/PqUnbufferedResultSet.php +++ b/src/PqUnbufferedResultSet.php @@ -3,9 +3,9 @@ namespace Amp\Postgres; use Amp\AsyncGenerator; +use Amp\Deferred; use Amp\DisposedException; use Amp\Promise; -use Amp\Success; use pq; final class PqUnbufferedResultSet implements ResultSet @@ -16,8 +16,11 @@ final class PqUnbufferedResultSet implements ResultSet /** @var AsyncGenerator */ private $generator; + /** @var Deferred */ + private $next; + /** - * @param callable():Promise $fetch Function to fetch next result row. + * @param callable():Promise $fetch Function to fetch next result row. * @param \pq\Result $result PostgreSQL result object. * @param callable():void $release Invoked once the result has been fully consumed. */ @@ -25,8 +28,9 @@ final class PqUnbufferedResultSet implements ResultSet { $this->numCols = $result->numCols; + $this->next = $deferred = new Deferred; $this->generator = new AsyncGenerator(static function (callable $yield) use ( - $release, $result, $fetch + $deferred, $release, $result, $fetch ): \Generator { try { do { @@ -37,18 +41,26 @@ final class PqUnbufferedResultSet implements ResultSet } while ($result instanceof pq\Result); } catch (DisposedException $exception) { // Discard remaining rows in the result set. - while ((yield $promise) instanceof pq\Result) { + while (($result = yield $promise) instanceof pq\Result) { $promise = $fetch(); } } finally { + if ($result instanceof ResultSet) { + $deferred->resolve($result); + return; + } + + // Only release if there was no next result set. $release(); + + $deferred->resolve(null); } }); } public function getNextResultSet(): Promise { - return new Success; // Empty stub for now. + return $this->next->promise(); } /** @@ -62,7 +74,7 @@ final class PqUnbufferedResultSet implements ResultSet /** * @inheritDoc */ - public function dispose() + public function dispose(): void { $this->generator->dispose(); } diff --git a/test/AbstractLinkTest.php b/test/AbstractLinkTest.php index 96abd2c..b59bc8c 100644 --- a/test/AbstractLinkTest.php +++ b/test/AbstractLinkTest.php @@ -73,8 +73,6 @@ abstract class AbstractLinkTest extends AsyncTestCase public function testMultipleQueryWithTupleResult(): \Generator { - $this->markTestSkipped('Unimplemented'); - /** @var \Amp\Postgres\ResultSet $result */ $result = yield $this->connection->query("SELECT * FROM test; SELECT * FROM test");