1
0
mirror of https://github.com/danog/MadelineProto.git synced 2025-01-10 13:48:15 +01:00

More cleaning up

This commit is contained in:
Daniil Gentili 2024-12-10 17:46:08 +00:00
parent 6d0a34acd0
commit 6f9a616510
7 changed files with 21 additions and 58 deletions

View File

@ -246,7 +246,7 @@ final class ReadLoop extends Loop
$this->API->referenceDatabase->reset();
}
$message = new MTProtoIncomingMessage($deserialized, $message_id, $unencrypted);
$message = new MTProtoIncomingMessage($this->connection, $deserialized, $message_id, $unencrypted);
if (isset($seq_no)) {
$message->setSeqNo($seq_no);
}

View File

@ -20,6 +20,8 @@ declare(strict_types=1);
namespace danog\MadelineProto\MTProto;
use danog\MadelineProto\Connection;
/**
* Incoming message.
*
@ -70,7 +72,7 @@ final class MTProtoIncomingMessage extends MTProtoMessage
* @param array $content Content
* @param boolean $fromContainer Whether this message was in a container
*/
public function __construct(array $content, int $msgId, public readonly bool $unencrypted, public readonly bool $fromContainer = false)
public function __construct(private readonly Connection $connection, array $content, int $msgId, public readonly bool $unencrypted, public readonly bool $fromContainer = false)
{
$this->content = $content;
$this->msgId = $msgId;
@ -135,6 +137,10 @@ final class MTProtoIncomingMessage extends MTProtoMessage
public function ack(): void
{
$this->state |= self::STATE_ACKED;
if ($this->contentRelated) {
// I let the server know that I received its message
$this->connection->ack_queue[$this->msgId] = $this->msgId;
}
}
/**
* Read this message, clearing its contents.

View File

@ -1,46 +0,0 @@
<?php
declare(strict_types=1);
/**
* AckHandler module.
*
* This file is part of MadelineProto.
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with MadelineProto.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2016-2023 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\MTProtoSession;
use danog\MadelineProto\DataCenterConnection;
use danog\MadelineProto\MTProto\MTProtoIncomingMessage;
/**
* Manages acknowledgement of messages.
*
* @property DataCenterConnection $shared
*
* @internal
*/
trait AckHandler
{
/**
* Acknowledge incoming message ID.
*/
public function ackIncomingMessage(MTProtoIncomingMessage $message): void
{
// Not exactly true, but we don't care
$message->ack();
$message_id = $message->getMsgId();
// I let the server know that I received its message
$this->ack_queue[$message_id] = $message_id;
}
}

View File

@ -53,7 +53,7 @@ trait CallHandler
public function methodRecall(MTProtoOutgoingMessage $request, ?int $forceDatacenter = null, float|Future|null $defer = null): void
{
$id = $request->getMsgId();
unset($this->outgoing_messages[$id], $this->new_outgoing[$id]);
unset($this->outgoing_messages[$id], $this->new_outgoing[$id], $this->unencrypted_new_outgoing[$id]);
if ($request instanceof Container) {
foreach ($request->msgs as $msg) {
$this->methodRecall($msg, $forceDatacenter, $defer);

View File

@ -37,7 +37,7 @@ trait Reliable
public function onNewMsgDetailedInfo(array $content): void
{
if (isset($this->incoming_messages[$content['answer_msg_id']])) {
$this->ackIncomingMessage($this->incoming_messages[$content['answer_msg_id']]);
$this->incoming_messages[$content['answer_msg_id']]->ack();
} else {
EventLoop::queue($this->objectCall(...), 'msg_resend_req', ['msg_ids' => [$content['answer_msg_id']]]);
}

View File

@ -106,7 +106,7 @@ trait ResponseHandler
}
private function handleFallback(MTProtoIncomingMessage $message): void
{
$this->ackIncomingMessage($message);
$message->ack();
$response_type = $this->API->getTL()->getConstructors()->findByPredicate($message->getContent()['_'])['type'];
if ($response_type == 'Updates') {
if ($message->unencrypted) {
@ -135,7 +135,7 @@ trait ResponseHandler
}
private function handleNewSession(MTProtoIncomingMessage $message): void
{
$this->ackIncomingMessage($message);
$message->ack();
$this->shared->getTempAuthKey()->setServerSalt($message->read()['server_salt']);
if ($this->API->authorized === \danog\MadelineProto\API::LOGGED_IN
&& isset($this->API->updaters[UpdateLoop::GENERIC])
@ -149,7 +149,7 @@ trait ResponseHandler
$tmp->setIteratorMode(SplQueue::IT_MODE_DELETE);
foreach ($message->read()['messages'] as $msg) {
$this->msgIdHandler->checkIncomingMessageId($msg['msg_id'], true);
$newMessage = new MTProtoIncomingMessage($msg['body'], $msg['msg_id'], $message->unencrypted, true);
$newMessage = new MTProtoIncomingMessage($this->connection, $msg['body'], $msg['msg_id'], $message->unencrypted, true);
$newMessage->setSeqNo($msg['seqno']);
$this->checkInSeqNo($newMessage);
$newMessage->setSeqNo(null);
@ -162,14 +162,14 @@ trait ResponseHandler
}
private function handleMsgCopy(MTProtoIncomingMessage $message): void
{
$this->ackIncomingMessage($message);
$message->ack();
$content = $message->read();
$referencedMsgId = $content['msg_id'];
if (isset($this->incoming_messages[$referencedMsgId])) {
$this->ackIncomingMessage($this->incoming_messages[$referencedMsgId]);
$this->incoming_messages[$referencedMsgId]->ack();
} else {
$this->msgIdHandler->checkIncomingMessageId($referencedMsgId, true);
$message = new MTProtoIncomingMessage($content['orig_message'], $referencedMsgId, $message->unencrypted);
$message = new MTProtoIncomingMessage($this->connection, $content['orig_message'], $referencedMsgId, $message->unencrypted);
$this->incomingCtr?->inc();
$this->incoming_messages[$referencedMsgId] = $message;
$this->handleMessages([$message]);
@ -187,7 +187,7 @@ trait ResponseHandler
if ($message->unencrypted) {
throw new SecurityException("Can't accept unencrypted result!");
}
$this->ackIncomingMessage($message);
$message->ack();
$response = $response['result'];
}
if (!isset($this->outgoing_messages[$requestId])) {

View File

@ -38,7 +38,6 @@ use SplQueue;
trait Session
{
use AuthKeyHandler;
use AckHandler;
use ResponseHandler;
use SeqNoHandler;
use CallHandler;
@ -160,6 +159,8 @@ trait Session
$incoming = [];
foreach ($this->incoming_messages as $key => $message) {
if ($message->canGarbageCollect()) {
$this->API->logger("Collecting incoming $message in DC {$this->datacenter}", Logger::VERBOSE);
$count++;
} else {
$this->API->logger("Can't garbage collect $message in DC {$this->datacenter}, not handled yet!", Logger::VERBOSE);
@ -176,6 +177,8 @@ trait Session
$outgoing = [];
foreach ($this->outgoing_messages as $key => $message) {
if ($message->canGarbageCollect()) {
$this->API->logger("Collecting outgiong $message in DC {$this->datacenter}", Logger::VERBOSE);
$count++;
} else {
$ago = (hrtime(true) - $message->getSent()) / 1_000_000_000;