mirror of
https://github.com/danog/postgres.git
synced 2024-12-12 09:29:57 +01:00
421 lines
13 KiB
PHP
421 lines
13 KiB
PHP
<?php
|
|
|
|
namespace Amp\Postgres;
|
|
|
|
use Amp\CallableMaker;
|
|
use Amp\Deferred;
|
|
use Amp\Emitter;
|
|
use Amp\Loop;
|
|
use Amp\Promise;
|
|
use Amp\Success;
|
|
use function Amp\call;
|
|
|
|
class PgSqlHandle implements Handle {
|
|
use CallableMaker;
|
|
|
|
const DIAGNOSTIC_CODES = [
|
|
\PGSQL_DIAG_SEVERITY => "severity",
|
|
\PGSQL_DIAG_SQLSTATE => "sqlstate",
|
|
\PGSQL_DIAG_MESSAGE_PRIMARY => "message_primary",
|
|
\PGSQL_DIAG_MESSAGE_DETAIL => "message_detail",
|
|
\PGSQL_DIAG_MESSAGE_HINT => "message_hint",
|
|
\PGSQL_DIAG_STATEMENT_POSITION => "statement_position",
|
|
\PGSQL_DIAG_INTERNAL_POSITION => "internal_position",
|
|
\PGSQL_DIAG_INTERNAL_QUERY => "internal_query",
|
|
\PGSQL_DIAG_CONTEXT => "context",
|
|
\PGSQL_DIAG_SOURCE_FILE => "source_file",
|
|
\PGSQL_DIAG_SOURCE_LINE => "source_line",
|
|
\PGSQL_DIAG_SOURCE_FUNCTION => "source_function",
|
|
];
|
|
|
|
/** @var resource PostgreSQL connection handle. */
|
|
private $handle;
|
|
|
|
/** @var \Amp\Deferred|null */
|
|
private $deferred;
|
|
|
|
/** @var string */
|
|
private $poll;
|
|
|
|
/** @var string */
|
|
private $await;
|
|
|
|
/** @var callable */
|
|
private $executeCallback;
|
|
|
|
/** @var callable */
|
|
private $deallocateCallback;
|
|
|
|
/** @var \Amp\Emitter[] */
|
|
private $listeners = [];
|
|
|
|
/** @var callable */
|
|
private $unlisten;
|
|
|
|
/** @var \Amp\Postgres\Internal\StatementStorage[] */
|
|
private $statements = [];
|
|
|
|
/**
|
|
* Connection constructor.
|
|
*
|
|
* @param resource $handle PostgreSQL connection handle.
|
|
* @param resource $socket PostgreSQL connection stream socket.
|
|
*/
|
|
public function __construct($handle, $socket) {
|
|
$this->handle = $handle;
|
|
|
|
$handle = &$this->handle;
|
|
$deferred = &$this->deferred;
|
|
$listeners = &$this->listeners;
|
|
|
|
$this->poll = Loop::onReadable($socket, static function ($watcher) use (&$deferred, &$listeners, &$handle) {
|
|
if (!\pg_consume_input($handle)) {
|
|
$handle = null; // Marks connection as dead.
|
|
Loop::disable($watcher);
|
|
|
|
$exception = new ConnectionException(\pg_last_error($handle));
|
|
|
|
foreach ($listeners as $listener) {
|
|
$listener->fail($exception);
|
|
}
|
|
|
|
if ($deferred !== null) {
|
|
$deferred->fail($exception);
|
|
}
|
|
|
|
return;
|
|
}
|
|
|
|
while ($result = \pg_get_notify($handle, \PGSQL_ASSOC)) {
|
|
$channel = $result["message"];
|
|
|
|
if (!isset($listeners[$channel])) {
|
|
continue;
|
|
}
|
|
|
|
$notification = new Notification;
|
|
$notification->channel = $channel;
|
|
$notification->pid = $result["pid"];
|
|
$notification->payload = $result["payload"];
|
|
$listeners[$channel]->emit($notification);
|
|
}
|
|
|
|
if ($deferred === null) {
|
|
return; // No active query, only notification listeners.
|
|
}
|
|
|
|
if (\pg_connection_busy($handle)) {
|
|
return;
|
|
}
|
|
|
|
$deferred->resolve(\pg_get_result($handle));
|
|
|
|
if (!$deferred && empty($listeners)) {
|
|
Loop::disable($watcher);
|
|
}
|
|
});
|
|
|
|
$this->await = Loop::onWritable($socket, static function ($watcher) use (&$deferred, &$listeners, &$handle) {
|
|
$flush = \pg_flush($handle);
|
|
if ($flush === 0) {
|
|
return; // Not finished sending data, listen again.
|
|
}
|
|
|
|
Loop::disable($watcher);
|
|
|
|
if ($flush === false) {
|
|
$exception = new ConnectionException(\pg_last_error($handle));
|
|
$handle = null; // Marks connection as dead.
|
|
|
|
foreach ($listeners as $listener) {
|
|
$listener->fail($exception);
|
|
}
|
|
|
|
if ($deferred !== null) {
|
|
$deferred->fail($exception);
|
|
}
|
|
}
|
|
});
|
|
|
|
Loop::disable($this->poll);
|
|
Loop::disable($this->await);
|
|
|
|
$this->executeCallback = $this->callableFromInstanceMethod("sendExecute");
|
|
$this->deallocateCallback = $this->callableFromInstanceMethod("sendDeallocate");
|
|
$this->unlisten = $this->callableFromInstanceMethod("unlisten");
|
|
}
|
|
|
|
/**
|
|
* Frees Io watchers from loop.
|
|
*/
|
|
public function __destruct() {
|
|
if (\is_resource($this->handle)) {
|
|
\pg_close($this->handle);
|
|
}
|
|
|
|
Loop::cancel($this->poll);
|
|
Loop::cancel($this->await);
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function isAlive(): bool {
|
|
return $this->handle !== null;
|
|
}
|
|
|
|
/**
|
|
* @param callable $function Function name to execute.
|
|
* @param mixed ...$args Arguments to pass to function.
|
|
*
|
|
* @return \Generator
|
|
*
|
|
* @resolve resource
|
|
*
|
|
* @throws \Amp\Postgres\FailureException
|
|
*/
|
|
private function send(callable $function, ...$args): \Generator {
|
|
while ($this->deferred) {
|
|
try {
|
|
yield $this->deferred->promise();
|
|
} catch (\Throwable $exception) {
|
|
// Ignore failure from another operation.
|
|
}
|
|
}
|
|
|
|
if (!\is_resource($this->handle)) {
|
|
throw new ConnectionException("The connection to the database has been lost");
|
|
}
|
|
|
|
$result = $function($this->handle, ...$args);
|
|
|
|
if ($result === false) {
|
|
throw new FailureException(\pg_last_error($this->handle));
|
|
}
|
|
|
|
$this->deferred = new Deferred;
|
|
|
|
Loop::enable($this->poll);
|
|
if (0 === $result) {
|
|
Loop::enable($this->await);
|
|
}
|
|
|
|
try {
|
|
$result = yield $this->deferred->promise();
|
|
} finally {
|
|
$this->deferred = null;
|
|
}
|
|
|
|
return $result;
|
|
}
|
|
|
|
/**
|
|
* @param resource $result PostgreSQL result resource.
|
|
*
|
|
* @return \Amp\Postgres\CommandResult|\Amp\Postgres\ResultSet
|
|
*
|
|
* @throws \Amp\Postgres\FailureException
|
|
* @throws \Amp\Postgres\QueryError
|
|
*/
|
|
private function createResult($result) {
|
|
switch (\pg_result_status($result, \PGSQL_STATUS_LONG)) {
|
|
case \PGSQL_EMPTY_QUERY:
|
|
throw new QueryError("Empty query string");
|
|
|
|
case \PGSQL_COMMAND_OK:
|
|
return new PgSqlCommandResult($result);
|
|
|
|
case \PGSQL_TUPLES_OK:
|
|
return new PgSqlResultSet($result);
|
|
|
|
case \PGSQL_NONFATAL_ERROR:
|
|
case \PGSQL_FATAL_ERROR:
|
|
$diagnostics = [];
|
|
foreach (self::DIAGNOSTIC_CODES as $fieldCode => $desciption) {
|
|
$diagnostics[$desciption] = \pg_result_error_field($result, $fieldCode);
|
|
}
|
|
throw new QueryExecutionError(\pg_result_error($result), $diagnostics);
|
|
|
|
case \PGSQL_BAD_RESPONSE:
|
|
throw new FailureException(\pg_result_error($result));
|
|
|
|
default:
|
|
// @codeCoverageIgnoreStart
|
|
throw new FailureException("Unknown result status");
|
|
// @codeCoverageIgnoreEnd
|
|
}
|
|
}
|
|
|
|
private function sendExecute(string $name, array $params): Promise {
|
|
return call(function () use ($name, $params) {
|
|
return $this->createResult(yield from $this->send("pg_send_execute", $name, $params));
|
|
});
|
|
}
|
|
|
|
private function sendDeallocate(string $name) {
|
|
\assert(isset($this->statements[$name]), "Named statement not found when deallocating");
|
|
|
|
$storage = $this->statements[$name];
|
|
|
|
if (--$storage->count) {
|
|
return;
|
|
}
|
|
|
|
unset($this->statements[$name]);
|
|
|
|
Promise\rethrow($this->query(\sprintf("DEALLOCATE %s", $name)));
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function query(string $sql): Promise {
|
|
return call(function () use ($sql) {
|
|
return $this->createResult(yield from $this->send("pg_send_query", $sql));
|
|
});
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function execute(string $sql, array $params = []): Promise {
|
|
return call(function () use ($sql, $params) {
|
|
return $this->createResult(yield from $this->send("pg_send_query_params", $sql, $params));
|
|
});
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function prepare(string $sql): Promise {
|
|
$name = self::STATEMENT_NAME_PREFIX . \sha1($sql);
|
|
|
|
if (isset($this->statements[$name])) {
|
|
$storage = $this->statements[$name];
|
|
++$storage->count;
|
|
|
|
if ($storage->promise) {
|
|
return $storage->promise;
|
|
}
|
|
|
|
return new Success(new PgSqlStatement($name, $sql, $this->executeCallback, $this->deallocateCallback));
|
|
}
|
|
|
|
$this->statements[$name] = $storage = new Internal\StatementStorage;
|
|
|
|
$promise = $storage->promise = call(function () use ($name, $sql) {
|
|
/** @var resource $result PostgreSQL result resource. */
|
|
$result = yield from $this->send("pg_send_prepare", $name, $sql);
|
|
|
|
switch (\pg_result_status($result, \PGSQL_STATUS_LONG)) {
|
|
case \PGSQL_COMMAND_OK:
|
|
return new PgSqlStatement($name, $sql, $this->executeCallback, $this->deallocateCallback);
|
|
|
|
case \PGSQL_NONFATAL_ERROR:
|
|
case \PGSQL_FATAL_ERROR:
|
|
$diagnostics = [];
|
|
foreach (self::DIAGNOSTIC_CODES as $fieldCode => $desciption) {
|
|
$diagnostics[$desciption] = \pg_result_error_field($result, $fieldCode);
|
|
}
|
|
throw new QueryExecutionError(\pg_result_error($result), $diagnostics);
|
|
|
|
case \PGSQL_BAD_RESPONSE:
|
|
throw new FailureException(\pg_result_error($result));
|
|
|
|
default:
|
|
// @codeCoverageIgnoreStart
|
|
throw new FailureException("Unknown result status");
|
|
// @codeCoverageIgnoreEnd
|
|
}
|
|
});
|
|
$promise->onResolve(function ($exception) use ($storage, $name) {
|
|
if ($exception) {
|
|
unset($this->statements[$name]);
|
|
return;
|
|
}
|
|
|
|
$storage->promise = null;
|
|
});
|
|
return $promise;
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function notify(string $channel, string $payload = ""): Promise {
|
|
if ($payload === "") {
|
|
return $this->query(\sprintf("NOTIFY %s", $this->quoteName($channel)));
|
|
}
|
|
|
|
return $this->query(\sprintf("NOTIFY %s, %s", $this->quoteName($channel), $this->quoteString($payload)));
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function listen(string $channel): Promise {
|
|
return call(function () use ($channel) {
|
|
if (isset($this->listeners[$channel])) {
|
|
throw new QueryError(\sprintf("Already listening on channel '%s'", $channel));
|
|
}
|
|
|
|
$this->listeners[$channel] = $emitter = new Emitter;
|
|
|
|
try {
|
|
yield $this->query(\sprintf("LISTEN %s", $this->quoteName($channel)));
|
|
} catch (\Throwable $exception) {
|
|
unset($this->listeners[$channel]);
|
|
throw $exception;
|
|
}
|
|
|
|
Loop::enable($this->poll);
|
|
return new Listener($emitter->iterate(), $channel, $this->unlisten);
|
|
});
|
|
}
|
|
|
|
/**
|
|
* @param string $channel
|
|
*
|
|
* @return \Amp\Promise
|
|
*
|
|
* @throws \Error
|
|
*/
|
|
private function unlisten(string $channel): Promise {
|
|
\assert(isset($this->listeners[$channel]), "Not listening on that channel");
|
|
|
|
$emitter = $this->listeners[$channel];
|
|
unset($this->listeners[$channel]);
|
|
|
|
if (!\is_resource($this->handle)) {
|
|
$promise = new Success; // Connection already closed.
|
|
} else {
|
|
$promise = $this->query(\sprintf("UNLISTEN %s", $this->quoteName($channel)));
|
|
}
|
|
|
|
$promise->onResolve([$emitter, "complete"]);
|
|
return $promise;
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function quoteString(string $data): string {
|
|
if (!\is_resource($this->handle)) {
|
|
throw new ConnectionException("The connection to the database has been lost");
|
|
}
|
|
|
|
return \pg_escape_literal($this->handle, $data);
|
|
}
|
|
|
|
/**
|
|
* {@inheritdoc}
|
|
*/
|
|
public function quoteName(string $name): string {
|
|
if (!\is_resource($this->handle)) {
|
|
throw new ConnectionException("The connection to the database has been lost");
|
|
}
|
|
|
|
return \pg_escape_identifier($this->handle, $name);
|
|
}
|
|
}
|