From 81b5c5e9aea371674f6a23d25c34bb7232b028d4 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Thu, 27 Jun 2024 18:47:08 +0200 Subject: [PATCH 1/6] TAS hotfixes --- src/Loop/Connection/ReadLoop.php | 2 +- src/MTProto.php | 11 +++++++++-- src/MTProtoTools/FilesLogic.php | 9 ++++++++- src/MTProtoTools/PeerDatabase.php | 17 +++++++++++------ 4 files changed, 29 insertions(+), 10 deletions(-) diff --git a/src/Loop/Connection/ReadLoop.php b/src/Loop/Connection/ReadLoop.php index d43c83b94..d3321d2c1 100644 --- a/src/Loop/Connection/ReadLoop.php +++ b/src/Loop/Connection/ReadLoop.php @@ -71,7 +71,7 @@ final class ReadLoop extends Loop $this->API->logger("Got NothingInTheSocketException in DC {$this->datacenter}, disconnecting because we have nothing to do...", Logger::ERROR); $this->connection->disconnect(true); } else { - $this->API->logger($e); + $this->API->logger($e, Logger::ERROR); $this->API->logger("Got exception in DC {$this->datacenter}, reconnecting...", Logger::ERROR); $this->connection->reconnect(); } diff --git a/src/MTProto.php b/src/MTProto.php index d5ee6ecf7..1bad2f58d 100644 --- a/src/MTProto.php +++ b/src/MTProto.php @@ -408,6 +408,7 @@ final class MTProto implements TLCallback, LoggerGetter, SettingsGetter */ #[OrmMappedArray(KeyType::STRING, ValueType::SCALAR, cacheTtl: 0, optimizeIfWastedMb: 1, tablePostfix: 'session')] public DbArray $sessionDb; + private bool $cleaned = false; /** * Returns an instance of a client by session name. @@ -1003,6 +1004,11 @@ final class MTProto implements TLCallback, LoggerGetter, SettingsGetter $this->updateQueue = $q; } + if ($this->cleaned) { + return; + } + $this->cleaned = true; + if (isset($this->channels_state)) { $this->updateState = new CombinedUpdatesState; foreach ($this->channels_state->get() as $channelId => $state) { @@ -1125,6 +1131,9 @@ final class MTProto implements TLCallback, LoggerGetter, SettingsGetter $this->initPromise = $deferred->getFuture(); try { + // Update settings from constructor + $this->updateSettings($settings); + // Setup logger $this->setupLogger(); @@ -1154,8 +1163,6 @@ final class MTProto implements TLCallback, LoggerGetter, SettingsGetter } // Reset MTProto session (not related to user session) $this->resetMTProtoSession("wakeup"); - // Update settings from constructor - $this->updateSettings($settings); // Update TL callbacks $callbacks = [$this, $this->peerDatabase]; if ($this->settings->getDb()->getEnableFileReferenceDb()) { diff --git a/src/MTProtoTools/FilesLogic.php b/src/MTProtoTools/FilesLogic.php index 746373b71..94290bb84 100644 --- a/src/MTProtoTools/FilesLogic.php +++ b/src/MTProtoTools/FilesLogic.php @@ -240,7 +240,14 @@ trait FilesLogic if ($result->shouldServe()) { $pipe = new Pipe(1024 * 1024); [$start, $end] = $result->getServeRange(); - EventLoop::queue($this->downloadToStream(...), $messageMedia, $pipe->getSink(), $cb, $start, $end, $cancellation); + EventLoop::queue(function() use($messageMedia, $pipe, $cb, $start, $end, $cancellation) { + try { + $this->downloadToStream($messageMedia, $pipe->getSink(), $cb, $start, $end, $cancellation); + } catch (\Throwable $e) { + $this->logger->logger($e, Logger::ERROR); + } + $pipe->getSink()->close(); + }); $body = $pipe->getSource(); } elseif (!\in_array($result->getCode(), [HttpStatus::OK, HttpStatus::PARTIAL_CONTENT], true)) { $body = $result->getCodeExplanation(); diff --git a/src/MTProtoTools/PeerDatabase.php b/src/MTProtoTools/PeerDatabase.php index 34d2405b7..7482cc5ba 100644 --- a/src/MTProtoTools/PeerDatabase.php +++ b/src/MTProtoTools/PeerDatabase.php @@ -266,11 +266,13 @@ final class PeerDatabase implements TLCallback } $new = self::getUsernames($new); $old = $old ? self::getUsernames($old) : []; - $diffToRemove = array_diff($old, $new); - $diffToAdd = array_diff($new, $old); - if (!$diffToAdd && !$diffToRemove) { - return; + foreach ($old as $key => $username) { + if (!isset($this->usernames[$username])) { + unset($old[$key]); + } } + $diffToRemove = array_diff($old, $new); + $diffToAdd = array_diff($new, $diffToRemove); $lock = $this->decacheMutex->acquire(); try { foreach ($diffToRemove as $username) { @@ -410,6 +412,9 @@ final class PeerDatabase implements TLCallback return; } } + + $this->recacheChatUsername($user['id'], $existingChat, $user); + if ($existingChat != $user) { $this->API->logger("Updated user {$user['id']}", Logger::ULTRA_VERBOSE); if (($user['min'] ?? false) && !($existingChat['min'] ?? false)) { @@ -419,7 +424,7 @@ final class PeerDatabase implements TLCallback $user['access_hash'] = $existingChat['access_hash']; } } - $this->recacheChatUsername($user['id'], $existingChat, $user); + if (!$this->API->settings->getDb()->getEnablePeerInfoDb()) { $user = [ '_' => $user['_'], @@ -544,8 +549,8 @@ final class PeerDatabase implements TLCallback return; } } + $this->recacheChatUsername($bot_api_id, $existingChat, $chat); if ($existingChat != $chat) { - $this->recacheChatUsername($bot_api_id, $existingChat, $chat); $this->API->logger("Updated chat {$bot_api_id}", Logger::ULTRA_VERBOSE); if (($chat['min'] ?? false) && $existingChat && !($existingChat['min'] ?? false)) { $this->API->logger("{$bot_api_id} is min, filling missing fields", Logger::ULTRA_VERBOSE); From 9349ba71fe20742980694c279be3b6090c5bc017 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Thu, 27 Jun 2024 19:08:00 +0200 Subject: [PATCH 2/6] Properly sync state --- psalm-baseline.xml | 19 +++++++++++++--- src/Loop/Connection/CheckLoop.php | 5 ++--- src/MTProto/MTProtoOutgoingMessage.php | 1 + src/MTProtoSession/AckHandler.php | 10 +-------- src/MTProtoSession/CallHandler.php | 4 ++-- src/MTProtoSession/ResponseHandler.php | 30 +++++++------------------- 6 files changed, 30 insertions(+), 39 deletions(-) diff --git a/psalm-baseline.xml b/psalm-baseline.xml index 0a8fac111..4d449b2c8 100644 --- a/psalm-baseline.xml +++ b/psalm-baseline.xml @@ -3372,6 +3372,10 @@ state |= self::STATE_REPLIED]]> state |= self::STATE_SENT]]> + + connection->new_outgoing]]> + connection->outgoing_messages]]> + sent]]> sent]]> @@ -3401,9 +3405,6 @@ - - new_outgoing]]> - getSent()]]> getSent()]]> @@ -3517,6 +3518,8 @@ + new_outgoing[$message_id]]]> + new_outgoing[$message_id]]]> outgoing_messages[$message_id]]]> outgoing_messages[$message_id]]]> @@ -3644,6 +3647,12 @@ getMsgId()]]> + + new_outgoing]]> + new_outgoing]]> + new_outgoing]]> + new_outgoing]]> + API->authorized_dc == $this->datacenter && $this->API->authorized === \danog\MadelineProto\API::LOGGED_IN]]> @@ -4176,6 +4185,8 @@ + + @@ -4192,6 +4203,8 @@ + + diff --git a/src/Loop/Connection/CheckLoop.php b/src/Loop/Connection/CheckLoop.php index 36ffb5dd7..94272b110 100644 --- a/src/Loop/Connection/CheckLoop.php +++ b/src/Loop/Connection/CheckLoop.php @@ -23,6 +23,7 @@ namespace danog\MadelineProto\Loop\Connection; use Amp\CancelledException; use Amp\DeferredFuture; use Amp\TimeoutCancellation; +use Amp\TimeoutException; use danog\Loop\Loop; use danog\MadelineProto\Connection; use danog\MadelineProto\Logger; @@ -99,7 +100,7 @@ final class CheckLoop extends Loop case 2: case 3: if ($message->constructor === 'msgs_state_req') { - $this->connection->gotResponseForOutgoingMessage($message); + $message->reply(static fn () => new TimeoutException("Server did not receive message")); break; } $this->API->logger("Message $message not received by server, resending...", Logger::ERROR); @@ -115,8 +116,6 @@ final class CheckLoop extends Loop } elseif ($chr & 32) { if ($message->getSent() + $this->resendTimeout < hrtime(true)) { if ($message->isCancellationRequested()) { - unset($this->connection->new_outgoing[$message_id], $this->connection->outgoing_messages[$message_id]); - $this->API->logger("Cancelling $message...", Logger::ERROR); } else { $this->API->logger("Message $message received by server and is being processed for way too long, resending request...", Logger::ERROR); diff --git a/src/MTProto/MTProtoOutgoingMessage.php b/src/MTProto/MTProtoOutgoingMessage.php index 47ca658a9..6bc3ae8e1 100644 --- a/src/MTProto/MTProtoOutgoingMessage.php +++ b/src/MTProto/MTProtoOutgoingMessage.php @@ -197,6 +197,7 @@ class MTProtoOutgoingMessage extends MTProtoMessage ); } } + unset($this->connection->new_outgoing[$this->msgId], $this->connection->outgoing_messages[$this->msgId]); $this->serializedBody = null; $this->body = null; diff --git a/src/MTProtoSession/AckHandler.php b/src/MTProtoSession/AckHandler.php index fe574bbe7..a88467d9e 100644 --- a/src/MTProtoSession/AckHandler.php +++ b/src/MTProtoSession/AckHandler.php @@ -47,14 +47,6 @@ trait AckHandler } return true; } - /** - * We have gotten a response for an outgoing message. - */ - public function gotResponseForOutgoingMessage(MTProtoOutgoingMessage $outgoingMessage): void - { - // The server acknowledges that it received my message - unset($this->new_outgoing[$outgoingMessage->getMsgId()]); - } /** * Acknowledge incoming message ID. */ @@ -121,7 +113,7 @@ trait AckHandler || ($message->getSent() + $dropTimeout < hrtime(true)) ) { Logger::log('No reply for message: ' . $message, Logger::WARNING); - $this->handleReject($message, static fn () => new TimeoutException('Request timeout')); + $message->reply(static fn () => new TimeoutException('Request timeout')); continue; } if ($message->getState() & MTProtoOutgoingMessage::STATE_REPLIED) { diff --git a/src/MTProtoSession/CallHandler.php b/src/MTProtoSession/CallHandler.php index 7194144e6..ad4e63db6 100644 --- a/src/MTProtoSession/CallHandler.php +++ b/src/MTProtoSession/CallHandler.php @@ -60,7 +60,7 @@ trait CallHandler if ($datacenter) { /** @var MTProtoOutgoingMessage */ $message = $this->outgoing_messages[$message_id]; - $this->gotResponseForOutgoingMessage($message); + unset($this->new_outgoing[$message_id]); $message->setMsgId(null); $message->setSeqNo(null); EventLoop::queue(function () use ($datacenter, $message): void { @@ -71,7 +71,7 @@ trait CallHandler /** @var MTProtoOutgoingMessage */ $message = $this->outgoing_messages[$message_id]; if (!$message->hasSeqNo()) { - $this->gotResponseForOutgoingMessage($message); + unset($this->new_outgoing[$message_id]); } EventLoop::queue($this->sendMessage(...), $message); } diff --git a/src/MTProtoSession/ResponseHandler.php b/src/MTProtoSession/ResponseHandler.php index ea2f14a6a..29b729508 100644 --- a/src/MTProtoSession/ResponseHandler.php +++ b/src/MTProtoSession/ResponseHandler.php @@ -170,14 +170,6 @@ trait ResponseHandler $this->handleMessages([$message]); } } - /** - * @param callable(): \Throwable $data - */ - private function handleReject(MTProtoOutgoingMessage $message, callable $data): void - { - $this->gotResponseForOutgoingMessage($message); - $message->reply($data); - } /** * Handle RPC response. @@ -211,7 +203,7 @@ trait ResponseHandler $exception = static fn (): \Throwable => $e; } if ($exception) { - $this->handleReject($request, $exception); + $request->reply($exception); } return; } @@ -237,7 +229,7 @@ trait ResponseHandler EventLoop::queue($this->methodRecall(...), $requestId); return; } - $this->handleReject($request, static fn () => RPCErrorException::make('Received bad_msg_notification: ' . MTProto::BAD_MSG_ERROR_CODES[$response['error_code']], $response['error_code'], $request->constructor)); + $request->reply(static fn () => RPCErrorException::make('Received bad_msg_notification: ' . MTProto::BAD_MSG_ERROR_CODES[$response['error_code']], $response['error_code'], $request->constructor)); return; } @@ -275,7 +267,6 @@ trait ResponseHandler } } } - $this->gotResponseForOutgoingMessage($request); $this->requestResponse?->inc([ 'method' => $request->constructor, @@ -283,7 +274,7 @@ trait ResponseHandler 'error_code' => '200', ]); - EventLoop::queue($request->reply(...), $response); + $request->reply($response); } /** * @param array{error_message: string, error_code: int} $response @@ -314,8 +305,8 @@ trait ResponseHandler && !$request->shouldRefreshReferences() ) { $this->API->logger("Got {$response['error_message']}, refreshing file reference and repeating method call..."); - $this->gotResponseForOutgoingMessage($request); $msgId = $request->getMsgId(); + unset($this->new_outgoing[$msgId]); $request->setRefreshReferences(true); $request->setMsgId(null); $request->setSeqNo(null); @@ -334,8 +325,8 @@ trait ResponseHandler ) ) { $this->API->logger("Resending $request due to {$response['error_message']}"); - $this->gotResponseForOutgoingMessage($request); $msgId = $request->getMsgId(); + unset($this->new_outgoing[$msgId]); $request->setSent(hrtime(true) + (5*60 * 1_000_000_000)); $request->setMsgId(null); $request->setSeqNo(null); @@ -386,8 +377,8 @@ trait ResponseHandler ) ) { $this->API->logger("Resending $request due to {$response['error_message']}"); - $this->gotResponseForOutgoingMessage($request); $msgId = $request->getMsgId(); + unset($this->new_outgoing[$msgId]); $request->setSent(hrtime(true) + (5*60 * 1_000_000_000)); $request->setMsgId(null); $request->setSeqNo(null); @@ -420,12 +411,7 @@ trait ResponseHandler case 'AUTH_KEY_UNREGISTERED': case 'AUTH_KEY_INVALID': if ($this->API->authorized !== \danog\MadelineProto\API::LOGGED_IN) { - $this->gotResponseForOutgoingMessage($request); - EventLoop::queue( - $this->handleReject(...), - $request, - static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor) - ); + $request->reply(static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor)); return null; } $this->session_id = null; @@ -457,8 +443,8 @@ trait ResponseHandler $limit = $request->floodWaitLimit ?? $this->API->settings->getRPC()->getFloodTimeout(); if ($seconds < $limit) { $this->API->logger("Flood, waiting $seconds seconds before repeating async call of $request...", Logger::NOTICE); - $this->gotResponseForOutgoingMessage($request); $msgId = $request->getMsgId(); + unset($this->new_outgoing[$msgId]); $request->setSent(hrtime(true) + ($seconds * 1_000_000_000)); $request->setMsgId(null); $request->setSeqNo(null); From 4570120aa1be9a50256981bdf98a91f55eec402d Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Thu, 27 Jun 2024 19:14:22 +0200 Subject: [PATCH 3/6] Cleanup --- src/Connection.php | 5 +---- src/MTProto/MTProtoOutgoingMessage.php | 4 +++- src/MTProtoSession/CallHandler.php | 4 ++-- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/Connection.php b/src/Connection.php index 0a00ec07e..93c7db776 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -310,10 +310,7 @@ final class Connection } foreach ($this->new_outgoing as $message_id => $message) { if ($message->unencrypted) { - if (!($message->getState() & MTProtoOutgoingMessage::STATE_REPLIED)) { - $message->reply(static fn () => new Exception('Restart because we were reconnected')); - } - unset($this->new_outgoing[$message_id], $this->outgoing_messages[$message_id]); + $message->reply(static fn () => new Exception('Restart because we were reconnected')); } } Assert::true($this->writer->start(), "Could not start writer stream"); diff --git a/src/MTProto/MTProtoOutgoingMessage.php b/src/MTProto/MTProtoOutgoingMessage.php index 6bc3ae8e1..850287ad3 100644 --- a/src/MTProto/MTProtoOutgoingMessage.php +++ b/src/MTProto/MTProtoOutgoingMessage.php @@ -197,7 +197,9 @@ class MTProtoOutgoingMessage extends MTProtoMessage ); } } - unset($this->connection->new_outgoing[$this->msgId], $this->connection->outgoing_messages[$this->msgId]); + if ($this->msgId !== null) { + unset($this->connection->new_outgoing[$this->msgId], $this->connection->outgoing_messages[$this->msgId]); + } $this->serializedBody = null; $this->body = null; diff --git a/src/MTProtoSession/CallHandler.php b/src/MTProtoSession/CallHandler.php index ad4e63db6..12b4f7ee5 100644 --- a/src/MTProtoSession/CallHandler.php +++ b/src/MTProtoSession/CallHandler.php @@ -60,7 +60,7 @@ trait CallHandler if ($datacenter) { /** @var MTProtoOutgoingMessage */ $message = $this->outgoing_messages[$message_id]; - unset($this->new_outgoing[$message_id]); + unset($this->new_outgoing[$message_id], $this->outgoing_messages[$message_id]); $message->setMsgId(null); $message->setSeqNo(null); EventLoop::queue(function () use ($datacenter, $message): void { @@ -71,7 +71,7 @@ trait CallHandler /** @var MTProtoOutgoingMessage */ $message = $this->outgoing_messages[$message_id]; if (!$message->hasSeqNo()) { - unset($this->new_outgoing[$message_id]); + unset($this->new_outgoing[$message_id], $this->outgoing_messages[$message_id]); } EventLoop::queue($this->sendMessage(...), $message); } From 605ece2eca3501d9ac3e99b55ba28ecd8f4691c3 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Thu, 27 Jun 2024 19:47:01 +0200 Subject: [PATCH 4/6] WIP message refactoring --- psalm-baseline.xml | 4 -- src/Loop/Connection/CheckLoop.php | 12 ++-- src/Loop/Connection/WriteLoop.php | 9 ++- src/MTProto/Container.php | 19 +------ src/MTProtoSession/CallHandler.php | 76 +++++++++++++++----------- src/MTProtoSession/Reliable.php | 4 +- src/MTProtoSession/ResponseHandler.php | 74 +++++++------------------ src/MTProtoTools/FilesLogic.php | 2 +- 8 files changed, 78 insertions(+), 122 deletions(-) diff --git a/psalm-baseline.xml b/psalm-baseline.xml index 4d449b2c8..d2632a20d 100644 --- a/psalm-baseline.xml +++ b/psalm-baseline.xml @@ -3372,10 +3372,6 @@ state |= self::STATE_REPLIED]]> state |= self::STATE_SENT]]> - - connection->new_outgoing]]> - connection->outgoing_messages]]> - sent]]> sent]]> diff --git a/src/Loop/Connection/CheckLoop.php b/src/Loop/Connection/CheckLoop.php index 94272b110..68cd7182a 100644 --- a/src/Loop/Connection/CheckLoop.php +++ b/src/Loop/Connection/CheckLoop.php @@ -104,7 +104,7 @@ final class CheckLoop extends Loop break; } $this->API->logger("Message $message not received by server, resending...", Logger::ERROR); - $this->connection->methodRecall($message_id); + $this->connection->methodRecall($message); break; case 4: if ($chr & 128) { @@ -115,12 +115,8 @@ final class CheckLoop extends Loop $reply[] = $message_id; } elseif ($chr & 32) { if ($message->getSent() + $this->resendTimeout < hrtime(true)) { - if ($message->isCancellationRequested()) { - $this->API->logger("Cancelling $message...", Logger::ERROR); - } else { - $this->API->logger("Message $message received by server and is being processed for way too long, resending request...", Logger::ERROR); - $this->connection->methodRecall($message_id); - } + $this->API->logger("Message $message received by server and is being processed for way too long, resending request...", Logger::ERROR); + $this->connection->methodRecall($message); } else { $this->API->logger("Message $message received by server and is being processed, waiting...", Logger::ERROR); } @@ -146,7 +142,7 @@ final class CheckLoop extends Loop && $message->unencrypted ) { $this->API->logger("Still missing $message on DC {$this->datacenter}, resending", Logger::ERROR); - $this->connection->methodRecall($message->getMsgId()); + $this->connection->methodRecall($message); } } } diff --git a/src/Loop/Connection/WriteLoop.php b/src/Loop/Connection/WriteLoop.php index ecf07b569..61b5ffb8f 100644 --- a/src/Loop/Connection/WriteLoop.php +++ b/src/Loop/Connection/WriteLoop.php @@ -286,7 +286,7 @@ final class WriteLoop extends Loop $total_length += $actual_length; $MTmessage['bytes'] = $body_length; $messages[] = $MTmessage; - $keys[$k] = $message_id; + $keys[$k] = $message; $message->setSeqNo($MTmessage['seqno']) ->setMsgId($MTmessage['msg_id']); @@ -325,10 +325,10 @@ final class WriteLoop extends Loop if ($count > 1 || $has_seq) { $this->API->logger("Wrapping in msg_container ({$count} messages of total size {$total_length}) as encrypted message for DC {$this->datacenter}", Logger::ULTRA_VERBOSE); $message_id = $this->connection->msgIdHandler->generateMessageId(); - $this->connection->pendingOutgoing[$this->connection->pendingOutgoingKey] = new Container($this->connection, array_values($keys)); + $this->connection->pendingOutgoing[$this->connection->pendingOutgoingKey] = $ct = new Container($this->connection, $keys); $this->connection->outgoingCtr?->inc(); $this->connection->pendingOutgoingGauge?->set(\count($this->connection->pendingOutgoing)); - $keys[$this->connection->pendingOutgoingKey++] = $message_id; + $keys[$this->connection->pendingOutgoingKey++] = $ct; $message_data = $this->API->getTL()->serializeObject(['type' => ''], ['_' => 'msg_container', 'messages' => $messages], 'container'); $message_data_length = \strlen($message_data); $seq_no = $this->connection->generateOutSeqNo(false); @@ -364,8 +364,7 @@ final class WriteLoop extends Loop $this->connection->ack_queue = \array_slice($this->connection->ack_queue, $ackCount); } - foreach ($keys as $key => $message_id) { - $message = $this->connection->pendingOutgoing[$key]; + foreach ($keys as $key => $message) { unset($this->connection->pendingOutgoing[$key]); $this->connection->outgoing_messages[$message_id] = $message; if ($message->hasPromise()) { diff --git a/src/MTProto/Container.php b/src/MTProto/Container.php index ef799fb52..e0d0bedd5 100644 --- a/src/MTProto/Container.php +++ b/src/MTProto/Container.php @@ -29,28 +29,13 @@ use danog\MadelineProto\Connection; */ final class Container extends MTProtoOutgoingMessage { - /** - * Message IDs. - * - * @var list - */ - private array $ids = []; - /** * Constructor. * - * @param list $ids + * @param list $msgs */ - public function __construct(Connection $connection, array $ids) + public function __construct(Connection $connection, public readonly array $msgs) { - $this->ids = $ids; parent::__construct($connection, [], 'msg_container', '', false, false); } - /** - * Get message IDs. - */ - public function getIds(): array - { - return $this->ids; - } } diff --git a/src/MTProtoSession/CallHandler.php b/src/MTProtoSession/CallHandler.php index 12b4f7ee5..577256d76 100644 --- a/src/MTProtoSession/CallHandler.php +++ b/src/MTProtoSession/CallHandler.php @@ -21,6 +21,7 @@ declare(strict_types=1); namespace danog\MadelineProto\MTProtoSession; use Amp\DeferredFuture; +use Amp\Future; use Amp\Sync\LocalKeyedMutex; use danog\MadelineProto\DataCenterConnection; use danog\MadelineProto\Logger; @@ -45,39 +46,52 @@ trait CallHandler /** * Recall method. */ - public function methodRecall(int $message_id, ?int $datacenter = null): void + public function methodRecall(MTProtoOutgoingMessage $request, ?int $forceDatacenter = null, float|Future|null $defer = null): void { - if ($datacenter === $this->datacenter) { - $datacenter = null; - } - $message = $this->outgoing_messages[$message_id] ?? null; - $message_ids = $message instanceof Container - ? $message->getIds() - : [$message_id]; - foreach ($message_ids as $message_id) { - if (isset($this->outgoing_messages[$message_id]) - && !$this->outgoing_messages[$message_id]->canGarbageCollect()) { - if ($datacenter) { - /** @var MTProtoOutgoingMessage */ - $message = $this->outgoing_messages[$message_id]; - unset($this->new_outgoing[$message_id], $this->outgoing_messages[$message_id]); - $message->setMsgId(null); - $message->setSeqNo(null); - EventLoop::queue(function () use ($datacenter, $message): void { - $this->API->datacenter->waitGetConnection($datacenter) - ->sendMessage($message); - }); - } else { - /** @var MTProtoOutgoingMessage */ - $message = $this->outgoing_messages[$message_id]; - if (!$message->hasSeqNo()) { - unset($this->new_outgoing[$message_id], $this->outgoing_messages[$message_id]); - } - EventLoop::queue($this->sendMessage(...), $message); - } - } else { - $this->API->logger('Could not resend '.($this->outgoing_messages[$message_id] ?? $message_id)); + $id = $request->getMsgId(); + unset($this->outgoing_messages[$id], $this->new_outgoing[$id]); + if ($request instanceof Container) { + foreach ($request->msgs as $msg) { + $this->methodRecall($msg, $forceDatacenter, $defer); } + return; + } + if ($request->isCancellationRequested()) { + return; + } + if (\is_float($defer)) { + $d = new DeferredFuture; + $id = EventLoop::delay($defer, $d->complete(...)); + $request->cancellation?->subscribe(static fn () => EventLoop::cancel($id)); + $defer = $d; + return; + } + $prev = $request->previousQueuedMessage; + if (!$prev->hasReply()) { + $prev->getResultPromise()->finally( + fn () => $this->methodRecall($request, $this->datacenter, $defer) + ); + return; + } + if ($defer) { + $defer->finally( + fn () => $this->methodRecall($request, $this->datacenter) + ); + return; + } + $datacenter = $forceDatacenter ?? $this->datacenter; + if ($forceDatacenter !== null) { + /** @var MTProtoOutgoingMessage */ + $request->setMsgId(null); + $request->setSeqNo(null); + } + if ($datacenter === $this->datacenter) { + EventLoop::queue($this->sendMessage(...), $request); + } else { + EventLoop::queue(function () use ($datacenter, $request): void { + $this->API->datacenter->waitGetConnection($datacenter) + ->sendMessage($request); + }); } } /** diff --git a/src/MTProtoSession/Reliable.php b/src/MTProtoSession/Reliable.php index 07ff16461..8b34fd071 100644 --- a/src/MTProtoSession/Reliable.php +++ b/src/MTProtoSession/Reliable.php @@ -64,7 +64,9 @@ trait Reliable } if ($ok) { foreach ($content['msg_ids'] as $msg_id) { - $this->methodRecall($msg_id); + if (isset($this->outgoing_messages[$msg_id])) { + $this->methodRecall($this->outgoing_messages[$msg_id]); + } } } else { $this->sendMsgsStateInfo($content['msg_ids'], $current_msg_id); diff --git a/src/MTProtoSession/ResponseHandler.php b/src/MTProtoSession/ResponseHandler.php index 29b729508..964c582ce 100644 --- a/src/MTProtoSession/ResponseHandler.php +++ b/src/MTProtoSession/ResponseHandler.php @@ -20,6 +20,7 @@ declare(strict_types=1); namespace danog\MadelineProto\MTProtoSession; +use Amp\DeferredFuture; use Amp\SignalException; use danog\BetterPrometheus\BetterHistogram; use danog\Loop\Loop; @@ -43,6 +44,8 @@ use Throwable; use const PHP_EOL; +use function Amp\async; + /** * Manages responses. * @@ -212,12 +215,10 @@ trait ResponseHandler switch ($response['error_code']) { case 48: $this->shared->getTempAuthKey()->setServerSalt($response['new_server_salt']); - $this->methodRecall($requestId); + $this->methodRecall($request); return; case 20: - $request->setMsgId(null); - $request->setSeqNo(null); - $this->methodRecall($requestId); + $this->methodRecall($request, $this->datacenter); return; case 16: case 17: @@ -225,8 +226,9 @@ trait ResponseHandler $this->API->logger('Set time delta to ' . $this->time_delta, Logger::WARNING); $this->API->resetMTProtoSession("time delta update"); $this->shared->setTempAuthKey(null); - EventLoop::queue($this->shared->initAuthorization(...)); - EventLoop::queue($this->methodRecall(...), $requestId); + $d = new DeferredFuture; + async($this->shared->initAuthorization(...))->finally($d->complete(...)); + $this->methodRecall($request, $this->datacenter, $d->getFuture()); return; } $request->reply(static fn () => RPCErrorException::make('Received bad_msg_notification: ' . MTProto::BAD_MSG_ERROR_CODES[$response['error_code']], $response['error_code'], $request->constructor)); @@ -305,12 +307,7 @@ trait ResponseHandler && !$request->shouldRefreshReferences() ) { $this->API->logger("Got {$response['error_message']}, refreshing file reference and repeating method call..."); - $msgId = $request->getMsgId(); - unset($this->new_outgoing[$msgId]); - $request->setRefreshReferences(true); - $request->setMsgId(null); - $request->setSeqNo(null); - $this->methodRecall($msgId); + $this->methodRecall($request, $this->datacenter); return null; } @@ -325,28 +322,13 @@ trait ResponseHandler ) ) { $this->API->logger("Resending $request due to {$response['error_message']}"); - $msgId = $request->getMsgId(); - unset($this->new_outgoing[$msgId]); - $request->setSent(hrtime(true) + (5*60 * 1_000_000_000)); - $request->setMsgId(null); - $request->setSeqNo(null); - $prev = $request->previousQueuedMessage; - if ($prev->hasReply()) { - $this->methodRecall($msgId); - } else { - $prev->getResultPromise()->finally( - fn () => $this->methodRecall($msgId) - ); - } + $this->methodRecall($request, $this->datacenter); return null; } if ((($response['error_code'] === -503 || $response['error_message'] === '-503') && !\in_array($request->constructor, ['messages.getBotCallbackAnswer', 'messages.getInlineBotResults'], true)) || (\in_array($response['error_message'], ['MSGID_DECREASE_RETRY', 'HISTORY_GET_FAILED', 'RPC_CONNECT_FAILED', 'RPC_CALL_FAIL', 'RPC_MCGET_FAIL', 'PERSISTENT_TIMESTAMP_OUTDATED', 'RPC_MCGET_FAIL', 'no workers running', 'No workers running'], true))) { $this->API->logger("Resending $request in 1 second due to {$response['error_message']}"); - $msgId = $request->getMsgId(); - $request->setMsgId(null); - $request->setSeqNo(null); - EventLoop::delay(1.0, fn () => $this->methodRecall($msgId)); + $this->methodRecall($request, $this->datacenter, 1.0); return null; } return static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor); @@ -377,20 +359,7 @@ trait ResponseHandler ) ) { $this->API->logger("Resending $request due to {$response['error_message']}"); - $msgId = $request->getMsgId(); - unset($this->new_outgoing[$msgId]); - $request->setSent(hrtime(true) + (5*60 * 1_000_000_000)); - $request->setMsgId(null); - $request->setSeqNo(null); - \assert($msgId !== null); - $prev = $request->previousQueuedMessage; - if ($prev->hasReply()) { - $this->methodRecall($msgId); - } else { - $prev->getResultPromise()->finally( - fn () => $this->methodRecall($msgId) - ); - } + $this->methodRecall($request, $this->datacenter); return null; } return static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor); @@ -427,14 +396,16 @@ trait ResponseHandler $this->API->logout(); return static fn () => new SignalException(sprintf(Lang::$current_lang['account_banned'], $phone)); } - EventLoop::queue($this->shared->initAuthorization(...)); - EventLoop::queue($this->methodRecall(...), $request->getMsgId()); + $deferred = new DeferredFuture; + async($this->shared->initAuthorization(...))->finally($deferred->complete(...)); + $this->methodRecall($request, $this->datacenter, $deferred->getFuture()); return null; case 'AUTH_KEY_PERM_EMPTY': $this->API->logger('Temporary auth key not bound, resetting temporary auth key...', Logger::ERROR); $this->shared->setTempAuthKey(null); - EventLoop::queue($this->shared->initAuthorization(...)); - EventLoop::queue($this->methodRecall(...), $request->getMsgId()); + $deferred = new DeferredFuture; + async($this->shared->initAuthorization(...))->finally($deferred->complete(...)); + $this->methodRecall($request, $this->datacenter, $deferred->getFuture()); return null; } return static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor); @@ -443,14 +414,7 @@ trait ResponseHandler $limit = $request->floodWaitLimit ?? $this->API->settings->getRPC()->getFloodTimeout(); if ($seconds < $limit) { $this->API->logger("Flood, waiting $seconds seconds before repeating async call of $request...", Logger::NOTICE); - $msgId = $request->getMsgId(); - unset($this->new_outgoing[$msgId]); - $request->setSent(hrtime(true) + ($seconds * 1_000_000_000)); - $request->setMsgId(null); - $request->setSeqNo(null); - \assert($msgId !== null); - $id = EventLoop::delay((float) $seconds, fn () => $this->methodRecall($msgId)); - $request->cancellation?->subscribe(static fn () => EventLoop::cancel($id)); + $this->methodRecall($request, $this->datacenter, (float) $seconds); return null; } if (str_starts_with($response['error_message'], 'FLOOD_WAIT_')) { diff --git a/src/MTProtoTools/FilesLogic.php b/src/MTProtoTools/FilesLogic.php index 94290bb84..4a99c88c0 100644 --- a/src/MTProtoTools/FilesLogic.php +++ b/src/MTProtoTools/FilesLogic.php @@ -240,7 +240,7 @@ trait FilesLogic if ($result->shouldServe()) { $pipe = new Pipe(1024 * 1024); [$start, $end] = $result->getServeRange(); - EventLoop::queue(function() use($messageMedia, $pipe, $cb, $start, $end, $cancellation) { + EventLoop::queue(function () use ($messageMedia, $pipe, $cb, $start, $end, $cancellation): void { try { $this->downloadToStream($messageMedia, $pipe->getSink(), $cb, $start, $end, $cancellation); } catch (\Throwable $e) { From a2b836832e5811f40f76e6ab829e935cebc2f4cd Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Thu, 27 Jun 2024 19:48:21 +0200 Subject: [PATCH 5/6] More refactoring --- src/APIWrapper.php | 4 +- src/Connection.php | 44 +------ src/DataCenterConnection.php | 3 - src/Loop/Connection/CheckLoop.php | 158 ------------------------- src/Loop/Connection/ReadLoop.php | 9 +- src/Loop/Connection/WriteLoop.php | 127 ++++++++++++++++++-- src/MTProto/Container.php | 2 +- src/MTProto/MTProtoOutgoingMessage.php | 50 ++++++-- src/MTProtoSession/AckHandler.php | 69 ----------- src/MTProtoSession/CallHandler.php | 17 ++- src/MTProtoSession/ResponseHandler.php | 2 +- src/MTProtoSession/Session.php | 24 +++- 12 files changed, 212 insertions(+), 297 deletions(-) delete mode 100644 src/Loop/Connection/CheckLoop.php diff --git a/src/APIWrapper.php b/src/APIWrapper.php index 4b6f68ab2..10e4293a9 100644 --- a/src/APIWrapper.php +++ b/src/APIWrapper.php @@ -76,13 +76,13 @@ final class APIWrapper return $this->API; } - private ?int $drop = null; + private ?float $drop = null; /** * @internal */ public function getRpcDropCancellation(): Cancellation { - return new TimeoutCancellation($this->drop ??= $this->getAPI()->getSettings()->getRpc()->getRpcDropTimeout()); + return new TimeoutCancellation($this->drop ??= (float) $this->getAPI()->getSettings()->getRpc()->getRpcDropTimeout()); } /** diff --git a/src/Connection.php b/src/Connection.php index 93c7db776..6879122b8 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -72,21 +72,11 @@ final class Connection * */ protected ?ReadLoop $reader = null; - /** - * Checker loop. - * - */ - protected ?CheckLoop $checker = null; /** * Waiter loop. * */ protected ?HttpWaitLoop $waiter = null; - /** - * Ping loop. - * - */ - protected ?PingLoop $pinger = null; /** * Cleanup loop. * @@ -301,26 +291,14 @@ final class Connection $this->httpResCount = 0; $this->writer ??= new WriteLoop($this); $this->reader ??= new ReadLoop($this); - $this->checker ??= new CheckLoop($this); $this->cleanup ??= new CleanupLoop($this); - $this->waiter ??= new HttpWaitLoop($this); $this->handler ??= new GenericLoop(fn () => $this->handleMessages($this->new_incoming), "Handler loop"); - if (!isset($this->pinger) && !$ctx->isMedia() && !$ctx->isCDN() && !$this->isHttp()) { - $this->pinger = new PingLoop($this); - } - foreach ($this->new_outgoing as $message_id => $message) { - if ($message->unencrypted) { - $message->reply(static fn () => new Exception('Restart because we were reconnected')); - } + foreach ($this->unencrypted_new_outgoing as $message_id => $message) { + $message->reply(static fn () => new Exception('Restart because we were reconnected')); } Assert::true($this->writer->start(), "Could not start writer stream"); Assert::true($this->reader->start(), "Could not start reader stream"); - Assert::true($this->checker->start(), "Could not start checker stream"); Assert::true($this->cleanup->start(), "Could not start cleanup stream"); - $this->waiter->start(); - if ($this->pinger) { - Assert::true($this->pinger->start(), "Could not start pinger stream"); - } $this->handler->start(); EventLoop::queue($this->shared->initAuthorization(...)); @@ -583,9 +561,7 @@ final class Connection $this->pendingOutgoing[$this->pendingOutgoingKey++] = $message; $this->outgoingCtr?->inc(); $this->pendingOutgoingGauge?->set(\count($this->pendingOutgoing)); - if (isset($this->writer)) { - $this->writer->resume(); - } + $this->flush(); $this->connect(); $promise->await(); } @@ -598,18 +574,6 @@ final class Connection $this->writer->resume(); } } - /** - * Resume HttpWaiter. - */ - public function pingHttpWaiter(): void - { - if (isset($this->waiter)) { - $this->waiter->resume(); - } - if (isset($this->pinger)) { - $this->pinger->resume(); - } - } /** * Connect main instance. * @@ -660,9 +624,7 @@ final class Connection $this->reader?->stop(); $this->writer?->stop(); - $this->checker?->stop(); $this->cleanup?->stop(); - $this->pinger?->stop(); if (!$temporary) { $this->shared->signalDisconnect($this->id); diff --git a/src/DataCenterConnection.php b/src/DataCenterConnection.php index 303f0bf2f..2c3489c15 100644 --- a/src/DataCenterConnection.php +++ b/src/DataCenterConnection.php @@ -204,9 +204,6 @@ final class DataCenterConnection implements JsonSerializable $logger->logger("Done initing auth for DC {$this->datacenter}", Logger::NOTICE); EventLoop::queue($lock->release(...)); } - if ($this->hasTempAuthKey()) { - $connection->pingHttpWaiter(); - } } /** * Bind temporary and permanent auth keys. diff --git a/src/Loop/Connection/CheckLoop.php b/src/Loop/Connection/CheckLoop.php deleted file mode 100644 index 68cd7182a..000000000 --- a/src/Loop/Connection/CheckLoop.php +++ /dev/null @@ -1,158 +0,0 @@ -. - * - * @author Daniil Gentili - * @copyright 2016-2023 Daniil Gentili - * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 - * @link https://docs.madelineproto.xyz MadelineProto documentation - */ - -namespace danog\MadelineProto\Loop\Connection; - -use Amp\CancelledException; -use Amp\DeferredFuture; -use Amp\TimeoutCancellation; -use Amp\TimeoutException; -use danog\Loop\Loop; -use danog\MadelineProto\Connection; -use danog\MadelineProto\Logger; -use Revolt\EventLoop; - -/** - * RPC call status check loop. - * - * @internal - * - * @author Daniil Gentili - */ -final class CheckLoop extends Loop -{ - use Common { - __construct as initCommon; - } - - private int $resendTimeout; - public function __construct(Connection $connection) - { - $this->initCommon($connection); - $this->resendTimeout = (int) ($this->API->settings->getRpc()->getRpcResendTimeout() * 1_000_000_000.0); - } - /** - * Main loop. - */ - protected function loop(): ?float - { - if (!$this->connection->new_outgoing) { - return self::PAUSE; - } - if (!$this->connection->hasPendingCalls()) { - return $this->timeoutSeconds; - } - if ($this->shared->hasTempAuthKey()) { - $full_message_ids = $this->connection->getPendingCalls(); - foreach (array_chunk($full_message_ids, WriteLoop::MAX_IDS) as $message_ids) { - $deferred = new DeferredFuture(); - $list = ''; - // Don't edit this here pls - foreach ($message_ids as $message_id) { - if (!isset($this->connection->outgoing_messages[$message_id])) { - continue; - } - $list .= $this->connection->outgoing_messages[$message_id]->constructor.', '; - } - $this->API->logger("Still missing {$list} on DC {$this->datacenter}, sending state request", Logger::ERROR); - $this->connection->objectCall('msgs_state_req', ['msg_ids' => $message_ids], $deferred); - EventLoop::queue(function () use ($deferred, $message_ids): void { - try { - $result = $deferred->getFuture()->await(new TimeoutCancellation($this->timeout)); - if (\is_callable($result)) { - throw $result(); - } - $reply = []; - foreach (str_split($result['info']) as $key => $chr) { - $message_id = $message_ids[$key]; - if (!isset($this->connection->outgoing_messages[$message_id])) { - $this->API->logger("Already got response for and forgot about message ID $message_id"); - continue; - } - if (!isset($this->connection->new_outgoing[$message_id])) { - $this->API->logger('Already got response for '.$this->connection->outgoing_messages[$message_id]); - continue; - } - $message = $this->connection->new_outgoing[$message_id]; - $chr = \ord($chr); - switch ($chr & 7) { - case 0: - $this->API->logger("Wrong message status 0 for $message", Logger::FATAL_ERROR); - break; - case 1: - case 2: - case 3: - if ($message->constructor === 'msgs_state_req') { - $message->reply(static fn () => new TimeoutException("Server did not receive message")); - break; - } - $this->API->logger("Message $message not received by server, resending...", Logger::ERROR); - $this->connection->methodRecall($message); - break; - case 4: - if ($chr & 128) { - $this->API->logger("Message $message received by server and was already sent, requesting reply...", Logger::ERROR); - $reply[] = $message_id; - } elseif ($chr & 64) { - $this->API->logger("Message $message received by server and was already processed, requesting reply...", Logger::ERROR); - $reply[] = $message_id; - } elseif ($chr & 32) { - if ($message->getSent() + $this->resendTimeout < hrtime(true)) { - $this->API->logger("Message $message received by server and is being processed for way too long, resending request...", Logger::ERROR); - $this->connection->methodRecall($message); - } else { - $this->API->logger("Message $message received by server and is being processed, waiting...", Logger::ERROR); - } - } else { - $this->API->logger("Message $message received by server, waiting...", Logger::ERROR); - $reply[] = $message_id; - } - } - } - //} catch (CancelledException) { - //$this->API->logger("We did not receive a response for {$this->timeout} seconds: reconnecting and exiting check loop on DC {$this->datacenter}"); - //EventLoop::queue($this->connection->reconnect(...)); - } catch (\Throwable $e) { - $this->API->logger("Got exception in check loop for DC {$this->datacenter}"); - $this->API->logger((string) $e); - } - }); - } - } else { - foreach ($this->connection->new_outgoing as $message_id => $message) { - if ($message->wasSent() - && $message->getSent() + $this->timeout < hrtime(true) - && $message->unencrypted - ) { - $this->API->logger("Still missing $message on DC {$this->datacenter}, resending", Logger::ERROR); - $this->connection->methodRecall($message); - } - } - } - return $this->timeoutSeconds; - } - /** - * Loop name. - */ - public function __toString(): string - { - return "check loop in DC {$this->datacenter}"; - } -} diff --git a/src/Loop/Connection/ReadLoop.php b/src/Loop/Connection/ReadLoop.php index d3321d2c1..f36dd7ede 100644 --- a/src/Loop/Connection/ReadLoop.php +++ b/src/Loop/Connection/ReadLoop.php @@ -63,7 +63,8 @@ final class ReadLoop extends Loop } EventLoop::queue(function () use ($e): void { if ($e instanceof NothingInTheSocketException - && !$this->connection->hasPendingCalls() + && !$this->connection->unencrypted_new_outgoing + && !$this->connection->new_outgoing && $this->connection->isMedia() && !$this->connection->isWriting() && $this->shared->hasTempAuthKey() @@ -93,6 +94,9 @@ final class ReadLoop extends Loop foreach ($this->connection->new_outgoing as $message) { $message->resetSent(); } + foreach ($this->connection->unencrypted_new_outgoing as $message) { + $message->resetSent(); + } $this->shared->reconnect(); } else { $this->connection->reconnect(); @@ -116,9 +120,6 @@ final class ReadLoop extends Loop return self::STOP; } $this->connection->httpReceived(); - if ($this->connection->isHttp()) { - $this->connection->pingHttpWaiter(); - } $this->connection->wakeupHandler(); return self::CONTINUE; } diff --git a/src/Loop/Connection/WriteLoop.php b/src/Loop/Connection/WriteLoop.php index 61b5ffb8f..57ceb29be 100644 --- a/src/Loop/Connection/WriteLoop.php +++ b/src/Loop/Connection/WriteLoop.php @@ -21,7 +21,9 @@ declare(strict_types=1); namespace danog\MadelineProto\Loop\Connection; use Amp\ByteStream\StreamException; +use Amp\DeferredFuture; use danog\Loop\Loop; +use danog\MadelineProto\Connection; use danog\MadelineProto\Logger; use danog\MadelineProto\MTProto\Container; use danog\MadelineProto\MTProto\MTProtoOutgoingMessage; @@ -42,23 +44,46 @@ final class WriteLoop extends Loop { private const MAX_COUNT = 1020; private const MAX_SIZE = 1 << 15; + private const LONG_POLL_TIMEOUT = 30; + private const LONG_POLL_TIMEOUT_MS = self::LONG_POLL_TIMEOUT*1000; public const MAX_IDS = 8192; - use Common; + use Common { + __construct as init; + } + + private int $pingTimeout; + private float $timeout; + /** + * Constructor function. + */ + public function __construct(Connection $connection) + { + $this->init($connection); + $timeout = $this->shared->getSettings()->getPingInterval(); + $this->pingTimeout = $timeout + 15; + + if ($this->connection->isHttp()) { + $this->timeout = (float) max(self::LONG_POLL_TIMEOUT, $timeout); + } else { + $this->timeout = (float) $timeout; + } + } /** * Main loop. */ public function loop(): ?float { $please_wait = false; + $first = true; while (true) { if ($this->connection->shouldReconnect()) { $this->API->logger("Exiting $this because connection is old"); return self::STOP; } - if (!$this->connection->pendingOutgoing) { + if (!$this->connection->pendingOutgoing && !$first) { $this->API->logger("No messages, pausing in $this...", Logger::ULTRA_VERBOSE); - return self::PAUSE; + return $this->timeout; } if ($please_wait) { $this->API->logger("Have to wait for handshake, pausing in $this...", Logger::ULTRA_VERBOSE); @@ -67,7 +92,7 @@ final class WriteLoop extends Loop $this->connection->writing(true); try { $please_wait = $this->shared->hasTempAuthKey() - ? $this->encryptedWriteLoop() + ? $this->encryptedWriteLoop($first) : $this->unencryptedWriteLoop(); } catch (StreamException $e) { if ($this->connection->shouldReconnect()) { @@ -87,10 +112,17 @@ final class WriteLoop extends Loop } finally { $this->connection->writing(false); } + $first = false; } } public function unencryptedWriteLoop(): bool { + if ($queue = $this->connection->unencrypted_check_queue) { + $this->connection->unencrypted_check_queue = []; + foreach ($queue as $msg_id => $_) { + $this->connection->methodRecall($msg_id); + } + } while ($this->connection->pendingOutgoing) { $skipped_all = true; foreach ($this->connection->pendingOutgoing as $k => $message) { @@ -123,7 +155,7 @@ final class WriteLoop extends Loop $this->connection->pendingOutgoingGauge?->set(\count($this->connection->pendingOutgoing)); $message->setMsgId($message_id); $this->connection->outgoing_messages[$message_id] = $message; - $this->connection->new_outgoing[$message_id] = $message; + $this->connection->unencrypted_new_outgoing[$message_id] = $message; $message->sent(); } @@ -133,16 +165,97 @@ final class WriteLoop extends Loop } return false; } - public function encryptedWriteLoop(): bool + public function encryptedWriteLoop(bool $first): bool { do { if (!$this->shared->hasTempAuthKey()) { return false; } - if ($this->connection->isHttp() && empty($this->connection->pendingOutgoing)) { + if (!$first && !$this->connection->pendingOutgoing) { return false; } + foreach ($this->connection->check_queue as $msg_id => $_) { + $deferred = new DeferredFuture(); + $list = ''; + // Don't edit this here pls + foreach ($message_ids as $message_id) { + if (!isset($this->connection->outgoing_messages[$message_id])) { + continue; + } + $list .= $this->connection->outgoing_messages[$message_id]->constructor.', '; + } + $this->API->logger("Still missing {$list} on DC {$this->datacenter}, sending state request", Logger::ERROR); + $this->connection->objectCall('msgs_state_req', ['msg_ids' => $message_ids], $deferred); + EventLoop::queue(function () use ($deferred, $message_ids): void { + try { + $result = $deferred->getFuture()->await(new TimeoutCancellation($this->timeout)); + if (\is_callable($result)) { + throw $result(); + } + $reply = []; + foreach (str_split($result['info']) as $key => $chr) { + $message_id = $message_ids[$key]; + if (!isset($this->connection->outgoing_messages[$message_id])) { + $this->API->logger("Already got response for and forgot about message ID $message_id"); + continue; + } + if (!isset($this->connection->new_outgoing[$message_id])) { + $this->API->logger('Already got response for '.$this->connection->outgoing_messages[$message_id]); + continue; + } + $message = $this->connection->new_outgoing[$message_id]; + $chr = \ord($chr); + switch ($chr & 7) { + case 0: + $this->API->logger("Wrong message status 0 for $message", Logger::FATAL_ERROR); + break; + case 1: + case 2: + case 3: + if ($message->constructor === 'msgs_state_req') { + $this->connection->gotResponseForOutgoingMessage($message); + break; + } + $this->API->logger("Message $message not received by server, resending...", Logger::ERROR); + $this->connection->methodRecall($message_id); + break; + case 4: + if ($chr & 128) { + $this->API->logger("Message $message received by server and was already sent, requesting reply...", Logger::ERROR); + $reply[] = $message_id; + } elseif ($chr & 64) { + $this->API->logger("Message $message received by server and was already processed, requesting reply...", Logger::ERROR); + $reply[] = $message_id; + } elseif ($chr & 32) { + if ($message->getSent() + $this->resendTimeout < hrtime(true)) { + if ($message->cancellation?->isRequested()) { + unset($this->connection->new_outgoing[$message_id], $this->connection->outgoing_messages[$message_id]); + + $this->API->logger("Cancelling $message...", Logger::ERROR); + } else { + $this->API->logger("Message $message received by server and is being processed for way too long, resending request...", Logger::ERROR); + $this->connection->methodRecall($message_id); + } + } else { + $this->API->logger("Message $message received by server and is being processed, waiting...", Logger::ERROR); + } + } else { + $this->API->logger("Message $message received by server, waiting...", Logger::ERROR); + $reply[] = $message_id; + } + } + } + //} catch (CancelledException) { + //$this->API->logger("We did not receive a response for {$this->timeout} seconds: reconnecting and exiting check loop on DC {$this->datacenter}"); + //EventLoop::queue($this->connection->reconnect(...)); + } catch (\Throwable $e) { + $this->API->logger("Got exception in check loop for DC {$this->datacenter}"); + $this->API->logger((string) $e); + } + }); + } + ksort($this->connection->pendingOutgoing); $messages = []; diff --git a/src/MTProto/Container.php b/src/MTProto/Container.php index e0d0bedd5..29c9f824a 100644 --- a/src/MTProto/Container.php +++ b/src/MTProto/Container.php @@ -36,6 +36,6 @@ final class Container extends MTProtoOutgoingMessage */ public function __construct(Connection $connection, public readonly array $msgs) { - parent::__construct($connection, [], 'msg_container', '', false, false); + parent::__construct($connection, [], 'msg_container', '', false, false, null); } } diff --git a/src/MTProto/MTProtoOutgoingMessage.php b/src/MTProto/MTProtoOutgoingMessage.php index 850287ad3..f3e65a3c1 100644 --- a/src/MTProto/MTProtoOutgoingMessage.php +++ b/src/MTProto/MTProtoOutgoingMessage.php @@ -89,6 +89,8 @@ class MTProtoOutgoingMessage extends MTProtoMessage */ private int $tries = 0; + private ?string $checkTimer = null; + /** * Whether this message is related to a user, as in getting a successful reply means we have auth. */ @@ -110,6 +112,7 @@ class MTProtoOutgoingMessage extends MTProtoMessage public readonly string $type, public readonly bool $isMethod, public readonly bool $unencrypted, + public readonly ?Cancellation $cancellation, public readonly ?string $subtype = null, /** * Whether this message is related to a file upload, as in getting a redirect should redirect to a media server. @@ -126,7 +129,6 @@ class MTProtoOutgoingMessage extends MTProtoMessage public readonly ?int $takeoutId = null, public readonly ?string $businessConnectionId = null, private ?DeferredFuture $resultDeferred = null, - public readonly ?Cancellation $cancellation = null ) { $this->userRelated = $constructor === 'users.getUsers' && $body === ['id' => [['_' => 'inputUserSelf']]] || $constructor === 'auth.exportAuthorization' || $constructor === 'updates.getDifference'; @@ -134,14 +136,6 @@ class MTProtoOutgoingMessage extends MTProtoMessage $cancellation?->subscribe(fn (CancelledException $e) => $this->reply(static fn () => throw $e)); } - /** - * Whether cancellation is requested. - */ - public function isCancellationRequested(): bool - { - return $this->cancellation?->isRequested() ?? false; - } - /** * Signal that we're trying to send the message. */ @@ -164,12 +158,46 @@ class MTProtoOutgoingMessage extends MTProtoMessage } $this->state |= self::STATE_SENT; $this->sent = hrtime(true); + $this->checkTimer = EventLoop::delay( + $this->connection->API->getSettings()->getConnection()->getTimeout(), + $this->check(...) + ); if (isset($this->sendDeferred)) { $sendDeferred = $this->sendDeferred; $this->sendDeferred = null; $sendDeferred->complete(); } } + private function check(): void + { + if ($this->state & self::STATE_REPLIED) { + return; + } + $shared = $this->connection->getShared(); + $settings = $shared->getSettings(); + $global = $shared->getGenericSettings(); + $timeout = (float) $settings->getTimeout(); + $pfs = $global->getAuth()->getPfs(); + $unencrypted = !$shared->hasTempAuthKey(); + $notBound = !$shared->isBound(); + $pfsNotBound = $pfs && $notBound; + $this->checkTimer = EventLoop::delay( + $timeout, + $this->check(...) + ); + + if ($this->unencrypted === $unencrypted) { + if (!$unencrypted && $pfsNotBound && $this->constructor !== 'auth.bindTempAuthKey') { + return; + } + if ($unencrypted) { + $this->connection->unencrypted_check_queue[$this->msgId] = true; + } else { + $this->connection->check_queue[$this->msgId] = true; + } + $this->connection->flush(); + } + } /** * Set reply to message. * @@ -185,6 +213,10 @@ class MTProtoOutgoingMessage extends MTProtoMessage if (!($this->state & self::STATE_SENT)) { $this->sent(); } + if ($this->checkTimer !== null) { + EventLoop::cancel($this->checkTimer); + $this->checkTimer = null; + } if ($this->isMethod) { $this->connection->inFlightGauge?->dec([ diff --git a/src/MTProtoSession/AckHandler.php b/src/MTProtoSession/AckHandler.php index a88467d9e..0cd082489 100644 --- a/src/MTProtoSession/AckHandler.php +++ b/src/MTProtoSession/AckHandler.php @@ -20,7 +20,6 @@ declare(strict_types=1); namespace danog\MadelineProto\MTProtoSession; -use Amp\TimeoutException; use danog\MadelineProto\DataCenterConnection; use danog\MadelineProto\Logger; use danog\MadelineProto\MTProto\MTProtoIncomingMessage; @@ -58,72 +57,4 @@ trait AckHandler // I let the server know that I received its message $this->ack_queue[$message_id] = $message_id; } - - /** - * Check if there are some pending calls. - */ - public function hasPendingCalls(): bool - { - $timeout = (int) ($this->shared->getSettings()->getTimeout() * 1_000_000_000.0); - $pfs = $this->shared->getGenericSettings()->getAuth()->getPfs(); - $unencrypted = !$this->shared->hasTempAuthKey(); - $notBound = !$this->shared->isBound(); - $pfsNotBound = $pfs && $notBound; - /** @var MTProtoOutgoingMessage */ - foreach ($this->new_outgoing as $message) { - if ($message->wasSent() - && $message->getSent() + $timeout < hrtime(true) - && $message->unencrypted === $unencrypted - && $message->constructor !== 'msgs_state_req') { - if (!$unencrypted && $pfsNotBound && $message->constructor !== 'auth.bindTempAuthKey') { - continue; - } - return true; - } - } - return false; - } - /** - * Get all pending calls (also clear pending state requests). - */ - public function getPendingCalls(): array - { - $settings = $this->shared->getSettings(); - $global = $this->shared->getGenericSettings(); - $dropTimeout = (int) ($global->getRpc()->getRpcDropTimeout() * 1_000_000_000.0); - $timeout = (int) ($settings->getTimeout() * 1_000_000_000.0); - $pfs = $global->getAuth()->getPfs(); - $unencrypted = !$this->shared->hasTempAuthKey(); - $notBound = !$this->shared->isBound(); - $pfsNotBound = $pfs && $notBound; - if ($this->datacenter < 0) { - $dropTimeout *= 10; - } - $result = []; - /** @var MTProtoOutgoingMessage $message */ - foreach ($this->new_outgoing as $message_id => $message) { - if ($message->wasSent() - && $message->getSent() + $timeout < hrtime(true) - && $message->unencrypted === $unencrypted - ) { - if (!$unencrypted && $pfsNotBound && $message->constructor !== 'auth.bindTempAuthKey') { - continue; - } - if ($message->constructor === 'msgs_state_req' || $message->constructor === 'ping_delay_disconnect' - || ($message->getSent() + $dropTimeout < hrtime(true)) - ) { - Logger::log('No reply for message: ' . $message, Logger::WARNING); - $message->reply(static fn () => new TimeoutException('Request timeout')); - continue; - } - if ($message->getState() & MTProtoOutgoingMessage::STATE_REPLIED) { - $this->API->logger("Already replied to message $message, but still in new_outgoing"); - unset($this->new_outgoing[$message_id]); - continue; - } - $result[] = $message_id; - } - } - return $result; - } } diff --git a/src/MTProtoSession/CallHandler.php b/src/MTProtoSession/CallHandler.php index 577256d76..8a9479480 100644 --- a/src/MTProtoSession/CallHandler.php +++ b/src/MTProtoSession/CallHandler.php @@ -20,11 +20,14 @@ declare(strict_types=1); namespace danog\MadelineProto\MTProtoSession; +use Amp\CompositeCancellation; use Amp\DeferredFuture; use Amp\Future; use Amp\Sync\LocalKeyedMutex; +use Amp\TimeoutCancellation; use danog\MadelineProto\DataCenterConnection; use danog\MadelineProto\Logger; +use danog\MadelineProto\MTProto; use danog\MadelineProto\MTProto\Container; use danog\MadelineProto\MTProto\MTProtoOutgoingMessage; use danog\MadelineProto\TL\Exception; @@ -39,6 +42,7 @@ use function Amp\Future\await; * * * @property DataCenterConnection $shared + * @property MTProto $API * @internal */ trait CallHandler @@ -106,6 +110,7 @@ trait CallHandler return $readFuture->await(); } private LocalKeyedMutex $abstractionQueueMutex; + private ?int $drop = null; /** * Call method and make sure it is asynchronously sent (generator). * @@ -177,6 +182,10 @@ trait CallHandler if (!$encrypted && $this->shared->hasTempAuthKey()) { $encrypted = true; } + $timeout = new TimeoutCancellation($this->drop ??= (float) $this->getAPI()->getSettings()->getRpc()->getRpcDropTimeout()); + $cancellation = $cancellation !== null + ? new CompositeCancellation($cancellation, $timeout) + : $timeout; $message = new MTProtoOutgoingMessage( connection: $this, body: $args, @@ -201,7 +210,6 @@ trait CallHandler $message->setMsgId($args['madelineMsgId']); } $this->sendMessage($message); - $this->checker->resume(); return new WrappedFuture($response->getFuture()); } /** @@ -212,8 +220,15 @@ trait CallHandler */ public function objectCall(string $object, array $args, ?DeferredFuture $promise = null): void { + $cancellation = $args['cancellation'] ?? null; + $cancellation?->throwIfRequested(); + $timeout = new TimeoutCancellation($this->drop ??= (float) $this->getAPI()->getSettings()->getRpc()->getRpcDropTimeout()); + $cancellation = $cancellation !== null + ? new CompositeCancellation($cancellation, $timeout) + : $timeout; $this->sendMessage( new MTProtoOutgoingMessage( + cancellation: $cancellation, connection: $this, body: $args, constructor: $object, diff --git a/src/MTProtoSession/ResponseHandler.php b/src/MTProtoSession/ResponseHandler.php index 964c582ce..c651c4510 100644 --- a/src/MTProtoSession/ResponseHandler.php +++ b/src/MTProtoSession/ResponseHandler.php @@ -117,7 +117,7 @@ trait ResponseHandler } $this->API->logger('Trying to assign a response of type ' . $response_type . ' to its request...', Logger::VERBOSE); - foreach ($this->new_outgoing as $expecting_msg_id => $expecting) { + foreach ($this->unencrypted_new_outgoing as $expecting_msg_id => $expecting) { if (!$expecting->type) { continue; } diff --git a/src/MTProtoSession/Session.php b/src/MTProtoSession/Session.php index 8225f25e5..6d190fef2 100644 --- a/src/MTProtoSession/Session.php +++ b/src/MTProtoSession/Session.php @@ -77,6 +77,12 @@ trait Session * @var array */ public array $new_outgoing = []; + /** + * New unencrypted outgoing message array. + * + * @var array + */ + public array $unencrypted_new_outgoing = []; /** * Pending outgoing messages. * @@ -104,6 +110,16 @@ trait Session * */ public array $ack_queue = []; + /** + * Check queue. + * + */ + public array $check_queue = []; + /** + * Check queue. + * + */ + public array $unencrypted_check_queue = []; /** * Message ID handler. * @@ -178,6 +194,12 @@ trait Session $new_outgoing[$key] = $message; } $this->new_outgoing = $new_outgoing; + + $unencrypted_new_outgoing = []; + foreach ($this->unencrypted_new_outgoing as $key => $message) { + $unencrypted_new_outgoing[$key] = $message; + } + $this->unencrypted_new_outgoing = $unencrypted_new_outgoing; } /** * Create MTProto session if needed. @@ -227,6 +249,6 @@ trait Session public function backupSession(): array { $pending = array_values($this->pendingOutgoing); - return array_merge($pending, $this->new_outgoing); + return array_merge($pending, $this->new_outgoing, $this->unencrypted_new_outgoing); } } From 1efb43cbdefff74ba24fb47e6c723a4342fe98ad Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Mon, 21 Oct 2024 15:50:43 +0000 Subject: [PATCH 6/6] Misc fixes --- src/Loop/Connection/WriteLoop.php | 20 ++++++++++---------- src/MTProto.php | 11 ++--------- src/MTProtoSession/CallHandler.php | 6 +++--- src/MTProtoTools/FilesLogic.php | 9 +-------- 4 files changed, 16 insertions(+), 30 deletions(-) diff --git a/src/Loop/Connection/WriteLoop.php b/src/Loop/Connection/WriteLoop.php index 55114791b..4484d81f8 100644 --- a/src/Loop/Connection/WriteLoop.php +++ b/src/Loop/Connection/WriteLoop.php @@ -49,24 +49,24 @@ final class WriteLoop extends Loop public const MAX_IDS = 8192; use Common { - __construct as init; + __construct as init2; } private int $pingTimeout; - private float $timeout; + private float $pollTimeout; /** * Constructor function. */ public function __construct(Connection $connection) { - $this->init($connection); + $this->init2($connection); $timeout = $this->shared->getSettings()->getPingInterval(); $this->pingTimeout = $timeout + 15; if ($this->connection->isHttp()) { - $this->timeout = (float) max(self::LONG_POLL_TIMEOUT, $timeout); + $this->pollTimeout = (float) max(self::LONG_POLL_TIMEOUT, $timeout); } else { - $this->timeout = (float) $timeout; + $this->pollTimeout = (float) $timeout; } } /** @@ -83,7 +83,7 @@ final class WriteLoop extends Loop } if (!$this->connection->pendingOutgoing && !$first) { $this->API->logger("No messages, pausing in $this...", Logger::ULTRA_VERBOSE); - return $this->timeout; + return $this->pollTimeout; } if ($please_wait) { $this->API->logger("Have to wait for handshake, pausing in $this...", Logger::ULTRA_VERBOSE); @@ -119,8 +119,8 @@ final class WriteLoop extends Loop { if ($queue = $this->connection->unencrypted_check_queue) { $this->connection->unencrypted_check_queue = []; - foreach ($queue as $msg_id => $_) { - $this->connection->methodRecall($msg_id); + foreach ($queue as $msg) { + $this->connection->methodRecall($msg); } } while ($this->connection->pendingOutgoing) { @@ -189,7 +189,7 @@ final class WriteLoop extends Loop $this->connection->objectCall('msgs_state_req', ['msg_ids' => $message_ids], $deferred); EventLoop::queue(function () use ($deferred, $message_ids): void { try { - $result = $deferred->getFuture()->await(new TimeoutCancellation($this->timeout)); + $result = $deferred->getFuture()->await(new TimeoutCancellation($this->pollTimeout)); if (\is_callable($result)) { throw $result(); } @@ -247,7 +247,7 @@ final class WriteLoop extends Loop } } //} catch (CancelledException) { - //$this->API->logger("We did not receive a response for {$this->timeout} seconds: reconnecting and exiting check loop on DC {$this->datacenter}"); + //$this->API->logger("We did not receive a response for {$this->pollTimeout} seconds: reconnecting and exiting check loop on DC {$this->datacenter}"); //EventLoop::queue($this->connection->reconnect(...)); } catch (\Throwable $e) { $this->API->logger("Got exception in check loop for DC {$this->datacenter}"); diff --git a/src/MTProto.php b/src/MTProto.php index 1bad2f58d..d5ee6ecf7 100644 --- a/src/MTProto.php +++ b/src/MTProto.php @@ -408,7 +408,6 @@ final class MTProto implements TLCallback, LoggerGetter, SettingsGetter */ #[OrmMappedArray(KeyType::STRING, ValueType::SCALAR, cacheTtl: 0, optimizeIfWastedMb: 1, tablePostfix: 'session')] public DbArray $sessionDb; - private bool $cleaned = false; /** * Returns an instance of a client by session name. @@ -1004,11 +1003,6 @@ final class MTProto implements TLCallback, LoggerGetter, SettingsGetter $this->updateQueue = $q; } - if ($this->cleaned) { - return; - } - $this->cleaned = true; - if (isset($this->channels_state)) { $this->updateState = new CombinedUpdatesState; foreach ($this->channels_state->get() as $channelId => $state) { @@ -1131,9 +1125,6 @@ final class MTProto implements TLCallback, LoggerGetter, SettingsGetter $this->initPromise = $deferred->getFuture(); try { - // Update settings from constructor - $this->updateSettings($settings); - // Setup logger $this->setupLogger(); @@ -1163,6 +1154,8 @@ final class MTProto implements TLCallback, LoggerGetter, SettingsGetter } // Reset MTProto session (not related to user session) $this->resetMTProtoSession("wakeup"); + // Update settings from constructor + $this->updateSettings($settings); // Update TL callbacks $callbacks = [$this, $this->peerDatabase]; if ($this->settings->getDb()->getEnableFileReferenceDb()) { diff --git a/src/MTProtoSession/CallHandler.php b/src/MTProtoSession/CallHandler.php index 8a9479480..a2e266d20 100644 --- a/src/MTProtoSession/CallHandler.php +++ b/src/MTProtoSession/CallHandler.php @@ -110,7 +110,7 @@ trait CallHandler return $readFuture->await(); } private LocalKeyedMutex $abstractionQueueMutex; - private ?int $drop = null; + private ?float $drop = null; /** * Call method and make sure it is asynchronously sent (generator). * @@ -182,7 +182,7 @@ trait CallHandler if (!$encrypted && $this->shared->hasTempAuthKey()) { $encrypted = true; } - $timeout = new TimeoutCancellation($this->drop ??= (float) $this->getAPI()->getSettings()->getRpc()->getRpcDropTimeout()); + $timeout = new TimeoutCancellation($this->drop ??= (float) $this->API->getSettings()->getRpc()->getRpcDropTimeout()); $cancellation = $cancellation !== null ? new CompositeCancellation($cancellation, $timeout) : $timeout; @@ -222,7 +222,7 @@ trait CallHandler { $cancellation = $args['cancellation'] ?? null; $cancellation?->throwIfRequested(); - $timeout = new TimeoutCancellation($this->drop ??= (float) $this->getAPI()->getSettings()->getRpc()->getRpcDropTimeout()); + $timeout = new TimeoutCancellation($this->drop ??= (float) $this->API->getSettings()->getRpc()->getRpcDropTimeout()); $cancellation = $cancellation !== null ? new CompositeCancellation($cancellation, $timeout) : $timeout; diff --git a/src/MTProtoTools/FilesLogic.php b/src/MTProtoTools/FilesLogic.php index 4a99c88c0..7f863fca1 100644 --- a/src/MTProtoTools/FilesLogic.php +++ b/src/MTProtoTools/FilesLogic.php @@ -240,14 +240,7 @@ trait FilesLogic if ($result->shouldServe()) { $pipe = new Pipe(1024 * 1024); [$start, $end] = $result->getServeRange(); - EventLoop::queue(function () use ($messageMedia, $pipe, $cb, $start, $end, $cancellation): void { - try { - $this->downloadToStream($messageMedia, $pipe->getSink(), $cb, $start, $end, $cancellation); - } catch (\Throwable $e) { - $this->logger->logger($e, Logger::ERROR); - } - $pipe->getSink()->close(); - }); + async($this->downloadToStream(...), $messageMedia, $pipe->getSink(), $cb, $start, $end, $cancellation)->finally($pipe->getSink()->close(...)); $body = $pipe->getSource(); } elseif (!\in_array($result->getCode(), [HttpStatus::OK, HttpStatus::PARTIAL_CONTENT], true)) { $body = $result->getCodeExplanation();