1
0
mirror of https://github.com/danog/postgres.git synced 2024-11-26 12:04:50 +01:00

Restore backpressure to unbuffered results

This commit is contained in:
Aaron Piotrowski 2019-04-01 19:15:30 -05:00
parent cf43bc03ac
commit 93f5a9cf1f
No known key found for this signature in database
GPG Key ID: ADD1EF783EDE9EEB

View File

@ -5,6 +5,7 @@ namespace Amp\Postgres;
use Amp\Producer;
use Amp\Promise;
use pq;
use function Amp\asyncCall;
final class PqUnbufferedResultSet implements ResultSet
{
@ -17,6 +18,9 @@ final class PqUnbufferedResultSet implements ResultSet
/** @var array|object Last row emitted. */
private $currentRow;
/** @var bool */
private $destroyed = false;
/**
* @param callable(): $fetch Function to fetch next result row.
* @param \pq\Result $result PostgreSQL result object.
@ -25,19 +29,36 @@ final class PqUnbufferedResultSet implements ResultSet
{
$this->numCols = $result->numCols;
$this->producer = new Producer(static function (callable $emit) use ($release, $result, $fetch) {
$this->producer = new Producer(static function (callable $emit) use (&$destroyed, $release, $result, $fetch) {
try {
do {
$result->autoConvert = pq\Result::CONV_SCALAR | pq\Result::CONV_ARRAY;
$emit($result);
$result = yield $fetch();
$next = $fetch();
yield $emit($result);
$result = yield $next;
} while ($result instanceof pq\Result);
} finally {
$destroyed = true;
$release();
}
});
}
public function __destruct()
{
if ($this->destroyed) {
return;
}
asyncCall(function () {
try {
while (yield $this->producer->advance());
} catch (\Throwable $exception) {
// Ignore iterator failure when destroying.
}
});
}
/**
* {@inheritdoc}
*/
@ -58,7 +79,7 @@ final class PqUnbufferedResultSet implements ResultSet
}
$result = $this->producer->getCurrent();
\assert($result instanceof \pq\Result);
\assert($result instanceof pq\Result);
return $this->currentRow = $result->fetchRow(pq\Result::FETCH_ASSOC);
}