This commit is contained in:
Daniil Gentili 2024-03-17 20:50:54 +01:00
parent ad9463728e
commit f16daf9c1c
9 changed files with 175 additions and 260 deletions

View File

@ -21,10 +21,7 @@ namespace danog\AsyncOrm\Driver;
use danog\AsyncOrm\DbArray;
use danog\AsyncOrm\FieldConfig;
use danog\AsyncOrm\Serializer;
use danog\AsyncOrm\Serializer\Igbinary;
use danog\AsyncOrm\Serializer\Native;
use danog\AsyncOrm\Serializer\Passthrough;
use danog\AsyncOrm\ValueType;
use danog\AsyncOrm\Settings\DriverSettings;
use function Amp\async;
use function Amp\Future\await;
@ -52,13 +49,9 @@ abstract class DriverArray extends DbArray
) {
return $previous;
}
\assert($config->settings instanceof DriverSettings);
$instance = new static($config, match ($config->annotation->valueType) {
ValueType::BEST => \extension_loaded('igbinary') ? new Igbinary : new Native,
ValueType::IGBINARY => new Igbinary,
ValueType::SERIALIZE => new Native,
default => new Passthrough
});
$instance = new static($config, $config->settings->serializer);
if ($previous === null) {
return $instance;

View File

@ -18,14 +18,18 @@
namespace danog\AsyncOrm\Internal\Driver;
use Amp\Mysql\MysqlConnectionPool;
use Amp\Sql\SqlResult;
use Amp\Sync\LocalKeyedMutex;
use AssertionError;
use danog\AsyncOrm\Driver\Mysql;
use danog\AsyncOrm\Driver\SqlArray;
use danog\AsyncOrm\Exception;
use danog\AsyncOrm\Logger;
use danog\AsyncOrm\Settings\Database\Mysql as DatabaseMysql;
use danog\AsyncOrm\FieldConfig;
use danog\AsyncOrm\KeyType;
use danog\AsyncOrm\Serializer;
use danog\AsyncOrm\ValueType;
use PDO;
use Revolt\EventLoop;
use Webmozart\Assert\Assert;
/**
@ -39,52 +43,143 @@ use Webmozart\Assert\Assert;
*/
final class MysqlArray extends SqlArray
{
/** @var array<list{MysqlConnectionPool, \PDO}> */
private static array $connections = [];
private static ?LocalKeyedMutex $mutex = null;
// We're forced to use quoting (just like PDO does internally when using prepares) because native MySQL prepares are extremely slow.
protected PDO $pdo;
/**
* Initialize on startup.
*/
public function initStartup(): void
public function __construct(FieldConfig $config, Serializer $serializer)
{
$this->setTable($this->table);
$this->initConnection($this->dbSettings);
$settings = $config->settings;
\assert($settings instanceof \danog\AsyncOrm\Settings\Mysql);
self::$mutex ??= new LocalKeyedMutex;
$dbKey = $settings->getDbIdentifier();
$lock = self::$mutex->acquire($dbKey);
try {
if (!isset(self::$connections[$dbKey])) {
$db = $settings->config->getDatabase();
$connection = new MysqlConnectionPool($settings->config->withDatabase(null));
$connection->query("
CREATE DATABASE IF NOT EXISTS `{$db}`
CHARACTER SET 'utf8mb4'
COLLATE 'utf8mb4_general_ci'
");
try {
$max = (int) $connection->query("SHOW VARIABLES LIKE 'max_connections'")->fetchRow()['Value'];
if ($max < 100000) {
$connection->query("SET GLOBAL max_connections = 100000");
}
} catch (\Throwable) {
}
$connection->close();
$host = $settings->config->getHost();
$port = $settings->config->getPort();
if (!\extension_loaded('pdo_mysql')) {
throw new AssertionError("PDO is needed for the mysql backend!");
}
/**
* Prepare statements.
*
* @param SqlArray::SQL_* $type
*/
protected function getSqlQuery(int $type): string
{
switch ($type) {
case SqlArray::SQL_GET:
return "SELECT `value` FROM `{$this->table}` WHERE `key` = :index LIMIT 1";
case SqlArray::SQL_SET:
return "
REPLACE INTO `{$this->table}`
SET `key` = :index, `value` = :value
";
case SqlArray::SQL_UNSET:
return "
DELETE FROM `{$this->table}`
WHERE `key` = :index
";
case SqlArray::SQL_COUNT:
return "
SELECT count(`key`) as `count` FROM `{$this->table}`
";
case SqlArray::SQL_ITERATE:
return "
SELECT `key`, `value` FROM `{$this->table}`
";
case SqlArray::SQL_CLEAR:
return "
DELETE FROM `{$this->table}`
";
$pdo = new PDO(
$host[0] === '/'
? "mysql:unix_socket={$host};charset=UTF8"
: "mysql:host={$host};port={$port};charset=UTF8",
$settings->config->getUser(),
$settings->config->getPassword(),
);
self::$connections[$dbKey] = [
new MysqlConnectionPool($settings->config, $settings->maxConnections, $settings->idleTimeout),
$pdo,
];
}
} finally {
EventLoop::queue($lock->release(...));
}
[$db, $pdo] = self::$connections[$dbKey];
$this->pdo = $pdo;
parent::__construct(
$config,
$serializer,
$db,
"SELECT `value` FROM `{$config->table}` WHERE `key` = :index LIMIT 1",
"
REPLACE INTO `{$config->table}`
SET `key` = :index, `value` = :value
",
"
DELETE FROM `{$config->table}`
WHERE `key` = :index
",
"
SELECT count(`key`) as `count` FROM `{$config->table}`
",
"
SELECT `key`, `value` FROM `{$config->table}`
",
"
DELETE FROM `{$config->table}`
"
);
$keyType = match ($config->annotation->keyType) {
KeyType::STRING_OR_INT => "VARCHAR(255)",
KeyType::STRING => "VARCHAR(255)",
KeyType::INT => "BIGINT",
};
$valueType = match ($config->annotation->valueType) {
ValueType::INT => "BIGINT",
ValueType::STRING => "VARCHAR(255)",
default => "MEDIUMBLOB",
};
$this->db->query("
CREATE TABLE IF NOT EXISTS `{$config->table}`
(
`key` $keyType PRIMARY KEY NOT NULL,
`value` $valueType NOT NULL,
)
ENGINE = InnoDB
CHARACTER SET 'utf8mb4'
COLLATE 'utf8mb4_general_ci'
");
$result = $this->db->query("DESCRIBE `{$config->table}`");
while ($column = $result->fetchRow()) {
['Field' => $key, 'Type' => $type, 'Null' => $null] = $column;
$type = \strtoupper($type);
if (\str_starts_with($type, 'BIGINT')) {
$type = 'BIGINT';
}
if ($key === 'key') {
$expected = $keyType;
} elseif ($key === 'value') {
$expected = $valueType;
} else {
$this->db->query("ALTER TABLE `{$config->table}` DROP `$key`");
}
if ($expected !== $type || $null !== 'NO') {
$this->db->query("ALTER TABLE `{$config->table}` MODIFY `$key` $expected NOT NULL");
}
}
if ($settings->optimizeIfWastedGtMb !== null) {
$database = $settings->config->getDatabase();
$result = $this->db->prepare("SELECT data_free FROM information_schema.tables WHERE table_schema=? AND table_name=?")
->execute([$database, $config->table])
->fetchRow();
Assert::notNull($result);
$result = $result['data_free'] ?? $result['DATA_FREE'];
if (($result >> 20) > $settings->optimizeIfWastedGtMb) {
$this->db->query("OPTIMIZE TABLE `{$config->table}`");
}
}
throw new Exception("An invalid statement type $type was provided!");
}
/**
@ -101,177 +196,6 @@ final class MysqlArray extends SqlArray
return $this->db->query($sql);
}
/** @var array<list{MysqlConnectionPool, \PDO}> */
private static array $connections = [];
private static ?LocalKeyedMutex $mutex = null;
/** @return list{MysqlConnectionPool, \PDO} */
public static function getConnection(DatabaseMysql $settings): array
{
self::$mutex ??= new LocalKeyedMutex;
$dbKey = $settings->getDbIdentifier();
$lock = self::$mutex->acquire($dbKey);
try {
if (!isset(self::$connections[$dbKey])) {
$host = \str_replace(['tcp://', 'unix://'], '', $settings->getUri());
if ($host[0] === '/') {
$port = 0;
} else {
$host = \explode(':', $host, 2);
if (\count($host) === 2) {
[$host, $port] = $host;
} else {
$host = $host[0];
$port = MysqlConfig::DEFAULT_PORT;
}
}
$config = new MysqlConfig(
host: $host,
port: (int) $port,
user: $settings->getUsername(),
password: $settings->getPassword(),
database: $settings->getDatabase()
);
self::createDb($config);
$host = $config->getHost();
$port = $config->getPort();
if (!\extension_loaded('pdo_mysql')) {
throw Exception::extension('pdo_mysql');
}
try {
$pdo = new PDO(
$host[0] === '/'
? "mysql:unix_socket={$host};charset=UTF8"
: "mysql:host={$host};port={$port};charset=UTF8",
$settings->getUsername(),
$settings->getPassword(),
);
} catch (PDOException $e) {
$config = $config->withPassword(null);
try {
$pdo = new PDO(
$host[0] === '/'
? "mysql:unix_socket={$host};charset=UTF8"
: "mysql:host={$host};port={$port};charset=UTF8",
$settings->getUsername(),
);
} catch (\Throwable) {
throw $e;
}
}
self::$connections[$dbKey] = [
new MysqlConnectionPool($config, $settings->getMaxConnections(), $settings->getIdleTimeout()),
$pdo,
];
}
} finally {
EventLoop::queue($lock->release(...));
}
return self::$connections[$dbKey];
}
private static function createDb(MysqlConfig $config): void
{
try {
$db = $config->getDatabase();
$connection = new MysqlConnectionPool($config->withDatabase(null));
$connection->query("
CREATE DATABASE IF NOT EXISTS `{$db}`
CHARACTER SET 'utf8mb4'
COLLATE 'utf8mb4_general_ci'
");
try {
$max = (int) $connection->query("SHOW VARIABLES LIKE 'max_connections'")->fetchRow()['Value'];
if ($max < 100000) {
$connection->query("SET GLOBAL max_connections = 100000");
}
} catch (Throwable) {
}
$connection->close();
} catch (Throwable $e) {
Logger::log("An error occurred while trying to create the database: ".$e->getMessage(), Logger::ERROR);
}
}
/**
* Initialize connection.
*/
public function initConnection(DatabaseMysql $settings): void
{
if (isset($this->db)) {
return;
}
[$this->db, $this->pdo] = self::getConnection($settings);
}
/**
* Create table for property.
*/
protected function prepareTable(): void
{
//Logger::log("Creating/checking table {$this->table}", Logger::WARNING);
\assert($this->dbSettings instanceof DatabaseMysql);
$keyType = $this->dbSettings->keyType;
$valueType = $this->dbSettings->valueType;
$this->db->query("
CREATE TABLE IF NOT EXISTS mgmt
(
`tableName` VARCHAR(255) NOT NULL,
`versionInfo` LONGBLOB NOT NULL,
PRIMARY KEY (`tableName`)
)
ENGINE = InnoDB
CHARACTER SET 'utf8mb4'
COLLATE 'utf8mb4_general_ci'
");
$version = $this->db->prepare("SELECT version FROM mgmt WHERE tableName=?")->execute([$this->table])->fetchRow()['versionInfo'] ?? null;
if ($version === null) {
$this->db->query("
CREATE TABLE IF NOT EXISTS `{$this->table}`
(
`key` $keyType NOT NULL,
`value` LONGBLOB NOT NULL,
PRIMARY KEY (`key`)
)
ENGINE = InnoDB
CHARACTER SET 'utf8mb4'
COLLATE 'utf8mb4_general_ci'
");
}
if ($version < 1) {
}
if ($version < 2) {
$this->db->query("ALTER TABLE `{$this->table}` MODIFY `key` BIGINT");
$this->db->query("ALTER TABLE `{$this->table}` DROP `ts`");
}
$this->db->prepare("REPLACE INTO mgmt SET version=? WHERE tableName=?")->execute([self::V, $this->table]);
if ($this->dbSettings->getOptimizeIfWastedGtMb() !== null) {
try {
$database = $this->dbSettings->getDatabase();
$result = $this->db->prepare("SELECT data_free FROM information_schema.tables WHERE table_schema=? AND table_name=?")
->execute([$database, $this->table])
->fetchRow();
Assert::notNull($result);
$result = $result['data_free'] ?? $result['DATA_FREE'];
if (($result >> 20) > $this->dbSettings->getOptimizeIfWastedGtMb()) {
$this->db->query("OPTIMIZE TABLE `{$this->table}`");
}
} catch (\Throwable $e) {
Logger::log("An error occurred while optimizing the table: $e", Logger::ERROR);
}
}
}
protected function importFromTable(string $fromTable): void
{
$this->db->query("

View File

@ -18,7 +18,6 @@
namespace danog\AsyncOrm\Internal\Driver;
use Amp\Postgres\PostgresConfig;
use Amp\Postgres\PostgresConnectionPool;
use Amp\Sync\LocalKeyedMutex;
use danog\AsyncOrm\Driver\SqlArray;
@ -55,29 +54,9 @@ class PostgresArray extends SqlArray
try {
if (!isset(self::$connections[$dbKey])) {
$host = \str_replace(['tcp://', 'unix://'], '', $settings->uri);
if ($host[0] === '/') {
$port = 0;
} else {
$host = \explode(':', $host, 2);
if (\count($host) === 2) {
[$host, $port] = $host;
} else {
$host = $host[0];
$port = PostgresConfig::DEFAULT_PORT;
}
}
$config = new PostgresConfig(
host: $host,
port: (int) $port,
user: $settings->username,
password: $settings->password,
database: $settings->database
);
$db = $config->getDatabase();
$user = $config->getUser();
$connection = new PostgresConnectionPool($config->withDatabase(null));
$db = $settings->config->getDatabase();
$user = $settings->config->getUser();
$connection = new PostgresConnectionPool($settings->config->withDatabase(null));
$result = $connection->query("SELECT * FROM pg_database WHERE datname = '{$db}'");
@ -91,7 +70,7 @@ class PostgresArray extends SqlArray
}
$connection->close();
self::$connections[$dbKey] = new PostgresConnectionPool($config, $settings->maxConnections, $settings->idleTimeoutgetIdleTimeout());
self::$connections[$dbKey] = new PostgresConnectionPool($settings->config, $settings->maxConnections, $settings->idleTimeoutgetIdleTimeout());
}
} finally {
EventLoop::queue($lock->release(...));
@ -122,6 +101,25 @@ class PostgresArray extends SqlArray
);
");
$result = $this->db->query("DESCRIBE \"bytea_{$config->table}\"");
while ($column = $result->fetchRow()) {
['Field' => $key, 'Type' => $type, 'Null' => $null] = $column;
$type = \strtoupper($type);
if (\str_starts_with($type, 'BIGINT')) {
$type = 'BIGINT';
}
if ($key === 'key') {
$expected = $keyType;
} elseif ($key === 'value') {
$expected = $valueType;
} else {
$this->db->query("ALTER TABLE \"bytea_{$config->table}\" DROP \"$key\"");
}
if ($expected !== $type || $null !== 'NO') {
$this->db->query("ALTER TABLE \"bytea_{$config->table}\" MODIFY \"$key\" $expected NOT NULL");
}
}
parent::__construct(
$config,
$serializer,

View File

@ -20,8 +20,6 @@ namespace danog\AsyncOrm\Internal\Driver;
use Amp\Redis\Connection\ReconnectingRedisLink;
use Amp\Redis\RedisClient;
use Amp\Redis\RedisConfig;
use Amp\Serialization\PassthroughSerializer;
use Amp\Sync\LocalKeyedMutex;
use danog\AsyncOrm\Driver\DriverArray;
use danog\AsyncOrm\FieldConfig;
@ -62,14 +60,11 @@ final class RedisArray extends DriverArray
\assert($config->settings instanceof Redis);
$dbKey = $config->settings->getDbIdentifier();
$lock = self::$mutex->acquire($dbKey);
\assert($config->settings instanceof Redis);
try {
if (!isset(self::$connections[$dbKey])) {
$config = RedisConfig::fromUri($config->settings->uri)
->withPassword($config->settings->password)
->withDatabase($config->settings->database);
self::$connections[$dbKey] = new RedisClient(new ReconnectingRedisLink(createRedisConnector($config)));
self::$connections[$dbKey] = new RedisClient(new ReconnectingRedisLink(createRedisConnector($config->settings->config)));
self::$connections[$dbKey]->ping();
}
} finally {

View File

@ -26,8 +26,7 @@ final class ByteaSerializer implements Serializer
{
public function __construct(
private readonly Serializer $inner
)
{
) {
}
public function serialize(mixed $value): mixed
{

View File

@ -24,6 +24,8 @@ use danog\AsyncOrm\Serializer;
* MySQL backend settings.
*
* MariaDb 10.2+ or Mysql 5.6+ required.
*
* @extends SqlSettings<MysqlConfig>
*/
final readonly class Mysql extends SqlSettings
{

View File

@ -22,6 +22,7 @@ use danog\AsyncOrm\Internal\Driver\PostgresArray;
/**
* Postgres backend settings.
* @extends SqlSettings<PostgresConfig>
*/
final readonly class Postgres extends SqlSettings
{

View File

@ -21,6 +21,8 @@ use danog\AsyncOrm\Serializer;
/**
* Generic SQL db backend settings.
*
* @template T as SqlConfig
*/
abstract readonly class SqlSettings extends DriverSettings
{
@ -58,6 +60,7 @@ abstract readonly class SqlSettings extends DriverSettings
* @param int<1, max> $idleTimeout Idle timeout
*/
public function __construct(
/** @var T */
public readonly SqlConfig $config,
Serializer $serializer,
int $cacheTtl = self::DEFAULT_CACHE_TTL,