1
0
mirror of https://github.com/danog/MadelineProto.git synced 2025-01-22 21:31:28 +01:00

Restore v7 resume semantics for update actors

This commit is contained in:
Daniil Gentili 2023-12-13 21:20:16 +01:00
parent 9bbb72f12b
commit 86401b19e8
5 changed files with 55 additions and 11 deletions

View File

@ -18,11 +18,9 @@
*/
use danog\MadelineProto\EventHandler\Attributes\Handler;
use danog\MadelineProto\EventHandler\Message\ChannelMessage;
use danog\MadelineProto\EventHandler\Message\SecretMessage;
use danog\MadelineProto\EventHandler\SimpleFilter\Incoming;
use danog\MadelineProto\Logger;
use danog\MadelineProto\ParseMode;
use danog\MadelineProto\Settings;
use danog\MadelineProto\SimpleEventHandler;

View File

@ -59,7 +59,9 @@ final class ChannelMessage extends Message
)['messages'];
$r = end($r);
return $this->getClient()->wrapMessage($r);
$v = $this->getClient()->wrapMessage($r);
\assert($v instanceof GroupMessage);
return $v;
}
/**

View File

@ -99,6 +99,9 @@ final class FeedLoop extends Loop
}
return self::PAUSE;
}
/**
* @param array<int, array> $updates
*/
public function parse(array $updates): void
{
reset($updates);
@ -127,9 +130,12 @@ final class FeedLoop extends Loop
if ($result > 0) {
$logger('PTS hole');
$this->updater->setLimit($this->state->pts() + $result);
$this->updater->resume();
// Drop current update, it will be recovered anyway while filling the gap
continue;
$this->updater->resumeAndWait();
$this->incomingUpdates []= $update;
foreach ($updates as $update) {
$this->incomingUpdates []= $update;
}
return;
}
if (isset($update['message']['id'], $update['message']['peer_id']) && !\in_array($update['_'], ['updateEditMessage', 'updateEditChannelMessage', 'updateMessageID'], true)) {
if (!$this->API->checkMsgId($update['message'])) {

View File

@ -25,8 +25,6 @@ use danog\MadelineProto\Logger;
use danog\MadelineProto\Loop\InternalLoop;
use danog\MadelineProto\MTProtoTools\UpdatesState;
use function Amp\delay;
/**
* update feed loop.
*
@ -80,6 +78,9 @@ final class SeqLoop extends Loop
}
return self::PAUSE;
}
/**
* @param array<int, array> $updates
*/
public function parse(array $updates): void
{
reset($updates);
@ -94,9 +95,12 @@ final class SeqLoop extends Loop
$result = $this->state->checkSeq($seq_start);
if ($result > 0) {
$this->API->logger('Seq hole. seq_start: '.$seq_start.' != cur seq: '.($this->state->seq() + 1), Logger::ERROR);
$this->API->updaters[UpdateLoop::GENERIC]->resume();
// Drop current update, it will be recovered anyway while filling the gap
continue;
$this->API->updaters[UpdateLoop::GENERIC]->resumeAndWait();
$this->incomingUpdates []= $update;
foreach ($updates as $update) {
$this->incomingUpdates []= $update;
}
return;
}
if ($result < 0) {
$this->API->logger('Seq too old. seq_start: '.$seq_start.' != cur seq: '.($this->state->seq() + 1), Logger::ERROR);

View File

@ -20,6 +20,8 @@ declare(strict_types=1);
namespace danog\MadelineProto\Loop\Update;
use Amp\DeferredFuture;
use AssertionError;
use danog\Loop\Loop;
use danog\MadelineProto\Exception;
use danog\MadelineProto\Logger;
@ -29,6 +31,7 @@ use danog\MadelineProto\MTProtoTools\DialogId;
use danog\MadelineProto\PeerNotInDbException;
use danog\MadelineProto\PTSException;
use danog\MadelineProto\RPCErrorException;
use Revolt\EventLoop;
use function Amp\delay;
@ -68,10 +71,41 @@ final class UpdateLoop extends Loop
$this->init($API);
$this->channelId = $channelId;
}
private ?DeferredFuture $done = null;
public function resumeAndWait(): void
{
$i = 0;
do {
++$i;
$this->API->logger("Queued resume of $this (try $i) to recover gap...", Logger::NOTICE);
if ($this->done !== null) {
throw new AssertionError("Already waiting in $this!");
}
if ($this->isRunning()) {
throw new AssertionError("$this is not running!");
}
$this->done = new DeferredFuture;
// Can be false if we're currently running, so wait for it to finish and re-queue.
$resumed = $this->resume();
$this->done->getFuture()->await();
} while (!$resumed);
$this->API->logger("Resumed and re-paused $this (try $i) to recover gap!", Logger::NOTICE);
}
/**
* Main loop.
*/
public function loop(): ?float
{
try {
return $this->loopInner();
} finally {
if ($this->done !== null) {
EventLoop::queue($this->done->complete(...));
$this->done = null;
}
}
}
private function loopInner(): ?float
{
if (!$this->isLoggedIn()) {
return self::PAUSE;