1
0
mirror of https://github.com/danog/postgres.git synced 2024-12-13 18:07:31 +01:00
postgres/lib/PgSqlConnection.php

81 lines
2.5 KiB
PHP
Raw Normal View History

2016-12-30 06:21:17 +01:00
<?php
2016-09-14 16:27:39 +02:00
namespace Amp\Postgres;
use Amp\CancellationToken;
use Amp\Deferred;
use Amp\Failure;
use Amp\Loop;
use Amp\NullCancellationToken;
use Amp\Promise;
2016-09-14 16:27:39 +02:00
class PgSqlConnection extends AbstractConnection {
/**
* @param string $connectionString
2017-06-05 06:42:18 +02:00
* @param \Amp\CancellationToken $token
2016-09-14 16:27:39 +02:00
*
* @return \Amp\Promise<\Amp\Postgres\PgSqlConnection>
2016-09-14 16:27:39 +02:00
*/
2017-06-05 06:42:18 +02:00
public static function connect(string $connectionString, CancellationToken $token = null): Promise {
2016-09-14 16:27:39 +02:00
if (!$connection = @\pg_connect($connectionString, \PGSQL_CONNECT_ASYNC | \PGSQL_CONNECT_FORCE_NEW)) {
return new Failure(new FailureException("Failed to create connection resource"));
2016-09-14 16:27:39 +02:00
}
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
if (\pg_connection_status($connection) === \PGSQL_CONNECTION_BAD) {
return new Failure(new FailureException(\pg_last_error($connection)));
2016-09-14 16:27:39 +02:00
}
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
if (!$socket = \pg_socket($connection)) {
return new Failure(new FailureException("Failed to access connection socket"));
2016-09-14 16:27:39 +02:00
}
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
$deferred = new Deferred;
2017-05-16 06:28:37 +02:00
$callback = function ($watcher, $resource) use ($connection, $deferred) {
switch (\pg_connect_poll($connection)) {
case \PGSQL_POLLING_READING:
return; // Connection not ready, poll again.
case \PGSQL_POLLING_WRITING:
return; // Still writing...
case \PGSQL_POLLING_FAILED:
2017-06-05 06:42:18 +02:00
$deferred->fail(new FailureException(\pg_last_error($connection)));
return;
case \PGSQL_POLLING_OK:
$deferred->resolve(new self($connection, $resource));
return;
2016-09-14 16:27:39 +02:00
}
};
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
$poll = Loop::onReadable($socket, $callback);
$await = Loop::onWritable($socket, $callback);
$promise = $deferred->promise();
2017-06-05 06:42:18 +02:00
$token = $token ?? new NullCancellationToken;
$id = $token->subscribe([$deferred, "fail"]);
2017-06-05 06:42:18 +02:00
$promise->onResolve(function ($exception) use ($connection, $poll, $await, $id, $token) {
if ($exception) {
\pg_close($connection);
}
2017-06-05 06:42:18 +02:00
$token->unsubscribe($id);
Loop::cancel($poll);
Loop::cancel($await);
});
return $promise;
2016-09-14 16:27:39 +02:00
}
2017-05-16 06:28:37 +02:00
2016-09-14 16:27:39 +02:00
/**
* @param resource $handle PostgreSQL connection handle.
* @param resource $socket PostgreSQL connection stream socket.
*/
public function __construct($handle, $socket) {
parent::__construct(new PgSqlHandle($handle, $socket));
2016-09-14 16:27:39 +02:00
}
}