*/ abstract public static function connect(string $connectionString, CancellationToken $token = null): Promise; /** * @param $executor; */ public function __construct(Executor $executor) { $this->executor = $executor; $this->release = $this->callableFromInstanceMethod("release"); } /** * @param string $methodName Method to execute. * @param mixed ...$args Arguments to pass to function. * * @return \Amp\Promise * * @throws \Amp\Postgres\FailureException * @throws \Amp\Postgres\PendingOperationError */ private function send(string $methodName, ...$args): \Generator { while ($this->busy) { yield $this->busy->promise(); } $this->busy = new Deferred; try { return $this->executor->{$methodName}(...$args); } finally { $this->release(); } } /** * Releases the transaction lock. */ private function release() { $deferred = $this->busy; $this->busy = null; $deferred->resolve(); } /** * {@inheritdoc} */ public function query(string $sql): Promise { return new Coroutine($this->send("query", $sql)); } /** * {@inheritdoc} */ public function execute(string $sql, ...$params): Promise { return new Coroutine($this->send("execute", $sql, ...$params)); } /** * {@inheritdoc} */ public function prepare(string $sql): Promise { return new Coroutine($this->send("prepare", $sql)); } /** * {@inheritdoc} */ public function notify(string $channel, string $payload = ""): Promise { return new Coroutine($this->send("notify", $channel, $payload)); } /** * {@inheritdoc} */ public function listen(string $channel): Promise { return new Coroutine($this->send("listen", $channel)); } /** * {@inheritdoc} */ public function transaction(int $isolation = Transaction::COMMITTED): Promise { return call(function () use ($isolation) { while ($this->busy) { yield $this->busy->promise(); } $this->busy = new Deferred; switch ($isolation) { case Transaction::UNCOMMITTED: yield $this->executor->query("BEGIN TRANSACTION ISOLATION LEVEL READ UNCOMMITTED"); break; case Transaction::COMMITTED: yield $this->executor->query("BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED"); break; case Transaction::REPEATABLE: yield $this->executor->query("BEGIN TRANSACTION ISOLATION LEVEL REPEATABLE READ"); break; case Transaction::SERIALIZABLE: yield $this->executor->query("BEGIN TRANSACTION ISOLATION LEVEL SERIALIZABLE"); break; default: throw new \Error("Invalid transaction type"); } $transaction = new Transaction($this->executor, $isolation); $transaction->onComplete($this->release); return $transaction; }); } }