. * * @author Daniil Gentili * @copyright 2016-2023 Daniil Gentili * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 * @link https://docs.madelineproto.xyz MadelineProto documentation */ namespace danog\MadelineProto; use Amp\CancelledException; use Amp\DeferredCancellation; use Amp\DeferredFuture; use Amp\Future; use Amp\Ipc\Sync\ChannelledSocket; use Amp\TimeoutException; use AssertionError; use danog\AsyncOrm\DbArrayBuilder; use danog\AsyncOrm\Driver\DriverArray; use danog\AsyncOrm\KeyType; use danog\AsyncOrm\ValueType; use danog\MadelineProto\Ipc\Server; use danog\MadelineProto\Settings\Database\DriverDatabaseAbstract; use Revolt\EventLoop; use Throwable; use const LOCK_EX; use function Amp\File\exists; use function Amp\Ipc\connect; /** * Manages serialization of the MadelineProto instance. * * @internal */ abstract class Serialization { /** * Unserialize session. * * Logic for deserialization is as follows. * - If the session is unlocked * - Try starting IPC server: * - Fetch light state * - If don't need event handler * - Unlock * - Fork * - Lock (fork) * - Deserialize full (fork) * - Start IPC server (fork) * - Store IPC state (fork) * - If need event handler * - If have event handler class * - Deserialize full * - Start IPC server * - Store IPC state * - Else Fallthrough * - Wait for a new IPC state for a maximum of 30 seconds, then throw * - Execute original request via IPC * * - If the session is locked * - In parallel (concurrent): * - The IPC server should be running, connect * - Try starting full session * - Fetch light state * - If don't need event handler * - Wait lock * - Unlock * - Fork * - Lock (fork) * - Deserialize full (fork) * - Start IPC server (fork) * - Store IPC state (fork) * - If need event handler and have event handler class * - Wait lock * - Deserialize full * - Start IPC server * - Store IPC state * - Wait for a new IPC session for a maximum of 30 seconds, then throw * - Execute original request via IPC * * * * - If receiving a startAndLoop or setEventHandler request on an IPC session: * - Shutdown remote IPC server * - Deserialize full * - Start IPC server * - Store IPC state * * @param SessionPaths $session Session name * @param SettingsAbstract $settings Settings * @param bool $forceFull Whether to force full session deserialization * @internal * @return array{0: (ChannelledSocket|APIWrapper|Throwable|null|0), 1: (callable|null)} */ public static function unserialize(SessionPaths $session, SettingsAbstract $settings, bool $forceFull = false): array { //Logger::log('Waiting for exclusive session lock...'); $warningId = EventLoop::delay(1, static function () use (&$warningId): void { if (isset($_GET['MadelineSelfRestart'])) { Logger::log("MadelineProto self-restarted successfully!"); } else { Logger::log('It seems like the session is busy.'); Logger::log('Telegram does not support starting multiple instances of the same session, make sure no other instance of the session is running.'); $warningId = EventLoop::repeat(5, static fn () => Logger::log('Still waiting for exclusive session lock...')); EventLoop::unreference($warningId); } }); EventLoop::unreference($warningId); $lightState = null; $cancelFlock = new DeferredCancellation; $cancelIpc = new DeferredFuture; $canContinue = true; $ipcSocket = null; $unlock = Tools::flock($session->getLockPath(), LOCK_EX, 1, $cancelFlock->getCancellation(), $forceFull ? null : static function () use ($session, &$cancelFlock, $cancelIpc, &$canContinue, &$ipcSocket, &$lightState): void { $cancelFull = static function () use (&$cancelFlock): void { if ($cancelFlock !== null) { $copy = $cancelFlock; $cancelFlock = null; $copy->cancel(); } }; EventLoop::queue(static function () use ($session, $cancelFull, &$canContinue, &$lightState): void { try { $lightState = $session->getLightState(); if (!$lightState->canStartIpc()) { $canContinue = false; $cancelFull(); } } catch (Throwable) { $lightState = false; } }); $ipcSocket = self::tryConnect($session->getIpcPath(), $cancelIpc->getFuture(), $cancelFull); }); EventLoop::cancel($warningId); if (!$unlock) { // Canceled, don't have lock return $ipcSocket; } if (!$canContinue) { // Have lock, can't use it Logger::log("Session has event handler, but it's not started.", Logger::ERROR); Logger::log("We don't have access to the event handler class, so we can't start it.", Logger::ERROR); Logger::log('Please start the event handler or unset it to use the IPC server.', Logger::ERROR); $unlock(); return $ipcSocket; } try { $lightState ??= $session->getLightState(); } catch (Throwable) { } \assert($lightState === null || $lightState instanceof LightState); $exists = exists($session->getSessionPath()); if ($lightState && !$forceFull) { if (!$class = $lightState->getEventHandler()) { // Unlock and fork $unlock(); $monitor = Server::startMe($session); EventLoop::queue(static function () use ($cancelIpc, $monitor): void { try { $cancelIpc->complete($monitor->await()); } catch (\Throwable $e) { $cancelIpc->error($e); } }); return $ipcSocket ?? self::tryConnect($session->getIpcPath(), $cancelIpc->getFuture()); } elseif (!class_exists($class)) { // Have lock, can't use it $unlock(); Logger::log("Session has event handler (class $class), but it's not started.", Logger::ERROR); Logger::log("We don't have access to the event handler class, so we can't start it.", Logger::ERROR); Logger::log('Please start the event handler or unset it to use the IPC server.', Logger::ERROR); return $ipcSocket ?? self::tryConnect($session->getIpcPath(), $cancelIpc->getFuture(), customE: new AssertionError("Please make sure the $class class is in scope, or that the event handler is running (in a separate process or in the current process).")); } elseif (is_subclass_of($class, EventHandler::class)) { EventHandler::cachePlugins($class); } } elseif ($lightState) { $class = $lightState->getEventHandler(); if ($class && !class_exists($class)) { // Have lock, can't use it $unlock(); Logger::log("Session has event handler, but it's not started.", Logger::ERROR); Logger::log("We don't have access to the event handler class, so we can't start it.", Logger::ERROR); Logger::log('Please start the event handler or unset it to use the IPC server.', Logger::ERROR); throw new AssertionError("Please make sure the $class class is in scope, or that the event handler is running (in a separate process or in the current process)."); } elseif ($class && is_subclass_of($class, EventHandler::class)) { EventHandler::cachePlugins($class); } } elseif ($exists) { throw new AssertionError("Could not read the lightstate file, check logs!"); } $tempId = Shutdown::addCallback($unlock = static function () use ($unlock): void { Logger::log('Unlocking exclusive session lock!'); $unlock(); Logger::log('Unlocked exclusive session lock!'); }); Logger::log('Got exclusive session lock!'); if ($exists) { $unserialized = $session->unserialize(); } else { $unserialized = null; } if ($unserialized instanceof DriverArray || \is_string($unserialized) || !$exists) { if ($settings instanceof Settings) { $settings = $settings->getDb(); } if ($settings instanceof DriverDatabaseAbstract && $prefix = $settings->getEphemeralFilesystemPrefix() ) { $tableName = "{$prefix}_MTProto_session"; } elseif ($unserialized instanceof DriverArray) { $tableName = ((array) $unserialized)["\0*\0table"]; } else { $tableName = $unserialized; } $unserialized = null; if ($tableName !== null && $settings instanceof DriverDatabaseAbstract) { Logger::log('Extracting session from database...'); $unserialized = (new DbArrayBuilder( $tableName, $settings->getOrmSettings(), KeyType::STRING, ValueType::SCALAR, ))->build()->get('data'); } if (!$unserialized && $exists) { throw new Exception('Could not extract session from database!'); } } \assert($unserialized instanceof APIWrapper || $unserialized === null); Shutdown::removeCallback($tempId); return [$unserialized, $unlock]; } /** * Try connecting to IPC socket. * * @param string $ipcPath IPC path * @param Future<(Throwable|null)> $cancelConnect Cancelation token (triggers cancellation of connection) * @param null|callable(): void $cancelFull Cancelation token source (can trigger cancellation of full unserialization) * @return array{0: (ChannelledSocket|Throwable|0), 1: null} */ public static function tryConnect(string $ipcPath, Future $cancelConnect, ?callable $cancelFull = null, ?Throwable $customE = null): array { for ($x = 0; $x < 25; $x++) { Logger::log('MadelineProto is starting, please wait...'); if (\PHP_OS_FAMILY === 'Windows') { Logger::log(Lang::$current_lang['windows_warning']); } try { clearstatcache(true, $ipcPath); $socket = connect($ipcPath); Logger::log('Connected to IPC socket!'); if ($cancelFull) { $cancelFull(); } return [$socket, null]; } catch (Throwable $e) { $e = $e->getMessage(); if ($e !== 'The endpoint does not exist!') { Logger::log("$e while connecting to IPC socket"); } } try { if ($res = $cancelConnect->await(Tools::getTimeoutCancellation(1.0))) { if ($res instanceof Throwable) { return [$res, null]; } $cancelConnect = (new DeferredFuture)->getFuture(); } } catch (CancelledException $e) { if (!$e->getPrevious() instanceof TimeoutException) { throw $e; } } } return [$customE ?? 0, null]; } }