From 1efb43cbdefff74ba24fb47e6c723a4342fe98ad Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Mon, 21 Oct 2024 15:50:43 +0000 Subject: [PATCH] Misc fixes --- src/Loop/Connection/WriteLoop.php | 20 ++++++++++---------- src/MTProto.php | 11 ++--------- src/MTProtoSession/CallHandler.php | 6 +++--- src/MTProtoTools/FilesLogic.php | 9 +-------- 4 files changed, 16 insertions(+), 30 deletions(-) diff --git a/src/Loop/Connection/WriteLoop.php b/src/Loop/Connection/WriteLoop.php index 55114791b..4484d81f8 100644 --- a/src/Loop/Connection/WriteLoop.php +++ b/src/Loop/Connection/WriteLoop.php @@ -49,24 +49,24 @@ final class WriteLoop extends Loop public const MAX_IDS = 8192; use Common { - __construct as init; + __construct as init2; } private int $pingTimeout; - private float $timeout; + private float $pollTimeout; /** * Constructor function. */ public function __construct(Connection $connection) { - $this->init($connection); + $this->init2($connection); $timeout = $this->shared->getSettings()->getPingInterval(); $this->pingTimeout = $timeout + 15; if ($this->connection->isHttp()) { - $this->timeout = (float) max(self::LONG_POLL_TIMEOUT, $timeout); + $this->pollTimeout = (float) max(self::LONG_POLL_TIMEOUT, $timeout); } else { - $this->timeout = (float) $timeout; + $this->pollTimeout = (float) $timeout; } } /** @@ -83,7 +83,7 @@ final class WriteLoop extends Loop } if (!$this->connection->pendingOutgoing && !$first) { $this->API->logger("No messages, pausing in $this...", Logger::ULTRA_VERBOSE); - return $this->timeout; + return $this->pollTimeout; } if ($please_wait) { $this->API->logger("Have to wait for handshake, pausing in $this...", Logger::ULTRA_VERBOSE); @@ -119,8 +119,8 @@ final class WriteLoop extends Loop { if ($queue = $this->connection->unencrypted_check_queue) { $this->connection->unencrypted_check_queue = []; - foreach ($queue as $msg_id => $_) { - $this->connection->methodRecall($msg_id); + foreach ($queue as $msg) { + $this->connection->methodRecall($msg); } } while ($this->connection->pendingOutgoing) { @@ -189,7 +189,7 @@ final class WriteLoop extends Loop $this->connection->objectCall('msgs_state_req', ['msg_ids' => $message_ids], $deferred); EventLoop::queue(function () use ($deferred, $message_ids): void { try { - $result = $deferred->getFuture()->await(new TimeoutCancellation($this->timeout)); + $result = $deferred->getFuture()->await(new TimeoutCancellation($this->pollTimeout)); if (\is_callable($result)) { throw $result(); } @@ -247,7 +247,7 @@ final class WriteLoop extends Loop } } //} catch (CancelledException) { - //$this->API->logger("We did not receive a response for {$this->timeout} seconds: reconnecting and exiting check loop on DC {$this->datacenter}"); + //$this->API->logger("We did not receive a response for {$this->pollTimeout} seconds: reconnecting and exiting check loop on DC {$this->datacenter}"); //EventLoop::queue($this->connection->reconnect(...)); } catch (\Throwable $e) { $this->API->logger("Got exception in check loop for DC {$this->datacenter}"); diff --git a/src/MTProto.php b/src/MTProto.php index 1bad2f58d..d5ee6ecf7 100644 --- a/src/MTProto.php +++ b/src/MTProto.php @@ -408,7 +408,6 @@ final class MTProto implements TLCallback, LoggerGetter, SettingsGetter */ #[OrmMappedArray(KeyType::STRING, ValueType::SCALAR, cacheTtl: 0, optimizeIfWastedMb: 1, tablePostfix: 'session')] public DbArray $sessionDb; - private bool $cleaned = false; /** * Returns an instance of a client by session name. @@ -1004,11 +1003,6 @@ final class MTProto implements TLCallback, LoggerGetter, SettingsGetter $this->updateQueue = $q; } - if ($this->cleaned) { - return; - } - $this->cleaned = true; - if (isset($this->channels_state)) { $this->updateState = new CombinedUpdatesState; foreach ($this->channels_state->get() as $channelId => $state) { @@ -1131,9 +1125,6 @@ final class MTProto implements TLCallback, LoggerGetter, SettingsGetter $this->initPromise = $deferred->getFuture(); try { - // Update settings from constructor - $this->updateSettings($settings); - // Setup logger $this->setupLogger(); @@ -1163,6 +1154,8 @@ final class MTProto implements TLCallback, LoggerGetter, SettingsGetter } // Reset MTProto session (not related to user session) $this->resetMTProtoSession("wakeup"); + // Update settings from constructor + $this->updateSettings($settings); // Update TL callbacks $callbacks = [$this, $this->peerDatabase]; if ($this->settings->getDb()->getEnableFileReferenceDb()) { diff --git a/src/MTProtoSession/CallHandler.php b/src/MTProtoSession/CallHandler.php index 8a9479480..a2e266d20 100644 --- a/src/MTProtoSession/CallHandler.php +++ b/src/MTProtoSession/CallHandler.php @@ -110,7 +110,7 @@ trait CallHandler return $readFuture->await(); } private LocalKeyedMutex $abstractionQueueMutex; - private ?int $drop = null; + private ?float $drop = null; /** * Call method and make sure it is asynchronously sent (generator). * @@ -182,7 +182,7 @@ trait CallHandler if (!$encrypted && $this->shared->hasTempAuthKey()) { $encrypted = true; } - $timeout = new TimeoutCancellation($this->drop ??= (float) $this->getAPI()->getSettings()->getRpc()->getRpcDropTimeout()); + $timeout = new TimeoutCancellation($this->drop ??= (float) $this->API->getSettings()->getRpc()->getRpcDropTimeout()); $cancellation = $cancellation !== null ? new CompositeCancellation($cancellation, $timeout) : $timeout; @@ -222,7 +222,7 @@ trait CallHandler { $cancellation = $args['cancellation'] ?? null; $cancellation?->throwIfRequested(); - $timeout = new TimeoutCancellation($this->drop ??= (float) $this->getAPI()->getSettings()->getRpc()->getRpcDropTimeout()); + $timeout = new TimeoutCancellation($this->drop ??= (float) $this->API->getSettings()->getRpc()->getRpcDropTimeout()); $cancellation = $cancellation !== null ? new CompositeCancellation($cancellation, $timeout) : $timeout; diff --git a/src/MTProtoTools/FilesLogic.php b/src/MTProtoTools/FilesLogic.php index 4a99c88c0..7f863fca1 100644 --- a/src/MTProtoTools/FilesLogic.php +++ b/src/MTProtoTools/FilesLogic.php @@ -240,14 +240,7 @@ trait FilesLogic if ($result->shouldServe()) { $pipe = new Pipe(1024 * 1024); [$start, $end] = $result->getServeRange(); - EventLoop::queue(function () use ($messageMedia, $pipe, $cb, $start, $end, $cancellation): void { - try { - $this->downloadToStream($messageMedia, $pipe->getSink(), $cb, $start, $end, $cancellation); - } catch (\Throwable $e) { - $this->logger->logger($e, Logger::ERROR); - } - $pipe->getSink()->close(); - }); + async($this->downloadToStream(...), $messageMedia, $pipe->getSink(), $cb, $start, $end, $cancellation)->finally($pipe->getSink()->close(...)); $body = $pipe->getSource(); } elseif (!\in_array($result->getCode(), [HttpStatus::OK, HttpStatus::PARTIAL_CONTENT], true)) { $body = $result->getCodeExplanation();