1
0
mirror of https://github.com/danog/MadelineProto.git synced 2024-11-30 04:08:59 +01:00
This commit is contained in:
Daniil Gentili 2024-11-23 02:55:37 -05:00 committed by GitHub
commit 2a128af981
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 316 additions and 449 deletions

View File

@ -3415,9 +3415,6 @@
</UnsupportedReferenceUsage>
</file>
<file src="src/MTProtoSession/AckHandler.php">
<PossiblyNullArrayOffset>
<code><![CDATA[$this->new_outgoing]]></code>
</PossiblyNullArrayOffset>
<PossiblyNullOperand>
<code><![CDATA[$message->getSent()]]></code>
<code><![CDATA[$message->getSent()]]></code>
@ -3531,6 +3528,8 @@
<code><![CDATA[$sub['queueId']]]></code>
</MixedArrayAssignment>
<MixedArrayOffset>
<code><![CDATA[$this->new_outgoing[$message_id]]]></code>
<code><![CDATA[$this->new_outgoing[$message_id]]]></code>
<code><![CDATA[$this->outgoing_messages[$message_id]]]></code>
<code><![CDATA[$this->outgoing_messages[$message_id]]]></code>
</MixedArrayOffset>
@ -3658,6 +3657,12 @@
<code><![CDATA[$msgId]]></code>
<code><![CDATA[$request->getMsgId()]]></code>
</PossiblyNullArgument>
<PossiblyNullArrayOffset>
<code><![CDATA[$this->new_outgoing]]></code>
<code><![CDATA[$this->new_outgoing]]></code>
<code><![CDATA[$this->new_outgoing]]></code>
<code><![CDATA[$this->new_outgoing]]></code>
</PossiblyNullArrayOffset>
<RedundantConditionGivenDocblockType>
<code><![CDATA[$this->API->authorized_dc == $this->datacenter && $this->API->authorized === \danog\MadelineProto\API::LOGGED_IN]]></code>
</RedundantConditionGivenDocblockType>
@ -4190,6 +4195,8 @@
<code><![CDATA[static function (string $payload, int $offset) use ($stream, $seekable, $lock) {]]></code>
</MissingClosureReturnType>
<MixedArgument>
<code><![CDATA[$end]]></code>
<code><![CDATA[$end]]></code>
<code><![CDATA[$end]]></code>
<code><![CDATA[$end]]></code>
<code><![CDATA[$file]]></code>
@ -4206,6 +4213,8 @@
<code><![CDATA[$size]]></code>
<code><![CDATA[$start]]></code>
<code><![CDATA[$start]]></code>
<code><![CDATA[$start]]></code>
<code><![CDATA[$start]]></code>
<code><![CDATA[$stream]]></code>
<code><![CDATA[$stream]]></code>
<code><![CDATA[$url]]></code>

View File

@ -76,13 +76,13 @@ final class APIWrapper
return $this->API;
}
private ?int $drop = null;
private ?float $drop = null;
/**
* @internal
*/
public function getRpcDropCancellation(): Cancellation
{
return new TimeoutCancellation($this->drop ??= $this->getAPI()->getSettings()->getRpc()->getRpcDropTimeout());
return new TimeoutCancellation($this->drop ??= (float) $this->getAPI()->getSettings()->getRpc()->getRpcDropTimeout());
}
/**

View File

@ -72,21 +72,11 @@ final class Connection
*
*/
protected ?ReadLoop $reader = null;
/**
* Checker loop.
*
*/
protected ?CheckLoop $checker = null;
/**
* Waiter loop.
*
*/
protected ?HttpWaitLoop $waiter = null;
/**
* Ping loop.
*
*/
protected ?PingLoop $pinger = null;
/**
* Cleanup loop.
*
@ -301,29 +291,14 @@ final class Connection
$this->httpResCount = 0;
$this->writer ??= new WriteLoop($this);
$this->reader ??= new ReadLoop($this);
$this->checker ??= new CheckLoop($this);
$this->cleanup ??= new CleanupLoop($this);
$this->waiter ??= new HttpWaitLoop($this);
$this->handler ??= new GenericLoop(fn () => $this->handleMessages($this->new_incoming), "Handler loop");
if (!isset($this->pinger) && !$ctx->isMedia() && !$ctx->isCDN() && !$this->isHttp()) {
$this->pinger = new PingLoop($this);
}
foreach ($this->new_outgoing as $message_id => $message) {
if ($message->unencrypted) {
if (!($message->getState() & MTProtoOutgoingMessage::STATE_REPLIED)) {
$message->reply(static fn () => new Exception('Restart because we were reconnected'));
}
unset($this->new_outgoing[$message_id], $this->outgoing_messages[$message_id]);
}
foreach ($this->unencrypted_new_outgoing as $message_id => $message) {
$message->reply(static fn () => new Exception('Restart because we were reconnected'));
}
Assert::true($this->writer->start(), "Could not start writer stream");
Assert::true($this->reader->start(), "Could not start reader stream");
Assert::true($this->checker->start(), "Could not start checker stream");
Assert::true($this->cleanup->start(), "Could not start cleanup stream");
$this->waiter->start();
if ($this->pinger) {
Assert::true($this->pinger->start(), "Could not start pinger stream");
}
$this->handler->start();
EventLoop::queue($this->shared->initAuthorization(...));
@ -586,9 +561,7 @@ final class Connection
$this->pendingOutgoing[$this->pendingOutgoingKey++] = $message;
$this->outgoingCtr?->inc();
$this->pendingOutgoingGauge?->set(\count($this->pendingOutgoing));
if (isset($this->writer)) {
$this->writer->resume();
}
$this->flush();
$this->connect();
$promise->await();
}
@ -601,18 +574,6 @@ final class Connection
$this->writer->resume();
}
}
/**
* Resume HttpWaiter.
*/
public function pingHttpWaiter(): void
{
if (isset($this->waiter)) {
$this->waiter->resume();
}
if (isset($this->pinger)) {
$this->pinger->resume();
}
}
/**
* Connect main instance.
*
@ -663,9 +624,7 @@ final class Connection
$this->reader?->stop();
$this->writer?->stop();
$this->checker?->stop();
$this->cleanup?->stop();
$this->pinger?->stop();
if (!$temporary) {
$this->shared->signalDisconnect($this->id);

View File

@ -204,9 +204,6 @@ final class DataCenterConnection implements JsonSerializable
$logger->logger("Done initing auth for DC {$this->datacenter}", Logger::NOTICE);
EventLoop::queue($lock->release(...));
}
if ($this->hasTempAuthKey()) {
$connection->pingHttpWaiter();
}
}
/**
* Bind temporary and permanent auth keys.

View File

@ -1,163 +0,0 @@
<?php
declare(strict_types=1);
/**
* RPC call status check loop.
*
* This file is part of MadelineProto.
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with MadelineProto.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
* @copyright 2016-2023 Daniil Gentili <daniil@daniil.it>
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
* @link https://docs.madelineproto.xyz MadelineProto documentation
*/
namespace danog\MadelineProto\Loop\Connection;
use Amp\CancelledException;
use Amp\DeferredFuture;
use Amp\TimeoutCancellation;
use danog\Loop\Loop;
use danog\MadelineProto\Connection;
use danog\MadelineProto\Logger;
use Revolt\EventLoop;
/**
* RPC call status check loop.
*
* @internal
*
* @author Daniil Gentili <daniil@daniil.it>
*/
final class CheckLoop extends Loop
{
use Common {
__construct as initCommon;
}
private int $resendTimeout;
public function __construct(Connection $connection)
{
$this->initCommon($connection);
$this->resendTimeout = (int) ($this->API->settings->getRpc()->getRpcResendTimeout() * 1_000_000_000.0);
}
/**
* Main loop.
*/
protected function loop(): ?float
{
if (!$this->connection->new_outgoing) {
return self::PAUSE;
}
if (!$this->connection->hasPendingCalls()) {
return $this->timeoutSeconds;
}
if ($this->shared->hasTempAuthKey()) {
$full_message_ids = $this->connection->getPendingCalls();
foreach (array_chunk($full_message_ids, WriteLoop::MAX_IDS) as $message_ids) {
$deferred = new DeferredFuture();
$list = '';
// Don't edit this here pls
foreach ($message_ids as $message_id) {
if (!isset($this->connection->outgoing_messages[$message_id])) {
continue;
}
$list .= $this->connection->outgoing_messages[$message_id]->constructor.', ';
}
$this->API->logger("Still missing {$list} on DC {$this->datacenter}, sending state request", Logger::ERROR);
$this->connection->objectCall('msgs_state_req', ['msg_ids' => $message_ids], $deferred);
EventLoop::queue(function () use ($deferred, $message_ids): void {
try {
$result = $deferred->getFuture()->await(new TimeoutCancellation($this->timeout));
if (\is_callable($result)) {
throw $result();
}
$reply = [];
foreach (str_split($result['info']) as $key => $chr) {
$message_id = $message_ids[$key];
if (!isset($this->connection->outgoing_messages[$message_id])) {
$this->API->logger("Already got response for and forgot about message ID $message_id");
continue;
}
if (!isset($this->connection->new_outgoing[$message_id])) {
$this->API->logger('Already got response for '.$this->connection->outgoing_messages[$message_id]);
continue;
}
$message = $this->connection->new_outgoing[$message_id];
$chr = \ord($chr);
switch ($chr & 7) {
case 0:
$this->API->logger("Wrong message status 0 for $message", Logger::FATAL_ERROR);
break;
case 1:
case 2:
case 3:
if ($message->constructor === 'msgs_state_req') {
$this->connection->gotResponseForOutgoingMessage($message);
break;
}
$this->API->logger("Message $message not received by server, resending...", Logger::ERROR);
$this->connection->methodRecall($message_id);
break;
case 4:
if ($chr & 128) {
$this->API->logger("Message $message received by server and was already sent, requesting reply...", Logger::ERROR);
$reply[] = $message_id;
} elseif ($chr & 64) {
$this->API->logger("Message $message received by server and was already processed, requesting reply...", Logger::ERROR);
$reply[] = $message_id;
} elseif ($chr & 32) {
if ($message->getSent() + $this->resendTimeout < hrtime(true)) {
if ($message->isCancellationRequested()) {
unset($this->connection->new_outgoing[$message_id], $this->connection->outgoing_messages[$message_id]);
$this->API->logger("Cancelling $message...", Logger::ERROR);
} else {
$this->API->logger("Message $message received by server and is being processed for way too long, resending request...", Logger::ERROR);
$this->connection->methodRecall($message_id);
}
} else {
$this->API->logger("Message $message received by server and is being processed, waiting...", Logger::ERROR);
}
} else {
$this->API->logger("Message $message received by server, waiting...", Logger::ERROR);
$reply[] = $message_id;
}
}
}
//} catch (CancelledException) {
//$this->API->logger("We did not receive a response for {$this->timeout} seconds: reconnecting and exiting check loop on DC {$this->datacenter}");
//EventLoop::queue($this->connection->reconnect(...));
} catch (\Throwable $e) {
$this->API->logger("Got exception in check loop for DC {$this->datacenter}");
$this->API->logger((string) $e);
}
});
}
} else {
foreach ($this->connection->new_outgoing as $message_id => $message) {
if ($message->wasSent()
&& $message->getSent() + $this->timeout < hrtime(true)
&& $message->unencrypted
) {
$this->API->logger("Still missing $message on DC {$this->datacenter}, resending", Logger::ERROR);
$this->connection->methodRecall($message->getMsgId());
}
}
}
return $this->timeoutSeconds;
}
/**
* Loop name.
*/
public function __toString(): string
{
return "check loop in DC {$this->datacenter}";
}
}

