mirror of
https://github.com/danog/MadelineProto.git
synced 2024-11-30 10:19:00 +01:00
Merge pull request #901 from danog/sql_optimize
Use queries + PDO::quotes instead of prepared statements. Reduce sql cpu load.
This commit is contained in:
commit
157a0c5bc0
@ -60,7 +60,8 @@
|
||||
"danog/phpdoc": "^0.1.7"
|
||||
},
|
||||
"suggest": {
|
||||
"ext-libtgvoip": "Install the php-libtgvoip extension to make phone calls (https://github.com/danog/php-libtgvoip)"
|
||||
"ext-libtgvoip": "Install the php-libtgvoip extension to make phone calls (https://github.com/danog/php-libtgvoip)",
|
||||
"ext-pdo": "Install pdo extension to support database used as cache"
|
||||
},
|
||||
"authors": [{
|
||||
"name": "Daniil Gentili",
|
||||
|
@ -15,7 +15,6 @@ use danog\MadelineProto\Settings\Database\Mysql as DatabaseMysql;
|
||||
class MysqlArray extends SqlArray
|
||||
{
|
||||
protected DatabaseMysql $dbSettings;
|
||||
private Pool $db;
|
||||
|
||||
// Legacy
|
||||
protected array $settings;
|
||||
@ -35,38 +34,36 @@ class MysqlArray extends SqlArray
|
||||
*
|
||||
* @param SqlArray::STATEMENT_* $type
|
||||
*
|
||||
* @return Promise
|
||||
* @return string
|
||||
*/
|
||||
protected function prepareStatements(int $type): Promise
|
||||
protected function getSqlQuery(int $type): string
|
||||
{
|
||||
switch ($type) {
|
||||
case SqlArray::STATEMENT_GET:
|
||||
return $this->db->prepare(
|
||||
"SELECT `value` FROM `{$this->table}` WHERE `key` = :index LIMIT 1"
|
||||
);
|
||||
case SqlArray::STATEMENT_SET:
|
||||
return $this->db->prepare("
|
||||
INSERT INTO `{$this->table}`
|
||||
SET `key` = :index, `value` = :value
|
||||
ON DUPLICATE KEY UPDATE `value` = :value
|
||||
");
|
||||
case SqlArray::STATEMENT_UNSET:
|
||||
return $this->db->prepare("
|
||||
DELETE FROM `{$this->table}`
|
||||
WHERE `key` = :index
|
||||
");
|
||||
case SqlArray::STATEMENT_COUNT:
|
||||
return $this->db->prepare("
|
||||
SELECT count(`key`) as `count` FROM `{$this->table}`
|
||||
");
|
||||
case SqlArray::STATEMENT_ITERATE:
|
||||
return $this->db->prepare("
|
||||
SELECT `key`, `value` FROM `{$this->table}`
|
||||
");
|
||||
case SqlArray::STATEMENT_CLEAR:
|
||||
return $this->db->prepare("
|
||||
DELETE FROM `{$this->table}`
|
||||
");
|
||||
case SqlArray::SQL_GET:
|
||||
return "SELECT `value` FROM `{$this->table}` WHERE `key` = :index LIMIT 1";
|
||||
case SqlArray::SQL_SET:
|
||||
return "
|
||||
INSERT INTO `{$this->table}`
|
||||
SET `key` = :index, `value` = :value
|
||||
ON DUPLICATE KEY UPDATE `value` = :value
|
||||
";
|
||||
case SqlArray::SQL_UNSET:
|
||||
return "
|
||||
DELETE FROM `{$this->table}`
|
||||
WHERE `key` = :index
|
||||
";
|
||||
case SqlArray::SQL_COUNT:
|
||||
return "
|
||||
SELECT count(`key`) as `count` FROM `{$this->table}`
|
||||
";
|
||||
case SqlArray::SQL_ITERATE:
|
||||
return "
|
||||
SELECT `key`, `value` FROM `{$this->table}`
|
||||
";
|
||||
case SqlArray::SQL_CLEAR:
|
||||
return "
|
||||
DELETE FROM `{$this->table}`
|
||||
";
|
||||
}
|
||||
throw new Exception("An invalid statement type $type was provided!");
|
||||
}
|
||||
@ -97,6 +94,12 @@ class MysqlArray extends SqlArray
|
||||
*/
|
||||
public function initConnection($settings): \Generator
|
||||
{
|
||||
$urlArray = parse_url($settings->getUri());
|
||||
$this->pdo = new \PDO(
|
||||
"mysql:host={$urlArray['host']};port={$urlArray['port']};charset=UTF8",
|
||||
$settings->getUsername(),
|
||||
$settings->getPassword()
|
||||
);
|
||||
if (!isset($this->db)) {
|
||||
$this->db = yield from Mysql::getConnection($settings);
|
||||
}
|
||||
|
@ -17,7 +17,6 @@ use danog\MadelineProto\Settings\Database\Postgres as DatabasePostgres;
|
||||
class PostgresArray extends SqlArray
|
||||
{
|
||||
public DatabasePostgres $dbSettings;
|
||||
private Pool $db;
|
||||
|
||||
// Legacy
|
||||
protected array $settings;
|
||||
@ -27,39 +26,37 @@ class PostgresArray extends SqlArray
|
||||
*
|
||||
* @param SqlArray::STATEMENT_* $type
|
||||
*
|
||||
* @return Promise
|
||||
* @return string
|
||||
*/
|
||||
protected function prepareStatements(int $type): Promise
|
||||
protected function getSqlQuery(int $type): string
|
||||
{
|
||||
switch ($type) {
|
||||
case SqlArray::STATEMENT_GET:
|
||||
return $this->db->prepare(
|
||||
"SELECT value FROM \"{$this->table}\" WHERE key = :index LIMIT 1",
|
||||
);
|
||||
case SqlArray::STATEMENT_SET:
|
||||
return $this->db->prepare("
|
||||
case SqlArray::SQL_GET:
|
||||
return "SELECT value FROM \"{$this->table}\" WHERE key = :index LIMIT 1";
|
||||
case SqlArray::SQL_SET:
|
||||
return "
|
||||
INSERT INTO \"{$this->table}\"
|
||||
(key,value)
|
||||
VALUES (:index, :value)
|
||||
ON CONFLICT (key) DO UPDATE SET value = :value
|
||||
");
|
||||
case SqlArray::STATEMENT_UNSET:
|
||||
return $this->db->prepare("
|
||||
";
|
||||
case SqlArray::SQL_UNSET:
|
||||
return "
|
||||
DELETE FROM \"{$this->table}\"
|
||||
WHERE key = :index
|
||||
");
|
||||
case SqlArray::STATEMENT_COUNT:
|
||||
return $this->db->prepare("
|
||||
";
|
||||
case SqlArray::SQL_COUNT:
|
||||
return "
|
||||
SELECT count(key) as count FROM \"{$this->table}\"
|
||||
");
|
||||
case SqlArray::STATEMENT_ITERATE:
|
||||
return $this->db->prepare("
|
||||
";
|
||||
case SqlArray::SQL_ITERATE:
|
||||
return "
|
||||
SELECT key, value FROM \"{$this->table}\"
|
||||
");
|
||||
case SqlArray::STATEMENT_CLEAR:
|
||||
return $this->db->prepare("
|
||||
";
|
||||
case SqlArray::SQL_CLEAR:
|
||||
return "
|
||||
DELETE FROM \"{$this->table}\"
|
||||
");
|
||||
";
|
||||
}
|
||||
throw new Exception("An invalid statement type $type was provided!");
|
||||
}
|
||||
@ -81,6 +78,12 @@ class PostgresArray extends SqlArray
|
||||
*/
|
||||
public function initConnection($settings): \Generator
|
||||
{
|
||||
$urlArray = parse_url($settings->getUri());
|
||||
$this->pdo = new \PDO(
|
||||
"postgre:host={$urlArray['host']};port={$urlArray['port']};charset=UTF8",
|
||||
$settings->getUsername(),
|
||||
$settings->getPassword()
|
||||
);
|
||||
if (!isset($this->db)) {
|
||||
$this->db = yield from Postgres::getConnection($settings);
|
||||
}
|
||||
@ -130,7 +133,7 @@ class PostgresArray extends SqlArray
|
||||
$this->setCache($index, $value);
|
||||
|
||||
$request = $this->execute(
|
||||
self::STATEMENT_SET,
|
||||
$this->getSqlQuery(self::SQL_SET),
|
||||
[
|
||||
'index' => $index,
|
||||
'value' => new ByteA(\serialize($value)),
|
||||
|
@ -4,8 +4,9 @@ namespace danog\MadelineProto\Db;
|
||||
|
||||
use Amp\Producer;
|
||||
use Amp\Promise;
|
||||
use Amp\Sql\CommandResult;
|
||||
use Amp\Sql\Pool;
|
||||
use Amp\Sql\ResultSet;
|
||||
use Amp\Sql\Statement;
|
||||
use Amp\Success;
|
||||
use danog\MadelineProto\Logger;
|
||||
|
||||
@ -16,19 +17,16 @@ use function Amp\call;
|
||||
*/
|
||||
abstract class SqlArray extends DriverArray
|
||||
{
|
||||
/**
|
||||
* Statement array.
|
||||
*
|
||||
* @var Statement[]
|
||||
*/
|
||||
private array $statements = [];
|
||||
protected Pool $db;
|
||||
//Pdo driver used for value quoting, to prevent sql injections.
|
||||
protected \PDO $pdo;
|
||||
|
||||
protected const STATEMENT_GET = 0;
|
||||
protected const STATEMENT_SET = 1;
|
||||
protected const STATEMENT_UNSET = 2;
|
||||
protected const STATEMENT_COUNT = 3;
|
||||
protected const STATEMENT_ITERATE = 4;
|
||||
protected const STATEMENT_CLEAR = 5;
|
||||
protected const SQL_GET = 0;
|
||||
protected const SQL_SET = 1;
|
||||
protected const SQL_UNSET = 2;
|
||||
protected const SQL_COUNT = 3;
|
||||
protected const SQL_ITERATE = 4;
|
||||
protected const SQL_CLEAR = 5;
|
||||
|
||||
|
||||
/**
|
||||
@ -36,9 +34,9 @@ abstract class SqlArray extends DriverArray
|
||||
*
|
||||
* @param SqlArray::STATEMENT_* $type
|
||||
*
|
||||
* @return Promise
|
||||
* @return string
|
||||
*/
|
||||
abstract protected function prepareStatements(int $type): Promise;
|
||||
abstract protected function getSqlQuery(int $type): string;
|
||||
|
||||
/**
|
||||
* Get value from row.
|
||||
@ -52,10 +50,7 @@ abstract class SqlArray extends DriverArray
|
||||
public function getIterator(): Producer
|
||||
{
|
||||
return new Producer(function (callable $emit) {
|
||||
if (!isset($this->statements[self::STATEMENT_ITERATE])) {
|
||||
$this->statements[self::STATEMENT_ITERATE] = yield $this->prepareStatements(self::STATEMENT_ITERATE);
|
||||
}
|
||||
$request = yield $this->statements[self::STATEMENT_ITERATE]->execute();
|
||||
$request = yield from $this->executeRaw($this->getSqlQuery(self::SQL_ITERATE));
|
||||
|
||||
while (yield $request->advance()) {
|
||||
$row = $request->getCurrent();
|
||||
@ -106,7 +101,7 @@ abstract class SqlArray extends DriverArray
|
||||
$this->unsetCache($index);
|
||||
|
||||
return $this->execute(
|
||||
self::STATEMENT_UNSET,
|
||||
$this->getSqlQuery(self::SQL_UNSET),
|
||||
['index' => $index]
|
||||
);
|
||||
}
|
||||
@ -122,7 +117,7 @@ abstract class SqlArray extends DriverArray
|
||||
public function count(): Promise
|
||||
{
|
||||
return call(function () {
|
||||
$row = yield $this->execute(self::STATEMENT_COUNT);
|
||||
$row = yield $this->execute($this->getSqlQuery(self::SQL_COUNT));
|
||||
return $row[0]['count'] ?? 0;
|
||||
});
|
||||
}
|
||||
@ -134,7 +129,7 @@ abstract class SqlArray extends DriverArray
|
||||
*/
|
||||
public function clear(): Promise
|
||||
{
|
||||
return $this->execute(self::STATEMENT_CLEAR);
|
||||
return $this->execute($this->getSqlQuery(self::SQL_CLEAR));
|
||||
}
|
||||
|
||||
public function offsetGet($offset): Promise
|
||||
@ -144,7 +139,7 @@ abstract class SqlArray extends DriverArray
|
||||
return $cached;
|
||||
}
|
||||
|
||||
$row = yield $this->execute(self::STATEMENT_GET, ['index' => $offset]);
|
||||
$row = yield $this->execute($this->getSqlQuery(self::SQL_GET), ['index' => $offset]);
|
||||
|
||||
if ($value = $this->getValue($row)) {
|
||||
$this->setCache($offset, $value);
|
||||
@ -176,7 +171,7 @@ abstract class SqlArray extends DriverArray
|
||||
$this->setCache($index, $value);
|
||||
|
||||
$request = $this->execute(
|
||||
self::STATEMENT_SET,
|
||||
$this->getSqlQuery(self::SQL_SET),
|
||||
[
|
||||
'index' => $index,
|
||||
'value' => \serialize($value),
|
||||
@ -192,34 +187,18 @@ abstract class SqlArray extends DriverArray
|
||||
/**
|
||||
* Perform async request to db.
|
||||
*
|
||||
* @param int $stmt
|
||||
* @param string $sql
|
||||
* @param array $params
|
||||
*
|
||||
* @psalm-param self::STATEMENT_* $stmt
|
||||
*
|
||||
* @return Promise
|
||||
* @return Promise<array>
|
||||
* @throws \Throwable
|
||||
*/
|
||||
protected function execute(int $stmt, array $params = []): Promise
|
||||
protected function execute(string $sql, array $params = []): Promise
|
||||
{
|
||||
return call(function () use ($stmt, $params) {
|
||||
if (
|
||||
!empty($params['index'])
|
||||
&& !\mb_check_encoding($params['index'], 'UTF-8')
|
||||
) {
|
||||
$params['index'] = \mb_convert_encoding($params['index'], 'UTF-8');
|
||||
}
|
||||
|
||||
if (!isset($this->statements[$stmt])) {
|
||||
$this->statements[$stmt] = yield $this->prepareStatements($stmt);
|
||||
}
|
||||
try {
|
||||
$request = yield $this->statements[$stmt]->execute($params);
|
||||
} catch (\Throwable $e) {
|
||||
Logger::log($e->getMessage(), Logger::ERROR);
|
||||
return [];
|
||||
}
|
||||
|
||||
return call(function () use ($sql, $params) {
|
||||
$request = yield from $this->executeRaw($sql, $params);
|
||||
$result = [];
|
||||
if ($request instanceof ResultSet) {
|
||||
while (yield $request->advance()) {
|
||||
@ -229,4 +208,37 @@ abstract class SqlArray extends DriverArray
|
||||
return $result;
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Return raw query result.
|
||||
*
|
||||
* @param string $sql
|
||||
* @param array $params
|
||||
*
|
||||
* @return \Generator<CommandResult|ResultSet>
|
||||
*/
|
||||
protected function executeRaw(string $sql, array $params = []): \Generator
|
||||
{
|
||||
if (
|
||||
!empty($params['index'])
|
||||
&& !\mb_check_encoding($params['index'], 'UTF-8')
|
||||
) {
|
||||
$params['index'] = \mb_convert_encoding($params['index'], 'UTF-8');
|
||||
}
|
||||
|
||||
try {
|
||||
foreach ($params as $key => $value) {
|
||||
$value = $this->pdo->quote($value);
|
||||
$sql = str_replace(":$key", $value, $sql);
|
||||
}
|
||||
|
||||
$request = yield $this->db->query($sql);
|
||||
} catch (\Throwable $e) {
|
||||
Logger::log($e->getMessage(), Logger::ERROR);
|
||||
return [];
|
||||
}
|
||||
|
||||
return $request;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -20,6 +20,7 @@
|
||||
namespace danog\MadelineProto\MTProtoTools;
|
||||
|
||||
use Amp\Promise;
|
||||
use danog\Loop\Loop;
|
||||
use danog\MadelineProto\Db\DbArray;
|
||||
use danog\MadelineProto\Db\DbPropertiesTrait;
|
||||
use danog\MadelineProto\MTProto;
|
||||
@ -88,13 +89,16 @@ class MinDatabase implements TLCallback
|
||||
$this->clean = true;
|
||||
return;
|
||||
}
|
||||
$iterator = $this->db->getIterator();
|
||||
while (yield $iterator->advance()) {
|
||||
[$id, $origin] = $iterator->getCurrent();
|
||||
if (!isset($origin['peer']) || $origin['peer'] === $id) {
|
||||
$this->db->offsetUnset($id);
|
||||
\Amp\Loop::defer(function() {
|
||||
$iterator = $this->db->getIterator();
|
||||
while (yield $iterator->advance()) {
|
||||
[$id, $origin] = $iterator->getCurrent();
|
||||
if (!isset($origin['peer']) || $origin['peer'] === $id) {
|
||||
$this->db->offsetUnset($id);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
public function getMethodCallbacks(): array
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user