numCols = $result->numCols; $this->queue = $queue = new Internal\CompletionQueue; parent::__construct($this->producer = new Producer(static function (callable $emit) use ($queue, $result, $fetch) { try { do { $next = $fetch(); // Request next result before current is consumed. yield $emit($result->fetchRow(pq\Result::FETCH_ASSOC)); $result = yield $next; } while ($result instanceof pq\Result); } finally { $queue->complete(); } })); } public function __destruct() { if (!$this->queue->isComplete()) { // Producer above did not complete, so consume remaining results. Promise\rethrow(new Coroutine($this->dispose())); } } private function dispose(): \Generator { try { while (yield $this->producer->advance()); // Discard unused result rows. } catch (\Throwable $exception) { // Ignore failure while discarding results. } } /** * @return int Number of fields (columns) in each result set. */ public function numFields(): int { return $this->numCols; } /** * {@inheritdoc} */ public function onComplete(callable $onComplete) { $this->queue->onComplete($onComplete); } }