1
0
mirror of https://github.com/danog/postgres.git synced 2024-12-11 08:59:42 +01:00
This commit is contained in:
Aaron Piotrowski 2020-05-22 00:00:03 -05:00
parent 98214c247c
commit 0743e60583
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
9 changed files with 98 additions and 31 deletions

View File

@ -14,8 +14,7 @@ Amp\Loop::run(function () {
/** @var \Amp\Postgres\ResultSet $result */ /** @var \Amp\Postgres\ResultSet $result */
$result = yield $connection->query('SHOW ALL'); $result = yield $connection->query('SHOW ALL');
while (yield $result->advance()) { while ($row = yield $result->continue()) {
$row = $result->getCurrent();
\printf("%-35s = %s (%s)\n", $row['name'], $row['setting'], $row['description']); \printf("%-35s = %s (%s)\n", $row['name'], $row['setting'], $row['description']);
} }
}); });

View File

@ -31,8 +31,7 @@ Loop::run(function () {
return $pool->notify($channel, "Data 2"); // Send second notification. return $pool->notify($channel, "Data 2"); // Send second notification.
}); });
while (yield $listener->advance()) { while ($notification = yield $listener->continue()) {
$notification = $listener->getCurrent();
\printf( \printf(
"Received notification from PID %d on channel '%s' with payload: %s\n", "Received notification from PID %d on channel '%s' with payload: %s\n",
$notification->pid, $notification->pid,

View File

@ -3,9 +3,9 @@
require \dirname(__DIR__) . '/vendor/autoload.php'; require \dirname(__DIR__) . '/vendor/autoload.php';
use Amp\Iterator;
use Amp\Loop; use Amp\Loop;
use Amp\Postgres; use Amp\Postgres;
use Amp\Stream;
Loop::run(function () { Loop::run(function () {
$config = Postgres\ConnectionConfig::fromString('host=localhost user=postgres'); $config = Postgres\ConnectionConfig::fromString('host=localhost user=postgres');
@ -51,10 +51,9 @@ Loop::run(function () {
return $pool->notify($channel1, "Data 1.2"); return $pool->notify($channel1, "Data 1.2");
}); });
$iterator = Iterator\merge([$listener1, $listener2]); // Merge both listeners into single iterator. $stream = Stream\merge([$listener1, $listener2]); // Merge both listeners into single iterator.
while (yield $iterator->advance()) { while ($notification = yield $stream->continue()) {
$notification = $iterator->getCurrent();
\printf( \printf(
"Received notification from PID %d on channel '%s' with payload: %s\n", "Received notification from PID %d on channel '%s' with payload: %s\n",
$notification->pid, $notification->pid,

View File

@ -29,8 +29,7 @@ Amp\Loop::run(function () {
$format = "%-20s | %-10s\n"; $format = "%-20s | %-10s\n";
\printf($format, 'TLD', 'Domain'); \printf($format, 'TLD', 'Domain');
while (yield $result->advance()) { while ($row = yield $result->continue()) {
$row = $result->getCurrent();
\printf($format, $row['domain'], $row['tld']); \printf($format, $row['domain'], $row['tld']);
} }

View File

@ -25,12 +25,17 @@ final class PgSqlResultSet implements ResultSet
/** @var Internal\ArrayParser */ /** @var Internal\ArrayParser */
private $parser; private $parser;
/** @var Promise<ResultSet|null> */
private $nextResult;
/** /**
* @param resource $handle PostgreSQL result resource. * @param resource $handle PostgreSQL result resource.
* @param Promise<ResultSet|null> $nextResult
*/ */
public function __construct($handle) public function __construct($handle, Promise $nextResult)
{ {
$this->handle = $handle; $this->handle = $handle;
$this->nextResult = $nextResult;
$numFields = \pg_num_fields($this->handle); $numFields = \pg_num_fields($this->handle);
for ($i = 0; $i < $numFields; ++$i) { for ($i = 0; $i < $numFields; ++$i) {
@ -73,14 +78,14 @@ final class PgSqlResultSet implements ResultSet
return new Success($this->processRow($result)); return new Success($this->processRow($result));
} }
public function dispose() public function dispose(): void
{ {
$this->handle = null; $this->handle = null;
} }
public function getNextResultSet(): Promise public function getNextResultSet(): Promise
{ {
return new Success; return $this->nextResult;
} }
/** /**

View File

@ -16,13 +16,18 @@ final class PqBufferedResultSet implements ResultSet
/** @var int */ /** @var int */
private $position = 0; private $position = 0;
/** @var Promise<ResultSet|null> */
private $nextResult;
/** /**
* @param pq\Result $result PostgreSQL result object. * @param pq\Result $result PostgreSQL result object.
* @param Promise<ResultSet|null> $nextResult Promise for next result set.
*/ */
public function __construct(pq\Result $result) public function __construct(pq\Result $result, Promise $nextResult)
{ {
$this->result = $result; $this->result = $result;
$this->result->autoConvert = pq\Result::CONV_SCALAR | pq\Result::CONV_ARRAY; $this->result->autoConvert = pq\Result::CONV_SCALAR | pq\Result::CONV_ARRAY;
$this->nextResult = $nextResult;
} }
/** /**
@ -44,14 +49,14 @@ final class PqBufferedResultSet implements ResultSet
/** /**
* @inheritDoc * @inheritDoc
*/ */
public function dispose() public function dispose(): void
{ {
$this->result = null; $this->result = null;
} }
public function getNextResultSet(): Promise public function getNextResultSet(): Promise
{ {
return new Success; // Empty stub for now. return $this->nextResult;
} }
public function getFieldCount(): int public function getFieldCount(): int

View File

@ -169,8 +169,6 @@ final class PqHandle implements Handle
* *
* @return \Generator * @return \Generator
* *
* @resolve \Amp\Sql\CommandResult|\pq\Statement
*
* @throws FailureException * @throws FailureException
*/ */
private function send(?string $sql, callable $method, ...$args): \Generator private function send(?string $sql, callable $method, ...$args): \Generator
@ -208,19 +206,64 @@ final class PqHandle implements Handle
throw new FailureException("Unknown query result"); throw new FailureException("Unknown query result");
} }
$result = $this->makeResult($result, $sql);
if ($handle instanceof pq\Statement) {
return $handle; // Will be wrapped into a PqStatement object.
}
return $result;
// switch ($result->status) {
// case pq\Result::EMPTY_QUERY:
// throw new QueryError("Empty query string");
//
// case pq\Result::COMMAND_OK:
// if ($handle instanceof pq\Statement) {
// return $handle; // Will be wrapped into a PqStatement object.
// }
//
// return new PqCommandResult($result);
//
// case pq\Result::TUPLES_OK:
// return new PqBufferedResultSet($result);
//
// case pq\Result::SINGLE_TUPLE:
// $this->busy = new Deferred;
// $result = new PqUnbufferedResultSet(
// coroutine(\Closure::fromCallable([$this, 'fetch'])),
// $result,
// \Closure::fromCallable([$this, 'release'])
// );
// return $result;
//
// case pq\Result::NONFATAL_ERROR:
// case pq\Result::FATAL_ERROR:
// throw new QueryExecutionError($result->errorMessage, $result->diag, null, $sql ?? '');
//
// case pq\Result::BAD_RESPONSE:
// throw new FailureException($result->errorMessage);
//
// default:
// throw new FailureException("Unknown result status");
// }
}
private function makeResult(pq\Result $result, ?string $sql)
{
switch ($result->status) { switch ($result->status) {
case pq\Result::EMPTY_QUERY: case pq\Result::EMPTY_QUERY:
throw new QueryError("Empty query string"); throw new QueryError("Empty query string");
case pq\Result::COMMAND_OK: case pq\Result::COMMAND_OK:
if ($handle instanceof pq\Statement) {
return $handle; // Will be wrapped into a PqStatement object.
}
return new PqCommandResult($result); return new PqCommandResult($result);
case pq\Result::TUPLES_OK: case pq\Result::TUPLES_OK:
return new PqBufferedResultSet($result); if (!$this->handle->busy && ($next = $this->handle->getResult()) instanceof pq\Result) {
$next = new Success($this->makeResult($next, $sql));
}
return new PqBufferedResultSet($result, $next ?? new Success);
case pq\Result::SINGLE_TUPLE: case pq\Result::SINGLE_TUPLE:
$this->busy = new Deferred; $this->busy = new Deferred;
@ -268,6 +311,14 @@ final class PqHandle implements Handle
switch ($result->status) { switch ($result->status) {
case pq\Result::TUPLES_OK: // End of result set. case pq\Result::TUPLES_OK: // End of result set.
while (!$this->handle->busy && ($next = $this->handle->getResult()) instanceof pq\Result) {
if ($next->status === pq\Result::TUPLES_OK) {
continue;
}
return $this->makeResult($result, null);
}
return null; return null;
case pq\Result::SINGLE_TUPLE: case pq\Result::SINGLE_TUPLE:

View File

@ -3,9 +3,9 @@
namespace Amp\Postgres; namespace Amp\Postgres;
use Amp\AsyncGenerator; use Amp\AsyncGenerator;
use Amp\Deferred;
use Amp\DisposedException; use Amp\DisposedException;
use Amp\Promise; use Amp\Promise;
use Amp\Success;
use pq; use pq;
final class PqUnbufferedResultSet implements ResultSet final class PqUnbufferedResultSet implements ResultSet
@ -16,8 +16,11 @@ final class PqUnbufferedResultSet implements ResultSet
/** @var AsyncGenerator */ /** @var AsyncGenerator */
private $generator; private $generator;
/** @var Deferred */
private $next;
/** /**
* @param callable():Promise<pq\Result> $fetch Function to fetch next result row. * @param callable():Promise<pq\Result|ResultSet|null> $fetch Function to fetch next result row.
* @param \pq\Result $result PostgreSQL result object. * @param \pq\Result $result PostgreSQL result object.
* @param callable():void $release Invoked once the result has been fully consumed. * @param callable():void $release Invoked once the result has been fully consumed.
*/ */
@ -25,8 +28,9 @@ final class PqUnbufferedResultSet implements ResultSet
{ {
$this->numCols = $result->numCols; $this->numCols = $result->numCols;
$this->next = $deferred = new Deferred;
$this->generator = new AsyncGenerator(static function (callable $yield) use ( $this->generator = new AsyncGenerator(static function (callable $yield) use (
$release, $result, $fetch $deferred, $release, $result, $fetch
): \Generator { ): \Generator {
try { try {
do { do {
@ -37,18 +41,26 @@ final class PqUnbufferedResultSet implements ResultSet
} while ($result instanceof pq\Result); } while ($result instanceof pq\Result);
} catch (DisposedException $exception) { } catch (DisposedException $exception) {
// Discard remaining rows in the result set. // Discard remaining rows in the result set.
while ((yield $promise) instanceof pq\Result) { while (($result = yield $promise) instanceof pq\Result) {
$promise = $fetch(); $promise = $fetch();
} }
} finally { } finally {
if ($result instanceof ResultSet) {
$deferred->resolve($result);
return;
}
// Only release if there was no next result set.
$release(); $release();
$deferred->resolve(null);
} }
}); });
} }
public function getNextResultSet(): Promise public function getNextResultSet(): Promise
{ {
return new Success; // Empty stub for now. return $this->next->promise();
} }
/** /**
@ -62,7 +74,7 @@ final class PqUnbufferedResultSet implements ResultSet
/** /**
* @inheritDoc * @inheritDoc
*/ */
public function dispose() public function dispose(): void
{ {
$this->generator->dispose(); $this->generator->dispose();
} }

View File

@ -73,8 +73,6 @@ abstract class AbstractLinkTest extends AsyncTestCase
public function testMultipleQueryWithTupleResult(): \Generator public function testMultipleQueryWithTupleResult(): \Generator
{ {
$this->markTestSkipped('Unimplemented');
/** @var \Amp\Postgres\ResultSet $result */ /** @var \Amp\Postgres\ResultSet $result */
$result = yield $this->connection->query("SELECT * FROM test; SELECT * FROM test"); $result = yield $this->connection->query("SELECT * FROM test; SELECT * FROM test");