1
0
mirror of https://github.com/danog/MadelineProto.git synced 2024-11-26 20:54:42 +01:00

Improvements for amp v3

This commit is contained in:
Daniil Gentili 2023-01-15 16:12:12 +01:00
parent 42c6d2c0e9
commit aa36745409
15 changed files with 134 additions and 120 deletions

View File

@ -1,6 +1,6 @@
<?xml version="1.0"?>
<psalm
errorLevel="4"
errorLevel="5"
resolveFromConfigFile="true"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="https://getpsalm.org/schema/config"

View File

@ -22,7 +22,9 @@ namespace danog\MadelineProto;
use Amp\CancelledException;
use Amp\DeferredFuture;
use Amp\Future;
use Amp\Ipc\Sync\ChannelledSocket;
use Amp\SignalException;
use Amp\TimeoutCancellation;
use Amp\TimeoutException;
use danog\MadelineProto\ApiWrappers\Start;
@ -30,6 +32,7 @@ use danog\MadelineProto\ApiWrappers\Templates;
use danog\MadelineProto\Ipc\Client;
use danog\MadelineProto\Settings\Ipc as SettingsIpc;
use danog\MadelineProto\Settings\Logger as SettingsLogger;
use Revolt\EventLoop\UncaughtThrowable;
use Throwable;
use function Amp\async;
@ -149,8 +152,8 @@ final class API extends InternalDoc
/**
* Magic constructor function.
*
* @param string $session Session name
* @param array $settings Settings
* @param string $session Session name
* @param array|SettingsAbstract $settings Settings
*/
public function __construct(string $session, array|SettingsAbstract $settings = [])
{
@ -213,9 +216,15 @@ final class API extends InternalDoc
$this->API->stopIpcServer();
$this->logger->logger('Restarting to full instance: disconnecting from IPC server...');
$this->API->disconnect();
} catch (SecurityException $e) {
} catch (SecurityException|SignalException $e) {
throw $e;
} catch (Throwable $e) {
if ($e instanceof UncaughtThrowable) {
$e = $e->getPrevious();
if ($e instanceof SecurityException || $e instanceof SignalException) {
throw $e;
}
}
$this->logger->logger("Restarting to full instance: error $e");
}
$this->logger->logger('Restarting to full instance: reconnecting...');
@ -241,9 +250,15 @@ final class API extends InternalDoc
$this->logger->logger('Restarting to full instance: disconnecting from IPC server...');
$API->disconnect();
$API->unreference();
} catch (SecurityException $e) {
} catch (SecurityException|SignalException $e) {
throw $e;
} catch (Throwable $e) {
if ($e instanceof UncaughtThrowable) {
$e = $e->getPrevious();
if ($e instanceof SecurityException || $e instanceof SignalException) {
throw $e;
}
}
$this->logger->logger("Restarting to full instance: error in stop loop $e");
}
async($cb);
@ -337,6 +352,17 @@ final class API extends InternalDoc
{
$this->oldInstance = true;
}
/**
* @var array<Future<null>>
*/
private static array $destructors = [];
/**
* @internal
*/
public static function finalize(): void
{
await(self::$destructors);
}
/**
* Destruct function.
*
@ -345,12 +371,13 @@ final class API extends InternalDoc
public function __destruct()
{
if (!$this->oldInstance) {
async(function (): void {
$id = \count(self::$destructors);
self::$destructors[$id] = async(function () use ($id): void {
$this->logger->logger('Shutting down MadelineProto ('.static::class.')');
if ($this->API) {
$this->API->unreference();
}
if (isset($this->wrapper) && (!Magic::$signaled || $this->gettingApiId)) {
if (isset($this->wrapper)) {
$this->logger->logger('Prompting final serialization...');
$this->wrapper->serialize();
$this->logger->logger('Done final serialization!');
@ -358,6 +385,7 @@ final class API extends InternalDoc
if ($this->unlock) {
($this->unlock)();
}
unset(self::$destructors[$id]);
});
} elseif ($this->logger) {
$this->logger->logger('Shutting down MadelineProto (old deserialized instance of API)');
@ -395,9 +423,15 @@ final class API extends InternalDoc
try {
$this->startAndLoopAsyncInternal($eventHandler, $started);
return;
} catch (SecurityException $e) {
} catch (SecurityException|SignalException $e) {
throw $e;
} catch (Throwable $e) {
if ($e instanceof UncaughtThrowable) {
$e = $e->getPrevious();
if ($e instanceof SecurityException || $e instanceof SignalException) {
throw $e;
}
}
if (\str_starts_with($e->getMessage(), 'Could not connect to DC ')) {
throw $e;
}
@ -438,9 +472,15 @@ final class API extends InternalDoc
}
await($promises);
return;
} catch (SecurityException $e) {
} catch (SecurityException|SignalException $e) {
throw $e;
} catch (Throwable $e) {
if ($e instanceof UncaughtThrowable) {
$e = $e->getPrevious();
if ($e instanceof SecurityException || $e instanceof SignalException) {
throw $e;
}
}
$t = \time();
$errors = [$t => $errors[$t] ?? 0];
$errors[$t]++;
@ -488,9 +528,15 @@ final class API extends InternalDoc
$started = true;
/** @var API $this->API */
$this->API->loop();
} catch (SecurityException $e) {
} catch (SecurityException|SignalException $e) {
throw $e;
} catch (Throwable $e) {
if ($e instanceof UncaughtThrowable) {
$e = $e->getPrevious();
if ($e instanceof SecurityException || $e instanceof SignalException) {
throw $e;
}
}
$t = \time();
$errors = [$t => $errors[$t] ?? 0];
$errors[$t]++;

View File

@ -104,6 +104,6 @@ final class Exception extends \Exception
public static function exceptionHandler(\Throwable $exception): void
{
Logger::log($exception, Logger::FATAL_ERROR);
Magic::shutdown(1);
die(1);
}
}

