1
0
mirror of https://github.com/danog/MadelineProto.git synced 2024-11-30 04:08:59 +01:00

Misc fixes

This commit is contained in:
Daniil Gentili 2024-10-21 15:50:43 +00:00
parent 8120ca7bce
commit 1efb43cbde
4 changed files with 16 additions and 30 deletions

View File

@ -49,24 +49,24 @@ final class WriteLoop extends Loop
public const MAX_IDS = 8192;
use Common {
__construct as init;
__construct as init2;
}
private int $pingTimeout;
private float $timeout;
private float $pollTimeout;
/**
* Constructor function.
*/
public function __construct(Connection $connection)
{
$this->init($connection);
$this->init2($connection);
$timeout = $this->shared->getSettings()->getPingInterval();
$this->pingTimeout = $timeout + 15;
if ($this->connection->isHttp()) {
$this->timeout = (float) max(self::LONG_POLL_TIMEOUT, $timeout);
$this->pollTimeout = (float) max(self::LONG_POLL_TIMEOUT, $timeout);
} else {
$this->timeout = (float) $timeout;
$this->pollTimeout = (float) $timeout;
}
}
/**
@ -83,7 +83,7 @@ final class WriteLoop extends Loop
}
if (!$this->connection->pendingOutgoing && !$first) {
$this->API->logger("No messages, pausing in $this...", Logger::ULTRA_VERBOSE);
return $this->timeout;
return $this->pollTimeout;
}
if ($please_wait) {
$this->API->logger("Have to wait for handshake, pausing in $this...", Logger::ULTRA_VERBOSE);
@ -119,8 +119,8 @@ final class WriteLoop extends Loop
{
if ($queue = $this->connection->unencrypted_check_queue) {
$this->connection->unencrypted_check_queue = [];
foreach ($queue as $msg_id => $_) {
$this->connection->methodRecall($msg_id);
foreach ($queue as $msg) {
$this->connection->methodRecall($msg);
}
}
while ($this->connection->pendingOutgoing) {
@ -189,7 +189,7 @@ final class WriteLoop extends Loop
$this->connection->objectCall('msgs_state_req', ['msg_ids' => $message_ids], $deferred);
EventLoop::queue(function () use ($deferred, $message_ids): void {
try {
$result = $deferred->getFuture()->await(new TimeoutCancellation($this->timeout));
$result = $deferred->getFuture()->await(new TimeoutCancellation($this->pollTimeout));
if (\is_callable($result)) {
throw $result();
}
@ -247,7 +247,7 @@ final class WriteLoop extends Loop
}
}
//} catch (CancelledException) {
//$this->API->logger("We did not receive a response for {$this->timeout} seconds: reconnecting and exiting check loop on DC {$this->datacenter}");
//$this->API->logger("We did not receive a response for {$this->pollTimeout} seconds: reconnecting and exiting check loop on DC {$this->datacenter}");
//EventLoop::queue($this->connection->reconnect(...));
} catch (\Throwable $e) {
$this->API->logger("Got exception in check loop for DC {$this->datacenter}");

View File

@ -408,7 +408,6 @@ final class MTProto implements TLCallback, LoggerGetter, SettingsGetter
*/
#[OrmMappedArray(KeyType::STRING, ValueType::SCALAR, cacheTtl: 0, optimizeIfWastedMb: 1, tablePostfix: 'session')]
public DbArray $sessionDb;
private bool $cleaned = false;
/**
* Returns an instance of a client by session name.
@ -1004,11 +1003,6 @@ final class MTProto implements TLCallback, LoggerGetter, SettingsGetter
$this->updateQueue = $q;
}
if ($this->cleaned) {
return;
}
$this->cleaned = true;
if (isset($this->channels_state)) {
$this->updateState = new CombinedUpdatesState;
foreach ($this->channels_state->get() as $channelId => $state) {
@ -1131,9 +1125,6 @@ final class MTProto implements TLCallback, LoggerGetter, SettingsGetter
$this->initPromise = $deferred->getFuture();
try {
// Update settings from constructor
$this->updateSettings($settings);
// Setup logger
$this->setupLogger();
@ -1163,6 +1154,8 @@ final class MTProto implements TLCallback, LoggerGetter, SettingsGetter
}
// Reset MTProto session (not related to user session)
$this->resetMTProtoSession("wakeup");
// Update settings from constructor
$this->updateSettings($settings);
// Update TL callbacks
$callbacks = [$this, $this->peerDatabase];
if ($this->settings->getDb()->getEnableFileReferenceDb()) {

View File

@ -110,7 +110,7 @@ trait CallHandler
return $readFuture->await();
}
private LocalKeyedMutex $abstractionQueueMutex;
private ?int $drop = null;
private ?float $drop = null;
/**
* Call method and make sure it is asynchronously sent (generator).
*
@ -182,7 +182,7 @@ trait CallHandler
if (!$encrypted && $this->shared->hasTempAuthKey()) {
$encrypted = true;
}
$timeout = new TimeoutCancellation($this->drop ??= (float) $this->getAPI()->getSettings()->getRpc()->getRpcDropTimeout());
$timeout = new TimeoutCancellation($this->drop ??= (float) $this->API->getSettings()->getRpc()->getRpcDropTimeout());
$cancellation = $cancellation !== null
? new CompositeCancellation($cancellation, $timeout)
: $timeout;
@ -222,7 +222,7 @@ trait CallHandler
{
$cancellation = $args['cancellation'] ?? null;
$cancellation?->throwIfRequested();
$timeout = new TimeoutCancellation($this->drop ??= (float) $this->getAPI()->getSettings()->getRpc()->getRpcDropTimeout());
$timeout = new TimeoutCancellation($this->drop ??= (float) $this->API->getSettings()->getRpc()->getRpcDropTimeout());
$cancellation = $cancellation !== null
? new CompositeCancellation($cancellation, $timeout)
: $timeout;

View File

@ -240,14 +240,7 @@ trait FilesLogic
if ($result->shouldServe()) {
$pipe = new Pipe(1024 * 1024);
[$start, $end] = $result->getServeRange();
EventLoop::queue(function () use ($messageMedia, $pipe, $cb, $start, $end, $cancellation): void {
try {
$this->downloadToStream($messageMedia, $pipe->getSink(), $cb, $start, $end, $cancellation);
} catch (\Throwable $e) {
$this->logger->logger($e, Logger::ERROR);
}
$pipe->getSink()->close();
});
async($this->downloadToStream(...), $messageMedia, $pipe->getSink(), $cb, $start, $end, $cancellation)->finally($pipe->getSink()->close(...));
$body = $pipe->getSource();
} elseif (!\in_array($result->getCode(), [HttpStatus::OK, HttpStatus::PARTIAL_CONTENT], true)) {
$body = $result->getCodeExplanation();