. * * @author Daniil Gentili * @copyright 2016-2023 Daniil Gentili * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 * @link https://docs.madelineproto.xyz MadelineProto documentation */ namespace danog\MadelineProto\Ipc; use Amp\CompositeException; use Amp\DeferredFuture; use Amp\Future; use Amp\Ipc\IpcServer; use Amp\Ipc\Sync\ChannelledSocket; use danog\Loop\Loop; use danog\MadelineProto\Exception; use danog\MadelineProto\Ipc\Runner\ProcessRunner; use danog\MadelineProto\Ipc\Runner\WebRunner; use danog\MadelineProto\Logger; use danog\MadelineProto\Loop\InternalLoop; use danog\MadelineProto\SessionPaths; use danog\MadelineProto\Settings\Ipc; use danog\MadelineProto\Shutdown; use danog\MadelineProto\Tools; use Revolt\EventLoop; use Throwable; use function Amp\async; use function Amp\delay; /** * IPC server. * * @internal */ class Server extends Loop { use InternalLoop; /** * Server version. */ public const VERSION = 1; /** * Shutdown server. */ public const SHUTDOWN = 0; /** * Boolean to shut down worker, if started. */ private static bool $shutdown = false; /** * Deferred to shut down worker, if started. */ private static ?DeferredFuture $shutdownDeferred = null; /** * Boolean whether to shut down worker, if started. */ private static bool $shutdownNow = false; /** * IPC server. */ protected IpcServer $server; /** * Callback IPC server. */ private ServerCallback $callback; /** * Set IPC path. * * @param SessionPaths $session Session */ public function setIpcPath(SessionPaths $session): void { self::$shutdownDeferred ??= new DeferredFuture; $this->server = new IpcServer($session->getIpcPath()); $this->callback = new ServerCallback($this->API); $this->callback->setIpcPath($session); } public function start(): bool { return $this instanceof ServerCallback ? parent::start() : $this->callback->start() && parent::start(); } /** * Start IPC server in background. * * @param SessionPaths $session Session path */ public static function startMe(SessionPaths $session): Future { $id = Tools::randomInt(2000000000); $started = false; $e = null; try { Logger::log("Starting IPC server $session (process)"); ProcessRunner::start((string) $session, $id); $started = true; WebRunner::start((string) $session, $id); return async(self::monitor(...), $session, $id, $started, null); } catch (Throwable $e) { Logger::log($e); } try { Logger::log("Starting IPC server $session (web)"); if (WebRunner::start((string) $session, $id)) { $started = true; } } catch (Throwable $e2) { Logger::log($e2); if ($e) { $e = new CompositeException([$e, $e2]); } else { $e = $e2; } } return async(self::monitor(...), $session, $id, $started, $e); } /** * Monitor session. */ private static function monitor(SessionPaths $session, int $id, bool $started, ?\Throwable $e): bool|Throwable { if (!$started) { Logger::log("It looks like the server couldn't be started, trying to connect anyway..."); } $count = 0; while (true) { $state = $session->getIpcState(); if ($state && $state->getStartupId() === $id) { if ($e = $state->getException()) { Logger::log("IPC server got exception $e"); return $e; } Logger::log('IPC server started successfully!'); return true; } elseif (!$started && $count > 0 && $count > 2*($state ? 3 : 1)) { return new Exception("We couldn't start the IPC server, please check the logs!", previous: $e); } delay(0.5); $count++; } return false; } /** * Wait for shutdown. */ public static function waitShutdown(): void { if (self::$shutdownNow) { return; } self::$shutdownDeferred ??= new DeferredFuture; self::$shutdownDeferred->getFuture()->await(); } /** * Shutdown. */ final public function stop(): bool { $this->server->close(); if (!$this instanceof ServerCallback) { $this->callback->server->close(); } if (self::$shutdownDeferred) { self::$shutdownNow = true; $deferred = self::$shutdownDeferred; self::$shutdownDeferred = null; $deferred->complete(); } return true; } /** * Main loop. */ protected function loop(): ?float { while ($socket = $this->server->accept()) { EventLoop::queue($this->clientLoop(...), $socket); } $this->server->close(); if (isset($this->callback)) { $this->callback->server->close(); } return self::STOP; } /** * Client handler loop. * * @param ChannelledSocket $socket Client */ protected function clientLoop(ChannelledSocket $socket): void { $this->API->waitForInit(); $this->API->logger('Accepted IPC client connection!'); $id = 0; $payload = null; try { while ($payload = $socket->receive()) { EventLoop::queue($this->clientRequest(...), $socket, $id++, $payload); } } catch (Throwable $e) { Logger::log("Exception in IPC connection: $e"); } finally { EventLoop::queue(function () use ($socket, $payload): void { try { $socket->disconnect(); } catch (Throwable $e) { Logger::log("Exception during shutdown in IPC connection: $e"); } if ($payload === self::SHUTDOWN) { Shutdown::removeCallback('restarter'); $this->stop(); } }); } } /** * Handle client request. * * @param ChannelledSocket $socket Socket * @param array{0: string, 1: array|Wrapper} $payload Payload */ private function clientRequest(ChannelledSocket $socket, int $id, array $payload): void { try { if ($payload[1] instanceof Wrapper) { $wrapper = $payload[1]; $payload[1] = $this->callback->unwrap($wrapper); } $result = $this->API->{$payload[0]}(...$payload[1]); } catch (Throwable $e) { $this->API->logger("Got error while calling IPC method: $e", Logger::ERROR); $result = new ExitFailure($e); } finally { if (isset($wrapper)) { EventLoop::queue(static function () use ($wrapper): void { try { $wrapper->disconnect(); } catch (Throwable $e) { Logger::log("Exception during shutdown in IPC connection: $e"); } }); } } try { $socket->send([$id, $result]); } catch (Throwable $e) { $this->API->logger("Got error while trying to send result of {$payload[0]}: $e", Logger::ERROR); try { $socket->send([$id, new ExitFailure($e)]); } catch (Throwable $e) { $this->API->logger("Got error while trying to send error of error of {$payload[0]}: $e", Logger::ERROR); } } } /** * Get the name of the loop. */ public function __toString(): string { return 'IPC server'; } }