1
0
mirror of https://github.com/danog/MadelineProto.git synced 2025-01-10 16:28:16 +01:00

Fix handling of rate limiting during uploads, add getWaitTimeLeft and expires methods to all RateLimitErrors

This commit is contained in:
Daniil Gentili 2024-12-07 13:30:47 +00:00
parent 9f50145564
commit 79fd98e34e
2 changed files with 47 additions and 16 deletions

View File

@ -320,22 +320,26 @@ trait Files
}; };
$resPromises = []; $resPromises = [];
$start = microtime(true); $start = microtime(true);
/** @var ?FloodPremiumWaitError */
$floodWaitError = null;
while ($part_num < $part_total_num || !$size) { while ($part_num < $part_total_num || !$size) {
if ($seekable) { if ($seekable) {
$writeCb = function () use ($method, $callable, $part_num, $cancellation, &$datacenter): WrappedFuture { $writeCb = function () use ($method, $callable, $part_num, $cancellation, &$datacenter, &$floodWaitError): WrappedFuture {
$floodWaitError?->wait($cancellation);
return $this->methodCallAsyncWrite( return $this->methodCallAsyncWrite(
$method, $method,
$callable($part_num) + ['cancellation' => $cancellation], $callable($part_num) + ['cancellation' => $cancellation, 'floodWaitLimit' => 0],
$datacenter $datacenter
); );
}; };
} else { } else {
try { try {
$part = $callable($part_num) + ['cancellation' => $cancellation]; $part = $callable($part_num) + ['cancellation' => $cancellation, 'floodWaitLimit' => 0];
} catch (StreamEof) { } catch (StreamEof) {
break; break;
} }
$writeCb = function () use ($method, $part, &$datacenter): WrappedFuture { $writeCb = function () use ($method, $part, &$datacenter, &$floodWaitError, $cancellation): WrappedFuture {
$floodWaitError?->wait($cancellation);
return $this->methodCallAsyncWrite( return $this->methodCallAsyncWrite(
$method, $method,
$part, $part,
@ -344,11 +348,11 @@ trait Files
}; };
} }
$writePromise = async($writeCb); $writePromise = async($writeCb);
EventLoop::queue(function () use ($writePromise, $cb, $part_num, $size, &$resPromises, $cancellation, $writeCb, &$datacenter): void { EventLoop::queue(function () use ($writePromise, $cb, $part_num, $size, &$resPromises, $cancellation, $writeCb, &$datacenter, &$floodWaitError): void {
$d = new DeferredFuture;
$resPromises[] = $d->getFuture();
do { do {
$readFuture = $writePromise->await($cancellation); $readFuture = $writePromise->await($cancellation);
$d = new DeferredFuture;
$resPromises[] = $d->getFuture();
try { try {
// Wrote chunk! // Wrote chunk!
if (!$readFuture->await($cancellation)) { if (!$readFuture->await($cancellation)) {
@ -361,11 +365,9 @@ trait Files
$d->complete(); $d->complete();
return; return;
} catch (FloodPremiumWaitError $e) { } catch (FloodPremiumWaitError $e) {
$this->logger("Got {$e->rpc} while uploading $part_num: {$datacenter}, retrying..."); $this->logger("Got {$e->rpc} while uploading $part_num: {$datacenter}, waiting and retrying...");
$writePromise = async(static function () use ($cancellation, $e, $writeCb): void { $floodWaitError = $e;
$e->wait($cancellation); $writePromise = async($writeCb);
$writeCb();
});
} catch (FileRedirect $e) { } catch (FileRedirect $e) {
$datacenter = $e->dc; $datacenter = $e->dc;
$this->logger("Got redirect while uploading $part_num: {$datacenter}"); $this->logger("Got redirect while uploading $part_num: {$datacenter}");

View File

@ -29,14 +29,30 @@ use function Amp\delay;
*/ */
class RateLimitError extends RPCErrorException class RateLimitError extends RPCErrorException
{ {
/**
* Indicates the absolute expiration time of the flood wait.
*
* @var positive-int
*/
public readonly int $expires;
/** @internal */ /** @internal */
public function __construct(string $message, public readonly int $waitTime, int $code, string $caller, ?Exception $previous = null) public function __construct(
{ string $message,
/** @var positive-int */
public readonly int $waitTime,
int $code,
string $caller,
?Exception $previous = null
) {
$this->expires = time() + $waitTime;
parent::__construct($message, "A rate limit was encountered, please repeat the method call after $waitTime seconds", $code, $caller, $previous); parent::__construct($message, "A rate limit was encountered, please repeat the method call after $waitTime seconds", $code, $caller, $previous);
} }
/** /**
* Returns the required waiting period in seconds before repeating the RPC call. * Returns the required waiting period in seconds before repeating the RPC call.
*
* @return positive-int
*/ */
public function getWaitTime(): int public function getWaitTime(): int
{ {
@ -44,10 +60,23 @@ class RateLimitError extends RPCErrorException
} }
/** /**
* Waits for the required waiting period. * Returns the remaining waiting period in seconds before repeating the RPC call.
*
* @return int<0, max>
*/
public function getWaitTimeLeft(): int
{
return max(0, $this->expires - time());
}
/**
* Waits for the remaining waiting period.
*/ */
public function wait(?Cancellation $cancellation = null): void public function wait(?Cancellation $cancellation = null): void
{ {
delay($this->waitTime, cancellation: $cancellation); $left = $this->getWaitTimeLeft();
if ($left > 0) {
delay($left, cancellation: $cancellation);
}
} }
} }