diff --git a/lib/PqExecutor.php b/lib/PqExecutor.php index 6ee7266..d999927 100644 --- a/lib/PqExecutor.php +++ b/lib/PqExecutor.php @@ -2,7 +2,7 @@ namespace Amp\Postgres; -use Amp\{ CallableMaker, Coroutine, Deferred, Emitter, function pipe }; +use Amp\{ CallableMaker, Coroutine, Deferred, Emitter }; use AsyncInterop\{ Loop, Promise }; use pq; @@ -82,7 +82,7 @@ class PqExecutor implements Executor { Loop::disable($this->await); $this->send = $this->callableFromInstanceMethod("send"); - $this->fetch = $this->callableFromInstanceMethod("fetch"); + $this->fetch = \Amp\coroutine($this->callableFromInstanceMethod("fetch")); $this->unlisten = $this->callableFromInstanceMethod("unlisten"); $this->release = $this->callableFromInstanceMethod("release"); } @@ -248,7 +248,7 @@ class PqExecutor implements Executor { $emitter->emit($notification); })); - return pipe($promise, function () use ($emitter, $channel): Listener { + return \Amp\pipe($promise, function () use ($emitter, $channel): Listener { $this->listeners[$channel] = $emitter; Loop::enable($this->poll); return new Listener($emitter->stream(), $channel, $this->unlisten); diff --git a/lib/PqUnbufferedResult.php b/lib/PqUnbufferedResult.php index bc8cc56..263cfbd 100644 --- a/lib/PqUnbufferedResult.php +++ b/lib/PqUnbufferedResult.php @@ -2,7 +2,7 @@ namespace Amp\Postgres; -use Amp\{ Coroutine, Producer }; +use Amp\Producer; use pq; class PqUnbufferedResult extends TupleResult implements Operation { @@ -12,7 +12,7 @@ class PqUnbufferedResult extends TupleResult implements Operation { private $numCols; /** - * @param callable(): \Generator $fetch Coroutine function to fetch next result row. + * @param callable(): \AsyncInterop\Promise $fetch Function to fetch next result row. * @param \pq\Result $result PostgreSQL result object. */ public function __construct(callable $fetch, pq\Result $result) { @@ -21,7 +21,7 @@ class PqUnbufferedResult extends TupleResult implements Operation { $count = 0; try { do { - $next = new Coroutine($fetch()); // Request next result before current is consumed. + $next = $fetch(); // Request next result before current is consumed. ++$count; yield $emit($result->fetchRow(pq\Result::FETCH_ASSOC)); $result = yield $next;