1
0
mirror of https://github.com/danog/MadelineProto.git synced 2024-12-12 10:39:38 +01:00

Properly sync state

This commit is contained in:
Daniil Gentili 2024-06-27 19:08:00 +02:00 committed by Alexander Pankratov
parent c2eafc22a3
commit 24f90ef57a
6 changed files with 30 additions and 39 deletions

View File

@ -3373,6 +3373,10 @@
<code><![CDATA[$this->state |= self::STATE_REPLIED]]></code> <code><![CDATA[$this->state |= self::STATE_REPLIED]]></code>
<code><![CDATA[$this->state |= self::STATE_SENT]]></code> <code><![CDATA[$this->state |= self::STATE_SENT]]></code>
</PossiblyInvalidPropertyAssignmentValue> </PossiblyInvalidPropertyAssignmentValue>
<PossiblyNullArrayOffset>
<code><![CDATA[$this->connection->new_outgoing]]></code>
<code><![CDATA[$this->connection->outgoing_messages]]></code>
</PossiblyNullArrayOffset>
<PossiblyNullOperand> <PossiblyNullOperand>
<code><![CDATA[$this->sent]]></code> <code><![CDATA[$this->sent]]></code>
<code><![CDATA[$this->sent]]></code> <code><![CDATA[$this->sent]]></code>
@ -3402,9 +3406,6 @@
</UnsupportedReferenceUsage> </UnsupportedReferenceUsage>
</file> </file>
<file src="src/MTProtoSession/AckHandler.php"> <file src="src/MTProtoSession/AckHandler.php">
<PossiblyNullArrayOffset>
<code><![CDATA[$this->new_outgoing]]></code>
</PossiblyNullArrayOffset>
<PossiblyNullOperand> <PossiblyNullOperand>
<code><![CDATA[$message->getSent()]]></code> <code><![CDATA[$message->getSent()]]></code>
<code><![CDATA[$message->getSent()]]></code> <code><![CDATA[$message->getSent()]]></code>
@ -3518,6 +3519,8 @@
<code><![CDATA[$sub['queueId']]]></code> <code><![CDATA[$sub['queueId']]]></code>
</MixedArrayAssignment> </MixedArrayAssignment>
<MixedArrayOffset> <MixedArrayOffset>
<code><![CDATA[$this->new_outgoing[$message_id]]]></code>
<code><![CDATA[$this->new_outgoing[$message_id]]]></code>
<code><![CDATA[$this->outgoing_messages[$message_id]]]></code> <code><![CDATA[$this->outgoing_messages[$message_id]]]></code>
<code><![CDATA[$this->outgoing_messages[$message_id]]]></code> <code><![CDATA[$this->outgoing_messages[$message_id]]]></code>
</MixedArrayOffset> </MixedArrayOffset>
@ -3645,6 +3648,12 @@
<code><![CDATA[$msgId]]></code> <code><![CDATA[$msgId]]></code>
<code><![CDATA[$request->getMsgId()]]></code> <code><![CDATA[$request->getMsgId()]]></code>
</PossiblyNullArgument> </PossiblyNullArgument>
<PossiblyNullArrayOffset>
<code><![CDATA[$this->new_outgoing]]></code>
<code><![CDATA[$this->new_outgoing]]></code>
<code><![CDATA[$this->new_outgoing]]></code>
<code><![CDATA[$this->new_outgoing]]></code>
</PossiblyNullArrayOffset>
<RedundantConditionGivenDocblockType> <RedundantConditionGivenDocblockType>
<code><![CDATA[$this->API->authorized_dc == $this->datacenter && $this->API->authorized === \danog\MadelineProto\API::LOGGED_IN]]></code> <code><![CDATA[$this->API->authorized_dc == $this->datacenter && $this->API->authorized === \danog\MadelineProto\API::LOGGED_IN]]></code>
</RedundantConditionGivenDocblockType> </RedundantConditionGivenDocblockType>
@ -4177,6 +4186,8 @@
<code><![CDATA[static function (string $payload, int $offset) use ($stream, $seekable, $lock) {]]></code> <code><![CDATA[static function (string $payload, int $offset) use ($stream, $seekable, $lock) {]]></code>
</MissingClosureReturnType> </MissingClosureReturnType>
<MixedArgument> <MixedArgument>
<code><![CDATA[$end]]></code>
<code><![CDATA[$end]]></code>
<code><![CDATA[$end]]></code> <code><![CDATA[$end]]></code>
<code><![CDATA[$end]]></code> <code><![CDATA[$end]]></code>
<code><![CDATA[$file]]></code> <code><![CDATA[$file]]></code>
@ -4193,6 +4204,8 @@
<code><![CDATA[$size]]></code> <code><![CDATA[$size]]></code>
<code><![CDATA[$start]]></code> <code><![CDATA[$start]]></code>
<code><![CDATA[$start]]></code> <code><![CDATA[$start]]></code>
<code><![CDATA[$start]]></code>
<code><![CDATA[$start]]></code>
<code><![CDATA[$stream]]></code> <code><![CDATA[$stream]]></code>
<code><![CDATA[$stream]]></code> <code><![CDATA[$stream]]></code>
<code><![CDATA[$url]]></code> <code><![CDATA[$url]]></code>

View File

@ -23,6 +23,7 @@ namespace danog\MadelineProto\Loop\Connection;
use Amp\CancelledException; use Amp\CancelledException;
use Amp\DeferredFuture; use Amp\DeferredFuture;
use Amp\TimeoutCancellation; use Amp\TimeoutCancellation;
use Amp\TimeoutException;
use danog\Loop\Loop; use danog\Loop\Loop;
use danog\MadelineProto\Connection; use danog\MadelineProto\Connection;
use danog\MadelineProto\Logger; use danog\MadelineProto\Logger;
@ -99,7 +100,7 @@ final class CheckLoop extends Loop
case 2: case 2:
case 3: case 3:
if ($message->constructor === 'msgs_state_req') { if ($message->constructor === 'msgs_state_req') {
$this->connection->gotResponseForOutgoingMessage($message); $message->reply(static fn () => new TimeoutException("Server did not receive message"));
break; break;
} }
$this->API->logger("Message $message not received by server, resending...", Logger::ERROR); $this->API->logger("Message $message not received by server, resending...", Logger::ERROR);
@ -115,8 +116,6 @@ final class CheckLoop extends Loop
} elseif ($chr & 32) { } elseif ($chr & 32) {
if ($message->getSent() + $this->resendTimeout < hrtime(true)) { if ($message->getSent() + $this->resendTimeout < hrtime(true)) {
if ($message->isCancellationRequested()) { if ($message->isCancellationRequested()) {
unset($this->connection->new_outgoing[$message_id], $this->connection->outgoing_messages[$message_id]);
$this->API->logger("Cancelling $message...", Logger::ERROR); $this->API->logger("Cancelling $message...", Logger::ERROR);
} else { } else {
$this->API->logger("Message $message received by server and is being processed for way too long, resending request...", Logger::ERROR); $this->API->logger("Message $message received by server and is being processed for way too long, resending request...", Logger::ERROR);

View File

@ -197,6 +197,7 @@ class MTProtoOutgoingMessage extends MTProtoMessage
); );
} }
} }
unset($this->connection->new_outgoing[$this->msgId], $this->connection->outgoing_messages[$this->msgId]);
$this->serializedBody = null; $this->serializedBody = null;
$this->body = null; $this->body = null;

