diff --git a/src/Connection.php b/src/Connection.php index 6879122b8..9849fee8a 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -27,10 +27,8 @@ use Amp\Sync\LocalMutex; use AssertionError; use danog\DialogId\DialogId; use danog\Loop\GenericLoop; -use danog\MadelineProto\Loop\Connection\CheckLoop; use danog\MadelineProto\Loop\Connection\CleanupLoop; use danog\MadelineProto\Loop\Connection\HttpWaitLoop; -use danog\MadelineProto\Loop\Connection\PingLoop; use danog\MadelineProto\Loop\Connection\ReadLoop; use danog\MadelineProto\Loop\Connection\WriteLoop; use danog\MadelineProto\MTProto\MTProtoOutgoingMessage; diff --git a/src/Loop/Connection/WriteLoop.php b/src/Loop/Connection/WriteLoop.php index 4484d81f8..17af53cfc 100644 --- a/src/Loop/Connection/WriteLoop.php +++ b/src/Loop/Connection/WriteLoop.php @@ -51,7 +51,7 @@ final class WriteLoop extends Loop use Common { __construct as init2; } - + private int $pingTimeout; private float $pollTimeout; /** @@ -175,19 +175,26 @@ final class WriteLoop extends Loop return false; } - foreach ($this->connection->check_queue as $msg_id => $_) { + if ($check = $this->connection->check_queue) { + $this->connection->check_queue = []; $deferred = new DeferredFuture(); $list = ''; - // Don't edit this here pls - foreach ($message_ids as $message_id) { - if (!isset($this->connection->outgoing_messages[$message_id])) { + $msgIds = []; + foreach ($check as $msg) { + if ($msg->hasReply()) { continue; } - $list .= $this->connection->outgoing_messages[$message_id]->constructor.', '; + $id = $msg->getMsgId(); + if ($id === null) { + $this->API->logger("$msg has no ID, cannot request status!", Logger::ERROR); + continue; + } + $msgIds[] = $id; + $list .= $msg->constructor.', '; } $this->API->logger("Still missing {$list} on DC {$this->datacenter}, sending state request", Logger::ERROR); - $this->connection->objectCall('msgs_state_req', ['msg_ids' => $message_ids], $deferred); - EventLoop::queue(function () use ($deferred, $message_ids): void { + $this->connection->objectCall('msgs_state_req', ['msg_ids' => $msgIds], $deferred); + EventLoop::queue(function () use ($deferred, $check): void { try { $result = $deferred->getFuture()->await(new TimeoutCancellation($this->pollTimeout)); if (\is_callable($result)) { @@ -195,16 +202,11 @@ final class WriteLoop extends Loop } $reply = []; foreach (str_split($result['info']) as $key => $chr) { - $message_id = $message_ids[$key]; - if (!isset($this->connection->outgoing_messages[$message_id])) { + $message = $check[$key]; + if ($message->hasReply()) { $this->API->logger("Already got response for and forgot about message ID $message_id"); continue; } - if (!isset($this->connection->new_outgoing[$message_id])) { - $this->API->logger('Already got response for '.$this->connection->outgoing_messages[$message_id]); - continue; - } - $message = $this->connection->new_outgoing[$message_id]; $chr = \ord($chr); switch ($chr & 7) { case 0: @@ -214,36 +216,30 @@ final class WriteLoop extends Loop case 2: case 3: if ($message->constructor === 'msgs_state_req') { - $this->connection->gotResponseForOutgoingMessage($message); + $message->reply(null); break; } $this->API->logger("Message $message not received by server, resending...", Logger::ERROR); - $this->connection->methodRecall($message_id); + $this->connection->methodRecall($message); break; case 4: if ($chr & 128) { - $this->API->logger("Message $message received by server and was already sent, requesting reply...", Logger::ERROR); - $reply[] = $message_id; + $this->API->logger("Message $message received by server and was already sent.", Logger::ERROR); } elseif ($chr & 64) { - $this->API->logger("Message $message received by server and was already processed, requesting reply...", Logger::ERROR); - $reply[] = $message_id; + $this->API->logger("Message $message received by server and was already processed.", Logger::ERROR); } elseif ($chr & 32) { if ($message->getSent() + $this->resendTimeout < hrtime(true)) { - if ($message->cancellation?->isRequested()) { - unset($this->connection->new_outgoing[$message_id], $this->connection->outgoing_messages[$message_id]); - - $this->API->logger("Cancelling $message...", Logger::ERROR); - } else { + if (!$message->cancellation?->isRequested()) { $this->API->logger("Message $message received by server and is being processed for way too long, resending request...", Logger::ERROR); - $this->connection->methodRecall($message_id); + $this->connection->methodRecall($message); } } else { $this->API->logger("Message $message received by server and is being processed, waiting...", Logger::ERROR); } } else { $this->API->logger("Message $message received by server, waiting...", Logger::ERROR); - $reply[] = $message_id; } + break; } } //} catch (CancelledException) { @@ -479,6 +475,7 @@ final class WriteLoop extends Loop foreach ($keys as $key => $message) { unset($this->connection->pendingOutgoing[$key]); + $message_id = $message->getMsgId(); $this->connection->outgoing_messages[$message_id] = $message; if ($message->hasPromise()) { $this->connection->new_outgoing[$message_id] = $message; diff --git a/src/MTProto/MTProtoOutgoingMessage.php b/src/MTProto/MTProtoOutgoingMessage.php index d69b9a36c..1db23f786 100644 --- a/src/MTProto/MTProtoOutgoingMessage.php +++ b/src/MTProto/MTProtoOutgoingMessage.php @@ -182,10 +182,12 @@ class MTProtoOutgoingMessage extends MTProtoMessage } $this->state |= self::STATE_SENT; $this->sent = hrtime(true); - $this->checkTimer = EventLoop::delay( - $this->connection->API->getSettings()->getConnection()->getTimeout(), - $this->check(...) - ); + if (!$this instanceof Container) { + $this->checkTimer = EventLoop::delay( + $this->connection->API->getSettings()->getConnection()->getTimeout(), + $this->check(...) + ); + } if (isset($this->sendDeferred)) { $sendDeferred = $this->sendDeferred; $this->sendDeferred = null; @@ -214,10 +216,11 @@ class MTProtoOutgoingMessage extends MTProtoMessage if (!$unencrypted && $pfsNotBound && $this->constructor !== 'auth.bindTempAuthKey') { return; } + \assert($this->msgId !== null); if ($unencrypted) { - $this->connection->unencrypted_check_queue[$this->msgId] = true; + $this->connection->unencrypted_check_queue[] = $this; } else { - $this->connection->check_queue[$this->msgId] = true; + $this->connection->check_queue[] = $this; } $this->connection->flush(); } @@ -254,7 +257,17 @@ class MTProtoOutgoingMessage extends MTProtoMessage } } if ($this->msgId !== null) { - unset($this->connection->new_outgoing[$this->msgId], $this->connection->outgoing_messages[$this->msgId]); + if ($this->unencrypted) { + unset( + $this->connection->unencrypted_new_outgoing[$this->msgId], + $this->connection->outgoing_messages[$this->msgId], + ); + } else { + unset( + $this->connection->new_outgoing[$this->msgId], + $this->connection->outgoing_messages[$this->msgId], + ); + } } $this->serializedBody = null; diff --git a/src/MTProtoSession/AckHandler.php b/src/MTProtoSession/AckHandler.php index 0cd082489..9abe3764d 100644 --- a/src/MTProtoSession/AckHandler.php +++ b/src/MTProtoSession/AckHandler.php @@ -21,9 +21,7 @@ declare(strict_types=1); namespace danog\MadelineProto\MTProtoSession; use danog\MadelineProto\DataCenterConnection; -use danog\MadelineProto\Logger; use danog\MadelineProto\MTProto\MTProtoIncomingMessage; -use danog\MadelineProto\MTProto\MTProtoOutgoingMessage; /** * Manages acknowledgement of messages. @@ -34,18 +32,6 @@ use danog\MadelineProto\MTProto\MTProtoOutgoingMessage; */ trait AckHandler { - /** - * Acknowledge outgoing message ID. - */ - public function ackOutgoingMessageId(int $message_id): bool - { - // The server acknowledges that it received my message - if (!isset($this->outgoing_messages[$message_id])) { - $this->API->logger("WARNING: Couldn't find message id ".$message_id.' in the array of outgoing messages. Maybe try to increase its size?', Logger::WARNING); - return false; - } - return true; - } /** * Acknowledge incoming message ID. */ diff --git a/src/MTProtoSession/ResponseHandler.php b/src/MTProtoSession/ResponseHandler.php index 0aec1b0cc..a912adbab 100644 --- a/src/MTProtoSession/ResponseHandler.php +++ b/src/MTProtoSession/ResponseHandler.php @@ -99,7 +99,9 @@ trait ResponseHandler { foreach ($message->read()['msg_ids'] as $msg_id) { // Acknowledge that the server received my message - $this->ackOutgoingMessageId($msg_id); + if (!isset($this->outgoing_messages[$msg_id])) { + $this->API->logger("WARNING: Couldn't find message id ".$msg_id.' in the array of outgoing messages. Maybe try to increase its size?', Logger::WARNING); + } } } private function handleFallback(MTProtoIncomingMessage $message): void @@ -349,7 +351,7 @@ trait ResponseHandler $this->API->authorized_dc = $this->API->datacenter->currentDatacenter; } $this->API->logger("Resending $request to new DC $datacenter..."); - $this->methodRecall($request->getMsgId(), $datacenter); + $this->methodRecall($request, $datacenter); return null; case 400: if ($request->previousQueuedMessage && diff --git a/src/MTProtoSession/Session.php b/src/MTProtoSession/Session.php index 6d190fef2..e8a476679 100644 --- a/src/MTProtoSession/Session.php +++ b/src/MTProtoSession/Session.php @@ -113,11 +113,13 @@ trait Session /** * Check queue. * + * @var list */ public array $check_queue = []; /** * Check queue. * + * @var list */ public array $unencrypted_check_queue = []; /**