View File

@ -6,6 +6,7 @@ namespace danog\MadelineProto;
use Amp\Http\Client\HttpClientBuilder;
use Amp\Http\Client\Request;
use Amp\SignalException;
use danog\Loop\Generic\PeriodicLoop;
use Revolt\EventLoop;
use Throwable;
@ -67,7 +68,7 @@ final class GarbageCollector
if (Magic::$version !== Magic::$version_latest) {
Logger::log('!!!!!!!!!!!!! An update of MadelineProto is required, shutting down worker! !!!!!!!!!!!!!', Logger::FATAL_ERROR);
if (Magic::$isIpcWorker) {
die;
throw new SignalException('!!!!!!!!!!!!! An update of MadelineProto is required, shutting down worker! !!!!!!!!!!!!!');
}
return true;
}

View File

@ -5,18 +5,12 @@ declare(strict_types=1);
namespace danog\MadelineProto\Ipc\Runner;
use Amp\ByteStream\ReadableResourceStream;
use Amp\NullCancellation;
use Amp\Process\Internal\Posix\PosixRunner;
use Amp\Process\Internal\Posix\Runner;
use Amp\Process\Internal\Windows\WindowsRunner as WindowsWindowsRunner;
use danog\MadelineProto\Exception;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Magic;
use Error;
use Throwable;
use const Amp\Process\IS_WINDOWS;
use const ARRAY_FILTER_USE_BOTH;
use const DIRECTORY_SEPARATOR;
use const PATH_SEPARATOR;
@ -52,6 +46,11 @@ final class ProcessRunner extends RunnerAbstract
/** @var string|null Cached path to located PHP binary. */
private static ?string $binaryPath = null;
/**
* Resources.
*/
private static array $resources = [];
/**
* Runner.
*
@ -95,21 +94,22 @@ final class ProcessRunner extends RunnerAbstract
['QUERY_STRING' => \http_build_query($params)],
);
$runner = IS_WINDOWS ? new WindowsWindowsRunner : new PosixRunner;
try {
$handle = $runner->start($command, new NullCancellation, null, $envVars);
} catch (\Throwable $e) {
Logger::log("Got exception while starting process worker: $e");
throw $e;
}
async(self::readUnref(...), $handle->streams->stdout);
async(self::readUnref(...), $handle->streams->stderr);
async(function () use ($runner, $handle): void {
$res = $runner->join($handle->handle);
$runner->destroy($handle->handle);
Logger::log("Process worker exited with $res!");
throw new Exception("Process worker exited with $res!");
});
self::$resources []= \proc_open(
$command,
[
["pipe", "r"],
["pipe", "w"],
["pipe", "w"],
],
$pipes,
null,
$envVars
);
$stdout = new ReadableResourceStream($pipes[1]);
$stderr = new ReadableResourceStream($pipes[2]);
async(self::readUnref(...), $stdout);
async(self::readUnref(...), $stderr);
return true;
}
/**

View File

@ -32,7 +32,7 @@ final class WebRunner extends RunnerAbstract
public static function start(string $session, int $startupId): bool
{
if (!isset($_SERVER['SERVER_NAME']) || !$_SERVER['SERVER_NAME']) {
throw new \Exception("Can't start the web runner!");
return false;
}
if (!self::$runPath) {

View File

@ -18,14 +18,17 @@ declare(strict_types=1);
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
use Amp\SignalException;
use danog\MadelineProto\API;
use danog\MadelineProto\Ipc\IpcState;
use danog\MadelineProto\Ipc\Server;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Magic;
use danog\MadelineProto\SecurityException;
use danog\MadelineProto\SessionPaths;
use danog\MadelineProto\Settings\Ipc;
use danog\MadelineProto\Shutdown;
use Revolt\EventLoop\UncaughtThrowable;
use Webmozart\Assert\Assert;
(static function (): void {
@ -115,6 +118,12 @@ use Webmozart\Assert\Assert;
Logger::log('A restart was triggered!', Logger::FATAL_ERROR);
return;
} catch (Throwable $e) {
if ($e instanceof UncaughtThrowable) {
$e = $e->getPrevious();
}
if ($e instanceof SecurityException || $e instanceof SignalException) {
throw $e;
}
Logger::log((string) $e, Logger::FATAL_ERROR);
$API->report("Surfaced: $e");
}

View File

@ -20,13 +20,10 @@ declare(strict_types=1);
namespace danog\MadelineProto\Ipc;
use Amp\CancelledException;
use Amp\DeferredFuture;
use Amp\Future;
use Amp\Ipc\IpcServer;
use Amp\Ipc\Sync\ChannelledSocket;
use Amp\TimeoutCancellation;
use Amp\TimeoutException;
use danog\Loop\SignalLoop;
use danog\MadelineProto\Exception;
use danog\MadelineProto\Ipc\Runner\ProcessRunner;
@ -39,7 +36,7 @@ use danog\MadelineProto\Tools;
use Throwable;
use function Amp\async;
use function Amp\Future\awaitAny;
use function Amp\delay;
/**
* IPC server.
@ -104,31 +101,27 @@ class Server extends SignalLoop
{
$id = Tools::randomInt(2000000000);
$started = false;
$promises = [];
try {
Logger::log("Starting IPC server $session (process)");
$promises []= async(ProcessRunner::start(...), (string) $session, $id);
ProcessRunner::start((string) $session, $id);
$started = true;
$promises []= async(WebRunner::start(...), (string) $session, $id);
return async(self::monitor(...), $session, $id, $started, async(awaitAny(...), $promises));
WebRunner::start((string) $session, $id);
return async(self::monitor(...), $session, $id, $started);
} catch (Throwable $e) {
Logger::log($e);
}
try {
Logger::log("Starting IPC server $session (web)");
$promises []= async(WebRunner::start(...), (string) $session, $id);
$started = true;
$started = $started || WebRunner::start((string) $session, $id);
} catch (Throwable $e) {
Logger::log($e);
}
return async(self::monitor(...), $session, $id, $started, $promises ? async(awaitAny(...), $promises) : (new DeferredFuture)->getFuture());
return async(self::monitor(...), $session, $id, $started);
}
/**
* Monitor session.
*
* @param Future<bool> $cancelConnect
*/
private static function monitor(SessionPaths $session, int $id, bool $started, Future $cancelConnect): bool|Throwable
private static function monitor(SessionPaths $session, int $id, bool $started): bool|Throwable
{
if (!$started) {
Logger::log("It looks like the server couldn't be started, trying to connect anyway...");
@ -146,20 +139,7 @@ class Server extends SignalLoop
} elseif (!$started && $count > 0 && $count > 2*($state ? 3 : 1)) {
return new Exception("We couldn't start the IPC server, please check the logs!");
}
try {
try {
$cancelConnect->await(new TimeoutCancellation(0.5));
} catch (CancelledException $e) {
if (!$e->getPrevious() instanceof TimeoutException) {
throw $e;
}
}
$cancelConnect = (new DeferredFuture)->getFuture();
} catch (Throwable $e) {
Logger::log("$e");
Logger::log('Could not start IPC server, please check the logs for more details!');
return $e;
}
delay(0.5);
$count++;
}
return false;

