1
0
mirror of https://github.com/danog/MadelineProto.git synced 2025-01-10 16:08:24 +01:00
This commit is contained in:
Daniil Gentili 2024-12-10 17:35:44 +00:00
parent 5c3318511a
commit 6d0a34acd0
6 changed files with 51 additions and 53 deletions

View File

@ -27,10 +27,8 @@ use Amp\Sync\LocalMutex;
use AssertionError;
use danog\DialogId\DialogId;
use danog\Loop\GenericLoop;
use danog\MadelineProto\Loop\Connection\CheckLoop;
use danog\MadelineProto\Loop\Connection\CleanupLoop;
use danog\MadelineProto\Loop\Connection\HttpWaitLoop;
use danog\MadelineProto\Loop\Connection\PingLoop;
use danog\MadelineProto\Loop\Connection\ReadLoop;
use danog\MadelineProto\Loop\Connection\WriteLoop;
use danog\MadelineProto\MTProto\MTProtoOutgoingMessage;

View File

@ -175,19 +175,26 @@ final class WriteLoop extends Loop
return false;
}
foreach ($this->connection->check_queue as $msg_id => $_) {
if ($check = $this->connection->check_queue) {
$this->connection->check_queue = [];
$deferred = new DeferredFuture();
$list = '';
// Don't edit this here pls
foreach ($message_ids as $message_id) {
if (!isset($this->connection->outgoing_messages[$message_id])) {
$msgIds = [];
foreach ($check as $msg) {
if ($msg->hasReply()) {
continue;
}
$list .= $this->connection->outgoing_messages[$message_id]->constructor.', ';
$id = $msg->getMsgId();
if ($id === null) {
$this->API->logger("$msg has no ID, cannot request status!", Logger::ERROR);
continue;
}
$msgIds[] = $id;
$list .= $msg->constructor.', ';
}
$this->API->logger("Still missing {$list} on DC {$this->datacenter}, sending state request", Logger::ERROR);
$this->connection->objectCall('msgs_state_req', ['msg_ids' => $message_ids], $deferred);
EventLoop::queue(function () use ($deferred, $message_ids): void {
$this->connection->objectCall('msgs_state_req', ['msg_ids' => $msgIds], $deferred);
EventLoop::queue(function () use ($deferred, $check): void {
try {
$result = $deferred->getFuture()->await(new TimeoutCancellation($this->pollTimeout));
if (\is_callable($result)) {
@ -195,16 +202,11 @@ final class WriteLoop extends Loop
}
$reply = [];
foreach (str_split($result['info']) as $key => $chr) {
$message_id = $message_ids[$key];
if (!isset($this->connection->outgoing_messages[$message_id])) {
$message = $check[$key];
if ($message->hasReply()) {
$this->API->logger("Already got response for and forgot about message ID $message_id");
continue;
}
if (!isset($this->connection->new_outgoing[$message_id])) {
$this->API->logger('Already got response for '.$this->connection->outgoing_messages[$message_id]);
continue;
}
$message = $this->connection->new_outgoing[$message_id];
$chr = \ord($chr);
switch ($chr & 7) {
case 0:
@ -214,36 +216,30 @@ final class WriteLoop extends Loop
case 2:
case 3:
if ($message->constructor === 'msgs_state_req') {
$this->connection->gotResponseForOutgoingMessage($message);
$message->reply(null);
break;
}
$this->API->logger("Message $message not received by server, resending...", Logger::ERROR);
$this->connection->methodRecall($message_id);
$this->connection->methodRecall($message);
break;
case 4:
if ($chr & 128) {
$this->API->logger("Message $message received by server and was already sent, requesting reply...", Logger::ERROR);
$reply[] = $message_id;
$this->API->logger("Message $message received by server and was already sent.", Logger::ERROR);
} elseif ($chr & 64) {
$this->API->logger("Message $message received by server and was already processed, requesting reply...", Logger::ERROR);
$reply[] = $message_id;
$this->API->logger("Message $message received by server and was already processed.", Logger::ERROR);
} elseif ($chr & 32) {
if ($message->getSent() + $this->resendTimeout < hrtime(true)) {
if ($message->cancellation?->isRequested()) {
unset($this->connection->new_outgoing[$message_id], $this->connection->outgoing_messages[$message_id]);
$this->API->logger("Cancelling $message...", Logger::ERROR);
} else {
if (!$message->cancellation?->isRequested()) {
$this->API->logger("Message $message received by server and is being processed for way too long, resending request...", Logger::ERROR);
$this->connection->methodRecall($message_id);
$this->connection->methodRecall($message);
}
} else {
$this->API->logger("Message $message received by server and is being processed, waiting...", Logger::ERROR);
}
} else {
$this->API->logger("Message $message received by server, waiting...", Logger::ERROR);
$reply[] = $message_id;
}
break;
}
}
//} catch (CancelledException) {
@ -479,6 +475,7 @@ final class WriteLoop extends Loop
foreach ($keys as $key => $message) {
unset($this->connection->pendingOutgoing[$key]);
$message_id = $message->getMsgId();
$this->connection->outgoing_messages[$message_id] = $message;
if ($message->hasPromise()) {
$this->connection->new_outgoing[$message_id] = $message;

View File

@ -182,10 +182,12 @@ class MTProtoOutgoingMessage extends MTProtoMessage
}
$this->state |= self::STATE_SENT;
$this->sent = hrtime(true);
if (!$this instanceof Container) {
$this->checkTimer = EventLoop::delay(
$this->connection->API->getSettings()->getConnection()->getTimeout(),
$this->check(...)
);
}
if (isset($this->sendDeferred)) {
$sendDeferred = $this->sendDeferred;
$this->sendDeferred = null;
@ -214,10 +216,11 @@ class MTProtoOutgoingMessage extends MTProtoMessage
if (!$unencrypted && $pfsNotBound && $this->constructor !== 'auth.bindTempAuthKey') {
return;
}
\assert($this->msgId !== null);
if ($unencrypted) {
$this->connection->unencrypted_check_queue[$this->msgId] = true;
$this->connection->unencrypted_check_queue[] = $this;
} else {
$this->connection->check_queue[$this->msgId] = true;
$this->connection->check_queue[] = $this;
}
$this->connection->flush();
}
@ -254,7 +257,17 @@ class MTProtoOutgoingMessage extends MTProtoMessage
}
}
if ($this->msgId !== null) {
unset($this->connection->new_outgoing[$this->msgId], $this->connection->outgoing_messages[$this->msgId]);
if ($this->unencrypted) {
unset(
$this->connection->unencrypted_new_outgoing[$this->msgId],
$this->connection->outgoing_messages[$this->msgId],
);
} else {
unset(
$this->connection->new_outgoing[$this->msgId],
$this->connection->outgoing_messages[$this->msgId],
);
}
}
$this->serializedBody = null;

View File

@ -21,9 +21,7 @@ declare(strict_types=1);
namespace danog\MadelineProto\MTProtoSession;
use danog\MadelineProto\DataCenterConnection;
use danog\MadelineProto\Logger;
use danog\MadelineProto\MTProto\MTProtoIncomingMessage;
use danog\MadelineProto\MTProto\MTProtoOutgoingMessage;
/**
* Manages acknowledgement of messages.
@ -34,18 +32,6 @@ use danog\MadelineProto\MTProto\MTProtoOutgoingMessage;
*/
trait AckHandler
{
/**
* Acknowledge outgoing message ID.
*/
public function ackOutgoingMessageId(int $message_id): bool
{
// The server acknowledges that it received my message
if (!isset($this->outgoing_messages[$message_id])) {
$this->API->logger("WARNING: Couldn't find message id ".$message_id.' in the array of outgoing messages. Maybe try to increase its size?', Logger::WARNING);
return false;
}
return true;
}
/**
* Acknowledge incoming message ID.
*/

View File

@ -99,7 +99,9 @@ trait ResponseHandler
{
foreach ($message->read()['msg_ids'] as $msg_id) {
// Acknowledge that the server received my message
$this->ackOutgoingMessageId($msg_id);
if (!isset($this->outgoing_messages[$msg_id])) {
$this->API->logger("WARNING: Couldn't find message id ".$msg_id.' in the array of outgoing messages. Maybe try to increase its size?', Logger::WARNING);
}
}
}
private function handleFallback(MTProtoIncomingMessage $message): void
@ -349,7 +351,7 @@ trait ResponseHandler
$this->API->authorized_dc = $this->API->datacenter->currentDatacenter;
}
$this->API->logger("Resending $request to new DC $datacenter...");
$this->methodRecall($request->getMsgId(), $datacenter);
$this->methodRecall($request, $datacenter);
return null;
case 400:
if ($request->previousQueuedMessage &&

View File

@ -113,11 +113,13 @@ trait Session
/**
* Check queue.
*
* @var list<MTProtoOutgoingMessage>
*/
public array $check_queue = [];
/**
* Check queue.
*
* @var list<MTProtoOutgoingMessage>
*/
public array $unencrypted_check_queue = [];
/**