1
0
mirror of https://github.com/danog/postgres.git synced 2024-12-02 09:27:54 +01:00

Enabled unbuffered queries, but remove backpressure

This commit is contained in:
Aaron Piotrowski 2018-03-29 23:11:28 -05:00
parent 61ea8388af
commit d91f8054ba
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB
4 changed files with 29 additions and 22 deletions

View File

@ -27,9 +27,7 @@ final class PqConnection extends Connection {
} }
$connection->nonblocking = true; $connection->nonblocking = true;
$connection->unbuffered = true;
// Disabling unbuffered results for now as there appears to be a bug when unbuffered results contain arrays.
// $connection->unbuffered = true;
$deferred = new Deferred; $deferred = new Deferred;

View File

@ -262,7 +262,7 @@ final class PqHandle implements Handle {
} }
if (!$result) { if (!$result) {
return null; // Connection closing, end result set. throw new ConnectionException("Connection closed");
} }
switch ($result->status) { switch ($result->status) {
@ -298,7 +298,7 @@ final class PqHandle implements Handle {
* @throws \Amp\Postgres\FailureException * @throws \Amp\Postgres\FailureException
*/ */
public function statementExecute(string $name, array $params): Promise { public function statementExecute(string $name, array $params): Promise {
\assert(isset($this->statements[$name]), "Named statement not found when deallocating"); \assert(isset($this->statements[$name]), "Named statement not found when executing");
$statement = $this->statements[$name]->statement; $statement = $this->statements[$name]->statement;

View File

@ -2,7 +2,6 @@
namespace Amp\Postgres; namespace Amp\Postgres;
use Amp\Coroutine;
use Amp\Producer; use Amp\Producer;
use Amp\Promise; use Amp\Promise;
use pq; use pq;
@ -35,9 +34,8 @@ final class PqUnbufferedResultSet implements ResultSet, Operation {
try { try {
do { do {
$result->autoConvert = pq\Result::CONV_SCALAR | pq\Result::CONV_ARRAY; $result->autoConvert = pq\Result::CONV_SCALAR | pq\Result::CONV_ARRAY;
$next = $fetch(); // Request next result before current is consumed.
yield $emit($result); yield $emit($result);
$result = yield $next; $result = yield $fetch();
} while ($result instanceof pq\Result); } while ($result instanceof pq\Result);
} finally { } finally {
$queue->unreference(); $queue->unreference();
@ -45,12 +43,6 @@ final class PqUnbufferedResultSet implements ResultSet, Operation {
}); });
} }
public function __destruct() {
if (!$this->queue->isReferenced()) { // Producer above did not complete, so consume remaining results.
Promise\rethrow(new Coroutine($this->dispose()));
}
}
/** /**
* {@inheritdoc} * {@inheritdoc}
*/ */
@ -84,14 +76,6 @@ final class PqUnbufferedResultSet implements ResultSet, Operation {
} }
} }
private function dispose(): \Generator {
try {
while (yield $this->producer->advance()); // Discard unused result rows.
} catch (\Throwable $exception) {
// Ignore failure while discarding results.
}
}
/** /**
* @return int Number of fields (columns) in each result set. * @return int Number of fields (columns) in each result set.
*/ */

View File

@ -330,6 +330,31 @@ abstract class AbstractLinkTest extends TestCase {
}); });
} }
public function testPrepareThenExecuteWithUnconsumedTupleResult() {
Loop::run(function () {
/** @var \Amp\Postgres\Statement $statement */
$statement = yield $this->connection->prepare("SELECT * FROM test");
/** @var \Amp\Postgres\ResultSet $result */
$result = yield $statement->execute();
$this->assertInstanceOf(ResultSet::class, $result);
/** @var \Amp\Postgres\ResultSet $result */
$result = yield $statement->execute();
$this->assertInstanceOf(ResultSet::class, $result);
$data = $this->getData();
for ($i = 0; yield $result->advance(ResultSet::FETCH_OBJECT); ++$i) {
$row = $result->getCurrent();
$this->assertSame($data[$i][0], $row->domain);
$this->assertSame($data[$i][1], $row->tld);
}
});
}
public function testExecute() { public function testExecute() {
Loop::run(function () { Loop::run(function () {
$data = $this->getData()[0]; $data = $this->getData()[0];