1
0
mirror of https://github.com/danog/postgres.git synced 2024-12-15 10:57:22 +01:00
postgres/lib/PqExecutor.php

344 lines
9.8 KiB
PHP
Raw Normal View History

2016-12-30 06:21:17 +01:00
<?php
2016-09-14 16:27:39 +02:00
namespace Amp\Postgres;
use Amp\CallableMaker;
use Amp\Coroutine;
use Amp\Deferred;
use Amp\Emitter;
use Amp\Loop;
use Amp\Promise;
use Amp\Success;
2016-09-14 16:27:39 +02:00
use pq;
use function Amp\call;
use function Amp\coroutine;
2016-09-14 16:27:39 +02:00
class PqExecutor implements Executor {
use CallableMaker;
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
/** @var \pq\Connection PostgreSQL connection object. */
private $handle;
/** @var \Amp\Deferred|null */
private $deferred;
2017-05-16 06:28:37 +02:00
/** @var \Amp\Deferred|null */
2016-09-14 16:27:39 +02:00
private $busy;
/** @var string */
private $poll;
/** @var string */
private $await;
2017-05-16 06:28:37 +02:00
/** @var \Amp\Emitter[] */
2016-09-19 18:12:32 +02:00
private $listeners;
2016-09-14 16:27:39 +02:00
/** @var \Amp\Postgres\Internal\PqStatementStorage[] */
private $statements = [];
2016-09-14 16:27:39 +02:00
/** @var callable */
private $send;
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
/** @var callable */
private $fetch;
2017-05-16 06:28:37 +02:00
2016-09-19 18:12:32 +02:00
/** @var callable */
private $unlisten;
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
/** @var callable */
private $release;
2017-05-16 06:28:37 +02:00
/** @var callable */
private $deallocate;
2016-09-14 16:27:39 +02:00
/**
* Connection constructor.
*
* @param \pq\Connection $handle
*/
public function __construct(pq\Connection $handle) {
$this->handle = $handle;
2017-05-16 06:28:37 +02:00
$deferred = &$this->deferred;
$listeners = &$this->listeners;
2017-05-16 06:28:37 +02:00
$this->poll = Loop::onReadable($this->handle->socket, static function ($watcher) use (&$deferred, &$listeners, $handle) {
$status = $handle->poll();
2017-05-16 06:28:37 +02:00
if ($deferred === null) {
return; // No active query, only notification listeners.
}
2017-05-16 06:28:37 +02:00
if ($status === pq\Connection::POLLING_FAILED) {
2016-09-14 16:27:39 +02:00
$deferred->fail(new FailureException($handle->errorMessage));
} elseif (!$handle->busy) {
$deferred->resolve($handle->getResult());
2016-09-14 16:27:39 +02:00
}
if (!$deferred && !$handle->busy && empty($listeners)) {
Loop::disable($watcher);
2016-09-14 16:27:39 +02:00
}
});
$this->await = Loop::onWritable($this->handle->socket, static function ($watcher) use (&$deferred, $handle) {
if (!$handle->flush()) {
return; // Not finished sending data, continue polling for writability.
2016-09-14 16:27:39 +02:00
}
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
Loop::disable($watcher);
});
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
Loop::disable($this->poll);
Loop::disable($this->await);
$this->send = coroutine($this->callableFromInstanceMethod("send"));
2017-05-16 06:14:02 +02:00
$this->fetch = coroutine($this->callableFromInstanceMethod("fetch"));
2016-09-19 18:12:32 +02:00
$this->unlisten = $this->callableFromInstanceMethod("unlisten");
2016-09-14 16:27:39 +02:00
$this->release = $this->callableFromInstanceMethod("release");
$this->deallocate = $this->callableFromInstanceMethod("deallocate");
2016-09-14 16:27:39 +02:00
}
/**
* Frees Io watchers from loop.
*/
public function __destruct() {
Loop::cancel($this->poll);
Loop::cancel($this->await);
}
/**
* @param callable $method Method to execute.
* @param mixed ...$args Arguments to pass to function.
*
* @return \Generator
*
* @resolve resource
*
* @throws \Amp\Postgres\FailureException
*/
private function send(callable $method, ...$args): \Generator {
while ($this->busy) {
try {
yield $this->busy->promise();
} catch (\Throwable $exception) {
// Ignore failure from another operation.
}
2016-09-14 16:27:39 +02:00
}
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
try {
$handle = $method(...$args);
2017-05-16 06:28:37 +02:00
$this->deferred = $this->busy = new Deferred;
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
Loop::enable($this->poll);
if (!$this->handle->flush()) {
Loop::enable($this->await);
}
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
try {
2016-11-15 18:06:21 +01:00
$result = yield $this->deferred->promise();
2016-09-14 16:27:39 +02:00
} finally {
$this->deferred = null;
2016-09-14 16:27:39 +02:00
}
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
if (!$result instanceof pq\Result) {
throw new FailureException("Unknown query result");
}
} catch (pq\Exception $exception) {
throw new FailureException($this->handle->errorMessage, 0, $exception);
2016-09-14 16:27:39 +02:00
} finally {
$this->busy = null;
2016-09-14 16:27:39 +02:00
}
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
switch ($result->status) {
case pq\Result::EMPTY_QUERY:
throw new QueryError("Empty query string");
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
case pq\Result::COMMAND_OK:
if ($handle instanceof pq\Statement) {
return $handle; // Will be wrapped into a PqStatement object.
}
2016-09-14 16:27:39 +02:00
return new PqCommandResult($result);
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
case pq\Result::TUPLES_OK:
return new PqBufferedResult($result);
2017-05-16 06:28:37 +02:00
2017-02-16 23:25:52 +01:00
case pq\Result::SINGLE_TUPLE:
$this->busy = new Deferred;
2016-09-14 16:27:39 +02:00
$result = new PqUnbufferedResult($this->fetch, $result);
$result->onComplete($this->release);
return $result;
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
case pq\Result::NONFATAL_ERROR:
case pq\Result::FATAL_ERROR:
throw new QueryError($result->errorMessage);
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
case pq\Result::BAD_RESPONSE:
throw new FailureException($result->errorMessage);
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
default:
throw new FailureException("Unknown result status");
}
}
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
private function fetch(): \Generator {
if (!$this->handle->busy) { // Results buffered.
$result = $this->handle->getResult();
} else {
$this->deferred = new Deferred;
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
Loop::enable($this->poll);
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
try {
2016-11-15 18:06:21 +01:00
$result = yield $this->deferred->promise();
2016-09-14 16:27:39 +02:00
} finally {
$this->deferred = null;
2016-09-14 16:27:39 +02:00
}
}
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
switch ($result->status) {
2016-09-19 18:12:32 +02:00
case pq\Result::TUPLES_OK: // End of result set.
2016-09-14 16:27:39 +02:00
return null;
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
case pq\Result::SINGLE_TUPLE:
return $result;
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
default:
throw new FailureException($result->errorMessage);
}
}
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
private function release() {
\assert(
$this->busy instanceof Deferred && $this->busy !== $this->deferred,
"Connection in invalid state when releasing"
);
$deferred = $this->busy;
$this->busy = null;
$deferred->resolve();
2016-09-14 16:27:39 +02:00
}
2017-05-16 06:28:37 +02:00
private function deallocate(string $name) {
\assert(isset($this->statements[$name]), "Named statement not found when deallocating");
$storage = $this->statements[$name];
if (--$storage->count) {
return;
}
unset($this->statements[$name]);
Promise\rethrow(new Coroutine($this->send([$storage->statement, "deallocateAsync"])));
}
2016-09-14 16:27:39 +02:00
/**
* {@inheritdoc}
*/
2016-11-15 18:06:21 +01:00
public function query(string $sql): Promise {
2016-09-14 16:27:39 +02:00
return new Coroutine($this->send([$this->handle, "execAsync"], $sql));
}
/**
* {@inheritdoc}
*/
2016-11-15 18:06:21 +01:00
public function execute(string $sql, ...$params): Promise {
return new Coroutine($this->send([$this->handle, "execParamsAsync"], $sql, $params));
2016-09-14 16:27:39 +02:00
}
/**
* {@inheritdoc}
*/
2016-11-15 18:06:21 +01:00
public function prepare(string $sql): Promise {
$name = self::STATEMENT_NAME_PREFIX . \sha1($sql);
if (isset($this->statements[$name])) {
$storage = $this->statements[$name];
++$storage->count;
if ($storage->promise) {
return $storage->promise;
}
return new Success(new PqStatement($storage->statement, $this->send, $this->deallocate));
}
$this->statements[$name] = $storage = new Internal\PqStatementStorage;
$storage->promise = call(function () use ($storage, $name, $sql) {
$statement = yield from $this->send([$this->handle, "prepareAsync"], $name, $sql);
$storage->statement = $statement;
return new PqStatement($statement, $this->send, $this->deallocate);
});
$storage->promise->onResolve(function () use ($storage) {
$storage->promise = null;
});
return $storage->promise;
2016-09-14 16:27:39 +02:00
}
2017-05-16 06:28:37 +02:00
/**
* {@inheritdoc}
*/
2016-11-15 18:06:21 +01:00
public function notify(string $channel, string $payload = ""): Promise {
return new Coroutine($this->send([$this->handle, "notifyAsync"], $channel, $payload));
}
2017-05-16 06:28:37 +02:00
2016-09-19 18:12:32 +02:00
/**
* {@inheritdoc}
*/
2016-11-15 18:06:21 +01:00
public function listen(string $channel): Promise {
2017-05-16 06:14:02 +02:00
return call(function () use ($channel) {
if (isset($this->listeners[$channel])) {
throw new QueryError(\sprintf("Already listening on channel '%s'", $channel));
}
$this->listeners[$channel] = $emitter = new Emitter;
try {
yield from $this->send(
[$this->handle, "listenAsync"],
$channel,
static function (string $channel, string $message, int $pid) use ($emitter) {
$notification = new Notification;
$notification->channel = $channel;
$notification->pid = $pid;
$notification->payload = $message;
$emitter->emit($notification);
}
);
} catch (\Throwable $exception) {
unset($this->listeners[$channel]);
throw $exception;
}
Loop::enable($this->poll);
2017-05-16 06:14:02 +02:00
return new Listener($emitter->iterate(), $channel, $this->unlisten);
2016-09-19 18:12:32 +02:00
});
}
2017-05-16 06:28:37 +02:00
2016-09-19 18:12:32 +02:00
/**
* @param string $channel
*
* @return \Amp\Promise
2016-09-19 18:12:32 +02:00
*
* @throws \Error
*/
2016-11-15 18:06:21 +01:00
private function unlisten(string $channel): Promise {
\assert(isset($this->listeners[$channel]), "Not listening on that channel");
2017-05-16 06:28:37 +02:00
$emitter = $this->listeners[$channel];
2016-09-19 18:12:32 +02:00
unset($this->listeners[$channel]);
2017-05-16 06:28:37 +02:00
if (empty($this->listeners) && $this->deferred === null) {
Loop::disable($this->poll);
}
2017-05-16 06:28:37 +02:00
2016-11-15 18:06:21 +01:00
$promise = new Coroutine($this->send([$this->handle, "unlistenAsync"], $channel));
$promise->onResolve([$emitter, "complete"]);
2016-11-15 18:06:21 +01:00
return $promise;
2016-09-19 18:12:32 +02:00
}
2016-09-14 16:27:39 +02:00
}