mirror of
https://github.com/danog/postgres.git
synced 2024-11-30 04:29:12 +01:00
Simplify statement preparing and storage
This commit is contained in:
parent
6791ece329
commit
1269216bee
@ -1,9 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
namespace Amp\Postgres\Internal;
|
|
||||||
|
|
||||||
class PqStatementStorage extends StatementStorage
|
|
||||||
{
|
|
||||||
/** @var \pq\Statement */
|
|
||||||
public $statement;
|
|
||||||
}
|
|
@ -1,16 +0,0 @@
|
|||||||
<?php
|
|
||||||
|
|
||||||
namespace Amp\Postgres\Internal;
|
|
||||||
|
|
||||||
use Amp\Struct;
|
|
||||||
|
|
||||||
class StatementStorage
|
|
||||||
{
|
|
||||||
use Struct;
|
|
||||||
|
|
||||||
/** @var |null */
|
|
||||||
public $promise;
|
|
||||||
|
|
||||||
/** @var int */
|
|
||||||
public $count = 1;
|
|
||||||
}
|
|
@ -50,7 +50,7 @@ final class PgSqlHandle implements Handle
|
|||||||
/** @var callable */
|
/** @var callable */
|
||||||
private $unlisten;
|
private $unlisten;
|
||||||
|
|
||||||
/** @var Internal\StatementStorage[] */
|
/** @var Promise[] */
|
||||||
private $statements = [];
|
private $statements = [];
|
||||||
|
|
||||||
/** @var int */
|
/** @var int */
|
||||||
@ -312,12 +312,6 @@ final class PgSqlHandle implements Handle
|
|||||||
|
|
||||||
\assert(isset($this->statements[$name]), "Named statement not found when deallocating");
|
\assert(isset($this->statements[$name]), "Named statement not found when deallocating");
|
||||||
|
|
||||||
$storage = $this->statements[$name];
|
|
||||||
|
|
||||||
if (--$storage->count) {
|
|
||||||
return new Success;
|
|
||||||
}
|
|
||||||
|
|
||||||
unset($this->statements[$name]);
|
unset($this->statements[$name]);
|
||||||
|
|
||||||
return $this->query(\sprintf("DEALLOCATE %s", $name));
|
return $this->query(\sprintf("DEALLOCATE %s", $name));
|
||||||
@ -368,21 +362,17 @@ final class PgSqlHandle implements Handle
|
|||||||
$name = Handle::STATEMENT_NAME_PREFIX . \sha1($modifiedSql);
|
$name = Handle::STATEMENT_NAME_PREFIX . \sha1($modifiedSql);
|
||||||
|
|
||||||
if (isset($this->statements[$name])) {
|
if (isset($this->statements[$name])) {
|
||||||
$storage = $this->statements[$name];
|
return $this->statements[$name];
|
||||||
++$storage->count;
|
|
||||||
|
|
||||||
if ($storage->promise) {
|
|
||||||
return $storage->promise;
|
|
||||||
}
|
|
||||||
|
|
||||||
return new Success(new PgSqlStatement($this, $name, $sql, $names));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->statements[$name] = $storage = new Internal\StatementStorage;
|
return $this->statements[$name] = call(function () use ($name, $names, $sql, $modifiedSql) {
|
||||||
|
try {
|
||||||
$promise = $storage->promise = call(function () use ($name, $names, $sql, $modifiedSql) {
|
/** @var resource $result PostgreSQL result resource. */
|
||||||
/** @var resource $result PostgreSQL result resource. */
|
$result = yield from $this->send("pg_send_prepare", $name, $modifiedSql);
|
||||||
$result = yield from $this->send("pg_send_prepare", $name, $modifiedSql);
|
} catch (\Throwable $exception) {
|
||||||
|
unset($this->statements[$name]);
|
||||||
|
throw $exception;
|
||||||
|
}
|
||||||
|
|
||||||
switch (\pg_result_status($result, \PGSQL_STATUS_LONG)) {
|
switch (\pg_result_status($result, \PGSQL_STATUS_LONG)) {
|
||||||
case \PGSQL_COMMAND_OK:
|
case \PGSQL_COMMAND_OK:
|
||||||
@ -391,8 +381,8 @@ final class PgSqlHandle implements Handle
|
|||||||
case \PGSQL_NONFATAL_ERROR:
|
case \PGSQL_NONFATAL_ERROR:
|
||||||
case \PGSQL_FATAL_ERROR:
|
case \PGSQL_FATAL_ERROR:
|
||||||
$diagnostics = [];
|
$diagnostics = [];
|
||||||
foreach (self::DIAGNOSTIC_CODES as $fieldCode => $desciption) {
|
foreach (self::DIAGNOSTIC_CODES as $fieldCode => $description) {
|
||||||
$diagnostics[$desciption] = \pg_result_error_field($result, $fieldCode);
|
$diagnostics[$description] = \pg_result_error_field($result, $fieldCode);
|
||||||
}
|
}
|
||||||
throw new QueryExecutionError(\pg_result_error($result), $diagnostics);
|
throw new QueryExecutionError(\pg_result_error($result), $diagnostics);
|
||||||
|
|
||||||
@ -405,15 +395,6 @@ final class PgSqlHandle implements Handle
|
|||||||
// @codeCoverageIgnoreEnd
|
// @codeCoverageIgnoreEnd
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
$promise->onResolve(function ($exception) use ($storage, $name) {
|
|
||||||
if ($exception) {
|
|
||||||
unset($this->statements[$name]);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
$storage->promise = null;
|
|
||||||
});
|
|
||||||
return $promise;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -38,8 +38,11 @@ final class PqHandle implements Handle
|
|||||||
/** @var \Amp\Emitter[] */
|
/** @var \Amp\Emitter[] */
|
||||||
private $listeners;
|
private $listeners;
|
||||||
|
|
||||||
/** @var @return PromiseInternal\PqStatementStorage[] */
|
/** @var Promise[] */
|
||||||
private $statements = [];
|
private $statementPromises = [];
|
||||||
|
|
||||||
|
/** @var \pq\Statement[] */
|
||||||
|
private $statementHandles = [];
|
||||||
|
|
||||||
/** @var callable */
|
/** @var callable */
|
||||||
private $fetch;
|
private $fetch;
|
||||||
@ -202,10 +205,10 @@ final class PqHandle implements Handle
|
|||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
$handle = $method(...$args);
|
|
||||||
|
|
||||||
$this->deferred = $this->busy = new Deferred;
|
$this->deferred = $this->busy = new Deferred;
|
||||||
|
|
||||||
|
$handle = $method(...$args);
|
||||||
|
|
||||||
Loop::enable($this->poll);
|
Loop::enable($this->poll);
|
||||||
if (!$this->handle->flush()) {
|
if (!$this->handle->flush()) {
|
||||||
Loop::enable($this->await);
|
Loop::enable($this->await);
|
||||||
@ -311,11 +314,9 @@ final class PqHandle implements Handle
|
|||||||
*/
|
*/
|
||||||
public function statementExecute(string $name, array $params): Promise
|
public function statementExecute(string $name, array $params): Promise
|
||||||
{
|
{
|
||||||
\assert(isset($this->statements[$name]), "Named statement not found when executing");
|
\assert(isset($this->statementHandles[$name]), "Named statement not found when executing");
|
||||||
|
|
||||||
$statement = $this->statements[$name]->statement;
|
return new Coroutine($this->send([$this->statementHandles[$name], "execAsync"], $params));
|
||||||
|
|
||||||
return new Coroutine($this->send([$statement, "execAsync"], $params));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -331,17 +332,15 @@ final class PqHandle implements Handle
|
|||||||
return new Success; // Connection dead.
|
return new Success; // Connection dead.
|
||||||
}
|
}
|
||||||
|
|
||||||
\assert(isset($this->statements[$name]), "Named statement not found when deallocating");
|
\assert(
|
||||||
|
isset($this->statementPromises[$name], $this->statementHandles[$name]),
|
||||||
|
"Named statement not found when deallocating"
|
||||||
|
);
|
||||||
|
|
||||||
$storage = $this->statements[$name];
|
$statement = $this->statementHandles[$name];
|
||||||
|
unset($this->statementPromises[$name], $this->statementHandles[$name]);
|
||||||
|
|
||||||
if (--$storage->count) {
|
return new Coroutine($this->send([$statement, "deallocateAsync"]));
|
||||||
return new Success;
|
|
||||||
}
|
|
||||||
|
|
||||||
unset($this->statements[$name]);
|
|
||||||
|
|
||||||
return new Coroutine($this->send([$storage->statement, "deallocateAsync"]));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -384,33 +383,22 @@ final class PqHandle implements Handle
|
|||||||
|
|
||||||
$name = Handle::STATEMENT_NAME_PREFIX . \sha1($modifiedSql);
|
$name = Handle::STATEMENT_NAME_PREFIX . \sha1($modifiedSql);
|
||||||
|
|
||||||
if (isset($this->statements[$name])) {
|
if (isset($this->statementPromises[$name])) {
|
||||||
$storage = $this->statements[$name];
|
return $this->statementPromises[$name];
|
||||||
++$storage->count;
|
|
||||||
|
|
||||||
if ($storage->promise) {
|
|
||||||
return $storage->promise;
|
|
||||||
}
|
|
||||||
|
|
||||||
return new Success(new PqStatement($this, $name, $sql, $names));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
$this->statements[$name] = $storage = new Internal\PqStatementStorage;
|
return $this->statementPromises[$name] = call(function () use ($names, $name, $sql, $modifiedSql) {
|
||||||
|
try {
|
||||||
$promise = $storage->promise = call(function () use ($storage, $names, $name, $sql, $modifiedSql) {
|
$statement = yield from $this->send([$this->handle, "prepareAsync"], $name, $modifiedSql);
|
||||||
$statement = yield from $this->send([$this->handle, "prepareAsync"], $name, $modifiedSql);
|
} catch (\Throwable $exception) {
|
||||||
$storage->statement = $statement;
|
unset($this->statementPromises[$name]);
|
||||||
return new PqStatement($this, $name, $sql, $names);
|
throw $exception;
|
||||||
});
|
|
||||||
$promise->onResolve(function ($exception) use ($storage, $name) {
|
|
||||||
if ($exception) {
|
|
||||||
unset($this->statements[$name]);
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
$storage->promise = null;
|
$this->statementHandles[$name] = $statement;
|
||||||
|
|
||||||
|
return new PqStatement($this, $name, $sql, $names);
|
||||||
});
|
});
|
||||||
return $promise;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -7,7 +7,7 @@ use Amp\Sql\Statement;
|
|||||||
|
|
||||||
final class PqStatement implements Statement
|
final class PqStatement implements Statement
|
||||||
{
|
{
|
||||||
/** @var @return PromisePqHandle */
|
/** @var PqHandle */
|
||||||
private $handle;
|
private $handle;
|
||||||
|
|
||||||
/** @var string */
|
/** @var string */
|
||||||
|
Loading…
Reference in New Issue
Block a user