diff --git a/src/MTProtoTools/Files.php b/src/MTProtoTools/Files.php index 732cb735e..2299e12d6 100644 --- a/src/MTProtoTools/Files.php +++ b/src/MTProtoTools/Files.php @@ -320,22 +320,26 @@ trait Files }; $resPromises = []; $start = microtime(true); + /** @var ?FloodPremiumWaitError */ + $floodWaitError = null; while ($part_num < $part_total_num || !$size) { if ($seekable) { - $writeCb = function () use ($method, $callable, $part_num, $cancellation, &$datacenter): WrappedFuture { + $writeCb = function () use ($method, $callable, $part_num, $cancellation, &$datacenter, &$floodWaitError): WrappedFuture { + $floodWaitError?->wait($cancellation); return $this->methodCallAsyncWrite( $method, - $callable($part_num) + ['cancellation' => $cancellation], + $callable($part_num) + ['cancellation' => $cancellation, 'floodWaitLimit' => 0], $datacenter ); }; } else { try { - $part = $callable($part_num) + ['cancellation' => $cancellation]; + $part = $callable($part_num) + ['cancellation' => $cancellation, 'floodWaitLimit' => 0]; } catch (StreamEof) { break; } - $writeCb = function () use ($method, $part, &$datacenter): WrappedFuture { + $writeCb = function () use ($method, $part, &$datacenter, &$floodWaitError, $cancellation): WrappedFuture { + $floodWaitError?->wait($cancellation); return $this->methodCallAsyncWrite( $method, $part, @@ -344,11 +348,11 @@ trait Files }; } $writePromise = async($writeCb); - EventLoop::queue(function () use ($writePromise, $cb, $part_num, $size, &$resPromises, $cancellation, $writeCb, &$datacenter): void { + EventLoop::queue(function () use ($writePromise, $cb, $part_num, $size, &$resPromises, $cancellation, $writeCb, &$datacenter, &$floodWaitError): void { + $d = new DeferredFuture; + $resPromises[] = $d->getFuture(); do { $readFuture = $writePromise->await($cancellation); - $d = new DeferredFuture; - $resPromises[] = $d->getFuture(); try { // Wrote chunk! if (!$readFuture->await($cancellation)) { @@ -361,11 +365,9 @@ trait Files $d->complete(); return; } catch (FloodPremiumWaitError $e) { - $this->logger("Got {$e->rpc} while uploading $part_num: {$datacenter}, retrying..."); - $writePromise = async(static function () use ($cancellation, $e, $writeCb): void { - $e->wait($cancellation); - $writeCb(); - }); + $this->logger("Got {$e->rpc} while uploading $part_num: {$datacenter}, waiting and retrying..."); + $floodWaitError = $e; + $writePromise = async($writeCb); } catch (FileRedirect $e) { $datacenter = $e->dc; $this->logger("Got redirect while uploading $part_num: {$datacenter}"); diff --git a/src/RPCError/RateLimitError.php b/src/RPCError/RateLimitError.php index 87f903daf..4ba56d2a6 100644 --- a/src/RPCError/RateLimitError.php +++ b/src/RPCError/RateLimitError.php @@ -29,14 +29,30 @@ use function Amp\delay; */ class RateLimitError extends RPCErrorException { + /** + * Indicates the absolute expiration time of the flood wait. + * + * @var positive-int + */ + public readonly int $expires; + /** @internal */ - public function __construct(string $message, public readonly int $waitTime, int $code, string $caller, ?Exception $previous = null) - { + public function __construct( + string $message, + /** @var positive-int */ + public readonly int $waitTime, + int $code, + string $caller, + ?Exception $previous = null + ) { + $this->expires = time() + $waitTime; parent::__construct($message, "A rate limit was encountered, please repeat the method call after $waitTime seconds", $code, $caller, $previous); } /** * Returns the required waiting period in seconds before repeating the RPC call. + * + * @return positive-int */ public function getWaitTime(): int { @@ -44,10 +60,23 @@ class RateLimitError extends RPCErrorException } /** - * Waits for the required waiting period. + * Returns the remaining waiting period in seconds before repeating the RPC call. + * + * @return int<0, max> + */ + public function getWaitTimeLeft(): int + { + return max(0, $this->expires - time()); + } + + /** + * Waits for the remaining waiting period. */ public function wait(?Cancellation $cancellation = null): void { - delay($this->waitTime, cancellation: $cancellation); + $left = $this->getWaitTimeLeft(); + if ($left > 0) { + delay($left, cancellation: $cancellation); + } } }