diff --git a/psalm-baseline.xml b/psalm-baseline.xml index 43d08ca08..370257444 100644 --- a/psalm-baseline.xml +++ b/psalm-baseline.xml @@ -3415,9 +3415,6 @@ - - new_outgoing]]> - getSent()]]> getSent()]]> @@ -3531,6 +3528,8 @@ + new_outgoing[$message_id]]]> + new_outgoing[$message_id]]]> outgoing_messages[$message_id]]]> outgoing_messages[$message_id]]]> @@ -3658,6 +3657,12 @@ getMsgId()]]> + + new_outgoing]]> + new_outgoing]]> + new_outgoing]]> + new_outgoing]]> + API->authorized_dc == $this->datacenter && $this->API->authorized === \danog\MadelineProto\API::LOGGED_IN]]> @@ -4190,6 +4195,8 @@ + + @@ -4206,6 +4213,8 @@ + + diff --git a/src/APIWrapper.php b/src/APIWrapper.php index 4b6f68ab2..10e4293a9 100644 --- a/src/APIWrapper.php +++ b/src/APIWrapper.php @@ -76,13 +76,13 @@ final class APIWrapper return $this->API; } - private ?int $drop = null; + private ?float $drop = null; /** * @internal */ public function getRpcDropCancellation(): Cancellation { - return new TimeoutCancellation($this->drop ??= $this->getAPI()->getSettings()->getRpc()->getRpcDropTimeout()); + return new TimeoutCancellation($this->drop ??= (float) $this->getAPI()->getSettings()->getRpc()->getRpcDropTimeout()); } /** diff --git a/src/Connection.php b/src/Connection.php index 0a00ec07e..6879122b8 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -72,21 +72,11 @@ final class Connection * */ protected ?ReadLoop $reader = null; - /** - * Checker loop. - * - */ - protected ?CheckLoop $checker = null; /** * Waiter loop. * */ protected ?HttpWaitLoop $waiter = null; - /** - * Ping loop. - * - */ - protected ?PingLoop $pinger = null; /** * Cleanup loop. * @@ -301,29 +291,14 @@ final class Connection $this->httpResCount = 0; $this->writer ??= new WriteLoop($this); $this->reader ??= new ReadLoop($this); - $this->checker ??= new CheckLoop($this); $this->cleanup ??= new CleanupLoop($this); - $this->waiter ??= new HttpWaitLoop($this); $this->handler ??= new GenericLoop(fn () => $this->handleMessages($this->new_incoming), "Handler loop"); - if (!isset($this->pinger) && !$ctx->isMedia() && !$ctx->isCDN() && !$this->isHttp()) { - $this->pinger = new PingLoop($this); - } - foreach ($this->new_outgoing as $message_id => $message) { - if ($message->unencrypted) { - if (!($message->getState() & MTProtoOutgoingMessage::STATE_REPLIED)) { - $message->reply(static fn () => new Exception('Restart because we were reconnected')); - } - unset($this->new_outgoing[$message_id], $this->outgoing_messages[$message_id]); - } + foreach ($this->unencrypted_new_outgoing as $message_id => $message) { + $message->reply(static fn () => new Exception('Restart because we were reconnected')); } Assert::true($this->writer->start(), "Could not start writer stream"); Assert::true($this->reader->start(), "Could not start reader stream"); - Assert::true($this->checker->start(), "Could not start checker stream"); Assert::true($this->cleanup->start(), "Could not start cleanup stream"); - $this->waiter->start(); - if ($this->pinger) { - Assert::true($this->pinger->start(), "Could not start pinger stream"); - } $this->handler->start(); EventLoop::queue($this->shared->initAuthorization(...)); @@ -586,9 +561,7 @@ final class Connection $this->pendingOutgoing[$this->pendingOutgoingKey++] = $message; $this->outgoingCtr?->inc(); $this->pendingOutgoingGauge?->set(\count($this->pendingOutgoing)); - if (isset($this->writer)) { - $this->writer->resume(); - } + $this->flush(); $this->connect(); $promise->await(); } @@ -601,18 +574,6 @@ final class Connection $this->writer->resume(); } } - /** - * Resume HttpWaiter. - */ - public function pingHttpWaiter(): void - { - if (isset($this->waiter)) { - $this->waiter->resume(); - } - if (isset($this->pinger)) { - $this->pinger->resume(); - } - } /** * Connect main instance. * @@ -663,9 +624,7 @@ final class Connection $this->reader?->stop(); $this->writer?->stop(); - $this->checker?->stop(); $this->cleanup?->stop(); - $this->pinger?->stop(); if (!$temporary) { $this->shared->signalDisconnect($this->id); diff --git a/src/DataCenterConnection.php b/src/DataCenterConnection.php index 303f0bf2f..2c3489c15 100644 --- a/src/DataCenterConnection.php +++ b/src/DataCenterConnection.php @@ -204,9 +204,6 @@ final class DataCenterConnection implements JsonSerializable $logger->logger("Done initing auth for DC {$this->datacenter}", Logger::NOTICE); EventLoop::queue($lock->release(...)); } - if ($this->hasTempAuthKey()) { - $connection->pingHttpWaiter(); - } } /** * Bind temporary and permanent auth keys. diff --git a/src/Loop/Connection/CheckLoop.php b/src/Loop/Connection/CheckLoop.php deleted file mode 100644 index 36ffb5dd7..000000000 --- a/src/Loop/Connection/CheckLoop.php +++ /dev/null @@ -1,163 +0,0 @@ -. - * - * @author Daniil Gentili - * @copyright 2016-2023 Daniil Gentili - * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 - * @link https://docs.madelineproto.xyz MadelineProto documentation - */ - -namespace danog\MadelineProto\Loop\Connection; - -use Amp\CancelledException; -use Amp\DeferredFuture; -use Amp\TimeoutCancellation; -use danog\Loop\Loop; -use danog\MadelineProto\Connection; -use danog\MadelineProto\Logger; -use Revolt\EventLoop; - -/** - * RPC call status check loop. - * - * @internal - * - * @author Daniil Gentili - */ -final class CheckLoop extends Loop -{ - use Common { - __construct as initCommon; - } - - private int $resendTimeout; - public function __construct(Connection $connection) - { - $this->initCommon($connection); - $this->resendTimeout = (int) ($this->API->settings->getRpc()->getRpcResendTimeout() * 1_000_000_000.0); - } - /** - * Main loop. - */ - protected function loop(): ?float - { - if (!$this->connection->new_outgoing) { - return self::PAUSE; - } - if (!$this->connection->hasPendingCalls()) { - return $this->timeoutSeconds; - } - if ($this->shared->hasTempAuthKey()) { - $full_message_ids = $this->connection->getPendingCalls(); - foreach (array_chunk($full_message_ids, WriteLoop::MAX_IDS) as $message_ids) { - $deferred = new DeferredFuture(); - $list = ''; - // Don't edit this here pls - foreach ($message_ids as $message_id) { - if (!isset($this->connection->outgoing_messages[$message_id])) { - continue; - } - $list .= $this->connection->outgoing_messages[$message_id]->constructor.', '; - } - $this->API->logger("Still missing {$list} on DC {$this->datacenter}, sending state request", Logger::ERROR); - $this->connection->objectCall('msgs_state_req', ['msg_ids' => $message_ids], $deferred); - EventLoop::queue(function () use ($deferred, $message_ids): void { - try { - $result = $deferred->getFuture()->await(new TimeoutCancellation($this->timeout)); - if (\is_callable($result)) { - throw $result(); - } - $reply = []; - foreach (str_split($result['info']) as $key => $chr) { - $message_id = $message_ids[$key]; - if (!isset($this->connection->outgoing_messages[$message_id])) { - $this->API->logger("Already got response for and forgot about message ID $message_id"); - continue; - } - if (!isset($this->connection->new_outgoing[$message_id])) { - $this->API->logger('Already got response for '.$this->connection->outgoing_messages[$message_id]); - continue; - } - $message = $this->connection->new_outgoing[$message_id]; - $chr = \ord($chr); - switch ($chr & 7) { - case 0: - $this->API->logger("Wrong message status 0 for $message", Logger::FATAL_ERROR); - break; - case 1: - case 2: - case 3: - if ($message->constructor === 'msgs_state_req') { - $this->connection->gotResponseForOutgoingMessage($message); - break; - } - $this->API->logger("Message $message not received by server, resending...", Logger::ERROR); - $this->connection->methodRecall($message_id); - break; - case 4: - if ($chr & 128) { - $this->API->logger("Message $message received by server and was already sent, requesting reply...", Logger::ERROR); - $reply[] = $message_id; - } elseif ($chr & 64) { - $this->API->logger("Message $message received by server and was already processed, requesting reply...", Logger::ERROR); - $reply[] = $message_id; - } elseif ($chr & 32) { - if ($message->getSent() + $this->resendTimeout < hrtime(true)) { - if ($message->isCancellationRequested()) { - unset($this->connection->new_outgoing[$message_id], $this->connection->outgoing_messages[$message_id]); - - $this->API->logger("Cancelling $message...", Logger::ERROR); - } else { - $this->API->logger("Message $message received by server and is being processed for way too long, resending request...", Logger::ERROR); - $this->connection->methodRecall($message_id); - } - } else { - $this->API->logger("Message $message received by server and is being processed, waiting...", Logger::ERROR); - } - } else { - $this->API->logger("Message $message received by server, waiting...", Logger::ERROR); - $reply[] = $message_id; - } - } - } - //} catch (CancelledException) { - //$this->API->logger("We did not receive a response for {$this->timeout} seconds: reconnecting and exiting check loop on DC {$this->datacenter}"); - //EventLoop::queue($this->connection->reconnect(...)); - } catch (\Throwable $e) { - $this->API->logger("Got exception in check loop for DC {$this->datacenter}"); - $this->API->logger((string) $e); - } - }); - } - } else { - foreach ($this->connection->new_outgoing as $message_id => $message) { - if ($message->wasSent() - && $message->getSent() + $this->timeout < hrtime(true) - && $message->unencrypted - ) { - $this->API->logger("Still missing $message on DC {$this->datacenter}, resending", Logger::ERROR); - $this->connection->methodRecall($message->getMsgId()); - } - } - } - return $this->timeoutSeconds; - } - /** - * Loop name. - */ - public function __toString(): string - { - return "check loop in DC {$this->datacenter}"; - } -} diff --git a/src/Loop/Connection/ReadLoop.php b/src/Loop/Connection/ReadLoop.php index d43c83b94..f36dd7ede 100644 --- a/src/Loop/Connection/ReadLoop.php +++ b/src/Loop/Connection/ReadLoop.php @@ -63,7 +63,8 @@ final class ReadLoop extends Loop } EventLoop::queue(function () use ($e): void { if ($e instanceof NothingInTheSocketException - && !$this->connection->hasPendingCalls() + && !$this->connection->unencrypted_new_outgoing + && !$this->connection->new_outgoing && $this->connection->isMedia() && !$this->connection->isWriting() && $this->shared->hasTempAuthKey() @@ -71,7 +72,7 @@ final class ReadLoop extends Loop $this->API->logger("Got NothingInTheSocketException in DC {$this->datacenter}, disconnecting because we have nothing to do...", Logger::ERROR); $this->connection->disconnect(true); } else { - $this->API->logger($e); + $this->API->logger($e, Logger::ERROR); $this->API->logger("Got exception in DC {$this->datacenter}, reconnecting...", Logger::ERROR); $this->connection->reconnect(); } @@ -93,6 +94,9 @@ final class ReadLoop extends Loop foreach ($this->connection->new_outgoing as $message) { $message->resetSent(); } + foreach ($this->connection->unencrypted_new_outgoing as $message) { + $message->resetSent(); + } $this->shared->reconnect(); } else { $this->connection->reconnect(); @@ -116,9 +120,6 @@ final class ReadLoop extends Loop return self::STOP; } $this->connection->httpReceived(); - if ($this->connection->isHttp()) { - $this->connection->pingHttpWaiter(); - } $this->connection->wakeupHandler(); return self::CONTINUE; } diff --git a/src/Loop/Connection/WriteLoop.php b/src/Loop/Connection/WriteLoop.php index 7bdcb69f7..4484d81f8 100644 --- a/src/Loop/Connection/WriteLoop.php +++ b/src/Loop/Connection/WriteLoop.php @@ -21,7 +21,9 @@ declare(strict_types=1); namespace danog\MadelineProto\Loop\Connection; use Amp\ByteStream\StreamException; +use Amp\DeferredFuture; use danog\Loop\Loop; +use danog\MadelineProto\Connection; use danog\MadelineProto\Logger; use danog\MadelineProto\MTProto\Container; use danog\MadelineProto\MTProto\MTProtoOutgoingMessage; @@ -42,23 +44,46 @@ final class WriteLoop extends Loop { private const MAX_COUNT = 1020; private const MAX_SIZE = 1 << 15; + private const LONG_POLL_TIMEOUT = 30; + private const LONG_POLL_TIMEOUT_MS = self::LONG_POLL_TIMEOUT*1000; public const MAX_IDS = 8192; - use Common; + use Common { + __construct as init2; + } + + private int $pingTimeout; + private float $pollTimeout; + /** + * Constructor function. + */ + public function __construct(Connection $connection) + { + $this->init2($connection); + $timeout = $this->shared->getSettings()->getPingInterval(); + $this->pingTimeout = $timeout + 15; + + if ($this->connection->isHttp()) { + $this->pollTimeout = (float) max(self::LONG_POLL_TIMEOUT, $timeout); + } else { + $this->pollTimeout = (float) $timeout; + } + } /** * Main loop. */ public function loop(): ?float { $please_wait = false; + $first = true; while (true) { if ($this->connection->shouldReconnect()) { $this->API->logger("Exiting $this because connection is old"); return self::STOP; } - if (!$this->connection->pendingOutgoing) { + if (!$this->connection->pendingOutgoing && !$first) { $this->API->logger("No messages, pausing in $this...", Logger::ULTRA_VERBOSE); - return self::PAUSE; + return $this->pollTimeout; } if ($please_wait) { $this->API->logger("Have to wait for handshake, pausing in $this...", Logger::ULTRA_VERBOSE); @@ -67,7 +92,7 @@ final class WriteLoop extends Loop $this->connection->writing(true); try { $please_wait = $this->shared->hasTempAuthKey() - ? $this->encryptedWriteLoop() + ? $this->encryptedWriteLoop($first) : $this->unencryptedWriteLoop(); } catch (StreamException $e) { if ($this->connection->shouldReconnect()) { @@ -87,10 +112,17 @@ final class WriteLoop extends Loop } finally { $this->connection->writing(false); } + $first = false; } } public function unencryptedWriteLoop(): bool { + if ($queue = $this->connection->unencrypted_check_queue) { + $this->connection->unencrypted_check_queue = []; + foreach ($queue as $msg) { + $this->connection->methodRecall($msg); + } + } while ($this->connection->pendingOutgoing) { $skipped_all = true; foreach ($this->connection->pendingOutgoing as $k => $message) { @@ -123,7 +155,7 @@ final class WriteLoop extends Loop $this->connection->pendingOutgoingGauge?->set(\count($this->connection->pendingOutgoing)); $message->setMsgId($message_id); $this->connection->outgoing_messages[$message_id] = $message; - $this->connection->new_outgoing[$message_id] = $message; + $this->connection->unencrypted_new_outgoing[$message_id] = $message; $message->sent(); } @@ -133,16 +165,97 @@ final class WriteLoop extends Loop } return false; } - public function encryptedWriteLoop(): bool + public function encryptedWriteLoop(bool $first): bool { do { if (!$this->shared->hasTempAuthKey()) { return false; } - if ($this->connection->isHttp() && empty($this->connection->pendingOutgoing)) { + if (!$first && !$this->connection->pendingOutgoing) { return false; } + foreach ($this->connection->check_queue as $msg_id => $_) { + $deferred = new DeferredFuture(); + $list = ''; + // Don't edit this here pls + foreach ($message_ids as $message_id) { + if (!isset($this->connection->outgoing_messages[$message_id])) { + continue; + } + $list .= $this->connection->outgoing_messages[$message_id]->constructor.', '; + } + $this->API->logger("Still missing {$list} on DC {$this->datacenter}, sending state request", Logger::ERROR); + $this->connection->objectCall('msgs_state_req', ['msg_ids' => $message_ids], $deferred); + EventLoop::queue(function () use ($deferred, $message_ids): void { + try { + $result = $deferred->getFuture()->await(new TimeoutCancellation($this->pollTimeout)); + if (\is_callable($result)) { + throw $result(); + } + $reply = []; + foreach (str_split($result['info']) as $key => $chr) { + $message_id = $message_ids[$key]; + if (!isset($this->connection->outgoing_messages[$message_id])) { + $this->API->logger("Already got response for and forgot about message ID $message_id"); + continue; + } + if (!isset($this->connection->new_outgoing[$message_id])) { + $this->API->logger('Already got response for '.$this->connection->outgoing_messages[$message_id]); + continue; + } + $message = $this->connection->new_outgoing[$message_id]; + $chr = \ord($chr); + switch ($chr & 7) { + case 0: + $this->API->logger("Wrong message status 0 for $message", Logger::FATAL_ERROR); + break; + case 1: + case 2: + case 3: + if ($message->constructor === 'msgs_state_req') { + $this->connection->gotResponseForOutgoingMessage($message); + break; + } + $this->API->logger("Message $message not received by server, resending...", Logger::ERROR); + $this->connection->methodRecall($message_id); + break; + case 4: + if ($chr & 128) { + $this->API->logger("Message $message received by server and was already sent, requesting reply...", Logger::ERROR); + $reply[] = $message_id; + } elseif ($chr & 64) { + $this->API->logger("Message $message received by server and was already processed, requesting reply...", Logger::ERROR); + $reply[] = $message_id; + } elseif ($chr & 32) { + if ($message->getSent() + $this->resendTimeout < hrtime(true)) { + if ($message->cancellation?->isRequested()) { + unset($this->connection->new_outgoing[$message_id], $this->connection->outgoing_messages[$message_id]); + + $this->API->logger("Cancelling $message...", Logger::ERROR); + } else { + $this->API->logger("Message $message received by server and is being processed for way too long, resending request...", Logger::ERROR); + $this->connection->methodRecall($message_id); + } + } else { + $this->API->logger("Message $message received by server and is being processed, waiting...", Logger::ERROR); + } + } else { + $this->API->logger("Message $message received by server, waiting...", Logger::ERROR); + $reply[] = $message_id; + } + } + } + //} catch (CancelledException) { + //$this->API->logger("We did not receive a response for {$this->pollTimeout} seconds: reconnecting and exiting check loop on DC {$this->datacenter}"); + //EventLoop::queue($this->connection->reconnect(...)); + } catch (\Throwable $e) { + $this->API->logger("Got exception in check loop for DC {$this->datacenter}"); + $this->API->logger((string) $e); + } + }); + } + ksort($this->connection->pendingOutgoing); $messages = []; @@ -286,7 +399,7 @@ final class WriteLoop extends Loop $total_length += $actual_length; $MTmessage['bytes'] = $body_length; $messages[] = $MTmessage; - $keys[$k] = $message_id; + $keys[$k] = $message; $message->setSeqNo($MTmessage['seqno']) ->setMsgId($MTmessage['msg_id']); @@ -325,10 +438,10 @@ final class WriteLoop extends Loop if ($count > 1 || $has_seq) { $this->API->logger("Wrapping in msg_container ({$count} messages of total size {$total_length}) as encrypted message for DC {$this->datacenter}", Logger::ULTRA_VERBOSE); $message_id = $this->connection->msgIdHandler->generateMessageId(); - $this->connection->pendingOutgoing[$this->connection->pendingOutgoingKey] = new Container($this->connection, array_values($keys)); + $this->connection->pendingOutgoing[$this->connection->pendingOutgoingKey] = $ct = new Container($this->connection, $keys); $this->connection->outgoingCtr?->inc(); $this->connection->pendingOutgoingGauge?->set(\count($this->connection->pendingOutgoing)); - $keys[$this->connection->pendingOutgoingKey++] = $message_id; + $keys[$this->connection->pendingOutgoingKey++] = $ct; $message_data = $this->API->getTL()->serializeObject(['type' => ''], ['_' => 'msg_container', 'messages' => $messages], 'container'); $message_data_length = \strlen($message_data); $seq_no = $this->connection->generateOutSeqNo(false); @@ -364,8 +477,7 @@ final class WriteLoop extends Loop $this->connection->ack_queue = \array_slice($this->connection->ack_queue, $ackCount); } - foreach ($keys as $key => $message_id) { - $message = $this->connection->pendingOutgoing[$key]; + foreach ($keys as $key => $message) { unset($this->connection->pendingOutgoing[$key]); $this->connection->outgoing_messages[$message_id] = $message; if ($message->hasPromise()) { diff --git a/src/MTProto/Container.php b/src/MTProto/Container.php index ef799fb52..29c9f824a 100644 --- a/src/MTProto/Container.php +++ b/src/MTProto/Container.php @@ -29,28 +29,13 @@ use danog\MadelineProto\Connection; */ final class Container extends MTProtoOutgoingMessage { - /** - * Message IDs. - * - * @var list - */ - private array $ids = []; - /** * Constructor. * - * @param list $ids + * @param list $msgs */ - public function __construct(Connection $connection, array $ids) + public function __construct(Connection $connection, public readonly array $msgs) { - $this->ids = $ids; - parent::__construct($connection, [], 'msg_container', '', false, false); - } - /** - * Get message IDs. - */ - public function getIds(): array - { - return $this->ids; + parent::__construct($connection, [], 'msg_container', '', false, false, null); } } diff --git a/src/MTProto/MTProtoOutgoingMessage.php b/src/MTProto/MTProtoOutgoingMessage.php index 8617a1731..d69b9a36c 100644 --- a/src/MTProto/MTProtoOutgoingMessage.php +++ b/src/MTProto/MTProtoOutgoingMessage.php @@ -89,6 +89,8 @@ class MTProtoOutgoingMessage extends MTProtoMessage */ private int $tries = 0; + private ?string $checkTimer = null; + /** * Whether this message is related to a user, as in getting a successful reply means we have auth. */ @@ -110,6 +112,7 @@ class MTProtoOutgoingMessage extends MTProtoMessage public readonly string $type, public readonly bool $isMethod, public readonly bool $unencrypted, + public readonly ?Cancellation $cancellation, public readonly ?string $subtype = null, /** * Whether this message is related to a file upload, as in getting a redirect should redirect to a media server. @@ -126,7 +129,6 @@ class MTProtoOutgoingMessage extends MTProtoMessage public readonly ?int $takeoutId = null, public readonly ?string $businessConnectionId = null, private ?DeferredFuture $resultDeferred = null, - public readonly ?Cancellation $cancellation = null ) { $this->userRelated = $constructor === 'users.getUsers' && $body === ['id' => [['_' => 'inputUserSelf']]] || $constructor === 'auth.exportAuthorization' || $constructor === 'updates.getDifference'; @@ -158,14 +160,6 @@ class MTProtoOutgoingMessage extends MTProtoMessage }); } - /** - * Whether cancellation is requested. - */ - public function isCancellationRequested(): bool - { - return $this->cancellation?->isRequested() ?? false; - } - /** * Signal that we're trying to send the message. */ @@ -188,12 +182,46 @@ class MTProtoOutgoingMessage extends MTProtoMessage } $this->state |= self::STATE_SENT; $this->sent = hrtime(true); + $this->checkTimer = EventLoop::delay( + $this->connection->API->getSettings()->getConnection()->getTimeout(), + $this->check(...) + ); if (isset($this->sendDeferred)) { $sendDeferred = $this->sendDeferred; $this->sendDeferred = null; $sendDeferred->complete(); } } + private function check(): void + { + if ($this->state & self::STATE_REPLIED) { + return; + } + $shared = $this->connection->getShared(); + $settings = $shared->getSettings(); + $global = $shared->getGenericSettings(); + $timeout = (float) $settings->getTimeout(); + $pfs = $global->getAuth()->getPfs(); + $unencrypted = !$shared->hasTempAuthKey(); + $notBound = !$shared->isBound(); + $pfsNotBound = $pfs && $notBound; + $this->checkTimer = EventLoop::delay( + $timeout, + $this->check(...) + ); + + if ($this->unencrypted === $unencrypted) { + if (!$unencrypted && $pfsNotBound && $this->constructor !== 'auth.bindTempAuthKey') { + return; + } + if ($unencrypted) { + $this->connection->unencrypted_check_queue[$this->msgId] = true; + } else { + $this->connection->check_queue[$this->msgId] = true; + } + $this->connection->flush(); + } + } /** * Set reply to message. * @@ -209,6 +237,10 @@ class MTProtoOutgoingMessage extends MTProtoMessage if (!($this->state & self::STATE_SENT)) { $this->sent(); } + if ($this->checkTimer !== null) { + EventLoop::cancel($this->checkTimer); + $this->checkTimer = null; + } if ($this->isMethod) { $this->connection->inFlightGauge?->dec([ @@ -221,6 +253,9 @@ class MTProtoOutgoingMessage extends MTProtoMessage ); } } + if ($this->msgId !== null) { + unset($this->connection->new_outgoing[$this->msgId], $this->connection->outgoing_messages[$this->msgId]); + } $this->serializedBody = null; $this->body = null; diff --git a/src/MTProtoSession/AckHandler.php b/src/MTProtoSession/AckHandler.php index fe574bbe7..0cd082489 100644 --- a/src/MTProtoSession/AckHandler.php +++ b/src/MTProtoSession/AckHandler.php @@ -20,7 +20,6 @@ declare(strict_types=1); namespace danog\MadelineProto\MTProtoSession; -use Amp\TimeoutException; use danog\MadelineProto\DataCenterConnection; use danog\MadelineProto\Logger; use danog\MadelineProto\MTProto\MTProtoIncomingMessage; @@ -47,14 +46,6 @@ trait AckHandler } return true; } - /** - * We have gotten a response for an outgoing message. - */ - public function gotResponseForOutgoingMessage(MTProtoOutgoingMessage $outgoingMessage): void - { - // The server acknowledges that it received my message - unset($this->new_outgoing[$outgoingMessage->getMsgId()]); - } /** * Acknowledge incoming message ID. */ @@ -66,72 +57,4 @@ trait AckHandler // I let the server know that I received its message $this->ack_queue[$message_id] = $message_id; } - - /** - * Check if there are some pending calls. - */ - public function hasPendingCalls(): bool - { - $timeout = (int) ($this->shared->getSettings()->getTimeout() * 1_000_000_000.0); - $pfs = $this->shared->getGenericSettings()->getAuth()->getPfs(); - $unencrypted = !$this->shared->hasTempAuthKey(); - $notBound = !$this->shared->isBound(); - $pfsNotBound = $pfs && $notBound; - /** @var MTProtoOutgoingMessage */ - foreach ($this->new_outgoing as $message) { - if ($message->wasSent() - && $message->getSent() + $timeout < hrtime(true) - && $message->unencrypted === $unencrypted - && $message->constructor !== 'msgs_state_req') { - if (!$unencrypted && $pfsNotBound && $message->constructor !== 'auth.bindTempAuthKey') { - continue; - } - return true; - } - } - return false; - } - /** - * Get all pending calls (also clear pending state requests). - */ - public function getPendingCalls(): array - { - $settings = $this->shared->getSettings(); - $global = $this->shared->getGenericSettings(); - $dropTimeout = (int) ($global->getRpc()->getRpcDropTimeout() * 1_000_000_000.0); - $timeout = (int) ($settings->getTimeout() * 1_000_000_000.0); - $pfs = $global->getAuth()->getPfs(); - $unencrypted = !$this->shared->hasTempAuthKey(); - $notBound = !$this->shared->isBound(); - $pfsNotBound = $pfs && $notBound; - if ($this->datacenter < 0) { - $dropTimeout *= 10; - } - $result = []; - /** @var MTProtoOutgoingMessage $message */ - foreach ($this->new_outgoing as $message_id => $message) { - if ($message->wasSent() - && $message->getSent() + $timeout < hrtime(true) - && $message->unencrypted === $unencrypted - ) { - if (!$unencrypted && $pfsNotBound && $message->constructor !== 'auth.bindTempAuthKey') { - continue; - } - if ($message->constructor === 'msgs_state_req' || $message->constructor === 'ping_delay_disconnect' - || ($message->getSent() + $dropTimeout < hrtime(true)) - ) { - Logger::log('No reply for message: ' . $message, Logger::WARNING); - $this->handleReject($message, static fn () => new TimeoutException('Request timeout')); - continue; - } - if ($message->getState() & MTProtoOutgoingMessage::STATE_REPLIED) { - $this->API->logger("Already replied to message $message, but still in new_outgoing"); - unset($this->new_outgoing[$message_id]); - continue; - } - $result[] = $message_id; - } - } - return $result; - } } diff --git a/src/MTProtoSession/CallHandler.php b/src/MTProtoSession/CallHandler.php index 7194144e6..a2e266d20 100644 --- a/src/MTProtoSession/CallHandler.php +++ b/src/MTProtoSession/CallHandler.php @@ -20,10 +20,14 @@ declare(strict_types=1); namespace danog\MadelineProto\MTProtoSession; +use Amp\CompositeCancellation; use Amp\DeferredFuture; +use Amp\Future; use Amp\Sync\LocalKeyedMutex; +use Amp\TimeoutCancellation; use danog\MadelineProto\DataCenterConnection; use danog\MadelineProto\Logger; +use danog\MadelineProto\MTProto; use danog\MadelineProto\MTProto\Container; use danog\MadelineProto\MTProto\MTProtoOutgoingMessage; use danog\MadelineProto\TL\Exception; @@ -38,6 +42,7 @@ use function Amp\Future\await; * * * @property DataCenterConnection $shared + * @property MTProto $API * @internal */ trait CallHandler @@ -45,39 +50,52 @@ trait CallHandler /** * Recall method. */ - public function methodRecall(int $message_id, ?int $datacenter = null): void + public function methodRecall(MTProtoOutgoingMessage $request, ?int $forceDatacenter = null, float|Future|null $defer = null): void { - if ($datacenter === $this->datacenter) { - $datacenter = null; - } - $message = $this->outgoing_messages[$message_id] ?? null; - $message_ids = $message instanceof Container - ? $message->getIds() - : [$message_id]; - foreach ($message_ids as $message_id) { - if (isset($this->outgoing_messages[$message_id]) - && !$this->outgoing_messages[$message_id]->canGarbageCollect()) { - if ($datacenter) { - /** @var MTProtoOutgoingMessage */ - $message = $this->outgoing_messages[$message_id]; - $this->gotResponseForOutgoingMessage($message); - $message->setMsgId(null); - $message->setSeqNo(null); - EventLoop::queue(function () use ($datacenter, $message): void { - $this->API->datacenter->waitGetConnection($datacenter) - ->sendMessage($message); - }); - } else { - /** @var MTProtoOutgoingMessage */ - $message = $this->outgoing_messages[$message_id]; - if (!$message->hasSeqNo()) { - $this->gotResponseForOutgoingMessage($message); - } - EventLoop::queue($this->sendMessage(...), $message); - } - } else { - $this->API->logger('Could not resend '.($this->outgoing_messages[$message_id] ?? $message_id)); + $id = $request->getMsgId(); + unset($this->outgoing_messages[$id], $this->new_outgoing[$id]); + if ($request instanceof Container) { + foreach ($request->msgs as $msg) { + $this->methodRecall($msg, $forceDatacenter, $defer); } + return; + } + if ($request->isCancellationRequested()) { + return; + } + if (\is_float($defer)) { + $d = new DeferredFuture; + $id = EventLoop::delay($defer, $d->complete(...)); + $request->cancellation?->subscribe(static fn () => EventLoop::cancel($id)); + $defer = $d; + return; + } + $prev = $request->previousQueuedMessage; + if (!$prev->hasReply()) { + $prev->getResultPromise()->finally( + fn () => $this->methodRecall($request, $this->datacenter, $defer) + ); + return; + } + if ($defer) { + $defer->finally( + fn () => $this->methodRecall($request, $this->datacenter) + ); + return; + } + $datacenter = $forceDatacenter ?? $this->datacenter; + if ($forceDatacenter !== null) { + /** @var MTProtoOutgoingMessage */ + $request->setMsgId(null); + $request->setSeqNo(null); + } + if ($datacenter === $this->datacenter) { + EventLoop::queue($this->sendMessage(...), $request); + } else { + EventLoop::queue(function () use ($datacenter, $request): void { + $this->API->datacenter->waitGetConnection($datacenter) + ->sendMessage($request); + }); } } /** @@ -92,6 +110,7 @@ trait CallHandler return $readFuture->await(); } private LocalKeyedMutex $abstractionQueueMutex; + private ?float $drop = null; /** * Call method and make sure it is asynchronously sent (generator). * @@ -163,6 +182,10 @@ trait CallHandler if (!$encrypted && $this->shared->hasTempAuthKey()) { $encrypted = true; } + $timeout = new TimeoutCancellation($this->drop ??= (float) $this->API->getSettings()->getRpc()->getRpcDropTimeout()); + $cancellation = $cancellation !== null + ? new CompositeCancellation($cancellation, $timeout) + : $timeout; $message = new MTProtoOutgoingMessage( connection: $this, body: $args, @@ -187,7 +210,6 @@ trait CallHandler $message->setMsgId($args['madelineMsgId']); } $this->sendMessage($message); - $this->checker->resume(); return new WrappedFuture($response->getFuture()); } /** @@ -198,8 +220,15 @@ trait CallHandler */ public function objectCall(string $object, array $args, ?DeferredFuture $promise = null): void { + $cancellation = $args['cancellation'] ?? null; + $cancellation?->throwIfRequested(); + $timeout = new TimeoutCancellation($this->drop ??= (float) $this->API->getSettings()->getRpc()->getRpcDropTimeout()); + $cancellation = $cancellation !== null + ? new CompositeCancellation($cancellation, $timeout) + : $timeout; $this->sendMessage( new MTProtoOutgoingMessage( + cancellation: $cancellation, connection: $this, body: $args, constructor: $object, diff --git a/src/MTProtoSession/Reliable.php b/src/MTProtoSession/Reliable.php index 07ff16461..8b34fd071 100644 --- a/src/MTProtoSession/Reliable.php +++ b/src/MTProtoSession/Reliable.php @@ -64,7 +64,9 @@ trait Reliable } if ($ok) { foreach ($content['msg_ids'] as $msg_id) { - $this->methodRecall($msg_id); + if (isset($this->outgoing_messages[$msg_id])) { + $this->methodRecall($this->outgoing_messages[$msg_id]); + } } } else { $this->sendMsgsStateInfo($content['msg_ids'], $current_msg_id); diff --git a/src/MTProtoSession/ResponseHandler.php b/src/MTProtoSession/ResponseHandler.php index ea2f14a6a..c651c4510 100644 --- a/src/MTProtoSession/ResponseHandler.php +++ b/src/MTProtoSession/ResponseHandler.php @@ -20,6 +20,7 @@ declare(strict_types=1); namespace danog\MadelineProto\MTProtoSession; +use Amp\DeferredFuture; use Amp\SignalException; use danog\BetterPrometheus\BetterHistogram; use danog\Loop\Loop; @@ -43,6 +44,8 @@ use Throwable; use const PHP_EOL; +use function Amp\async; + /** * Manages responses. * @@ -114,7 +117,7 @@ trait ResponseHandler } $this->API->logger('Trying to assign a response of type ' . $response_type . ' to its request...', Logger::VERBOSE); - foreach ($this->new_outgoing as $expecting_msg_id => $expecting) { + foreach ($this->unencrypted_new_outgoing as $expecting_msg_id => $expecting) { if (!$expecting->type) { continue; } @@ -170,14 +173,6 @@ trait ResponseHandler $this->handleMessages([$message]); } } - /** - * @param callable(): \Throwable $data - */ - private function handleReject(MTProtoOutgoingMessage $message, callable $data): void - { - $this->gotResponseForOutgoingMessage($message); - $message->reply($data); - } /** * Handle RPC response. @@ -211,7 +206,7 @@ trait ResponseHandler $exception = static fn (): \Throwable => $e; } if ($exception) { - $this->handleReject($request, $exception); + $request->reply($exception); } return; } @@ -220,12 +215,10 @@ trait ResponseHandler switch ($response['error_code']) { case 48: $this->shared->getTempAuthKey()->setServerSalt($response['new_server_salt']); - $this->methodRecall($requestId); + $this->methodRecall($request); return; case 20: - $request->setMsgId(null); - $request->setSeqNo(null); - $this->methodRecall($requestId); + $this->methodRecall($request, $this->datacenter); return; case 16: case 17: @@ -233,11 +226,12 @@ trait ResponseHandler $this->API->logger('Set time delta to ' . $this->time_delta, Logger::WARNING); $this->API->resetMTProtoSession("time delta update"); $this->shared->setTempAuthKey(null); - EventLoop::queue($this->shared->initAuthorization(...)); - EventLoop::queue($this->methodRecall(...), $requestId); + $d = new DeferredFuture; + async($this->shared->initAuthorization(...))->finally($d->complete(...)); + $this->methodRecall($request, $this->datacenter, $d->getFuture()); return; } - $this->handleReject($request, static fn () => RPCErrorException::make('Received bad_msg_notification: ' . MTProto::BAD_MSG_ERROR_CODES[$response['error_code']], $response['error_code'], $request->constructor)); + $request->reply(static fn () => RPCErrorException::make('Received bad_msg_notification: ' . MTProto::BAD_MSG_ERROR_CODES[$response['error_code']], $response['error_code'], $request->constructor)); return; } @@ -275,7 +269,6 @@ trait ResponseHandler } } } - $this->gotResponseForOutgoingMessage($request); $this->requestResponse?->inc([ 'method' => $request->constructor, @@ -283,7 +276,7 @@ trait ResponseHandler 'error_code' => '200', ]); - EventLoop::queue($request->reply(...), $response); + $request->reply($response); } /** * @param array{error_message: string, error_code: int} $response @@ -314,12 +307,7 @@ trait ResponseHandler && !$request->shouldRefreshReferences() ) { $this->API->logger("Got {$response['error_message']}, refreshing file reference and repeating method call..."); - $this->gotResponseForOutgoingMessage($request); - $msgId = $request->getMsgId(); - $request->setRefreshReferences(true); - $request->setMsgId(null); - $request->setSeqNo(null); - $this->methodRecall($msgId); + $this->methodRecall($request, $this->datacenter); return null; } @@ -334,28 +322,13 @@ trait ResponseHandler ) ) { $this->API->logger("Resending $request due to {$response['error_message']}"); - $this->gotResponseForOutgoingMessage($request); - $msgId = $request->getMsgId(); - $request->setSent(hrtime(true) + (5*60 * 1_000_000_000)); - $request->setMsgId(null); - $request->setSeqNo(null); - $prev = $request->previousQueuedMessage; - if ($prev->hasReply()) { - $this->methodRecall($msgId); - } else { - $prev->getResultPromise()->finally( - fn () => $this->methodRecall($msgId) - ); - } + $this->methodRecall($request, $this->datacenter); return null; } if ((($response['error_code'] === -503 || $response['error_message'] === '-503') && !\in_array($request->constructor, ['messages.getBotCallbackAnswer', 'messages.getInlineBotResults'], true)) || (\in_array($response['error_message'], ['MSGID_DECREASE_RETRY', 'HISTORY_GET_FAILED', 'RPC_CONNECT_FAILED', 'RPC_CALL_FAIL', 'RPC_MCGET_FAIL', 'PERSISTENT_TIMESTAMP_OUTDATED', 'RPC_MCGET_FAIL', 'no workers running', 'No workers running'], true))) { $this->API->logger("Resending $request in 1 second due to {$response['error_message']}"); - $msgId = $request->getMsgId(); - $request->setMsgId(null); - $request->setSeqNo(null); - EventLoop::delay(1.0, fn () => $this->methodRecall($msgId)); + $this->methodRecall($request, $this->datacenter, 1.0); return null; } return static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor); @@ -386,20 +359,7 @@ trait ResponseHandler ) ) { $this->API->logger("Resending $request due to {$response['error_message']}"); - $this->gotResponseForOutgoingMessage($request); - $msgId = $request->getMsgId(); - $request->setSent(hrtime(true) + (5*60 * 1_000_000_000)); - $request->setMsgId(null); - $request->setSeqNo(null); - \assert($msgId !== null); - $prev = $request->previousQueuedMessage; - if ($prev->hasReply()) { - $this->methodRecall($msgId); - } else { - $prev->getResultPromise()->finally( - fn () => $this->methodRecall($msgId) - ); - } + $this->methodRecall($request, $this->datacenter); return null; } return static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor); @@ -420,12 +380,7 @@ trait ResponseHandler case 'AUTH_KEY_UNREGISTERED': case 'AUTH_KEY_INVALID': if ($this->API->authorized !== \danog\MadelineProto\API::LOGGED_IN) { - $this->gotResponseForOutgoingMessage($request); - EventLoop::queue( - $this->handleReject(...), - $request, - static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor) - ); + $request->reply(static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor)); return null; } $this->session_id = null; @@ -441,14 +396,16 @@ trait ResponseHandler $this->API->logout(); return static fn () => new SignalException(sprintf(Lang::$current_lang['account_banned'], $phone)); } - EventLoop::queue($this->shared->initAuthorization(...)); - EventLoop::queue($this->methodRecall(...), $request->getMsgId()); + $deferred = new DeferredFuture; + async($this->shared->initAuthorization(...))->finally($deferred->complete(...)); + $this->methodRecall($request, $this->datacenter, $deferred->getFuture()); return null; case 'AUTH_KEY_PERM_EMPTY': $this->API->logger('Temporary auth key not bound, resetting temporary auth key...', Logger::ERROR); $this->shared->setTempAuthKey(null); - EventLoop::queue($this->shared->initAuthorization(...)); - EventLoop::queue($this->methodRecall(...), $request->getMsgId()); + $deferred = new DeferredFuture; + async($this->shared->initAuthorization(...))->finally($deferred->complete(...)); + $this->methodRecall($request, $this->datacenter, $deferred->getFuture()); return null; } return static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor); @@ -457,14 +414,7 @@ trait ResponseHandler $limit = $request->floodWaitLimit ?? $this->API->settings->getRPC()->getFloodTimeout(); if ($seconds < $limit) { $this->API->logger("Flood, waiting $seconds seconds before repeating async call of $request...", Logger::NOTICE); - $this->gotResponseForOutgoingMessage($request); - $msgId = $request->getMsgId(); - $request->setSent(hrtime(true) + ($seconds * 1_000_000_000)); - $request->setMsgId(null); - $request->setSeqNo(null); - \assert($msgId !== null); - $id = EventLoop::delay((float) $seconds, fn () => $this->methodRecall($msgId)); - $request->cancellation?->subscribe(static fn () => EventLoop::cancel($id)); + $this->methodRecall($request, $this->datacenter, (float) $seconds); return null; } if (str_starts_with($response['error_message'], 'FLOOD_WAIT_')) { diff --git a/src/MTProtoSession/Session.php b/src/MTProtoSession/Session.php index 8225f25e5..6d190fef2 100644 --- a/src/MTProtoSession/Session.php +++ b/src/MTProtoSession/Session.php @@ -77,6 +77,12 @@ trait Session * @var array */ public array $new_outgoing = []; + /** + * New unencrypted outgoing message array. + * + * @var array + */ + public array $unencrypted_new_outgoing = []; /** * Pending outgoing messages. * @@ -104,6 +110,16 @@ trait Session * */ public array $ack_queue = []; + /** + * Check queue. + * + */ + public array $check_queue = []; + /** + * Check queue. + * + */ + public array $unencrypted_check_queue = []; /** * Message ID handler. * @@ -178,6 +194,12 @@ trait Session $new_outgoing[$key] = $message; } $this->new_outgoing = $new_outgoing; + + $unencrypted_new_outgoing = []; + foreach ($this->unencrypted_new_outgoing as $key => $message) { + $unencrypted_new_outgoing[$key] = $message; + } + $this->unencrypted_new_outgoing = $unencrypted_new_outgoing; } /** * Create MTProto session if needed. @@ -227,6 +249,6 @@ trait Session public function backupSession(): array { $pending = array_values($this->pendingOutgoing); - return array_merge($pending, $this->new_outgoing); + return array_merge($pending, $this->new_outgoing, $this->unencrypted_new_outgoing); } } diff --git a/src/MTProtoTools/FilesLogic.php b/src/MTProtoTools/FilesLogic.php index 746373b71..7f863fca1 100644 --- a/src/MTProtoTools/FilesLogic.php +++ b/src/MTProtoTools/FilesLogic.php @@ -240,7 +240,7 @@ trait FilesLogic if ($result->shouldServe()) { $pipe = new Pipe(1024 * 1024); [$start, $end] = $result->getServeRange(); - EventLoop::queue($this->downloadToStream(...), $messageMedia, $pipe->getSink(), $cb, $start, $end, $cancellation); + async($this->downloadToStream(...), $messageMedia, $pipe->getSink(), $cb, $start, $end, $cancellation)->finally($pipe->getSink()->close(...)); $body = $pipe->getSource(); } elseif (!\in_array($result->getCode(), [HttpStatus::OK, HttpStatus::PARTIAL_CONTENT], true)) { $body = $result->getCodeExplanation(); diff --git a/src/MTProtoTools/PeerDatabase.php b/src/MTProtoTools/PeerDatabase.php index 80fc167be..159529342 100644 --- a/src/MTProtoTools/PeerDatabase.php +++ b/src/MTProtoTools/PeerDatabase.php @@ -267,11 +267,13 @@ final class PeerDatabase implements TLCallback } $new = $new ? self::getUsernames($new) : []; $old = $old ? self::getUsernames($old) : []; - $diffToRemove = array_diff($old, $new); - $diffToAdd = array_diff($new, $old); - if (!$diffToAdd && !$diffToRemove) { - return; + foreach ($old as $key => $username) { + if (!isset($this->usernames[$username])) { + unset($old[$key]); + } } + $diffToRemove = array_diff($old, $new); + $diffToAdd = array_diff($new, $diffToRemove); $lock = $this->decacheMutex->acquire(); try { foreach ($diffToRemove as $username) { @@ -417,6 +419,9 @@ final class PeerDatabase implements TLCallback return; } } + + $this->recacheChatUsername($user['id'], $existingChat, $user); + if ($existingChat != $user) { $this->API->logger("Updated user {$user['id']}", Logger::ULTRA_VERBOSE); if (($user['min'] ?? false) && !($existingChat['min'] ?? false)) { @@ -552,6 +557,7 @@ final class PeerDatabase implements TLCallback return; } } + $this->recacheChatUsername($bot_api_id, $existingChat, $chat); if ($existingChat != $chat) { $this->API->logger("Updated chat {$bot_api_id}", Logger::ULTRA_VERBOSE); if (($chat['min'] ?? false) && $existingChat && !($existingChat['min'] ?? false)) {