1
0
mirror of https://github.com/danog/MadelineProto.git synced 2025-01-22 15:51:15 +01:00

Retry connecting normally on IPC error + better error handling and process abortion

This commit is contained in:
Daniil Gentili 2021-06-10 15:54:15 +02:00
parent ba18d2caa5
commit 06f249583f
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
7 changed files with 59 additions and 26 deletions

View File

@ -305,7 +305,10 @@ class API extends InternalDoc
if ($unserialized === 0) {
// Timeout
throw new Exception("Could not connect to MadelineProto, please check the logs for more details.");
Logger::log("!!! Could not connect to MadelineProto, please check and report the logs for more details. !!!", Logger::FATAL_ERROR);
Logger::log("!!! Reconnecting using slower method. !!!", Logger::FATAL_ERROR);
// IPC server error, try fetching full session
return yield from $this->connectToMadelineProto($settings, true);
} elseif ($unserialized instanceof \Throwable) {
// IPC server error, try fetching full session
return yield from $this->connectToMadelineProto($settings, true);

View File

@ -2,9 +2,12 @@
namespace danog\MadelineProto\Ipc\Runner;
use Amp\Deferred;
use Amp\Process\Internal\Posix\Runner;
use Amp\Process\Internal\Windows\Runner as WindowsRunner;
use Amp\Process\ProcessInputStream;
use Amp\Promise;
use danog\MadelineProto\Exception;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Magic;
use danog\MadelineProto\Tools;
@ -41,9 +44,9 @@ final class ProcessRunner extends RunnerAbstract
*
* @param string $session Session path
*
* @return void
* @return Promise<true>
*/
public static function start(string $session, int $startupId): void
public static function start(string $session, int $startupId): Promise
{
if (\PHP_SAPI === "cli") {
$binary = \PHP_BINARY;
@ -79,24 +82,31 @@ final class ProcessRunner extends RunnerAbstract
['QUERY_STRING' => \http_build_query($params)]
);
$resDeferred = new Deferred;
$runner = IS_WINDOWS ? new WindowsRunner : new Runner;
$handle = $runner->start($command, null, $envVars);
$handle->pidDeferred->promise()->onResolve(function (?\Throwable $e, ?int $pid) use ($handle, $runner) {
$handle->pidDeferred->promise()->onResolve(function (?\Throwable $e, ?int $pid) use ($handle, $runner, $resDeferred) {
if ($e) {
Logger::log("Got exception while starting process worker: $e");
$resDeferred->resolve($e);
return;
}
Tools::callFork(self::readUnref($handle->stdout));
Tools::callFork(self::readUnref($handle->stderr));
$runner->join($handle)->onResolve(function (?\Throwable $e, ?int $res) {
$runner->join($handle)->onResolve(function (?\Throwable $e, ?int $res) use ($runner, $handle, $resDeferred) {
$runner->destroy($handle);
if ($e) {
Logger::log("Got exception from process worker: $e");
$resDeferred->fail($e);
} else {
Logger::log("Process worker exited with $res!");
$resDeferred->fail(new Exception("Process worker exited with $res!"));
}
});
});
return $resDeferred->promise();
}
/**
* Unreference and read data from fd, logging results.

View File

@ -2,6 +2,8 @@
namespace danog\MadelineProto\Ipc\Runner;
use Amp\Promise;
abstract class RunnerAbstract
{
const SCRIPT_PATH = __DIR__."/entry.php";
@ -59,10 +61,10 @@ abstract class RunnerAbstract
/**
* Runner.
*
* @param string $session Session path
* @param int $startup ID
* @param string $session Session path
* @param int $startup ID
*
* @return void
* @return Promise<true>
*/
abstract public static function start(string $session, int $startupId): void;
abstract public static function start(string $session, int $startupId): Promise;
}

View File

@ -2,7 +2,10 @@
namespace danog\MadelineProto\Ipc\Runner;
use Amp\Failure;
use Amp\Parallel\Context\ContextException;
use Amp\Promise;
use Amp\Success;
use danog\MadelineProto\Exception;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Magic;
@ -22,12 +25,12 @@ final class WebRunner extends RunnerAbstract
*
* @param string $session Session path
*
* @return void
* @return Promise<bool>
*/
public static function start(string $session, int $startupId): void
public static function start(string $session, int $startupId): Promise
{
if (!isset($_SERVER['SERVER_NAME'])) {
return;
return new Failure(new \Exception("Can't start the web runner!"));
}
if (!self::$runPath) {
@ -105,5 +108,7 @@ final class WebRunner extends RunnerAbstract
// Technically should use amphp/socket, but I guess it's OK to not introduce another dependency just for a socket that will be used once.
\fwrite($res, $payload);
self::$resources []= $res;
return new Success(true);
}
}

View File

@ -33,6 +33,8 @@ use danog\MadelineProto\SessionPaths;
use danog\MadelineProto\Settings\Ipc;
use danog\MadelineProto\Tools;
use function Amp\Promise\first;
/**
* IPC server.
*/
@ -100,34 +102,36 @@ class Server extends SignalLoop
{
$id = Tools::randomInt(2000000000);
$started = false;
$promises = [];
try {
Logger::log("Starting IPC server $session (process)");
ProcessRunner::start($session, $id);
$promises []= ProcessRunner::start($session, $id);
$started = true;
WebRunner::start($session, $id);
return Tools::call(self::monitor($session, $id, $started));
$promises []= WebRunner::start($session, $id);
return Tools::call(self::monitor($session, $id, $started, first($promises)));
} catch (\Throwable $e) {
Logger::log($e);
}
try {
Logger::log("Starting IPC server $session (web)");
WebRunner::start($session, $id);
$promises []= WebRunner::start($session, $id);
$started = true;
} catch (\Throwable $e) {
Logger::log($e);
}
return Tools::call(self::monitor($session, $id, $started));
return Tools::call(self::monitor($session, $id, $started, first($promises)));
}
/**
* Monitor session.
*
* @param SessionPaths $session
* @param int $id
* @param bool $started
* @param SessionPaths $session
* @param int $id
* @param bool $started
* @param Promise<bool> $cancelConnect
*
* @return \Generator
*/
private static function monitor(SessionPaths $session, int $id, bool $started): \Generator
private static function monitor(SessionPaths $session, int $id, bool $started, Promise $cancelConnect): \Generator
{
if (!$started) {
Logger::log("It looks like the server couldn't be started, trying to connect anyway...");
@ -145,7 +149,14 @@ class Server extends SignalLoop
} elseif (!$started && $count > 0 && $count > 2*($state ? 3 : 1)) {
return new Exception("We couldn't start the IPC server, please check the logs!");
}
yield Tools::sleep(0.5);
try {
yield Tools::timeoutWithDefault($cancelConnect, 500, null);
$cancelConnect = (new Deferred)->promise();
} catch (\Throwable $e) {
Logger::log("$e");
Logger::log("Could not start IPC server, please check the logs for more details!");
return $e;
}
$count++;
}
return false;

View File

@ -246,7 +246,7 @@ abstract class Serialization
public static function tryConnect(string $ipcPath, Promise $cancelConnect, ?Deferred $cancelFull = null): \Generator
{
for ($x = 0; $x < 60; $x++) {
Logger::log("Trying to connect to IPC socket...");
Logger::log("MadelineProto is starting, please wait...");
try {
\clearstatcache(true, $ipcPath);
$socket = yield connect($ipcPath);
@ -257,7 +257,9 @@ abstract class Serialization
return [$socket, null];
} catch (\Throwable $e) {
$e = $e->getMessage();
Logger::log("$e while connecting to IPC socket");
if ($e !== 'The endpoint does not exist!') {
Logger::log("$e while connecting to IPC socket");
}
}
if ($res = yield Tools::timeoutWithDefault($cancelConnect, 1000, null)) {
if ($res instanceof \Throwable) {

View File

@ -134,11 +134,11 @@ class SessionPaths
}
$headerLen = \strlen(Serialization::PHP_HEADER) + 1;
Logger::log("Waiting for shared lock of $path.lock...");
Logger::log("Waiting for shared lock of $path.lock...", Logger::ULTRA_VERBOSE);
$unlock = yield from Tools::flockGenerator("$path.lock", LOCK_SH, 0.1);
try {
Logger::log("Got shared lock of $path.lock...");
Logger::log("Got shared lock of $path.lock...", Logger::ULTRA_VERBOSE);
$file = yield open($path, 'rb');
$size = yield stat($path);