View File

@ -47,14 +47,6 @@ trait AckHandler
} }
return true; return true;
} }
/**
* We have gotten a response for an outgoing message.
*/
public function gotResponseForOutgoingMessage(MTProtoOutgoingMessage $outgoingMessage): void
{
// The server acknowledges that it received my message
unset($this->new_outgoing[$outgoingMessage->getMsgId()]);
}
/** /**
* Acknowledge incoming message ID. * Acknowledge incoming message ID.
*/ */
@ -121,7 +113,7 @@ trait AckHandler
|| ($message->getSent() + $dropTimeout < hrtime(true)) || ($message->getSent() + $dropTimeout < hrtime(true))
) { ) {
Logger::log('No reply for message: ' . $message, Logger::WARNING); Logger::log('No reply for message: ' . $message, Logger::WARNING);
$this->handleReject($message, static fn () => new TimeoutException('Request timeout')); $message->reply(static fn () => new TimeoutException('Request timeout'));
continue; continue;
} }
if ($message->getState() & MTProtoOutgoingMessage::STATE_REPLIED) { if ($message->getState() & MTProtoOutgoingMessage::STATE_REPLIED) {

View File

@ -60,7 +60,7 @@ trait CallHandler
if ($datacenter) { if ($datacenter) {
/** @var MTProtoOutgoingMessage */ /** @var MTProtoOutgoingMessage */
$message = $this->outgoing_messages[$message_id]; $message = $this->outgoing_messages[$message_id];
$this->gotResponseForOutgoingMessage($message); unset($this->new_outgoing[$message_id]);
$message->setMsgId(null); $message->setMsgId(null);
$message->setSeqNo(null); $message->setSeqNo(null);
EventLoop::queue(function () use ($datacenter, $message): void { EventLoop::queue(function () use ($datacenter, $message): void {
@ -71,7 +71,7 @@ trait CallHandler
/** @var MTProtoOutgoingMessage */ /** @var MTProtoOutgoingMessage */
$message = $this->outgoing_messages[$message_id]; $message = $this->outgoing_messages[$message_id];
if (!$message->hasSeqNo()) { if (!$message->hasSeqNo()) {
$this->gotResponseForOutgoingMessage($message); unset($this->new_outgoing[$message_id]);
} }
EventLoop::queue($this->sendMessage(...), $message); EventLoop::queue($this->sendMessage(...), $message);
} }

View File

@ -170,14 +170,6 @@ trait ResponseHandler
$this->handleMessages([$message]); $this->handleMessages([$message]);
} }
} }
/**
* @param callable(): \Throwable $data
*/
private function handleReject(MTProtoOutgoingMessage $message, callable $data): void
{
$this->gotResponseForOutgoingMessage($message);
$message->reply($data);
}
/** /**
* Handle RPC response. * Handle RPC response.
@ -211,7 +203,7 @@ trait ResponseHandler
$exception = static fn (): \Throwable => $e; $exception = static fn (): \Throwable => $e;
} }
if ($exception) { if ($exception) {
$this->handleReject($request, $exception); $request->reply($exception);
} }
return; return;
} }
@ -237,7 +229,7 @@ trait ResponseHandler
EventLoop::queue($this->methodRecall(...), $requestId); EventLoop::queue($this->methodRecall(...), $requestId);
return; return;
} }
$this->handleReject($request, static fn () => RPCErrorException::make('Received bad_msg_notification: ' . MTProto::BAD_MSG_ERROR_CODES[$response['error_code']], $response['error_code'], $request->constructor)); $request->reply(static fn () => RPCErrorException::make('Received bad_msg_notification: ' . MTProto::BAD_MSG_ERROR_CODES[$response['error_code']], $response['error_code'], $request->constructor));
return; return;
} }
@ -275,7 +267,6 @@ trait ResponseHandler
} }
} }
} }
$this->gotResponseForOutgoingMessage($request);
$this->requestResponse?->inc([ $this->requestResponse?->inc([
'method' => $request->constructor, 'method' => $request->constructor,
@ -283,7 +274,7 @@ trait ResponseHandler
'error_code' => '200', 'error_code' => '200',
]); ]);
EventLoop::queue($request->reply(...), $response); $request->reply($response);
} }
/** /**
* @param array{error_message: string, error_code: int} $response * @param array{error_message: string, error_code: int} $response
@ -314,8 +305,8 @@ trait ResponseHandler
&& !$request->shouldRefreshReferences() && !$request->shouldRefreshReferences()
) { ) {
$this->API->logger("Got {$response['error_message']}, refreshing file reference and repeating method call..."); $this->API->logger("Got {$response['error_message']}, refreshing file reference and repeating method call...");
$this->gotResponseForOutgoingMessage($request);
$msgId = $request->getMsgId(); $msgId = $request->getMsgId();
unset($this->new_outgoing[$msgId]);
$request->setRefreshReferences(true); $request->setRefreshReferences(true);
$request->setMsgId(null); $request->setMsgId(null);
$request->setSeqNo(null); $request->setSeqNo(null);
@ -334,8 +325,8 @@ trait ResponseHandler
) )
) { ) {
$this->API->logger("Resending $request due to {$response['error_message']}"); $this->API->logger("Resending $request due to {$response['error_message']}");
$this->gotResponseForOutgoingMessage($request);
$msgId = $request->getMsgId(); $msgId = $request->getMsgId();
unset($this->new_outgoing[$msgId]);
$request->setSent(hrtime(true) + (5*60 * 1_000_000_000)); $request->setSent(hrtime(true) + (5*60 * 1_000_000_000));
$request->setMsgId(null); $request->setMsgId(null);
$request->setSeqNo(null); $request->setSeqNo(null);
@ -386,8 +377,8 @@ trait ResponseHandler
) )
) { ) {
$this->API->logger("Resending $request due to {$response['error_message']}"); $this->API->logger("Resending $request due to {$response['error_message']}");
$this->gotResponseForOutgoingMessage($request);
$msgId = $request->getMsgId(); $msgId = $request->getMsgId();
unset($this->new_outgoing[$msgId]);
$request->setSent(hrtime(true) + (5*60 * 1_000_000_000)); $request->setSent(hrtime(true) + (5*60 * 1_000_000_000));
$request->setMsgId(null); $request->setMsgId(null);
$request->setSeqNo(null); $request->setSeqNo(null);
@ -420,12 +411,7 @@ trait ResponseHandler
case 'AUTH_KEY_UNREGISTERED': case 'AUTH_KEY_UNREGISTERED':
case 'AUTH_KEY_INVALID': case 'AUTH_KEY_INVALID':
if ($this->API->authorized !== \danog\MadelineProto\API::LOGGED_IN) { if ($this->API->authorized !== \danog\MadelineProto\API::LOGGED_IN) {
$this->gotResponseForOutgoingMessage($request); $request->reply(static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor));
EventLoop::queue(
$this->handleReject(...),
$request,
static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor)
);
return null; return null;
} }
$this->session_id = null; $this->session_id = null;
@ -457,8 +443,8 @@ trait ResponseHandler
$limit = $request->floodWaitLimit ?? $this->API->settings->getRPC()->getFloodTimeout(); $limit = $request->floodWaitLimit ?? $this->API->settings->getRPC()->getFloodTimeout();
if ($seconds < $limit) { if ($seconds < $limit) {
$this->API->logger("Flood, waiting $seconds seconds before repeating async call of $request...", Logger::NOTICE); $this->API->logger("Flood, waiting $seconds seconds before repeating async call of $request...", Logger::NOTICE);
$this->gotResponseForOutgoingMessage($request);
$msgId = $request->getMsgId(); $msgId = $request->getMsgId();
unset($this->new_outgoing[$msgId]);
$request->setSent(hrtime(true) + ($seconds * 1_000_000_000)); $request->setSent(hrtime(true) + ($seconds * 1_000_000_000));
$request->setMsgId(null); $request->setMsgId(null);
$request->setSeqNo(null); $request->setSeqNo(null);