View File

@ -63,7 +63,8 @@ final class ReadLoop extends Loop
}
EventLoop::queue(function () use ($e): void {
if ($e instanceof NothingInTheSocketException
&& !$this->connection->hasPendingCalls()
&& !$this->connection->unencrypted_new_outgoing
&& !$this->connection->new_outgoing
&& $this->connection->isMedia()
&& !$this->connection->isWriting()
&& $this->shared->hasTempAuthKey()
@ -71,7 +72,7 @@ final class ReadLoop extends Loop
$this->API->logger("Got NothingInTheSocketException in DC {$this->datacenter}, disconnecting because we have nothing to do...", Logger::ERROR);
$this->connection->disconnect(true);
} else {
$this->API->logger($e);
$this->API->logger($e, Logger::ERROR);
$this->API->logger("Got exception in DC {$this->datacenter}, reconnecting...", Logger::ERROR);
$this->connection->reconnect();
}
@ -93,6 +94,9 @@ final class ReadLoop extends Loop
foreach ($this->connection->new_outgoing as $message) {
$message->resetSent();
}
foreach ($this->connection->unencrypted_new_outgoing as $message) {
$message->resetSent();
}
$this->shared->reconnect();
} else {
$this->connection->reconnect();
@ -116,9 +120,6 @@ final class ReadLoop extends Loop
return self::STOP;
}
$this->connection->httpReceived();
if ($this->connection->isHttp()) {
$this->connection->pingHttpWaiter();
}
$this->connection->wakeupHandler();
return self::CONTINUE;
}

