1
0
mirror of https://github.com/danog/MadelineProto.git synced 2024-11-30 08:18:59 +01:00

Properly sync state

This commit is contained in:
Daniil Gentili 2024-06-27 19:08:00 +02:00 committed by Alexander Pankratov
parent 413113031c
commit 1600d46082
6 changed files with 30 additions and 39 deletions

View File

@ -3372,6 +3372,10 @@
<code><![CDATA[$this->state |= self::STATE_REPLIED]]></code>
<code><![CDATA[$this->state |= self::STATE_SENT]]></code>
</PossiblyInvalidPropertyAssignmentValue>
<PossiblyNullArrayOffset>
<code><![CDATA[$this->connection->new_outgoing]]></code>
<code><![CDATA[$this->connection->outgoing_messages]]></code>
</PossiblyNullArrayOffset>
<PossiblyNullOperand>
<code><![CDATA[$this->sent]]></code>
<code><![CDATA[$this->sent]]></code>
@ -3401,9 +3405,6 @@
</UnsupportedReferenceUsage>
</file>
<file src="src/MTProtoSession/AckHandler.php">
<PossiblyNullArrayOffset>
<code><![CDATA[$this->new_outgoing]]></code>
</PossiblyNullArrayOffset>
<PossiblyNullOperand>
<code><![CDATA[$message->getSent()]]></code>
<code><![CDATA[$message->getSent()]]></code>
@ -3517,6 +3518,8 @@
<code><![CDATA[$sub['queueId']]]></code>
</MixedArrayAssignment>
<MixedArrayOffset>
<code><![CDATA[$this->new_outgoing[$message_id]]]></code>
<code><![CDATA[$this->new_outgoing[$message_id]]]></code>
<code><![CDATA[$this->outgoing_messages[$message_id]]]></code>
<code><![CDATA[$this->outgoing_messages[$message_id]]]></code>
</MixedArrayOffset>
@ -3644,6 +3647,12 @@
<code><![CDATA[$msgId]]></code>
<code><![CDATA[$request->getMsgId()]]></code>
</PossiblyNullArgument>
<PossiblyNullArrayOffset>
<code><![CDATA[$this->new_outgoing]]></code>
<code><![CDATA[$this->new_outgoing]]></code>
<code><![CDATA[$this->new_outgoing]]></code>
<code><![CDATA[$this->new_outgoing]]></code>
</PossiblyNullArrayOffset>
<RedundantConditionGivenDocblockType>
<code><![CDATA[$this->API->authorized_dc == $this->datacenter && $this->API->authorized === \danog\MadelineProto\API::LOGGED_IN]]></code>
</RedundantConditionGivenDocblockType>
@ -4176,6 +4185,8 @@
<code><![CDATA[static function (string $payload, int $offset) use ($stream, $seekable, $lock) {]]></code>
</MissingClosureReturnType>
<MixedArgument>
<code><![CDATA[$end]]></code>
<code><![CDATA[$end]]></code>
<code><![CDATA[$end]]></code>
<code><![CDATA[$end]]></code>
<code><![CDATA[$file]]></code>
@ -4192,6 +4203,8 @@
<code><![CDATA[$size]]></code>
<code><![CDATA[$start]]></code>
<code><![CDATA[$start]]></code>
<code><![CDATA[$start]]></code>
<code><![CDATA[$start]]></code>
<code><![CDATA[$stream]]></code>
<code><![CDATA[$stream]]></code>
<code><![CDATA[$url]]></code>

View File

@ -23,6 +23,7 @@ namespace danog\MadelineProto\Loop\Connection;
use Amp\CancelledException;
use Amp\DeferredFuture;
use Amp\TimeoutCancellation;
use Amp\TimeoutException;
use danog\Loop\Loop;
use danog\MadelineProto\Connection;
use danog\MadelineProto\Logger;
@ -99,7 +100,7 @@ final class CheckLoop extends Loop
case 2:
case 3:
if ($message->constructor === 'msgs_state_req') {
$this->connection->gotResponseForOutgoingMessage($message);
$message->reply(static fn () => new TimeoutException("Server did not receive message"));
break;
}
$this->API->logger("Message $message not received by server, resending...", Logger::ERROR);
@ -115,8 +116,6 @@ final class CheckLoop extends Loop
} elseif ($chr & 32) {
if ($message->getSent() + $this->resendTimeout < hrtime(true)) {
if ($message->isCancellationRequested()) {
unset($this->connection->new_outgoing[$message_id], $this->connection->outgoing_messages[$message_id]);
$this->API->logger("Cancelling $message...", Logger::ERROR);
} else {
$this->API->logger("Message $message received by server and is being processed for way too long, resending request...", Logger::ERROR);

View File

@ -197,6 +197,7 @@ class MTProtoOutgoingMessage extends MTProtoMessage
);
}
}
unset($this->connection->new_outgoing[$this->msgId], $this->connection->outgoing_messages[$this->msgId]);
$this->serializedBody = null;
$this->body = null;

View File

