From 93f5a9cf1f8d80a76a90208eebd79280f0ea2657 Mon Sep 17 00:00:00 2001 From: Aaron Piotrowski Date: Mon, 1 Apr 2019 19:15:30 -0500 Subject: [PATCH] Restore backpressure to unbuffered results --- src/PqUnbufferedResultSet.php | 29 +++++++++++++++++++++++++---- 1 file changed, 25 insertions(+), 4 deletions(-) diff --git a/src/PqUnbufferedResultSet.php b/src/PqUnbufferedResultSet.php index 8b8df62..be205d9 100644 --- a/src/PqUnbufferedResultSet.php +++ b/src/PqUnbufferedResultSet.php @@ -5,6 +5,7 @@ namespace Amp\Postgres; use Amp\Producer; use Amp\Promise; use pq; +use function Amp\asyncCall; final class PqUnbufferedResultSet implements ResultSet { @@ -17,6 +18,9 @@ final class PqUnbufferedResultSet implements ResultSet /** @var array|object Last row emitted. */ private $currentRow; + /** @var bool */ + private $destroyed = false; + /** * @param callable(): $fetch Function to fetch next result row. * @param \pq\Result $result PostgreSQL result object. @@ -25,19 +29,36 @@ final class PqUnbufferedResultSet implements ResultSet { $this->numCols = $result->numCols; - $this->producer = new Producer(static function (callable $emit) use ($release, $result, $fetch) { + $this->producer = new Producer(static function (callable $emit) use (&$destroyed, $release, $result, $fetch) { try { do { $result->autoConvert = pq\Result::CONV_SCALAR | pq\Result::CONV_ARRAY; - $emit($result); - $result = yield $fetch(); + $next = $fetch(); + yield $emit($result); + $result = yield $next; } while ($result instanceof pq\Result); } finally { + $destroyed = true; $release(); } }); } + public function __destruct() + { + if ($this->destroyed) { + return; + } + + asyncCall(function () { + try { + while (yield $this->producer->advance()); + } catch (\Throwable $exception) { + // Ignore iterator failure when destroying. + } + }); + } + /** * {@inheritdoc} */ @@ -58,7 +79,7 @@ final class PqUnbufferedResultSet implements ResultSet } $result = $this->producer->getCurrent(); - \assert($result instanceof \pq\Result); + \assert($result instanceof pq\Result); return $this->currentRow = $result->fetchRow(pq\Result::FETCH_ASSOC); }