diff --git a/src/Loop/Connection/ReadLoop.php b/src/Loop/Connection/ReadLoop.php index f36dd7ede..daa444f0b 100644 --- a/src/Loop/Connection/ReadLoop.php +++ b/src/Loop/Connection/ReadLoop.php @@ -246,7 +246,7 @@ final class ReadLoop extends Loop $this->API->referenceDatabase->reset(); } - $message = new MTProtoIncomingMessage($deserialized, $message_id, $unencrypted); + $message = new MTProtoIncomingMessage($this->connection, $deserialized, $message_id, $unencrypted); if (isset($seq_no)) { $message->setSeqNo($seq_no); } diff --git a/src/MTProto/MTProtoIncomingMessage.php b/src/MTProto/MTProtoIncomingMessage.php index 0bcec69b9..015559344 100644 --- a/src/MTProto/MTProtoIncomingMessage.php +++ b/src/MTProto/MTProtoIncomingMessage.php @@ -20,6 +20,8 @@ declare(strict_types=1); namespace danog\MadelineProto\MTProto; +use danog\MadelineProto\Connection; + /** * Incoming message. * @@ -70,7 +72,7 @@ final class MTProtoIncomingMessage extends MTProtoMessage * @param array $content Content * @param boolean $fromContainer Whether this message was in a container */ - public function __construct(array $content, int $msgId, public readonly bool $unencrypted, public readonly bool $fromContainer = false) + public function __construct(private readonly Connection $connection, array $content, int $msgId, public readonly bool $unencrypted, public readonly bool $fromContainer = false) { $this->content = $content; $this->msgId = $msgId; @@ -135,6 +137,10 @@ final class MTProtoIncomingMessage extends MTProtoMessage public function ack(): void { $this->state |= self::STATE_ACKED; + if ($this->contentRelated) { + // I let the server know that I received its message + $this->connection->ack_queue[$this->msgId] = $this->msgId; + } } /** * Read this message, clearing its contents. diff --git a/src/MTProtoSession/AckHandler.php b/src/MTProtoSession/AckHandler.php deleted file mode 100644 index 9abe3764d..000000000 --- a/src/MTProtoSession/AckHandler.php +++ /dev/null @@ -1,46 +0,0 @@ -. - * - * @author Daniil Gentili - * @copyright 2016-2023 Daniil Gentili - * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 - * @link https://docs.madelineproto.xyz MadelineProto documentation - */ - -namespace danog\MadelineProto\MTProtoSession; - -use danog\MadelineProto\DataCenterConnection; -use danog\MadelineProto\MTProto\MTProtoIncomingMessage; - -/** - * Manages acknowledgement of messages. - * - * @property DataCenterConnection $shared - * - * @internal - */ -trait AckHandler -{ - /** - * Acknowledge incoming message ID. - */ - public function ackIncomingMessage(MTProtoIncomingMessage $message): void - { - // Not exactly true, but we don't care - $message->ack(); - $message_id = $message->getMsgId(); - // I let the server know that I received its message - $this->ack_queue[$message_id] = $message_id; - } -} diff --git a/src/MTProtoSession/CallHandler.php b/src/MTProtoSession/CallHandler.php index a2e266d20..9321ee75b 100644 --- a/src/MTProtoSession/CallHandler.php +++ b/src/MTProtoSession/CallHandler.php @@ -53,7 +53,7 @@ trait CallHandler public function methodRecall(MTProtoOutgoingMessage $request, ?int $forceDatacenter = null, float|Future|null $defer = null): void { $id = $request->getMsgId(); - unset($this->outgoing_messages[$id], $this->new_outgoing[$id]); + unset($this->outgoing_messages[$id], $this->new_outgoing[$id], $this->unencrypted_new_outgoing[$id]); if ($request instanceof Container) { foreach ($request->msgs as $msg) { $this->methodRecall($msg, $forceDatacenter, $defer); diff --git a/src/MTProtoSession/Reliable.php b/src/MTProtoSession/Reliable.php index 8b34fd071..3536d66a8 100644 --- a/src/MTProtoSession/Reliable.php +++ b/src/MTProtoSession/Reliable.php @@ -37,7 +37,7 @@ trait Reliable public function onNewMsgDetailedInfo(array $content): void { if (isset($this->incoming_messages[$content['answer_msg_id']])) { - $this->ackIncomingMessage($this->incoming_messages[$content['answer_msg_id']]); + $this->incoming_messages[$content['answer_msg_id']]->ack(); } else { EventLoop::queue($this->objectCall(...), 'msg_resend_req', ['msg_ids' => [$content['answer_msg_id']]]); } diff --git a/src/MTProtoSession/ResponseHandler.php b/src/MTProtoSession/ResponseHandler.php index a912adbab..a89344cfb 100644 --- a/src/MTProtoSession/ResponseHandler.php +++ b/src/MTProtoSession/ResponseHandler.php @@ -106,7 +106,7 @@ trait ResponseHandler } private function handleFallback(MTProtoIncomingMessage $message): void { - $this->ackIncomingMessage($message); + $message->ack(); $response_type = $this->API->getTL()->getConstructors()->findByPredicate($message->getContent()['_'])['type']; if ($response_type == 'Updates') { if ($message->unencrypted) { @@ -135,7 +135,7 @@ trait ResponseHandler } private function handleNewSession(MTProtoIncomingMessage $message): void { - $this->ackIncomingMessage($message); + $message->ack(); $this->shared->getTempAuthKey()->setServerSalt($message->read()['server_salt']); if ($this->API->authorized === \danog\MadelineProto\API::LOGGED_IN && isset($this->API->updaters[UpdateLoop::GENERIC]) @@ -149,7 +149,7 @@ trait ResponseHandler $tmp->setIteratorMode(SplQueue::IT_MODE_DELETE); foreach ($message->read()['messages'] as $msg) { $this->msgIdHandler->checkIncomingMessageId($msg['msg_id'], true); - $newMessage = new MTProtoIncomingMessage($msg['body'], $msg['msg_id'], $message->unencrypted, true); + $newMessage = new MTProtoIncomingMessage($this->connection, $msg['body'], $msg['msg_id'], $message->unencrypted, true); $newMessage->setSeqNo($msg['seqno']); $this->checkInSeqNo($newMessage); $newMessage->setSeqNo(null); @@ -162,14 +162,14 @@ trait ResponseHandler } private function handleMsgCopy(MTProtoIncomingMessage $message): void { - $this->ackIncomingMessage($message); + $message->ack(); $content = $message->read(); $referencedMsgId = $content['msg_id']; if (isset($this->incoming_messages[$referencedMsgId])) { - $this->ackIncomingMessage($this->incoming_messages[$referencedMsgId]); + $this->incoming_messages[$referencedMsgId]->ack(); } else { $this->msgIdHandler->checkIncomingMessageId($referencedMsgId, true); - $message = new MTProtoIncomingMessage($content['orig_message'], $referencedMsgId, $message->unencrypted); + $message = new MTProtoIncomingMessage($this->connection, $content['orig_message'], $referencedMsgId, $message->unencrypted); $this->incomingCtr?->inc(); $this->incoming_messages[$referencedMsgId] = $message; $this->handleMessages([$message]); @@ -187,7 +187,7 @@ trait ResponseHandler if ($message->unencrypted) { throw new SecurityException("Can't accept unencrypted result!"); } - $this->ackIncomingMessage($message); + $message->ack(); $response = $response['result']; } if (!isset($this->outgoing_messages[$requestId])) { diff --git a/src/MTProtoSession/Session.php b/src/MTProtoSession/Session.php index e8a476679..473c74563 100644 --- a/src/MTProtoSession/Session.php +++ b/src/MTProtoSession/Session.php @@ -38,7 +38,6 @@ use SplQueue; trait Session { use AuthKeyHandler; - use AckHandler; use ResponseHandler; use SeqNoHandler; use CallHandler; @@ -160,6 +159,8 @@ trait Session $incoming = []; foreach ($this->incoming_messages as $key => $message) { if ($message->canGarbageCollect()) { + $this->API->logger("Collecting incoming $message in DC {$this->datacenter}", Logger::VERBOSE); + $count++; } else { $this->API->logger("Can't garbage collect $message in DC {$this->datacenter}, not handled yet!", Logger::VERBOSE); @@ -176,6 +177,8 @@ trait Session $outgoing = []; foreach ($this->outgoing_messages as $key => $message) { if ($message->canGarbageCollect()) { + $this->API->logger("Collecting outgiong $message in DC {$this->datacenter}", Logger::VERBOSE); + $count++; } else { $ago = (hrtime(true) - $message->getSent()) / 1_000_000_000;