View File

@ -21,7 +21,9 @@ declare(strict_types=1);
namespace danog\MadelineProto\Loop\Connection;
use Amp\ByteStream\StreamException;
use Amp\DeferredFuture;
use danog\Loop\Loop;
use danog\MadelineProto\Connection;
use danog\MadelineProto\Logger;
use danog\MadelineProto\MTProto\Container;
use danog\MadelineProto\MTProto\MTProtoOutgoingMessage;
@ -42,23 +44,46 @@ final class WriteLoop extends Loop
{
private const MAX_COUNT = 1020;
private const MAX_SIZE = 1 << 15;
private const LONG_POLL_TIMEOUT = 30;
private const LONG_POLL_TIMEOUT_MS = self::LONG_POLL_TIMEOUT*1000;
public const MAX_IDS = 8192;
use Common;
use Common {
__construct as init2;
}
private int $pingTimeout;
private float $pollTimeout;
/**
* Constructor function.
*/
public function __construct(Connection $connection)
{
$this->init2($connection);
$timeout = $this->shared->getSettings()->getPingInterval();
$this->pingTimeout = $timeout + 15;
if ($this->connection->isHttp()) {
$this->pollTimeout = (float) max(self::LONG_POLL_TIMEOUT, $timeout);
} else {
$this->pollTimeout = (float) $timeout;
}
}
/**
* Main loop.
*/
public function loop(): ?float
{
$please_wait = false;
$first = true;
while (true) {
if ($this->connection->shouldReconnect()) {
$this->API->logger("Exiting $this because connection is old");
return self::STOP;
}
if (!$this->connection->pendingOutgoing) {
if (!$this->connection->pendingOutgoing && !$first) {
$this->API->logger("No messages, pausing in $this...", Logger::ULTRA_VERBOSE);
return self::PAUSE;
return $this->pollTimeout;
}
if ($please_wait) {
$this->API->logger("Have to wait for handshake, pausing in $this...", Logger::ULTRA_VERBOSE);
@ -67,7 +92,7 @@ final class WriteLoop extends Loop
$this->connection->writing(true);
try {
$please_wait = $this->shared->hasTempAuthKey()
? $this->encryptedWriteLoop()
? $this->encryptedWriteLoop($first)
: $this->unencryptedWriteLoop();
} catch (StreamException $e) {
if ($this->connection->shouldReconnect()) {
@ -87,10 +112,17 @@ final class WriteLoop extends Loop
} finally {
$this->connection->writing(false);
}
$first = false;
}
}
public function unencryptedWriteLoop(): bool
{
if ($queue = $this->connection->unencrypted_check_queue) {
$this->connection->unencrypted_check_queue = [];
foreach ($queue as $msg) {
$this->connection->methodRecall($msg);
}
}
while ($this->connection->pendingOutgoing) {
$skipped_all = true;
foreach ($this->connection->pendingOutgoing as $k => $message) {
@ -123,7 +155,7 @@ final class WriteLoop extends Loop
$this->connection->pendingOutgoingGauge?->set(\count($this->connection->pendingOutgoing));
$message->setMsgId($message_id);
$this->connection->outgoing_messages[$message_id] = $message;
$this->connection->new_outgoing[$message_id] = $message;
$this->connection->unencrypted_new_outgoing[$message_id] = $message;
$message->sent();
}
@ -133,16 +165,97 @@ final class WriteLoop extends Loop
}
return false;
}
public function encryptedWriteLoop(): bool
public function encryptedWriteLoop(bool $first): bool
{
do {
if (!$this->shared->hasTempAuthKey()) {
return false;
}
if ($this->connection->isHttp() && empty($this->connection->pendingOutgoing)) {
if (!$first && !$this->connection->pendingOutgoing) {
return false;
}
foreach ($this->connection->check_queue as $msg_id => $_) {
$deferred = new DeferredFuture();
$list = '';
// Don't edit this here pls
foreach ($message_ids as $message_id) {
if (!isset($this->connection->outgoing_messages[$message_id])) {
continue;
}
$list .= $this->connection->outgoing_messages[$message_id]->constructor.', ';
}
$this->API->logger("Still missing {$list} on DC {$this->datacenter}, sending state request", Logger::ERROR);
$this->connection->objectCall('msgs_state_req', ['msg_ids' => $message_ids], $deferred);
EventLoop::queue(function () use ($deferred, $message_ids): void {
try {
$result = $deferred->getFuture()->await(new TimeoutCancellation($this->pollTimeout));
if (\is_callable($result)) {
throw $result();
}
$reply = [];
foreach (str_split($result['info']) as $key => $chr) {
$message_id = $message_ids[$key];
if (!isset($this->connection->outgoing_messages[$message_id])) {
$this->API->logger("Already got response for and forgot about message ID $message_id");
continue;
}
if (!isset($this->connection->new_outgoing[$message_id])) {
$this->API->logger('Already got response for '.$this->connection->outgoing_messages[$message_id]);
continue;
}
$message = $this->connection->new_outgoing[$message_id];
$chr = \ord($chr);
switch ($chr & 7) {
case 0:
$this->API->logger("Wrong message status 0 for $message", Logger::FATAL_ERROR);
break;
case 1:
case 2:
case 3:
if ($message->constructor === 'msgs_state_req') {
$this->connection->gotResponseForOutgoingMessage($message);
break;
}
$this->API->logger("Message $message not received by server, resending...", Logger::ERROR);
$this->connection->methodRecall($message_id);
break;
case 4:
if ($chr & 128) {
$this->API->logger("Message $message received by server and was already sent, requesting reply...", Logger::ERROR);
$reply[] = $message_id;
} elseif ($chr & 64) {
$this->API->logger("Message $message received by server and was already processed, requesting reply...", Logger::ERROR);
$reply[] = $message_id;
} elseif ($chr & 32) {
if ($message->getSent() + $this->resendTimeout < hrtime(true)) {
if ($message->cancellation?->isRequested()) {
unset($this->connection->new_outgoing[$message_id], $this->connection->outgoing_messages[$message_id]);
$this->API->logger("Cancelling $message...", Logger::ERROR);
} else {
$this->API->logger("Message $message received by server and is being processed for way too long, resending request...", Logger::ERROR);
$this->connection->methodRecall($message_id);
}
} else {
$this->API->logger("Message $message received by server and is being processed, waiting...", Logger::ERROR);
}
} else {
$this->API->logger("Message $message received by server, waiting...", Logger::ERROR);
$reply[] = $message_id;
}
}
}
//} catch (CancelledException) {
//$this->API->logger("We did not receive a response for {$this->pollTimeout} seconds: reconnecting and exiting check loop on DC {$this->datacenter}");
//EventLoop::queue($this->connection->reconnect(...));
} catch (\Throwable $e) {
$this->API->logger("Got exception in check loop for DC {$this->datacenter}");
$this->API->logger((string) $e);
}
});
}
ksort($this->connection->pendingOutgoing);
$messages = [];
@ -286,7 +399,7 @@ final class WriteLoop extends Loop
$total_length += $actual_length;
$MTmessage['bytes'] = $body_length;
$messages[] = $MTmessage;
$keys[$k] = $message_id;
$keys[$k] = $message;
$message->setSeqNo($MTmessage['seqno'])
->setMsgId($MTmessage['msg_id']);
@ -325,10 +438,10 @@ final class WriteLoop extends Loop
if ($count > 1 || $has_seq) {
$this->API->logger("Wrapping in msg_container ({$count} messages of total size {$total_length}) as encrypted message for DC {$this->datacenter}", Logger::ULTRA_VERBOSE);
$message_id = $this->connection->msgIdHandler->generateMessageId();
$this->connection->pendingOutgoing[$this->connection->pendingOutgoingKey] = new Container($this->connection, array_values($keys));
$this->connection->pendingOutgoing[$this->connection->pendingOutgoingKey] = $ct = new Container($this->connection, $keys);
$this->connection->outgoingCtr?->inc();
$this->connection->pendingOutgoingGauge?->set(\count($this->connection->pendingOutgoing));
$keys[$this->connection->pendingOutgoingKey++] = $message_id;
$keys[$this->connection->pendingOutgoingKey++] = $ct;
$message_data = $this->API->getTL()->serializeObject(['type' => ''], ['_' => 'msg_container', 'messages' => $messages], 'container');
$message_data_length = \strlen($message_data);
$seq_no = $this->connection->generateOutSeqNo(false);
@ -364,8 +477,7 @@ final class WriteLoop extends Loop
$this->connection->ack_queue = \array_slice($this->connection->ack_queue, $ackCount);
}
foreach ($keys as $key => $message_id) {
$message = $this->connection->pendingOutgoing[$key];
foreach ($keys as $key => $message) {
unset($this->connection->pendingOutgoing[$key]);
$this->connection->outgoing_messages[$message_id] = $message;
if ($message->hasPromise()) {

View File

@ -29,28 +29,13 @@ use danog\MadelineProto\Connection;
*/
final class Container extends MTProtoOutgoingMessage
{
/**
* Message IDs.
*
* @var list<int>
*/
private array $ids = [];
/**
* Constructor.
*
* @param list<int> $ids
* @param list<MTProtoOutgoingMessage> $msgs
*/
public function __construct(Connection $connection, array $ids)
public function __construct(Connection $connection, public readonly array $msgs)
{
$this->ids = $ids;
parent::__construct($connection, [], 'msg_container', '', false, false);
}
/**
* Get message IDs.
*/
public function getIds(): array
{
return $this->ids;
parent::__construct($connection, [], 'msg_container', '', false, false, null);
}
}

View File

@ -89,6 +89,8 @@ class MTProtoOutgoingMessage extends MTProtoMessage
*/
private int $tries = 0;
private ?string $checkTimer = null;
/**
* Whether this message is related to a user, as in getting a successful reply means we have auth.
*/
@ -110,6 +112,7 @@ class MTProtoOutgoingMessage extends MTProtoMessage
public readonly string $type,
public readonly bool $isMethod,
public readonly bool $unencrypted,
public readonly ?Cancellation $cancellation,
public readonly ?string $subtype = null,
/**
* Whether this message is related to a file upload, as in getting a redirect should redirect to a media server.
@ -126,7 +129,6 @@ class MTProtoOutgoingMessage extends MTProtoMessage
public readonly ?int $takeoutId = null,
public readonly ?string $businessConnectionId = null,
private ?DeferredFuture $resultDeferred = null,
public readonly ?Cancellation $cancellation = null
) {
$this->userRelated = $constructor === 'users.getUsers' && $body === ['id' => [['_' => 'inputUserSelf']]] || $constructor === 'auth.exportAuthorization' || $constructor === 'updates.getDifference';
@ -158,14 +160,6 @@ class MTProtoOutgoingMessage extends MTProtoMessage
});
}
/**
* Whether cancellation is requested.
*/
public function isCancellationRequested(): bool
{
return $this->cancellation?->isRequested() ?? false;
}
/**
* Signal that we're trying to send the message.
*/
@ -188,12 +182,46 @@ class MTProtoOutgoingMessage extends MTProtoMessage
}
$this->state |= self::STATE_SENT;
$this->sent = hrtime(true);
$this->checkTimer = EventLoop::delay(
$this->connection->API->getSettings()->getConnection()->getTimeout(),
$this->check(...)
);
if (isset($this->sendDeferred)) {
$sendDeferred = $this->sendDeferred;
$this->sendDeferred = null;
$sendDeferred->complete();
}
}
private function check(): void
{
if ($this->state & self::STATE_REPLIED) {
return;
}
$shared = $this->connection->getShared();
$settings = $shared->getSettings();
$global = $shared->getGenericSettings();
$timeout = (float) $settings->getTimeout();
$pfs = $global->getAuth()->getPfs();
$unencrypted = !$shared->hasTempAuthKey();
$notBound = !$shared->isBound();
$pfsNotBound = $pfs && $notBound;
$this->checkTimer = EventLoop::delay(
$timeout,
$this->check(...)
);
if ($this->unencrypted === $unencrypted) {
if (!$unencrypted && $pfsNotBound && $this->constructor !== 'auth.bindTempAuthKey') {
return;
}
if ($unencrypted) {
$this->connection->unencrypted_check_queue[$this->msgId] = true;
} else {
$this->connection->check_queue[$this->msgId] = true;
}
$this->connection->flush();
}
}
/**
* Set reply to message.
*
@ -209,6 +237,10 @@ class MTProtoOutgoingMessage extends MTProtoMessage
if (!($this->state & self::STATE_SENT)) {
$this->sent();
}
if ($this->checkTimer !== null) {
EventLoop::cancel($this->checkTimer);
$this->checkTimer = null;
}
if ($this->isMethod) {
$this->connection->inFlightGauge?->dec([
@ -221,6 +253,9 @@ class MTProtoOutgoingMessage extends MTProtoMessage
);
}
}
if ($this->msgId !== null) {
unset($this->connection->new_outgoing[$this->msgId], $this->connection->outgoing_messages[$this->msgId]);
}
$this->serializedBody = null;
$this->body = null;

View File

@ -20,7 +20,6 @@ declare(strict_types=1);
namespace danog\MadelineProto\MTProtoSession;
use Amp\TimeoutException;
use danog\MadelineProto\DataCenterConnection;
use danog\MadelineProto\Logger;
use danog\MadelineProto\MTProto\MTProtoIncomingMessage;
@ -47,14 +46,6 @@ trait AckHandler
}
return true;
}
/**
* We have gotten a response for an outgoing message.
*/
public function gotResponseForOutgoingMessage(MTProtoOutgoingMessage $outgoingMessage): void
{
// The server acknowledges that it received my message
unset($this->new_outgoing[$outgoingMessage->getMsgId()]);
}
/**
* Acknowledge incoming message ID.
*/
@ -66,72 +57,4 @@ trait AckHandler
// I let the server know that I received its message
$this->ack_queue[$message_id] = $message_id;
}
/**
* Check if there are some pending calls.
*/
public function hasPendingCalls(): bool
{
$timeout = (int) ($this->shared->getSettings()->getTimeout() * 1_000_000_000.0);
$pfs = $this->shared->getGenericSettings()->getAuth()->getPfs();
$unencrypted = !$this->shared->hasTempAuthKey();
$notBound = !$this->shared->isBound();
$pfsNotBound = $pfs && $notBound;
/** @var MTProtoOutgoingMessage */
foreach ($this->new_outgoing as $message) {
if ($message->wasSent()
&& $message->getSent() + $timeout < hrtime(true)
&& $message->unencrypted === $unencrypted
&& $message->constructor !== 'msgs_state_req') {
if (!$unencrypted && $pfsNotBound && $message->constructor !== 'auth.bindTempAuthKey') {
continue;
}
return true;
}
}
return false;
}
/**
* Get all pending calls (also clear pending state requests).
*/
public function getPendingCalls(): array
{
$settings = $this->shared->getSettings();
$global = $this->shared->getGenericSettings();
$dropTimeout = (int) ($global->getRpc()->getRpcDropTimeout() * 1_000_000_000.0);
$timeout = (int) ($settings->getTimeout() * 1_000_000_000.0);
$pfs = $global->getAuth()->getPfs();
$unencrypted = !$this->shared->hasTempAuthKey();
$notBound = !$this->shared->isBound();
$pfsNotBound = $pfs && $notBound;
if ($this->datacenter < 0) {
$dropTimeout *= 10;
}
$result = [];
/** @var MTProtoOutgoingMessage $message */
foreach ($this->new_outgoing as $message_id => $message) {
if ($message->wasSent()
&& $message->getSent() + $timeout < hrtime(true)
&& $message->unencrypted === $unencrypted
) {
if (!$unencrypted && $pfsNotBound && $message->constructor !== 'auth.bindTempAuthKey') {
continue;
}
if ($message->constructor === 'msgs_state_req' || $message->constructor === 'ping_delay_disconnect'
|| ($message->getSent() + $dropTimeout < hrtime(true))
) {
Logger::log('No reply for message: ' . $message, Logger::WARNING);
$this->handleReject($message, static fn () => new TimeoutException('Request timeout'));
continue;
}
if ($message->getState() & MTProtoOutgoingMessage::STATE_REPLIED) {
$this->API->logger("Already replied to message $message, but still in new_outgoing");
unset($this->new_outgoing[$message_id]);
continue;
}
$result[] = $message_id;
}
}
return $result;
}
}

View File

@ -20,10 +20,14 @@ declare(strict_types=1);
namespace danog\MadelineProto\MTProtoSession;
use Amp\CompositeCancellation;
use Amp\DeferredFuture;
use Amp\Future;
use Amp\Sync\LocalKeyedMutex;
use Amp\TimeoutCancellation;
use danog\MadelineProto\DataCenterConnection;
use danog\MadelineProto\Logger;
use danog\MadelineProto\MTProto;
use danog\MadelineProto\MTProto\Container;
use danog\MadelineProto\MTProto\MTProtoOutgoingMessage;
use danog\MadelineProto\TL\Exception;
@ -38,6 +42,7 @@ use function Amp\Future\await;
*
*
* @property DataCenterConnection $shared
* @property MTProto $API
* @internal
*/
trait CallHandler
@ -45,39 +50,52 @@ trait CallHandler
/**
* Recall method.
*/
public function methodRecall(int $message_id, ?int $datacenter = null): void
public function methodRecall(MTProtoOutgoingMessage $request, ?int $forceDatacenter = null, float|Future|null $defer = null): void
{
if ($datacenter === $this->datacenter) {
$datacenter = null;
}
$message = $this->outgoing_messages[$message_id] ?? null;
$message_ids = $message instanceof Container
? $message->getIds()
: [$message_id];
foreach ($message_ids as $message_id) {
if (isset($this->outgoing_messages[$message_id])
&& !$this->outgoing_messages[$message_id]->canGarbageCollect()) {
if ($datacenter) {
/** @var MTProtoOutgoingMessage */
$message = $this->outgoing_messages[$message_id];
$this->gotResponseForOutgoingMessage($message);
$message->setMsgId(null);
$message->setSeqNo(null);
EventLoop::queue(function () use ($datacenter, $message): void {
$this->API->datacenter->waitGetConnection($datacenter)
->sendMessage($message);
});
} else {
/** @var MTProtoOutgoingMessage */
$message = $this->outgoing_messages[$message_id];
if (!$message->hasSeqNo()) {
$this->gotResponseForOutgoingMessage($message);
}
EventLoop::queue($this->sendMessage(...), $message);
}
} else {
$this->API->logger('Could not resend '.($this->outgoing_messages[$message_id] ?? $message_id));
$id = $request->getMsgId();
unset($this->outgoing_messages[$id], $this->new_outgoing[$id]);
if ($request instanceof Container) {
foreach ($request->msgs as $msg) {
$this->methodRecall($msg, $forceDatacenter, $defer);
}
return;
}
if ($request->isCancellationRequested()) {
return;
}
if (\is_float($defer)) {
$d = new DeferredFuture;
$id = EventLoop::delay($defer, $d->complete(...));
$request->cancellation?->subscribe(static fn () => EventLoop::cancel($id));
$defer = $d;
return;
}
$prev = $request->previousQueuedMessage;
if (!$prev->hasReply()) {
$prev->getResultPromise()->finally(
fn () => $this->methodRecall($request, $this->datacenter, $defer)
);
return;
}
if ($defer) {
$defer->finally(
fn () => $this->methodRecall($request, $this->datacenter)
);
return;
}
$datacenter = $forceDatacenter ?? $this->datacenter;
if ($forceDatacenter !== null) {
/** @var MTProtoOutgoingMessage */
$request->setMsgId(null);
$request->setSeqNo(null);
}
if ($datacenter === $this->datacenter) {
EventLoop::queue($this->sendMessage(...), $request);
} else {
EventLoop::queue(function () use ($datacenter, $request): void {
$this->API->datacenter->waitGetConnection($datacenter)
->sendMessage($request);
});
}
}
/**
@ -92,6 +110,7 @@ trait CallHandler
return $readFuture->await();
}
private LocalKeyedMutex $abstractionQueueMutex;
private ?float $drop = null;
/**
* Call method and make sure it is asynchronously sent (generator).
*
@ -163,6 +182,10 @@ trait CallHandler
if (!$encrypted && $this->shared->hasTempAuthKey()) {
$encrypted = true;
}
$timeout = new TimeoutCancellation($this->drop ??= (float) $this->API->getSettings()->getRpc()->getRpcDropTimeout());
$cancellation = $cancellation !== null
? new CompositeCancellation($cancellation, $timeout)
: $timeout;
$message = new MTProtoOutgoingMessage(
connection: $this,
body: $args,
@ -187,7 +210,6 @@ trait CallHandler
$message->setMsgId($args['madelineMsgId']);
}
$this->sendMessage($message);
$this->checker->resume();
return new WrappedFuture($response->getFuture());
}
/**
@ -198,8 +220,15 @@ trait CallHandler
*/
public function objectCall(string $object, array $args, ?DeferredFuture $promise = null): void
{
$cancellation = $args['cancellation'] ?? null;
$cancellation?->throwIfRequested();
$timeout = new TimeoutCancellation($this->drop ??= (float) $this->API->getSettings()->getRpc()->getRpcDropTimeout());
$cancellation = $cancellation !== null
? new CompositeCancellation($cancellation, $timeout)
: $timeout;
$this->sendMessage(
new MTProtoOutgoingMessage(
cancellation: $cancellation,
connection: $this,
body: $args,
constructor: $object,

View File

@ -64,7 +64,9 @@ trait Reliable
}
if ($ok) {
foreach ($content['msg_ids'] as $msg_id) {
$this->methodRecall($msg_id);
if (isset($this->outgoing_messages[$msg_id])) {
$this->methodRecall($this->outgoing_messages[$msg_id]);
}
}
} else {
$this->sendMsgsStateInfo($content['msg_ids'], $current_msg_id);

View File

@ -20,6 +20,7 @@ declare(strict_types=1);
namespace danog\MadelineProto\MTProtoSession;
use Amp\DeferredFuture;
use Amp\SignalException;
use danog\BetterPrometheus\BetterHistogram;
use danog\Loop\Loop;
@ -43,6 +44,8 @@ use Throwable;
use const PHP_EOL;
use function Amp\async;
/**
* Manages responses.
*
@ -114,7 +117,7 @@ trait ResponseHandler
}
$this->API->logger('Trying to assign a response of type ' . $response_type . ' to its request...', Logger::VERBOSE);
foreach ($this->new_outgoing as $expecting_msg_id => $expecting) {
foreach ($this->unencrypted_new_outgoing as $expecting_msg_id => $expecting) {
if (!$expecting->type) {
continue;
}
@ -170,14 +173,6 @@ trait ResponseHandler
$this->handleMessages([$message]);
}
}
/**
* @param callable(): \Throwable $data
*/
private function handleReject(MTProtoOutgoingMessage $message, callable $data): void
{
$this->gotResponseForOutgoingMessage($message);
$message->reply($data);
}
/**
* Handle RPC response.
@ -211,7 +206,7 @@ trait ResponseHandler
$exception = static fn (): \Throwable => $e;
}
if ($exception) {
$this->handleReject($request, $exception);
$request->reply($exception);
}
return;
}
@ -220,12 +215,10 @@ trait ResponseHandler
switch ($response['error_code']) {
case 48:
$this->shared->getTempAuthKey()->setServerSalt($response['new_server_salt']);
$this->methodRecall($requestId);
$this->methodRecall($request);
return;
case 20:
$request->setMsgId(null);
$request->setSeqNo(null);
$this->methodRecall($requestId);
$this->methodRecall($request, $this->datacenter);
return;
case 16:
case 17:
@ -233,11 +226,12 @@ trait ResponseHandler
$this->API->logger('Set time delta to ' . $this->time_delta, Logger::WARNING);
$this->API->resetMTProtoSession("time delta update");
$this->shared->setTempAuthKey(null);
EventLoop::queue($this->shared->initAuthorization(...));
EventLoop::queue($this->methodRecall(...), $requestId);
$d = new DeferredFuture;
async($this->shared->initAuthorization(...))->finally($d->complete(...));
$this->methodRecall($request, $this->datacenter, $d->getFuture());
return;
}
$this->handleReject($request, static fn () => RPCErrorException::make('Received bad_msg_notification: ' . MTProto::BAD_MSG_ERROR_CODES[$response['error_code']], $response['error_code'], $request->constructor));
$request->reply(static fn () => RPCErrorException::make('Received bad_msg_notification: ' . MTProto::BAD_MSG_ERROR_CODES[$response['error_code']], $response['error_code'], $request->constructor));
return;
}
@ -275,7 +269,6 @@ trait ResponseHandler
}
}
}
$this->gotResponseForOutgoingMessage($request);
$this->requestResponse?->inc([
'method' => $request->constructor,
@ -283,7 +276,7 @@ trait ResponseHandler
'error_code' => '200',
]);
EventLoop::queue($request->reply(...), $response);
$request->reply($response);
}
/**
* @param array{error_message: string, error_code: int} $response
@ -314,12 +307,7 @@ trait ResponseHandler
&& !$request->shouldRefreshReferences()
) {
$this->API->logger("Got {$response['error_message']}, refreshing file reference and repeating method call...");
$this->gotResponseForOutgoingMessage($request);
$msgId = $request->getMsgId();
$request->setRefreshReferences(true);
$request->setMsgId(null);
$request->setSeqNo(null);
$this->methodRecall($msgId);
$this->methodRecall($request, $this->datacenter);
return null;
}
@ -334,28 +322,13 @@ trait ResponseHandler
)
) {
$this->API->logger("Resending $request due to {$response['error_message']}");
$this->gotResponseForOutgoingMessage($request);
$msgId = $request->getMsgId();
$request->setSent(hrtime(true) + (5*60 * 1_000_000_000));
$request->setMsgId(null);
$request->setSeqNo(null);
$prev = $request->previousQueuedMessage;
if ($prev->hasReply()) {
$this->methodRecall($msgId);
} else {
$prev->getResultPromise()->finally(
fn () => $this->methodRecall($msgId)
);
}
$this->methodRecall($request, $this->datacenter);
return null;
}
if ((($response['error_code'] === -503 || $response['error_message'] === '-503') && !\in_array($request->constructor, ['messages.getBotCallbackAnswer', 'messages.getInlineBotResults'], true))
|| (\in_array($response['error_message'], ['MSGID_DECREASE_RETRY', 'HISTORY_GET_FAILED', 'RPC_CONNECT_FAILED', 'RPC_CALL_FAIL', 'RPC_MCGET_FAIL', 'PERSISTENT_TIMESTAMP_OUTDATED', 'RPC_MCGET_FAIL', 'no workers running', 'No workers running'], true))) {
$this->API->logger("Resending $request in 1 second due to {$response['error_message']}");
$msgId = $request->getMsgId();
$request->setMsgId(null);
$request->setSeqNo(null);
EventLoop::delay(1.0, fn () => $this->methodRecall($msgId));
$this->methodRecall($request, $this->datacenter, 1.0);
return null;
}
return static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor);
@ -386,20 +359,7 @@ trait ResponseHandler
)
) {
$this->API->logger("Resending $request due to {$response['error_message']}");
$this->gotResponseForOutgoingMessage($request);
$msgId = $request->getMsgId();
$request->setSent(hrtime(true) + (5*60 * 1_000_000_000));
$request->setMsgId(null);
$request->setSeqNo(null);
\assert($msgId !== null);
$prev = $request->previousQueuedMessage;
if ($prev->hasReply()) {
$this->methodRecall($msgId);
} else {
$prev->getResultPromise()->finally(
fn () => $this->methodRecall($msgId)
);
}
$this->methodRecall($request, $this->datacenter);
return null;
}
return static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor);
@ -420,12 +380,7 @@ trait ResponseHandler
case 'AUTH_KEY_UNREGISTERED':
case 'AUTH_KEY_INVALID':
if ($this->API->authorized !== \danog\MadelineProto\API::LOGGED_IN) {
$this->gotResponseForOutgoingMessage($request);
EventLoop::queue(
$this->handleReject(...),
$request,
static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor)
);
$request->reply(static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor));
return null;
}
$this->session_id = null;
@ -441,14 +396,16 @@ trait ResponseHandler
$this->API->logout();
return static fn () => new SignalException(sprintf(Lang::$current_lang['account_banned'], $phone));
}
EventLoop::queue($this->shared->initAuthorization(...));
EventLoop::queue($this->methodRecall(...), $request->getMsgId());
$deferred = new DeferredFuture;
async($this->shared->initAuthorization(...))->finally($deferred->complete(...));
$this->methodRecall($request, $this->datacenter, $deferred->getFuture());
return null;
case 'AUTH_KEY_PERM_EMPTY':
$this->API->logger('Temporary auth key not bound, resetting temporary auth key...', Logger::ERROR);
$this->shared->setTempAuthKey(null);
EventLoop::queue($this->shared->initAuthorization(...));
EventLoop::queue($this->methodRecall(...), $request->getMsgId());
$deferred = new DeferredFuture;
async($this->shared->initAuthorization(...))->finally($deferred->complete(...));
$this->methodRecall($request, $this->datacenter, $deferred->getFuture());
return null;
}
return static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor);
@ -457,14 +414,7 @@ trait ResponseHandler
$limit = $request->floodWaitLimit ?? $this->API->settings->getRPC()->getFloodTimeout();
if ($seconds < $limit) {
$this->API->logger("Flood, waiting $seconds seconds before repeating async call of $request...", Logger::NOTICE);
$this->gotResponseForOutgoingMessage($request);
$msgId = $request->getMsgId();
$request->setSent(hrtime(true) + ($seconds * 1_000_000_000));
$request->setMsgId(null);
$request->setSeqNo(null);
\assert($msgId !== null);
$id = EventLoop::delay((float) $seconds, fn () => $this->methodRecall($msgId));
$request->cancellation?->subscribe(static fn () => EventLoop::cancel($id));
$this->methodRecall($request, $this->datacenter, (float) $seconds);
return null;
}
if (str_starts_with($response['error_message'], 'FLOOD_WAIT_')) {

View File

@ -77,6 +77,12 @@ trait Session
* @var array<MTProtoOutgoingMessage>
*/
public array $new_outgoing = [];
/**
* New unencrypted outgoing message array.
*
* @var array<MTProtoOutgoingMessage>
*/
public array $unencrypted_new_outgoing = [];
/**
* Pending outgoing messages.
*
@ -104,6 +110,16 @@ trait Session
*
*/
public array $ack_queue = [];
/**
* Check queue.
*
*/
public array $check_queue = [];
/**
* Check queue.
*
*/
public array $unencrypted_check_queue = [];
/**
* Message ID handler.
*
@ -178,6 +194,12 @@ trait Session
$new_outgoing[$key] = $message;
}
$this->new_outgoing = $new_outgoing;
$unencrypted_new_outgoing = [];
foreach ($this->unencrypted_new_outgoing as $key => $message) {
$unencrypted_new_outgoing[$key] = $message;
}
$this->unencrypted_new_outgoing = $unencrypted_new_outgoing;
}
/**
* Create MTProto session if needed.
@ -227,6 +249,6 @@ trait Session
public function backupSession(): array
{
$pending = array_values($this->pendingOutgoing);
return array_merge($pending, $this->new_outgoing);
return array_merge($pending, $this->new_outgoing, $this->unencrypted_new_outgoing);
}
}

View File

@ -240,7 +240,7 @@ trait FilesLogic
if ($result->shouldServe()) {
$pipe = new Pipe(1024 * 1024);
[$start, $end] = $result->getServeRange();
EventLoop::queue($this->downloadToStream(...), $messageMedia, $pipe->getSink(), $cb, $start, $end, $cancellation);
async($this->downloadToStream(...), $messageMedia, $pipe->getSink(), $cb, $start, $end, $cancellation)->finally($pipe->getSink()->close(...));
$body = $pipe->getSource();
} elseif (!\in_array($result->getCode(), [HttpStatus::OK, HttpStatus::PARTIAL_CONTENT], true)) {
$body = $result->getCodeExplanation();

View File

@ -267,11 +267,13 @@ final class PeerDatabase implements TLCallback
}
$new = $new ? self::getUsernames($new) : [];
$old = $old ? self::getUsernames($old) : [];
$diffToRemove = array_diff($old, $new);
$diffToAdd = array_diff($new, $old);
if (!$diffToAdd && !$diffToRemove) {
return;
foreach ($old as $key => $username) {
if (!isset($this->usernames[$username])) {
unset($old[$key]);
}
}
$diffToRemove = array_diff($old, $new);
$diffToAdd = array_diff($new, $diffToRemove);
$lock = $this->decacheMutex->acquire();
try {
foreach ($diffToRemove as $username) {
@ -417,6 +419,9 @@ final class PeerDatabase implements TLCallback
return;
}
}
$this->recacheChatUsername($user['id'], $existingChat, $user);
if ($existingChat != $user) {
$this->API->logger("Updated user {$user['id']}", Logger::ULTRA_VERBOSE);
if (($user['min'] ?? false) && !($existingChat['min'] ?? false)) {
@ -552,6 +557,7 @@ final class PeerDatabase implements TLCallback
return;
}
}
$this->recacheChatUsername($bot_api_id, $existingChat, $chat);
if ($existingChat != $chat) {
$this->API->logger("Updated chat {$bot_api_id}", Logger::ULTRA_VERBOSE);
if (($chat['min'] ?? false) && $existingChat && !($existingChat['min'] ?? false)) {