1
0
mirror of https://github.com/danog/postgres.git synced 2025-01-23 05:41:19 +01:00
postgres/lib/PqConnection.php

83 lines
2.7 KiB
PHP
Raw Normal View History

2016-12-29 23:21:17 -06:00
<?php
2016-09-14 09:27:39 -05:00
namespace Amp\Postgres;
use Amp\{ Deferred, TimeoutException };
2016-11-15 11:06:21 -06:00
use Interop\Async\{ Loop, Promise };
2016-09-14 09:27:39 -05:00
use pq;
class PqConnection extends AbstractConnection {
/**
* @param string $connectionString
* @param int|null $timeout
*
2016-11-15 11:06:21 -06:00
* @return \Interop\Async\Promise<\Amp\Postgres\PgSqlConnection>
2016-09-14 09:27:39 -05:00
*
* @throws \Amp\Postgres\FailureException
*/
2016-11-15 11:06:21 -06:00
public static function connect(string $connectionString, int $timeout = null): Promise {
2016-09-14 09:27:39 -05:00
try {
$connection = new pq\Connection($connectionString, pq\Connection::ASYNC);
} catch (pq\Exception $exception) {
throw new FailureException("Could not connect to PostgresSQL server", 0, $exception);
}
$connection->resetAsync();
$connection->nonblocking = true;
$connection->unbuffered = true;
$deferred = new Deferred;
$callback = function ($watcher, $resource) use (&$poll, &$await, $connection, $deferred) {
try {
switch ($connection->poll()) {
case pq\Connection::POLLING_READING:
return; // Connection not ready, poll again.
case pq\Connection::POLLING_WRITING:
return; // Still writing...
case pq\Connection::POLLING_FAILED:
throw new FailureException("Could not connect to PostgreSQL server");
case pq\Connection::POLLING_OK:
case \PGSQL_POLLING_OK:
Loop::cancel($poll);
Loop::cancel($await);
$deferred->resolve(new self($connection));
return;
}
} catch (\Throwable $exception) {
Loop::cancel($poll);
Loop::cancel($await);
$deferred->fail($exception);
}
};
$poll = Loop::onReadable($connection->socket, $callback);
$await = Loop::onWritable($connection->socket, $callback);
if ($timeout !== null) {
return \Amp\capture(
2016-11-15 11:06:21 -06:00
$deferred->promise(),
2016-09-14 09:27:39 -05:00
TimeoutException::class,
function (\Throwable $exception) use ($connection, $poll, $await) {
Loop::cancel($poll);
Loop::cancel($await);
throw $exception;
}
);
}
2016-11-15 11:06:21 -06:00
return $deferred->promise();
2016-09-14 09:27:39 -05:00
}
/**
* Connection constructor.
*
* @param \pq\Connection $handle
*/
public function __construct(pq\Connection $handle) {
parent::__construct(new PqExecutor($handle));
}
}