1
0
mirror of https://github.com/danog/postgres.git synced 2024-11-27 04:24:45 +01:00
postgres/lib/PqExecutor.php

265 lines
7.5 KiB
PHP
Raw Normal View History

2016-09-14 16:27:39 +02:00
<?php declare(strict_types = 1);
namespace Amp\Postgres;
2016-09-19 18:12:32 +02:00
use Amp\{ CallableMaker, Coroutine, Deferred, Postponed, function pipe };
2016-09-14 16:27:39 +02:00
use Interop\Async\{ Awaitable, Loop };
use pq;
class PqExecutor implements Executor {
use CallableMaker;
/** @var \pq\Connection PostgreSQL connection object. */
private $handle;
/** @var \Amp\Deferred|null */
private $delayed;
/** @var \Amp\Deferred */
private $busy;
/** @var string */
private $poll;
/** @var string */
private $await;
2016-09-19 18:12:32 +02:00
/** @var \Amp\Postponed[] */
private $listeners;
2016-09-14 16:27:39 +02:00
/** @var callable */
private $send;
/** @var callable */
private $fetch;
2016-09-19 18:12:32 +02:00
/** @var callable */
private $unlisten;
2016-09-14 16:27:39 +02:00
/** @var callable */
private $release;
/**
* Connection constructor.
*
* @param \pq\Connection $handle
*/
public function __construct(pq\Connection $handle) {
$this->handle = $handle;
$deferred = &$this->delayed;
$this->poll = Loop::onReadable($this->handle->socket, static function ($watcher) use (&$deferred, $handle) {
if ($handle->poll() === pq\Connection::POLLING_FAILED) {
$deferred->fail(new FailureException($handle->errorMessage));
return;
}
if (!$handle->busy) {
$deferred->resolve($handle->getResult());
}
});
$this->await = Loop::onWritable($this->handle->socket, static function ($watcher) use (&$deferred, $handle) {
if (!$handle->flush()) {
return; // Not finished sending data, listen again.
}
Loop::disable($watcher);
});
Loop::disable($this->poll);
Loop::disable($this->await);
$this->send = $this->callableFromInstanceMethod("send");
$this->fetch = $this->callableFromInstanceMethod("fetch");
2016-09-19 18:12:32 +02:00
$this->unlisten = $this->callableFromInstanceMethod("unlisten");
2016-09-14 16:27:39 +02:00
$this->release = $this->callableFromInstanceMethod("release");
}
/**
* Frees Io watchers from loop.
*/
public function __destruct() {
Loop::cancel($this->poll);
Loop::cancel($this->await);
}
/**
* @param callable $method Method to execute.
* @param mixed ...$args Arguments to pass to function.
*
* @return \Generator
*
* @resolve resource
*
* @throws \Amp\Postgres\FailureException
*/
private function send(callable $method, ...$args): \Generator {
while ($this->busy !== null) {
yield $this->busy->getAwaitable();
}
$this->busy = new Deferred;
try {
try {
$handle = $method(...$args);
} catch (pg\Exception $exception) {
throw new FailureException($this->handle->errorMessage, 0, $exception);
}
$this->delayed = new Deferred;
Loop::enable($this->poll);
if (!$this->handle->flush()) {
Loop::enable($this->await);
}
try {
$result = yield $this->delayed->getAwaitable();
} finally {
$this->delayed = null;
Loop::disable($this->poll);
Loop::disable($this->await);
}
if ($handle instanceof pq\Statement) {
return new PqStatement($handle, $this->send);
}
if (!$result instanceof pq\Result) {
throw new FailureException("Unknown query result");
}
} finally {
$this->release();
}
switch ($result->status) {
case pq\Result::EMPTY_QUERY:
throw new QueryError("Empty query string");
case pq\Result::COMMAND_OK:
return new PqCommandResult($result);
case pq\Result::TUPLES_OK:
return new PqBufferedResult($result);
CASE pq\Result::SINGLE_TUPLE:
$result = new PqUnbufferedResult($this->fetch, $result);
$result->onComplete($this->release);
$this->busy = new Deferred;
return $result;
case pq\Result::NONFATAL_ERROR:
case pq\Result::FATAL_ERROR:
throw new QueryError($result->errorMessage);
case pq\Result::BAD_RESPONSE:
throw new FailureException($result->errorMessage);
default:
throw new FailureException("Unknown result status");
}
}
private function fetch(): \Generator {
if (!$this->handle->busy) { // Results buffered.
$result = $this->handle->getResult();
} else {
$this->delayed = new Deferred;
Loop::enable($this->poll);
try {
$result = yield $this->delayed->getAwaitable();
} finally {
$this->delayed = null;
Loop::disable($this->poll);
}
}
switch ($result->status) {
2016-09-19 18:12:32 +02:00
case pq\Result::TUPLES_OK: // End of result set.
2016-09-14 16:27:39 +02:00
return null;
case pq\Result::SINGLE_TUPLE:
return $result;
default:
throw new FailureException($result->errorMessage);
}
}
private function release() {
$busy = $this->busy;
$this->busy = null;
$busy->resolve();
}
/**
* {@inheritdoc}
*/
public function query(string $sql): Awaitable {
return new Coroutine($this->send([$this->handle, "execAsync"], $sql));
}
/**
* {@inheritdoc}
*/
public function execute(string $sql, ...$params): Awaitable {
return new Coroutine($this->send([$this->handle, "execParamsAsync"], $sql, $params));
}
/**
* {@inheritdoc}
*/
public function prepare(string $sql): Awaitable {
return new Coroutine($this->send([$this->handle, "prepareAsync"], $sql, $sql));
}
2016-09-19 18:12:32 +02:00
/**
* {@inheritdoc}
*/
public function listen(string $channel): Awaitable {
$postponed = new Postponed;
$awaitable = new Coroutine($this->send(
[$this->handle, "listenAsync"],
$channel,
static function (string $channel, string $message, int $pid) use ($postponed) {
$notification = new Notification;
$notification->channel = $channel;
$notification->pid = $pid;
$notification->payload = $message;
$postponed->emit($notification);
}));
return pipe($awaitable, function () use ($postponed, $channel) {
$this->listeners[$channel] = $postponed;
return new Listener($postponed->getObservable(), $channel, $this->unlisten);
});
}
/**
* @param string $channel
*
* @return \Interop\Async\Awaitable
*
* @throws \Error
*/
private function unlisten(string $channel): Awaitable {
if (!isset($this->listeners[$channel])) {
throw new \Error("Not listening on that channel");
}
$postponed = $this->listeners[$channel];
unset($this->listeners[$channel]);
$awaitable = new Coroutine($this->send([$this->handle, "unlistenAsync"], $channel));
$awaitable->when(function () use ($postponed) {
$postponed->resolve();
});
return $awaitable;
}
2016-09-14 16:27:39 +02:00
}