. * * @author Daniil Gentili * @copyright 2016-2023 Daniil Gentili * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 * @link https://docs.madelineproto.xyz MadelineProto documentation */ namespace danog\MadelineProto; use Amp\CancelledException; use Amp\DeferredCancellation; use Amp\DeferredFuture; use Amp\Future; use Amp\Ipc\Sync\ChannelledSocket; use Amp\TimeoutException; use AssertionError; use danog\MadelineProto\Db\DbPropertiesFactory; use danog\MadelineProto\Db\DriverArray; use danog\MadelineProto\Ipc\Server; use danog\MadelineProto\Settings\DatabaseAbstract; use Revolt\EventLoop; use Throwable; use const LOCK_EX; use function Amp\File\exists; use function Amp\Ipc\connect; /** * Manages serialization of the MadelineProto instance. * * @internal */ abstract class Serialization { /** * Header for session files. */ public const PHP_HEADER = 'getSessionPath())) { // No session exists yet, lock for when we create it return [null, Tools::flock($session->getLockPath(), LOCK_EX, 1)]; } //Logger::log('Waiting for exclusive session lock...'); $warningId = EventLoop::delay(1, static function () use (&$warningId): void { if (isset($_GET['MadelineSelfRestart'])) { Logger::log("MadelineProto self-restarted successfully!"); } else { Logger::log('It seems like the session is busy.'); Logger::log('Telegram does not support starting multiple instances of the same session, make sure no other instance of the session is running.'); $warningId = EventLoop::repeat(5, fn () => Logger::log('Still waiting for exclusive session lock...')); EventLoop::unreference($warningId); } }); EventLoop::unreference($warningId); $lightState = null; $cancelFlock = new DeferredCancellation; $cancelIpc = new DeferredFuture; $canContinue = true; $ipcSocket = null; $unlock = Tools::flock($session->getLockPath(), LOCK_EX, 1, $cancelFlock->getCancellation(), $forceFull ? null : static function () use ($session, &$cancelFlock, $cancelIpc, &$canContinue, &$ipcSocket, &$lightState): void { $cancelFull = static function () use (&$cancelFlock): void { if ($cancelFlock !== null) { $copy = $cancelFlock; $cancelFlock = null; $copy->cancel(); } }; EventLoop::queue(function () use ($session, $cancelFull, &$canContinue, &$lightState): void { try { $lightState = $session->getLightState(); if (!$lightState->canStartIpc()) { $canContinue = false; $cancelFull(); } } catch (Throwable) { $lightState = false; } }); $ipcSocket = self::tryConnect($session->getIpcPath(), $cancelIpc->getFuture(), $cancelFull); }); EventLoop::cancel($warningId); if (!$unlock) { // Canceled, don't have lock return $ipcSocket; } if (!$canContinue) { // Have lock, can't use it Logger::log("Session has event handler, but it's not started.", Logger::ERROR); Logger::log("We don't have access to the event handler class, so we can't start it.", Logger::ERROR); Logger::log('Please start the event handler or unset it to use the IPC server.', Logger::ERROR); $unlock(); return $ipcSocket; } try { /** @var LightState */ $lightState ??= $session->getLightState(); } catch (Throwable) { } if ($lightState && !$forceFull) { if (!$class = $lightState->getEventHandler()) { // Unlock and fork $unlock(); $monitor = Server::startMe($session); EventLoop::queue(function () use ($cancelIpc, $monitor): void { try { $cancelIpc->complete($monitor->await()); } catch (\Throwable $e) { $cancelIpc->error($e); } }); return $ipcSocket ?? self::tryConnect($session->getIpcPath(), $cancelIpc->getFuture()); } elseif (!\class_exists($class)) { // Have lock, can't use it $unlock(); Logger::log("Session has event handler (class $class), but it's not started.", Logger::ERROR); Logger::log("We don't have access to the event handler class, so we can't start it.", Logger::ERROR); Logger::log('Please start the event handler or unset it to use the IPC server.', Logger::ERROR); return $ipcSocket ?? self::tryConnect($session->getIpcPath(), $cancelIpc->getFuture(), customE: new AssertionError("Please make sure the $class class is in scope, or that the event handler is running (in a separate process or in the current process).")); } elseif (\is_subclass_of($class, EventHandler::class)) { EventHandler::cachePlugins($class); } } else { if (!$lightState) { throw new AssertionError("Could not read the lightstate file, check logs!"); } $class = $lightState->getEventHandler(); if ($class && !\class_exists($class)) { // Have lock, can't use it $unlock(); Logger::log("Session has event handler, but it's not started.", Logger::ERROR); Logger::log("We don't have access to the event handler class, so we can't start it.", Logger::ERROR); Logger::log('Please start the event handler or unset it to use the IPC server.', Logger::ERROR); throw new AssertionError("Please make sure the $class class is in scope, or that the event handler is running (in a separate process or in the current process)."); } elseif ($class && \is_subclass_of($class, EventHandler::class)) { EventHandler::cachePlugins($class); } } $tempId = Shutdown::addCallback($unlock = static function () use ($unlock): void { Logger::log('Unlocking exclusive session lock!'); $unlock(); Logger::log('Unlocked exclusive session lock!'); }); Logger::log('Got exclusive session lock!'); $unserialized = $session->unserialize(); if ($unserialized instanceof DriverArray) { Logger::log('Extracting session from database...'); if ($settings instanceof Settings) { $settings = $settings->getDb(); } if ($settings instanceof DatabaseAbstract) { $tableName = (string) $unserialized; $unserialized = DbPropertiesFactory::get( $settings, $tableName, ['enableCache' => false], $unserialized, ); } else { $unserialized->initStartup(); } $unserialized = $unserialized['data']; if (!$unserialized) { throw new Exception('Could not extract session from database!'); } } if ($unserialized === false) { throw new Exception(Lang::$current_lang['deserialization_error']); } Shutdown::removeCallback($tempId); return [$unserialized, $unlock]; } /** * Try connecting to IPC socket. * * @param string $ipcPath IPC path * @param Future<(Throwable|null)> $cancelConnect Cancelation token (triggers cancellation of connection) * @param null|callable(): void $cancelFull Cancelation token source (can trigger cancellation of full unserialization) * @return array{0: (ChannelledSocket|Throwable|0), 1: null} */ public static function tryConnect(string $ipcPath, Future $cancelConnect, ?callable $cancelFull = null, ?Throwable $customE = null): array { for ($x = 0; $x < 25; $x++) { Logger::log('MadelineProto is starting, please wait...'); if (\PHP_OS_FAMILY === 'Windows') { Logger::log(Lang::$current_lang['windows_warning']); } try { \clearstatcache(true, $ipcPath); $socket = connect($ipcPath); Logger::log('Connected to IPC socket!'); if ($cancelFull) { $cancelFull(); } return [$socket, null]; } catch (Throwable $e) { $e = $e->getMessage(); if ($e !== 'The endpoint does not exist!') { Logger::log("$e while connecting to IPC socket"); } } try { if ($res = $cancelConnect->await(Tools::getTimeoutCancellation(1.0))) { if ($res instanceof Throwable) { return [$res, null]; } $cancelConnect = (new DeferredFuture)->getFuture(); } } catch (CancelledException $e) { if (!$e->getPrevious() instanceof TimeoutException) { throw $e; } } } return [$customE ?? 0, null]; } }