diff --git a/psalm-baseline.xml b/psalm-baseline.xml index 5c8a6cef2..e942195ff 100644 --- a/psalm-baseline.xml +++ b/psalm-baseline.xml @@ -3373,6 +3373,10 @@ state |= self::STATE_REPLIED]]> state |= self::STATE_SENT]]> + + connection->new_outgoing]]> + connection->outgoing_messages]]> + sent]]> sent]]> @@ -3402,9 +3406,6 @@ - - new_outgoing]]> - getSent()]]> getSent()]]> @@ -3518,6 +3519,8 @@ + new_outgoing[$message_id]]]> + new_outgoing[$message_id]]]> outgoing_messages[$message_id]]]> outgoing_messages[$message_id]]]> @@ -3645,6 +3648,12 @@ getMsgId()]]> + + new_outgoing]]> + new_outgoing]]> + new_outgoing]]> + new_outgoing]]> + API->authorized_dc == $this->datacenter && $this->API->authorized === \danog\MadelineProto\API::LOGGED_IN]]> @@ -4177,6 +4186,8 @@ + + @@ -4193,6 +4204,8 @@ + + diff --git a/src/Loop/Connection/CheckLoop.php b/src/Loop/Connection/CheckLoop.php index 36ffb5dd7..94272b110 100644 --- a/src/Loop/Connection/CheckLoop.php +++ b/src/Loop/Connection/CheckLoop.php @@ -23,6 +23,7 @@ namespace danog\MadelineProto\Loop\Connection; use Amp\CancelledException; use Amp\DeferredFuture; use Amp\TimeoutCancellation; +use Amp\TimeoutException; use danog\Loop\Loop; use danog\MadelineProto\Connection; use danog\MadelineProto\Logger; @@ -99,7 +100,7 @@ final class CheckLoop extends Loop case 2: case 3: if ($message->constructor === 'msgs_state_req') { - $this->connection->gotResponseForOutgoingMessage($message); + $message->reply(static fn () => new TimeoutException("Server did not receive message")); break; } $this->API->logger("Message $message not received by server, resending...", Logger::ERROR); @@ -115,8 +116,6 @@ final class CheckLoop extends Loop } elseif ($chr & 32) { if ($message->getSent() + $this->resendTimeout < hrtime(true)) { if ($message->isCancellationRequested()) { - unset($this->connection->new_outgoing[$message_id], $this->connection->outgoing_messages[$message_id]); - $this->API->logger("Cancelling $message...", Logger::ERROR); } else { $this->API->logger("Message $message received by server and is being processed for way too long, resending request...", Logger::ERROR); diff --git a/src/MTProto/MTProtoOutgoingMessage.php b/src/MTProto/MTProtoOutgoingMessage.php index 47ca658a9..6bc3ae8e1 100644 --- a/src/MTProto/MTProtoOutgoingMessage.php +++ b/src/MTProto/MTProtoOutgoingMessage.php @@ -197,6 +197,7 @@ class MTProtoOutgoingMessage extends MTProtoMessage ); } } + unset($this->connection->new_outgoing[$this->msgId], $this->connection->outgoing_messages[$this->msgId]); $this->serializedBody = null; $this->body = null; diff --git a/src/MTProtoSession/AckHandler.php b/src/MTProtoSession/AckHandler.php index fe574bbe7..a88467d9e 100644 --- a/src/MTProtoSession/AckHandler.php +++ b/src/MTProtoSession/AckHandler.php @@ -47,14 +47,6 @@ trait AckHandler } return true; } - /** - * We have gotten a response for an outgoing message. - */ - public function gotResponseForOutgoingMessage(MTProtoOutgoingMessage $outgoingMessage): void - { - // The server acknowledges that it received my message - unset($this->new_outgoing[$outgoingMessage->getMsgId()]); - } /** * Acknowledge incoming message ID. */ @@ -121,7 +113,7 @@ trait AckHandler || ($message->getSent() + $dropTimeout < hrtime(true)) ) { Logger::log('No reply for message: ' . $message, Logger::WARNING); - $this->handleReject($message, static fn () => new TimeoutException('Request timeout')); + $message->reply(static fn () => new TimeoutException('Request timeout')); continue; } if ($message->getState() & MTProtoOutgoingMessage::STATE_REPLIED) { diff --git a/src/MTProtoSession/CallHandler.php b/src/MTProtoSession/CallHandler.php index 7194144e6..ad4e63db6 100644 --- a/src/MTProtoSession/CallHandler.php +++ b/src/MTProtoSession/CallHandler.php @@ -60,7 +60,7 @@ trait CallHandler if ($datacenter) { /** @var MTProtoOutgoingMessage */ $message = $this->outgoing_messages[$message_id]; - $this->gotResponseForOutgoingMessage($message); + unset($this->new_outgoing[$message_id]); $message->setMsgId(null); $message->setSeqNo(null); EventLoop::queue(function () use ($datacenter, $message): void { @@ -71,7 +71,7 @@ trait CallHandler /** @var MTProtoOutgoingMessage */ $message = $this->outgoing_messages[$message_id]; if (!$message->hasSeqNo()) { - $this->gotResponseForOutgoingMessage($message); + unset($this->new_outgoing[$message_id]); } EventLoop::queue($this->sendMessage(...), $message); } diff --git a/src/MTProtoSession/ResponseHandler.php b/src/MTProtoSession/ResponseHandler.php index ea2f14a6a..29b729508 100644 --- a/src/MTProtoSession/ResponseHandler.php +++ b/src/MTProtoSession/ResponseHandler.php @@ -170,14 +170,6 @@ trait ResponseHandler $this->handleMessages([$message]); } } - /** - * @param callable(): \Throwable $data - */ - private function handleReject(MTProtoOutgoingMessage $message, callable $data): void - { - $this->gotResponseForOutgoingMessage($message); - $message->reply($data); - } /** * Handle RPC response. @@ -211,7 +203,7 @@ trait ResponseHandler $exception = static fn (): \Throwable => $e; } if ($exception) { - $this->handleReject($request, $exception); + $request->reply($exception); } return; } @@ -237,7 +229,7 @@ trait ResponseHandler EventLoop::queue($this->methodRecall(...), $requestId); return; } - $this->handleReject($request, static fn () => RPCErrorException::make('Received bad_msg_notification: ' . MTProto::BAD_MSG_ERROR_CODES[$response['error_code']], $response['error_code'], $request->constructor)); + $request->reply(static fn () => RPCErrorException::make('Received bad_msg_notification: ' . MTProto::BAD_MSG_ERROR_CODES[$response['error_code']], $response['error_code'], $request->constructor)); return; } @@ -275,7 +267,6 @@ trait ResponseHandler } } } - $this->gotResponseForOutgoingMessage($request); $this->requestResponse?->inc([ 'method' => $request->constructor, @@ -283,7 +274,7 @@ trait ResponseHandler 'error_code' => '200', ]); - EventLoop::queue($request->reply(...), $response); + $request->reply($response); } /** * @param array{error_message: string, error_code: int} $response @@ -314,8 +305,8 @@ trait ResponseHandler && !$request->shouldRefreshReferences() ) { $this->API->logger("Got {$response['error_message']}, refreshing file reference and repeating method call..."); - $this->gotResponseForOutgoingMessage($request); $msgId = $request->getMsgId(); + unset($this->new_outgoing[$msgId]); $request->setRefreshReferences(true); $request->setMsgId(null); $request->setSeqNo(null); @@ -334,8 +325,8 @@ trait ResponseHandler ) ) { $this->API->logger("Resending $request due to {$response['error_message']}"); - $this->gotResponseForOutgoingMessage($request); $msgId = $request->getMsgId(); + unset($this->new_outgoing[$msgId]); $request->setSent(hrtime(true) + (5*60 * 1_000_000_000)); $request->setMsgId(null); $request->setSeqNo(null); @@ -386,8 +377,8 @@ trait ResponseHandler ) ) { $this->API->logger("Resending $request due to {$response['error_message']}"); - $this->gotResponseForOutgoingMessage($request); $msgId = $request->getMsgId(); + unset($this->new_outgoing[$msgId]); $request->setSent(hrtime(true) + (5*60 * 1_000_000_000)); $request->setMsgId(null); $request->setSeqNo(null); @@ -420,12 +411,7 @@ trait ResponseHandler case 'AUTH_KEY_UNREGISTERED': case 'AUTH_KEY_INVALID': if ($this->API->authorized !== \danog\MadelineProto\API::LOGGED_IN) { - $this->gotResponseForOutgoingMessage($request); - EventLoop::queue( - $this->handleReject(...), - $request, - static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor) - ); + $request->reply(static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor)); return null; } $this->session_id = null; @@ -457,8 +443,8 @@ trait ResponseHandler $limit = $request->floodWaitLimit ?? $this->API->settings->getRPC()->getFloodTimeout(); if ($seconds < $limit) { $this->API->logger("Flood, waiting $seconds seconds before repeating async call of $request...", Logger::NOTICE); - $this->gotResponseForOutgoingMessage($request); $msgId = $request->getMsgId(); + unset($this->new_outgoing[$msgId]); $request->setSent(hrtime(true) + ($seconds * 1_000_000_000)); $request->setMsgId(null); $request->setSeqNo(null);