@ -47,14 +47,6 @@ trait AckHandler
}
return true;
}
/**
* We have gotten a response for an outgoing message.
*/
public function gotResponseForOutgoingMessage(MTProtoOutgoingMessage $outgoingMessage): void
{
// The server acknowledges that it received my message
unset($this->new_outgoing[$outgoingMessage->getMsgId()]);
}
/**
* Acknowledge incoming message ID.
*/
@ -121,7 +113,7 @@ trait AckHandler
|| ($message->getSent() + $dropTimeout < hrtime(true))
) {
Logger::log('No reply for message: ' . $message, Logger::WARNING);
$this->handleReject($message, static fn () => new TimeoutException('Request timeout'));
$message->reply(static fn () => new TimeoutException('Request timeout'));
continue;
}
if ($message->getState() & MTProtoOutgoingMessage::STATE_REPLIED) {

View File

@ -60,7 +60,7 @@ trait CallHandler
if ($datacenter) {
/** @var MTProtoOutgoingMessage */
$message = $this->outgoing_messages[$message_id];
$this->gotResponseForOutgoingMessage($message);
unset($this->new_outgoing[$message_id]);
$message->setMsgId(null);
$message->setSeqNo(null);
EventLoop::queue(function () use ($datacenter, $message): void {
@ -71,7 +71,7 @@ trait CallHandler
/** @var MTProtoOutgoingMessage */
$message = $this->outgoing_messages[$message_id];
if (!$message->hasSeqNo()) {
$this->gotResponseForOutgoingMessage($message);
unset($this->new_outgoing[$message_id]);
}
EventLoop::queue($this->sendMessage(...), $message);
}

View File

@ -170,14 +170,6 @@ trait ResponseHandler
$this->handleMessages([$message]);
}
}
/**
* @param callable(): \Throwable $data
*/
private function handleReject(MTProtoOutgoingMessage $message, callable $data): void
{
$this->gotResponseForOutgoingMessage($message);
$message->reply($data);
}
/**
* Handle RPC response.
@ -211,7 +203,7 @@ trait ResponseHandler
$exception = static fn (): \Throwable => $e;
}
if ($exception) {
$this->handleReject($request, $exception);
$request->reply($exception);
}
return;
}
@ -237,7 +229,7 @@ trait ResponseHandler
EventLoop::queue($this->methodRecall(...), $requestId);
return;
}
$this->handleReject($request, static fn () => RPCErrorException::make('Received bad_msg_notification: ' . MTProto::BAD_MSG_ERROR_CODES[$response['error_code']], $response['error_code'], $request->constructor));
$request->reply(static fn () => RPCErrorException::make('Received bad_msg_notification: ' . MTProto::BAD_MSG_ERROR_CODES[$response['error_code']], $response['error_code'], $request->constructor));
return;
}
@ -275,7 +267,6 @@ trait ResponseHandler
}
}
}
$this->gotResponseForOutgoingMessage($request);
$this->requestResponse?->inc([
'method' => $request->constructor,
@ -283,7 +274,7 @@ trait ResponseHandler
'error_code' => '200',
]);
EventLoop::queue($request->reply(...), $response);
$request->reply($response);
}
/**
* @param array{error_message: string, error_code: int} $response
@ -314,8 +305,8 @@ trait ResponseHandler
&& !$request->shouldRefreshReferences()
) {
$this->API->logger("Got {$response['error_message']}, refreshing file reference and repeating method call...");
$this->gotResponseForOutgoingMessage($request);
$msgId = $request->getMsgId();
unset($this->new_outgoing[$msgId]);
$request->setRefreshReferences(true);
$request->setMsgId(null);
$request->setSeqNo(null);
@ -334,8 +325,8 @@ trait ResponseHandler
)
) {
$this->API->logger("Resending $request due to {$response['error_message']}");
$this->gotResponseForOutgoingMessage($request);
$msgId = $request->getMsgId();
unset($this->new_outgoing[$msgId]);
$request->setSent(hrtime(true) + (5*60 * 1_000_000_000));
$request->setMsgId(null);
$request->setSeqNo(null);
@ -386,8 +377,8 @@ trait ResponseHandler
)
) {
$this->API->logger("Resending $request due to {$response['error_message']}");
$this->gotResponseForOutgoingMessage($request);
$msgId = $request->getMsgId();
unset($this->new_outgoing[$msgId]);
$request->setSent(hrtime(true) + (5*60 * 1_000_000_000));
$request->setMsgId(null);
$request->setSeqNo(null);
@ -420,12 +411,7 @@ trait ResponseHandler
case 'AUTH_KEY_UNREGISTERED':
case 'AUTH_KEY_INVALID':
if ($this->API->authorized !== \danog\MadelineProto\API::LOGGED_IN) {
$this->gotResponseForOutgoingMessage($request);
EventLoop::queue(
$this->handleReject(...),
$request,
static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor)
);
$request->reply(static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor));
return null;
}
$this->session_id = null;
@ -457,8 +443,8 @@ trait ResponseHandler
$limit = $request->floodWaitLimit ?? $this->API->settings->getRPC()->getFloodTimeout();
if ($seconds < $limit) {
$this->API->logger("Flood, waiting $seconds seconds before repeating async call of $request...", Logger::NOTICE);
$this->gotResponseForOutgoingMessage($request);
$msgId = $request->getMsgId();
unset($this->new_outgoing[$msgId]);
$request->setSent(hrtime(true) + ($seconds * 1_000_000_000));
$request->setMsgId(null);
$request->setSeqNo(null);