View File

@ -22,6 +22,8 @@ namespace danog\MadelineProto;
use Amp\DeferredFuture;
use Amp\Loop\Driver;
use Amp\SignalCancellation;
use Amp\SignalException;
use danog\MadelineProto\TL\Conversion\Extension;
use phpseclib3\Math\BigInteger;
use ReflectionClass;
@ -181,11 +183,6 @@ final class Magic
*
*/
public static bool $zerowebhost = false;
/**
* Whether a signal was sent to the processand the system must shut down.
*
*/
public static bool $signaled = false;
/**
* Whether to suspend certain stdout log printing, when reading input.
*/
@ -222,6 +219,7 @@ final class Magic
}
if (!self::$initedLight) {
// Setup error reporting
Shutdown::init();
\set_error_handler(Exception::exceptionErrorHandler(...));
\set_exception_handler(Exception::exceptionHandler(...));
self::$isIpcWorker = \defined('MADELINE_WORKER_TYPE') ? MADELINE_WORKER_TYPE === 'madeline-ipc' : false;
@ -264,12 +262,16 @@ final class Magic
\pcntl_signal(SIGINT, fn () => null);
\pcntl_signal(SIGINT, SIG_DFL);
EventLoop::unreference(EventLoop::onSignal(SIGINT, static function (): void {
Logger::log('Got sigint', Logger::FATAL_ERROR);
Magic::shutdown(self::$isIpcWorker ? 0 : 1);
if (self::$suspendPeriodicLogging) {
self::togglePeriodicLogging();
}
throw new SignalException('SIGINT received');
}));
EventLoop::unreference(EventLoop::onSignal(SIGTERM, static function (): void {
Logger::log('Got sigterm', Logger::FATAL_ERROR);
Magic::shutdown(self::$isIpcWorker ? 0 : 1);
if (self::$suspendPeriodicLogging) {
self::togglePeriodicLogging();
}
throw new SignalException('SIGTERM received');
}));
} catch (Throwable $e) {
}
@ -287,9 +289,6 @@ final class Magic
}
self::$initedLight = true;
if ($light) {
if (!\defined('AMP_WORKER')) {
\define('AMP_WORKER', true);
}
return;
}
}
@ -368,33 +367,6 @@ final class Magic
{
return self::$can_getcwd ? \getcwd() : self::$cwd;
}
/**
* Shutdown system.
*
* @param int $code Exit code
*/
public static function shutdown(int $code = 0): void
{
self::$signaled = true;
if (\defined('STDIN')) {
getStdin()->unreference();
}
/*if ($code !== 0) {
$driver = EventLoop::get();
$reflectionClass = new ReflectionClass(Driver::class);
$reflectionProperty = $reflectionClass->getProperty('watchers');
$reflectionProperty->setAccessible(true);
foreach (\array_keys($reflectionProperty->getValue($driver)) as $key) {
$driver->unreference($key);
}
}*/
MTProto::serializeAll();
//EventLoop::stop();
if (\class_exists(Installer::class)) {
Installer::unlock();
}
die($code);
}
/**
* Toggle periodic logging.
*/

