1
0
mirror of https://github.com/danog/MadelineProto.git synced 2024-11-27 10:54:43 +01:00

Improve call queue logic

This commit is contained in:
Daniil Gentili 2023-09-28 17:28:40 +02:00
parent d9668d2650
commit e28b06c846
4 changed files with 51 additions and 28 deletions

View File

@ -28,6 +28,7 @@ use danog\MadelineProto\MTProto\MTProtoOutgoingMessage;
use danog\MadelineProto\MTProtoTools\Crypt;
use danog\MadelineProto\Tools;
use Revolt\EventLoop;
use Webmozart\Assert\Assert;
use function strlen;
@ -237,21 +238,25 @@ final class WriteLoop extends Loop
}
} elseif ($message->queueId !== null) {
$queueId = $message->queueId;
if (isset($this->connection->callQueue[$queueId])) {
$message->setPreviousQueuedMsgId($this->connection->callQueue[$queueId]);
if (isset($this->connection->callQueue[$queueId])
&& !($prev = $this->connection->callQueue[$queueId])->hasReply()
) {
$this->connection->callQueue[$queueId] = $message;
$message->setPreviousQueuedMessage($prev);
$this->API->logger("Adding $message to queue with ID $queueId", Logger::ULTRA_VERBOSE);
$prev = $prev->getMsgId();
Assert::notNull($prev);
$MTmessage['body'] = $this->API->getTL()->serializeMethod(
'invokeAfterMsg',
[
'msg_id' => $this->connection->callQueue[$queueId],
'msg_id' => $prev,
'query' => $MTmessage['body']
]
);
} else {
$message->setPreviousQueuedMsgId(null);
$this->connection->callQueue[$queueId] = $message;
$this->API->logger("$message is the first in the queue with ID $queueId", Logger::ULTRA_VERBOSE);
}
$this->connection->callQueue[$queueId] = $message_id;
}
// TODO
/*

View File

@ -103,9 +103,9 @@ class MTProtoOutgoingMessage extends MTProtoMessage
public readonly bool $userRelated;
/**
* Previous queued message ID.
* Previous queued message.
*/
private ?int $previousQueuedMsgId = null;
private ?self $previousQueuedMessage = null;
/**
* Create outgoing message.
@ -321,6 +321,13 @@ class MTProtoOutgoingMessage extends MTProtoMessage
{
return (bool) ($this->state & self::STATE_SENT);
}
/**
* Check if the message has a reply.
*/
public function hasReply(): bool
{
return (bool) ($this->state & self::STATE_REPLIED);
}
/**
* Check if can garbage collect this message.
*/
@ -407,24 +414,24 @@ class MTProtoOutgoingMessage extends MTProtoMessage
}
/**
* Get previous queued message ID.
* Get previous queued message.
*
* @return ?int
* @return ?self
*/
public function getPreviousQueuedMsgId(): ?int
public function getPreviousQueuedMessage(): ?self
{
return $this->previousQueuedMsgId;
return $this->previousQueuedMessage;
}
/**
* Set previous queued message ID.
* Set previous queued message.
*
* @param ?int $previousQueuedMsgId Previous queued message ID.
* @param ?self $previousQueuedMessage Previous queued message.
*
*/
public function setPreviousQueuedMsgId(?int $previousQueuedMsgId): self
public function setPreviousQueuedMessage(?self $previousQueuedMessage): self
{
$this->previousQueuedMsgId = $previousQueuedMsgId;
$this->previousQueuedMessage = $previousQueuedMessage;
return $this;
}

View File

@ -232,7 +232,10 @@ trait ResponseHandler
return;
}
if ($request->isMethod && $request->getConstructor() !== 'auth.bindTempAuthKey' && $this->shared->hasTempAuthKey() && !$this->shared->getTempAuthKey()->isInited()) {
if ($request->isMethod && $request->getConstructor() !== 'auth.bindTempAuthKey'
&& $this->shared->hasTempAuthKey()
&& !$this->shared->getTempAuthKey()->isInited()
) {
$this->shared->getTempAuthKey()->init(true);
}
if (isset($response['_']) && !$this->isCdn()) {
@ -272,7 +275,11 @@ trait ResponseHandler
*/
private function handleRpcError(MTProtoOutgoingMessage $request, array $response): ?callable
{
if ($request->isMethod && $request->getConstructor() !== 'auth.bindTempAuthKey' && $this->shared->hasTempAuthKey() && !$this->shared->getTempAuthKey()->isInited()) {
if ($request->isMethod
&& $request->getConstructor() !== 'auth.bindTempAuthKey'
&& $this->shared->hasTempAuthKey()
&& !$this->shared->getTempAuthKey()->isInited()
) {
$this->shared->getTempAuthKey()->init(true);
}
if (\in_array($response['error_message'], ['PERSISTENT_TIMESTAMP_EMPTY', 'PERSISTENT_TIMESTAMP_INVALID'], true)) {
@ -305,14 +312,16 @@ trait ResponseHandler
$this->API->logger("Resending $request due to {$response['error_message']}");
$this->gotResponseForOutgoingMessage($request);
$msgId = $request->getMsgId();
unset($this->callQueue[$request->queueId]);
$request->setSent(\time() + 1);
$request->setSent(\time() + 5*60);
$request->setMsgId(null);
$request->setSeqNo(null);
if ($response['error_message'] === 'MSG_WAIT_TIMEOUT') {
EventLoop::delay(1.0, fn () => $this->methodRecall($msgId));
$prev = $request->getPreviousQueuedMessage();
if ($prev->hasReply()) {
$this->methodRecall($msgId);
} else {
EventLoop::queue($this->methodRecall(...), $msgId);
$prev->getResultPromise()->finally(
fn () => $this->methodRecall($msgId)
);
}
return null;
}
@ -347,15 +356,17 @@ trait ResponseHandler
$this->API->logger("Resending $request due to {$response['error_message']}");
$this->gotResponseForOutgoingMessage($request);
$msgId = $request->getMsgId();
unset($this->callQueue[$request->queueId]);
$request->setSent(\time() + 1);
$request->setSent(\time() + 5*60);
$request->setMsgId(null);
$request->setSeqNo(null);
\assert($msgId !== null);
if ($response['error_message'] === 'MSG_WAIT_TIMEOUT') {
EventLoop::delay(1.0, fn () => $this->methodRecall($msgId));
$prev = $request->getPreviousQueuedMessage();
if ($prev->hasReply()) {
$this->methodRecall($msgId);
} else {
EventLoop::queue($this->methodRecall(...), $msgId);
$prev->getResultPromise()->finally(
fn () => $this->methodRecall($msgId)
);
}
return null;
}

View File

@ -81,7 +81,7 @@ trait Session
/**
* Call queue.
*
* @var array<string, int>
* @var array<string, MTProtoOutgoingMessage>
*/
public array $callQueue = [];
/**