From f16daf9c1c0858ae755b2adcffd8d480dbbe9c5d Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Sun, 17 Mar 2024 20:50:54 +0100 Subject: [PATCH] Cleanup --- src/Driver/DriverArray.php | 13 +- src/Internal/Driver/MysqlArray.php | 352 ++++++++++---------------- src/Internal/Driver/PostgresArray.php | 48 ++-- src/Internal/Driver/RedisArray.php | 9 +- src/Serializer/ByteaSerializer.php | 3 +- src/Settings/Mysql.php | 4 +- src/Settings/Postgres.php | 1 + src/Settings/SqlSettings.php | 3 + src/ValueType.php | 2 +- 9 files changed, 175 insertions(+), 260 deletions(-) diff --git a/src/Driver/DriverArray.php b/src/Driver/DriverArray.php index 64c4ca5..80d13c6 100644 --- a/src/Driver/DriverArray.php +++ b/src/Driver/DriverArray.php @@ -21,10 +21,7 @@ namespace danog\AsyncOrm\Driver; use danog\AsyncOrm\DbArray; use danog\AsyncOrm\FieldConfig; use danog\AsyncOrm\Serializer; -use danog\AsyncOrm\Serializer\Igbinary; -use danog\AsyncOrm\Serializer\Native; -use danog\AsyncOrm\Serializer\Passthrough; -use danog\AsyncOrm\ValueType; +use danog\AsyncOrm\Settings\DriverSettings; use function Amp\async; use function Amp\Future\await; @@ -52,13 +49,9 @@ abstract class DriverArray extends DbArray ) { return $previous; } + \assert($config->settings instanceof DriverSettings); - $instance = new static($config, match ($config->annotation->valueType) { - ValueType::BEST => \extension_loaded('igbinary') ? new Igbinary : new Native, - ValueType::IGBINARY => new Igbinary, - ValueType::SERIALIZE => new Native, - default => new Passthrough - }); + $instance = new static($config, $config->settings->serializer); if ($previous === null) { return $instance; diff --git a/src/Internal/Driver/MysqlArray.php b/src/Internal/Driver/MysqlArray.php index af2388e..e9cc668 100644 --- a/src/Internal/Driver/MysqlArray.php +++ b/src/Internal/Driver/MysqlArray.php @@ -18,14 +18,18 @@ namespace danog\AsyncOrm\Internal\Driver; +use Amp\Mysql\MysqlConnectionPool; use Amp\Sql\SqlResult; use Amp\Sync\LocalKeyedMutex; +use AssertionError; use danog\AsyncOrm\Driver\Mysql; use danog\AsyncOrm\Driver\SqlArray; -use danog\AsyncOrm\Exception; -use danog\AsyncOrm\Logger; -use danog\AsyncOrm\Settings\Database\Mysql as DatabaseMysql; +use danog\AsyncOrm\FieldConfig; +use danog\AsyncOrm\KeyType; +use danog\AsyncOrm\Serializer; +use danog\AsyncOrm\ValueType; use PDO; +use Revolt\EventLoop; use Webmozart\Assert\Assert; /** @@ -39,52 +43,143 @@ use Webmozart\Assert\Assert; */ final class MysqlArray extends SqlArray { + /** @var array */ + private static array $connections = []; + + private static ?LocalKeyedMutex $mutex = null; + // We're forced to use quoting (just like PDO does internally when using prepares) because native MySQL prepares are extremely slow. protected PDO $pdo; - /** - * Initialize on startup. - */ - public function initStartup(): void + public function __construct(FieldConfig $config, Serializer $serializer) { - $this->setTable($this->table); - $this->initConnection($this->dbSettings); - } + $settings = $config->settings; + \assert($settings instanceof \danog\AsyncOrm\Settings\Mysql); - /** - * Prepare statements. - * - * @param SqlArray::SQL_* $type - */ - protected function getSqlQuery(int $type): string - { - switch ($type) { - case SqlArray::SQL_GET: - return "SELECT `value` FROM `{$this->table}` WHERE `key` = :index LIMIT 1"; - case SqlArray::SQL_SET: - return " - REPLACE INTO `{$this->table}` - SET `key` = :index, `value` = :value - "; - case SqlArray::SQL_UNSET: - return " - DELETE FROM `{$this->table}` - WHERE `key` = :index - "; - case SqlArray::SQL_COUNT: - return " - SELECT count(`key`) as `count` FROM `{$this->table}` - "; - case SqlArray::SQL_ITERATE: - return " - SELECT `key`, `value` FROM `{$this->table}` - "; - case SqlArray::SQL_CLEAR: - return " - DELETE FROM `{$this->table}` - "; + self::$mutex ??= new LocalKeyedMutex; + $dbKey = $settings->getDbIdentifier(); + $lock = self::$mutex->acquire($dbKey); + + try { + if (!isset(self::$connections[$dbKey])) { + $db = $settings->config->getDatabase(); + $connection = new MysqlConnectionPool($settings->config->withDatabase(null)); + $connection->query(" + CREATE DATABASE IF NOT EXISTS `{$db}` + CHARACTER SET 'utf8mb4' + COLLATE 'utf8mb4_general_ci' + "); + try { + $max = (int) $connection->query("SHOW VARIABLES LIKE 'max_connections'")->fetchRow()['Value']; + if ($max < 100000) { + $connection->query("SET GLOBAL max_connections = 100000"); + } + } catch (\Throwable) { + } + $connection->close(); + + $host = $settings->config->getHost(); + $port = $settings->config->getPort(); + if (!\extension_loaded('pdo_mysql')) { + throw new AssertionError("PDO is needed for the mysql backend!"); + } + + $pdo = new PDO( + $host[0] === '/' + ? "mysql:unix_socket={$host};charset=UTF8" + : "mysql:host={$host};port={$port};charset=UTF8", + $settings->config->getUser(), + $settings->config->getPassword(), + ); + + self::$connections[$dbKey] = [ + new MysqlConnectionPool($settings->config, $settings->maxConnections, $settings->idleTimeout), + $pdo, + ]; + } + } finally { + EventLoop::queue($lock->release(...)); + } + + [$db, $pdo] = self::$connections[$dbKey]; + $this->pdo = $pdo; + + parent::__construct( + $config, + $serializer, + $db, + "SELECT `value` FROM `{$config->table}` WHERE `key` = :index LIMIT 1", + " + REPLACE INTO `{$config->table}` + SET `key` = :index, `value` = :value + ", + " + DELETE FROM `{$config->table}` + WHERE `key` = :index + ", + " + SELECT count(`key`) as `count` FROM `{$config->table}` + ", + " + SELECT `key`, `value` FROM `{$config->table}` + ", + " + DELETE FROM `{$config->table}` + " + ); + + $keyType = match ($config->annotation->keyType) { + KeyType::STRING_OR_INT => "VARCHAR(255)", + KeyType::STRING => "VARCHAR(255)", + KeyType::INT => "BIGINT", + }; + $valueType = match ($config->annotation->valueType) { + ValueType::INT => "BIGINT", + ValueType::STRING => "VARCHAR(255)", + default => "MEDIUMBLOB", + }; + + $this->db->query(" + CREATE TABLE IF NOT EXISTS `{$config->table}` + ( + `key` $keyType PRIMARY KEY NOT NULL, + `value` $valueType NOT NULL, + ) + ENGINE = InnoDB + CHARACTER SET 'utf8mb4' + COLLATE 'utf8mb4_general_ci' + "); + + $result = $this->db->query("DESCRIBE `{$config->table}`"); + while ($column = $result->fetchRow()) { + ['Field' => $key, 'Type' => $type, 'Null' => $null] = $column; + $type = \strtoupper($type); + if (\str_starts_with($type, 'BIGINT')) { + $type = 'BIGINT'; + } + if ($key === 'key') { + $expected = $keyType; + } elseif ($key === 'value') { + $expected = $valueType; + } else { + $this->db->query("ALTER TABLE `{$config->table}` DROP `$key`"); + } + if ($expected !== $type || $null !== 'NO') { + $this->db->query("ALTER TABLE `{$config->table}` MODIFY `$key` $expected NOT NULL"); + } + } + + if ($settings->optimizeIfWastedGtMb !== null) { + $database = $settings->config->getDatabase(); + $result = $this->db->prepare("SELECT data_free FROM information_schema.tables WHERE table_schema=? AND table_name=?") + ->execute([$database, $config->table]) + ->fetchRow(); + Assert::notNull($result); + $result = $result['data_free'] ?? $result['DATA_FREE']; + if (($result >> 20) > $settings->optimizeIfWastedGtMb) { + $this->db->query("OPTIMIZE TABLE `{$config->table}`"); + } } - throw new Exception("An invalid statement type $type was provided!"); } /** @@ -101,177 +196,6 @@ final class MysqlArray extends SqlArray return $this->db->query($sql); } - /** @var array */ - private static array $connections = []; - - private static ?LocalKeyedMutex $mutex = null; - /** @return list{MysqlConnectionPool, \PDO} */ - public static function getConnection(DatabaseMysql $settings): array - { - self::$mutex ??= new LocalKeyedMutex; - $dbKey = $settings->getDbIdentifier(); - $lock = self::$mutex->acquire($dbKey); - - try { - if (!isset(self::$connections[$dbKey])) { - $host = \str_replace(['tcp://', 'unix://'], '', $settings->getUri()); - if ($host[0] === '/') { - $port = 0; - } else { - $host = \explode(':', $host, 2); - if (\count($host) === 2) { - [$host, $port] = $host; - } else { - $host = $host[0]; - $port = MysqlConfig::DEFAULT_PORT; - } - } - $config = new MysqlConfig( - host: $host, - port: (int) $port, - user: $settings->getUsername(), - password: $settings->getPassword(), - database: $settings->getDatabase() - ); - - self::createDb($config); - - $host = $config->getHost(); - $port = $config->getPort(); - if (!\extension_loaded('pdo_mysql')) { - throw Exception::extension('pdo_mysql'); - } - - try { - $pdo = new PDO( - $host[0] === '/' - ? "mysql:unix_socket={$host};charset=UTF8" - : "mysql:host={$host};port={$port};charset=UTF8", - $settings->getUsername(), - $settings->getPassword(), - ); - } catch (PDOException $e) { - $config = $config->withPassword(null); - try { - $pdo = new PDO( - $host[0] === '/' - ? "mysql:unix_socket={$host};charset=UTF8" - : "mysql:host={$host};port={$port};charset=UTF8", - $settings->getUsername(), - ); - } catch (\Throwable) { - throw $e; - } - } - - self::$connections[$dbKey] = [ - new MysqlConnectionPool($config, $settings->getMaxConnections(), $settings->getIdleTimeout()), - $pdo, - ]; - } - } finally { - EventLoop::queue($lock->release(...)); - } - - return self::$connections[$dbKey]; - } - - private static function createDb(MysqlConfig $config): void - { - try { - $db = $config->getDatabase(); - $connection = new MysqlConnectionPool($config->withDatabase(null)); - $connection->query(" - CREATE DATABASE IF NOT EXISTS `{$db}` - CHARACTER SET 'utf8mb4' - COLLATE 'utf8mb4_general_ci' - "); - try { - $max = (int) $connection->query("SHOW VARIABLES LIKE 'max_connections'")->fetchRow()['Value']; - if ($max < 100000) { - $connection->query("SET GLOBAL max_connections = 100000"); - } - } catch (Throwable) { - } - $connection->close(); - } catch (Throwable $e) { - Logger::log("An error occurred while trying to create the database: ".$e->getMessage(), Logger::ERROR); - } - } - - /** - * Initialize connection. - */ - public function initConnection(DatabaseMysql $settings): void - { - if (isset($this->db)) { - return; - } - [$this->db, $this->pdo] = self::getConnection($settings); - } - - /** - * Create table for property. - */ - protected function prepareTable(): void - { - //Logger::log("Creating/checking table {$this->table}", Logger::WARNING); - \assert($this->dbSettings instanceof DatabaseMysql); - $keyType = $this->dbSettings->keyType; - $valueType = $this->dbSettings->valueType; - - $this->db->query(" - CREATE TABLE IF NOT EXISTS mgmt - ( - `tableName` VARCHAR(255) NOT NULL, - `versionInfo` LONGBLOB NOT NULL, - PRIMARY KEY (`tableName`) - ) - ENGINE = InnoDB - CHARACTER SET 'utf8mb4' - COLLATE 'utf8mb4_general_ci' - "); - - $version = $this->db->prepare("SELECT version FROM mgmt WHERE tableName=?")->execute([$this->table])->fetchRow()['versionInfo'] ?? null; - - if ($version === null) { - $this->db->query(" - CREATE TABLE IF NOT EXISTS `{$this->table}` - ( - `key` $keyType NOT NULL, - `value` LONGBLOB NOT NULL, - PRIMARY KEY (`key`) - ) - ENGINE = InnoDB - CHARACTER SET 'utf8mb4' - COLLATE 'utf8mb4_general_ci' - "); - } - if ($version < 1) { - } - if ($version < 2) { - $this->db->query("ALTER TABLE `{$this->table}` MODIFY `key` BIGINT"); - $this->db->query("ALTER TABLE `{$this->table}` DROP `ts`"); - } - $this->db->prepare("REPLACE INTO mgmt SET version=? WHERE tableName=?")->execute([self::V, $this->table]); - - if ($this->dbSettings->getOptimizeIfWastedGtMb() !== null) { - try { - $database = $this->dbSettings->getDatabase(); - $result = $this->db->prepare("SELECT data_free FROM information_schema.tables WHERE table_schema=? AND table_name=?") - ->execute([$database, $this->table]) - ->fetchRow(); - Assert::notNull($result); - $result = $result['data_free'] ?? $result['DATA_FREE']; - if (($result >> 20) > $this->dbSettings->getOptimizeIfWastedGtMb()) { - $this->db->query("OPTIMIZE TABLE `{$this->table}`"); - } - } catch (\Throwable $e) { - Logger::log("An error occurred while optimizing the table: $e", Logger::ERROR); - } - } - } - protected function importFromTable(string $fromTable): void { $this->db->query(" diff --git a/src/Internal/Driver/PostgresArray.php b/src/Internal/Driver/PostgresArray.php index c0757c1..17b032e 100644 --- a/src/Internal/Driver/PostgresArray.php +++ b/src/Internal/Driver/PostgresArray.php @@ -18,7 +18,6 @@ namespace danog\AsyncOrm\Internal\Driver; -use Amp\Postgres\PostgresConfig; use Amp\Postgres\PostgresConnectionPool; use Amp\Sync\LocalKeyedMutex; use danog\AsyncOrm\Driver\SqlArray; @@ -55,29 +54,9 @@ class PostgresArray extends SqlArray try { if (!isset(self::$connections[$dbKey])) { - $host = \str_replace(['tcp://', 'unix://'], '', $settings->uri); - if ($host[0] === '/') { - $port = 0; - } else { - $host = \explode(':', $host, 2); - if (\count($host) === 2) { - [$host, $port] = $host; - } else { - $host = $host[0]; - $port = PostgresConfig::DEFAULT_PORT; - } - } - $config = new PostgresConfig( - host: $host, - port: (int) $port, - user: $settings->username, - password: $settings->password, - database: $settings->database - ); - - $db = $config->getDatabase(); - $user = $config->getUser(); - $connection = new PostgresConnectionPool($config->withDatabase(null)); + $db = $settings->config->getDatabase(); + $user = $settings->config->getUser(); + $connection = new PostgresConnectionPool($settings->config->withDatabase(null)); $result = $connection->query("SELECT * FROM pg_database WHERE datname = '{$db}'"); @@ -91,7 +70,7 @@ class PostgresArray extends SqlArray } $connection->close(); - self::$connections[$dbKey] = new PostgresConnectionPool($config, $settings->maxConnections, $settings->idleTimeoutgetIdleTimeout()); + self::$connections[$dbKey] = new PostgresConnectionPool($settings->config, $settings->maxConnections, $settings->idleTimeoutgetIdleTimeout()); } } finally { EventLoop::queue($lock->release(...)); @@ -122,6 +101,25 @@ class PostgresArray extends SqlArray ); "); + $result = $this->db->query("DESCRIBE \"bytea_{$config->table}\""); + while ($column = $result->fetchRow()) { + ['Field' => $key, 'Type' => $type, 'Null' => $null] = $column; + $type = \strtoupper($type); + if (\str_starts_with($type, 'BIGINT')) { + $type = 'BIGINT'; + } + if ($key === 'key') { + $expected = $keyType; + } elseif ($key === 'value') { + $expected = $valueType; + } else { + $this->db->query("ALTER TABLE \"bytea_{$config->table}\" DROP \"$key\""); + } + if ($expected !== $type || $null !== 'NO') { + $this->db->query("ALTER TABLE \"bytea_{$config->table}\" MODIFY \"$key\" $expected NOT NULL"); + } + } + parent::__construct( $config, $serializer, diff --git a/src/Internal/Driver/RedisArray.php b/src/Internal/Driver/RedisArray.php index 84999ac..22deca1 100644 --- a/src/Internal/Driver/RedisArray.php +++ b/src/Internal/Driver/RedisArray.php @@ -20,8 +20,6 @@ namespace danog\AsyncOrm\Internal\Driver; use Amp\Redis\Connection\ReconnectingRedisLink; use Amp\Redis\RedisClient; -use Amp\Redis\RedisConfig; -use Amp\Serialization\PassthroughSerializer; use Amp\Sync\LocalKeyedMutex; use danog\AsyncOrm\Driver\DriverArray; use danog\AsyncOrm\FieldConfig; @@ -62,14 +60,11 @@ final class RedisArray extends DriverArray \assert($config->settings instanceof Redis); $dbKey = $config->settings->getDbIdentifier(); $lock = self::$mutex->acquire($dbKey); + \assert($config->settings instanceof Redis); try { if (!isset(self::$connections[$dbKey])) { - $config = RedisConfig::fromUri($config->settings->uri) - ->withPassword($config->settings->password) - ->withDatabase($config->settings->database); - - self::$connections[$dbKey] = new RedisClient(new ReconnectingRedisLink(createRedisConnector($config))); + self::$connections[$dbKey] = new RedisClient(new ReconnectingRedisLink(createRedisConnector($config->settings->config))); self::$connections[$dbKey]->ping(); } } finally { diff --git a/src/Serializer/ByteaSerializer.php b/src/Serializer/ByteaSerializer.php index 5739a74..7e44871 100644 --- a/src/Serializer/ByteaSerializer.php +++ b/src/Serializer/ByteaSerializer.php @@ -26,8 +26,7 @@ final class ByteaSerializer implements Serializer { public function __construct( private readonly Serializer $inner - ) - { + ) { } public function serialize(mixed $value): mixed { diff --git a/src/Settings/Mysql.php b/src/Settings/Mysql.php index f9df81f..d9816f0 100644 --- a/src/Settings/Mysql.php +++ b/src/Settings/Mysql.php @@ -24,6 +24,8 @@ use danog\AsyncOrm\Serializer; * MySQL backend settings. * * MariaDb 10.2+ or Mysql 5.6+ required. + * + * @extends SqlSettings */ final readonly class Mysql extends SqlSettings { @@ -53,7 +55,7 @@ final readonly class Mysql extends SqlSettings ) { parent::__construct($config, $serializer, $cacheTtl, $maxConnections, $idleTimeout); } - + public function getDriverClass(): string { return MysqlArray::class; diff --git a/src/Settings/Postgres.php b/src/Settings/Postgres.php index 96c7ba1..5b9078d 100644 --- a/src/Settings/Postgres.php +++ b/src/Settings/Postgres.php @@ -22,6 +22,7 @@ use danog\AsyncOrm\Internal\Driver\PostgresArray; /** * Postgres backend settings. + * @extends SqlSettings */ final readonly class Postgres extends SqlSettings { diff --git a/src/Settings/SqlSettings.php b/src/Settings/SqlSettings.php index dae15bb..580886a 100644 --- a/src/Settings/SqlSettings.php +++ b/src/Settings/SqlSettings.php @@ -21,6 +21,8 @@ use danog\AsyncOrm\Serializer; /** * Generic SQL db backend settings. + * + * @template T as SqlConfig */ abstract readonly class SqlSettings extends DriverSettings { @@ -58,6 +60,7 @@ abstract readonly class SqlSettings extends DriverSettings * @param int<1, max> $idleTimeout Idle timeout */ public function __construct( + /** @var T */ public readonly SqlConfig $config, Serializer $serializer, int $cacheTtl = self::DEFAULT_CACHE_TTL, diff --git a/src/ValueType.php b/src/ValueType.php index e591936..6a9a482 100644 --- a/src/ValueType.php +++ b/src/ValueType.php @@ -35,7 +35,7 @@ enum ValueType: string case OBJECT = 'object'; /** * Values of any type, serialized as specified in the settings. - * + * * Using MIXED worsens performances, please use STRING, INT or OBJECT whenever possible. */ case MIXED = 'object';