View File

@ -131,10 +131,6 @@ abstract class Serialization
//Logger::log('Waiting for exclusive session lock...');
$warningId = EventLoop::delay(1, static function () use (&$warningId): void {
Logger::log('It seems like the session is busy.');
/*if (\defined(\MADELINE_WORKER::class)) {
Logger::log("Exiting since we're in a worker");
Magic::shutdown(1);
}*/
Logger::log('Telegram does not support starting multiple instances of the same session, make sure no other instance of the session is running.');
$warningId = EventLoop::repeat(5, fn () => Logger::log('Still waiting for exclusive session lock...'));
EventLoop::unreference($warningId);

View File

@ -152,7 +152,8 @@ final class SessionPaths
}
$headerLen += 2;
}
$unserialized = \unserialize(($file->read(null, $size - $headerLen)) ?? '');
$d = ($file->read(null, $size - $headerLen)) ?? '';
$unserialized = \unserialize($d);
$file->close();
} finally {
$unlock();

View File

@ -140,11 +140,6 @@ final class Logger extends SettingsAbstract
*/
public function setType(int $type): self
{
if ($type === MadelineProtoLogger::NO_LOGGER) {
$type = (PHP_SAPI === 'cli' || PHP_SAPI === 'phpdbg')
? MadelineProtoLogger::ECHO_LOGGER
: MadelineProtoLogger::FILE_LOGGER;
}
$this->type = $type;
return $this;

View File

@ -20,6 +20,8 @@ declare(strict_types=1);
namespace danog\MadelineProto;
use function Amp\ByteStream\getStdin;
/**
* Class that controls script shutdown.
*/
@ -50,7 +52,25 @@ final class Shutdown
$callback();
}
self::$callbacks = [];
Magic::shutdown(0);
if (\defined('STDIN')) {
getStdin()->unreference();
}
API::finalize();
MTProto::serializeAll();
if (\class_exists(Installer::class)) {
Installer::unlock();
}
}
/**
* Register shutdown function.
*/
public static function init(): void
{
if (!self::$registered) {
\register_shutdown_function(fn () => self::shutdown());
self::$registered = true;
}
}
/**
* Add a callback for script shutdown.
@ -65,10 +85,7 @@ final class Shutdown
$id = self::$id++;
}
self::$callbacks[$id] = $callback;
if (!self::$registered) {
\register_shutdown_function(fn () => self::shutdown());
self::$registered = true;
}
self::init();
return $id;
}
/**

View File

@ -153,9 +153,6 @@ trait Loop
} while (!$this->stopLoop);
$this->logger->logger('Exiting update loop!', Logger::NOTICE);
$this->stopLoop = false;
if (isset($repeat)) {
EventLoop::cancel($repeat);
}
}
/**
* Stop update loop.

View File

@ -96,7 +96,7 @@ trait Start
$this->serialize();
return $this->fullGetSelf();
}
exit;
die;
}
private function webPhoneLogin(): void
{