mirror of
https://github.com/danog/MadelineProto.git
synced 2024-11-26 17:04:39 +01:00
Merge 1efb43cbde
into 314db4ee1f
This commit is contained in:
commit
baf7880c47
@ -3415,9 +3415,6 @@
|
||||
</UnsupportedReferenceUsage>
|
||||
</file>
|
||||
<file src="src/MTProtoSession/AckHandler.php">
|
||||
<PossiblyNullArrayOffset>
|
||||
<code><![CDATA[$this->new_outgoing]]></code>
|
||||
</PossiblyNullArrayOffset>
|
||||
<PossiblyNullOperand>
|
||||
<code><![CDATA[$message->getSent()]]></code>
|
||||
<code><![CDATA[$message->getSent()]]></code>
|
||||
@ -3531,6 +3528,8 @@
|
||||
<code><![CDATA[$sub['queueId']]]></code>
|
||||
</MixedArrayAssignment>
|
||||
<MixedArrayOffset>
|
||||
<code><![CDATA[$this->new_outgoing[$message_id]]]></code>
|
||||
<code><![CDATA[$this->new_outgoing[$message_id]]]></code>
|
||||
<code><![CDATA[$this->outgoing_messages[$message_id]]]></code>
|
||||
<code><![CDATA[$this->outgoing_messages[$message_id]]]></code>
|
||||
</MixedArrayOffset>
|
||||
@ -3658,6 +3657,12 @@
|
||||
<code><![CDATA[$msgId]]></code>
|
||||
<code><![CDATA[$request->getMsgId()]]></code>
|
||||
</PossiblyNullArgument>
|
||||
<PossiblyNullArrayOffset>
|
||||
<code><![CDATA[$this->new_outgoing]]></code>
|
||||
<code><![CDATA[$this->new_outgoing]]></code>
|
||||
<code><![CDATA[$this->new_outgoing]]></code>
|
||||
<code><![CDATA[$this->new_outgoing]]></code>
|
||||
</PossiblyNullArrayOffset>
|
||||
<RedundantConditionGivenDocblockType>
|
||||
<code><![CDATA[$this->API->authorized_dc == $this->datacenter && $this->API->authorized === \danog\MadelineProto\API::LOGGED_IN]]></code>
|
||||
</RedundantConditionGivenDocblockType>
|
||||
@ -4190,6 +4195,8 @@
|
||||
<code><![CDATA[static function (string $payload, int $offset) use ($stream, $seekable, $lock) {]]></code>
|
||||
</MissingClosureReturnType>
|
||||
<MixedArgument>
|
||||
<code><![CDATA[$end]]></code>
|
||||
<code><![CDATA[$end]]></code>
|
||||
<code><![CDATA[$end]]></code>
|
||||
<code><![CDATA[$end]]></code>
|
||||
<code><![CDATA[$file]]></code>
|
||||
@ -4206,6 +4213,8 @@
|
||||
<code><![CDATA[$size]]></code>
|
||||
<code><![CDATA[$start]]></code>
|
||||
<code><![CDATA[$start]]></code>
|
||||
<code><![CDATA[$start]]></code>
|
||||
<code><![CDATA[$start]]></code>
|
||||
<code><![CDATA[$stream]]></code>
|
||||
<code><![CDATA[$stream]]></code>
|
||||
<code><![CDATA[$url]]></code>
|
||||
|
@ -76,13 +76,13 @@ final class APIWrapper
|
||||
return $this->API;
|
||||
}
|
||||
|
||||
private ?int $drop = null;
|
||||
private ?float $drop = null;
|
||||
/**
|
||||
* @internal
|
||||
*/
|
||||
public function getRpcDropCancellation(): Cancellation
|
||||
{
|
||||
return new TimeoutCancellation($this->drop ??= $this->getAPI()->getSettings()->getRpc()->getRpcDropTimeout());
|
||||
return new TimeoutCancellation($this->drop ??= (float) $this->getAPI()->getSettings()->getRpc()->getRpcDropTimeout());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -72,21 +72,11 @@ final class Connection
|
||||
*
|
||||
*/
|
||||
protected ?ReadLoop $reader = null;
|
||||
/**
|
||||
* Checker loop.
|
||||
*
|
||||
*/
|
||||
protected ?CheckLoop $checker = null;
|
||||
/**
|
||||
* Waiter loop.
|
||||
*
|
||||
*/
|
||||
protected ?HttpWaitLoop $waiter = null;
|
||||
/**
|
||||
* Ping loop.
|
||||
*
|
||||
*/
|
||||
protected ?PingLoop $pinger = null;
|
||||
/**
|
||||
* Cleanup loop.
|
||||
*
|
||||
@ -301,29 +291,14 @@ final class Connection
|
||||
$this->httpResCount = 0;
|
||||
$this->writer ??= new WriteLoop($this);
|
||||
$this->reader ??= new ReadLoop($this);
|
||||
$this->checker ??= new CheckLoop($this);
|
||||
$this->cleanup ??= new CleanupLoop($this);
|
||||
$this->waiter ??= new HttpWaitLoop($this);
|
||||
$this->handler ??= new GenericLoop(fn () => $this->handleMessages($this->new_incoming), "Handler loop");
|
||||
if (!isset($this->pinger) && !$ctx->isMedia() && !$ctx->isCDN() && !$this->isHttp()) {
|
||||
$this->pinger = new PingLoop($this);
|
||||
}
|
||||
foreach ($this->new_outgoing as $message_id => $message) {
|
||||
if ($message->unencrypted) {
|
||||
if (!($message->getState() & MTProtoOutgoingMessage::STATE_REPLIED)) {
|
||||
$message->reply(static fn () => new Exception('Restart because we were reconnected'));
|
||||
}
|
||||
unset($this->new_outgoing[$message_id], $this->outgoing_messages[$message_id]);
|
||||
}
|
||||
foreach ($this->unencrypted_new_outgoing as $message_id => $message) {
|
||||
$message->reply(static fn () => new Exception('Restart because we were reconnected'));
|
||||
}
|
||||
Assert::true($this->writer->start(), "Could not start writer stream");
|
||||
Assert::true($this->reader->start(), "Could not start reader stream");
|
||||
Assert::true($this->checker->start(), "Could not start checker stream");
|
||||
Assert::true($this->cleanup->start(), "Could not start cleanup stream");
|
||||
$this->waiter->start();
|
||||
if ($this->pinger) {
|
||||
Assert::true($this->pinger->start(), "Could not start pinger stream");
|
||||
}
|
||||
$this->handler->start();
|
||||
|
||||
EventLoop::queue($this->shared->initAuthorization(...));
|
||||
@ -586,9 +561,7 @@ final class Connection
|
||||
$this->pendingOutgoing[$this->pendingOutgoingKey++] = $message;
|
||||
$this->outgoingCtr?->inc();
|
||||
$this->pendingOutgoingGauge?->set(\count($this->pendingOutgoing));
|
||||
if (isset($this->writer)) {
|
||||
$this->writer->resume();
|
||||
}
|
||||
$this->flush();
|
||||
$this->connect();
|
||||
$promise->await();
|
||||
}
|
||||
@ -601,18 +574,6 @@ final class Connection
|
||||
$this->writer->resume();
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Resume HttpWaiter.
|
||||
*/
|
||||
public function pingHttpWaiter(): void
|
||||
{
|
||||
if (isset($this->waiter)) {
|
||||
$this->waiter->resume();
|
||||
}
|
||||
if (isset($this->pinger)) {
|
||||
$this->pinger->resume();
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Connect main instance.
|
||||
*
|
||||
@ -663,9 +624,7 @@ final class Connection
|
||||
|
||||
$this->reader?->stop();
|
||||
$this->writer?->stop();
|
||||
$this->checker?->stop();
|
||||
$this->cleanup?->stop();
|
||||
$this->pinger?->stop();
|
||||
|
||||
if (!$temporary) {
|
||||
$this->shared->signalDisconnect($this->id);
|
||||
|
@ -204,9 +204,6 @@ final class DataCenterConnection implements JsonSerializable
|
||||
$logger->logger("Done initing auth for DC {$this->datacenter}", Logger::NOTICE);
|
||||
EventLoop::queue($lock->release(...));
|
||||
}
|
||||
if ($this->hasTempAuthKey()) {
|
||||
$connection->pingHttpWaiter();
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Bind temporary and permanent auth keys.
|
||||
|
@ -1,163 +0,0 @@
|
||||
<?php
|
||||
|
||||
declare(strict_types=1);
|
||||
|
||||
/**
|
||||
* RPC call status check loop.
|
||||
*
|
||||
* This file is part of MadelineProto.
|
||||
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
|
||||
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
|
||||
* See the GNU Affero General Public License for more details.
|
||||
* You should have received a copy of the GNU General Public License along with MadelineProto.
|
||||
* If not, see <http://www.gnu.org/licenses/>.
|
||||
*
|
||||
* @author Daniil Gentili <daniil@daniil.it>
|
||||
* @copyright 2016-2023 Daniil Gentili <daniil@daniil.it>
|
||||
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
|
||||
* @link https://docs.madelineproto.xyz MadelineProto documentation
|
||||
*/
|
||||
|
||||
namespace danog\MadelineProto\Loop\Connection;
|
||||
|
||||
use Amp\CancelledException;
|
||||
use Amp\DeferredFuture;
|
||||
use Amp\TimeoutCancellation;
|
||||
use danog\Loop\Loop;
|
||||
use danog\MadelineProto\Connection;
|
||||
use danog\MadelineProto\Logger;
|
||||
use Revolt\EventLoop;
|
||||
|
||||
/**
|
||||
* RPC call status check loop.
|
||||
*
|
||||
* @internal
|
||||
*
|
||||
* @author Daniil Gentili <daniil@daniil.it>
|
||||
*/
|
||||
final class CheckLoop extends Loop
|
||||
{
|
||||
use Common {
|
||||
__construct as initCommon;
|
||||
}
|
||||
|
||||
private int $resendTimeout;
|
||||
public function __construct(Connection $connection)
|
||||
{
|
||||
$this->initCommon($connection);
|
||||
$this->resendTimeout = (int) ($this->API->settings->getRpc()->getRpcResendTimeout() * 1_000_000_000.0);
|
||||
}
|
||||
/**
|
||||
* Main loop.
|
||||
*/
|
||||
protected function loop(): ?float
|
||||
{
|
||||
if (!$this->connection->new_outgoing) {
|
||||
return self::PAUSE;
|
||||
}
|
||||
if (!$this->connection->hasPendingCalls()) {
|
||||
return $this->timeoutSeconds;
|
||||
}
|
||||
if ($this->shared->hasTempAuthKey()) {
|
||||
$full_message_ids = $this->connection->getPendingCalls();
|
||||
foreach (array_chunk($full_message_ids, WriteLoop::MAX_IDS) as $message_ids) {
|
||||
$deferred = new DeferredFuture();
|
||||
$list = '';
|
||||
// Don't edit this here pls
|
||||
foreach ($message_ids as $message_id) {
|
||||
if (!isset($this->connection->outgoing_messages[$message_id])) {
|
||||
continue;
|
||||
}
|
||||
$list .= $this->connection->outgoing_messages[$message_id]->constructor.', ';
|
||||
}
|
||||
$this->API->logger("Still missing {$list} on DC {$this->datacenter}, sending state request", Logger::ERROR);
|
||||
$this->connection->objectCall('msgs_state_req', ['msg_ids' => $message_ids], $deferred);
|
||||
EventLoop::queue(function () use ($deferred, $message_ids): void {
|
||||
try {
|
||||
$result = $deferred->getFuture()->await(new TimeoutCancellation($this->timeout));
|
||||
if (\is_callable($result)) {
|
||||
throw $result();
|
||||
}
|
||||
$reply = [];
|
||||
foreach (str_split($result['info']) as $key => $chr) {
|
||||
$message_id = $message_ids[$key];
|
||||
if (!isset($this->connection->outgoing_messages[$message_id])) {
|
||||
$this->API->logger("Already got response for and forgot about message ID $message_id");
|
||||
continue;
|
||||
}
|
||||
if (!isset($this->connection->new_outgoing[$message_id])) {
|
||||
$this->API->logger('Already got response for '.$this->connection->outgoing_messages[$message_id]);
|
||||
continue;
|
||||
}
|
||||
$message = $this->connection->new_outgoing[$message_id];
|
||||
$chr = \ord($chr);
|
||||
switch ($chr & 7) {
|
||||
case 0:
|
||||
$this->API->logger("Wrong message status 0 for $message", Logger::FATAL_ERROR);
|
||||
break;
|
||||
case 1:
|
||||
case 2:
|
||||
case 3:
|
||||
if ($message->constructor === 'msgs_state_req') {
|
||||
$this->connection->gotResponseForOutgoingMessage($message);
|
||||
break;
|
||||
}
|
||||
$this->API->logger("Message $message not received by server, resending...", Logger::ERROR);
|
||||
$this->connection->methodRecall($message_id);
|
||||
break;
|
||||
case 4:
|
||||
if ($chr & 128) {
|
||||
$this->API->logger("Message $message received by server and was already sent, requesting reply...", Logger::ERROR);
|
||||
$reply[] = $message_id;
|
||||
} elseif ($chr & 64) {
|
||||
$this->API->logger("Message $message received by server and was already processed, requesting reply...", Logger::ERROR);
|
||||
$reply[] = $message_id;
|
||||
} elseif ($chr & 32) {
|
||||
if ($message->getSent() + $this->resendTimeout < hrtime(true)) {
|
||||
if ($message->isCancellationRequested()) {
|
||||
unset($this->connection->new_outgoing[$message_id], $this->connection->outgoing_messages[$message_id]);
|
||||
|
||||
$this->API->logger("Cancelling $message...", Logger::ERROR);
|
||||
} else {
|
||||
$this->API->logger("Message $message received by server and is being processed for way too long, resending request...", Logger::ERROR);
|
||||
$this->connection->methodRecall($message_id);
|
||||
}
|
||||
} else {
|
||||
$this->API->logger("Message $message received by server and is being processed, waiting...", Logger::ERROR);
|
||||
}
|
||||
} else {
|
||||
$this->API->logger("Message $message received by server, waiting...", Logger::ERROR);
|
||||
$reply[] = $message_id;
|
||||
}
|
||||
}
|
||||
}
|
||||
//} catch (CancelledException) {
|
||||
//$this->API->logger("We did not receive a response for {$this->timeout} seconds: reconnecting and exiting check loop on DC {$this->datacenter}");
|
||||
//EventLoop::queue($this->connection->reconnect(...));
|
||||
} catch (\Throwable $e) {
|
||||
$this->API->logger("Got exception in check loop for DC {$this->datacenter}");
|
||||
$this->API->logger((string) $e);
|
||||
}
|
||||
});
|
||||
}
|
||||
} else {
|
||||
foreach ($this->connection->new_outgoing as $message_id => $message) {
|
||||
if ($message->wasSent()
|
||||
&& $message->getSent() + $this->timeout < hrtime(true)
|
||||
&& $message->unencrypted
|
||||
) {
|
||||
$this->API->logger("Still missing $message on DC {$this->datacenter}, resending", Logger::ERROR);
|
||||
$this->connection->methodRecall($message->getMsgId());
|
||||
}
|
||||
}
|
||||
}
|
||||
return $this->timeoutSeconds;
|
||||
}
|
||||
/**
|
||||
* Loop name.
|
||||
*/
|
||||
public function __toString(): string
|
||||
{
|
||||
return "check loop in DC {$this->datacenter}";
|
||||
}
|
||||
}
|
@ -63,7 +63,8 @@ final class ReadLoop extends Loop
|
||||
}
|
||||
EventLoop::queue(function () use ($e): void {
|
||||
if ($e instanceof NothingInTheSocketException
|
||||
&& !$this->connection->hasPendingCalls()
|
||||
&& !$this->connection->unencrypted_new_outgoing
|
||||
&& !$this->connection->new_outgoing
|
||||
&& $this->connection->isMedia()
|
||||
&& !$this->connection->isWriting()
|
||||
&& $this->shared->hasTempAuthKey()
|
||||
@ -71,7 +72,7 @@ final class ReadLoop extends Loop
|
||||
$this->API->logger("Got NothingInTheSocketException in DC {$this->datacenter}, disconnecting because we have nothing to do...", Logger::ERROR);
|
||||
$this->connection->disconnect(true);
|
||||
} else {
|
||||
$this->API->logger($e);
|
||||
$this->API->logger($e, Logger::ERROR);
|
||||
$this->API->logger("Got exception in DC {$this->datacenter}, reconnecting...", Logger::ERROR);
|
||||
$this->connection->reconnect();
|
||||
}
|
||||
@ -93,6 +94,9 @@ final class ReadLoop extends Loop
|
||||
foreach ($this->connection->new_outgoing as $message) {
|
||||
$message->resetSent();
|
||||
}
|
||||
foreach ($this->connection->unencrypted_new_outgoing as $message) {
|
||||
$message->resetSent();
|
||||
}
|
||||
$this->shared->reconnect();
|
||||
} else {
|
||||
$this->connection->reconnect();
|
||||
@ -116,9 +120,6 @@ final class ReadLoop extends Loop
|
||||
return self::STOP;
|
||||
}
|
||||
$this->connection->httpReceived();
|
||||
if ($this->connection->isHttp()) {
|
||||
$this->connection->pingHttpWaiter();
|
||||
}
|
||||
$this->connection->wakeupHandler();
|
||||
return self::CONTINUE;
|
||||
}
|
||||
|
@ -21,7 +21,9 @@ declare(strict_types=1);
|
||||
namespace danog\MadelineProto\Loop\Connection;
|
||||
|
||||
use Amp\ByteStream\StreamException;
|
||||
use Amp\DeferredFuture;
|
||||
use danog\Loop\Loop;
|
||||
use danog\MadelineProto\Connection;
|
||||
use danog\MadelineProto\Logger;
|
||||
use danog\MadelineProto\MTProto\Container;
|
||||
use danog\MadelineProto\MTProto\MTProtoOutgoingMessage;
|
||||
@ -42,23 +44,46 @@ final class WriteLoop extends Loop
|
||||
{
|
||||
private const MAX_COUNT = 1020;
|
||||
private const MAX_SIZE = 1 << 15;
|
||||
private const LONG_POLL_TIMEOUT = 30;
|
||||
private const LONG_POLL_TIMEOUT_MS = self::LONG_POLL_TIMEOUT*1000;
|
||||
public const MAX_IDS = 8192;
|
||||
|
||||
use Common;
|
||||
use Common {
|
||||
__construct as init2;
|
||||
}
|
||||
|
||||
private int $pingTimeout;
|
||||
private float $pollTimeout;
|
||||
/**
|
||||
* Constructor function.
|
||||
*/
|
||||
public function __construct(Connection $connection)
|
||||
{
|
||||
$this->init2($connection);
|
||||
$timeout = $this->shared->getSettings()->getPingInterval();
|
||||
$this->pingTimeout = $timeout + 15;
|
||||
|
||||
if ($this->connection->isHttp()) {
|
||||
$this->pollTimeout = (float) max(self::LONG_POLL_TIMEOUT, $timeout);
|
||||
} else {
|
||||
$this->pollTimeout = (float) $timeout;
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Main loop.
|
||||
*/
|
||||
public function loop(): ?float
|
||||
{
|
||||
$please_wait = false;
|
||||
$first = true;
|
||||
while (true) {
|
||||
if ($this->connection->shouldReconnect()) {
|
||||
$this->API->logger("Exiting $this because connection is old");
|
||||
return self::STOP;
|
||||
}
|
||||
if (!$this->connection->pendingOutgoing) {
|
||||
if (!$this->connection->pendingOutgoing && !$first) {
|
||||
$this->API->logger("No messages, pausing in $this...", Logger::ULTRA_VERBOSE);
|
||||
return self::PAUSE;
|
||||
return $this->pollTimeout;
|
||||
}
|
||||
if ($please_wait) {
|
||||
$this->API->logger("Have to wait for handshake, pausing in $this...", Logger::ULTRA_VERBOSE);
|
||||
@ -67,7 +92,7 @@ final class WriteLoop extends Loop
|
||||
$this->connection->writing(true);
|
||||
try {
|
||||
$please_wait = $this->shared->hasTempAuthKey()
|
||||
? $this->encryptedWriteLoop()
|
||||
? $this->encryptedWriteLoop($first)
|
||||
: $this->unencryptedWriteLoop();
|
||||
} catch (StreamException $e) {
|
||||
if ($this->connection->shouldReconnect()) {
|
||||
@ -87,10 +112,17 @@ final class WriteLoop extends Loop
|
||||
} finally {
|
||||
$this->connection->writing(false);
|
||||
}
|
||||
$first = false;
|
||||
}
|
||||
}
|
||||
public function unencryptedWriteLoop(): bool
|
||||
{
|
||||
if ($queue = $this->connection->unencrypted_check_queue) {
|
||||
$this->connection->unencrypted_check_queue = [];
|
||||
foreach ($queue as $msg) {
|
||||
$this->connection->methodRecall($msg);
|
||||
}
|
||||
}
|
||||
while ($this->connection->pendingOutgoing) {
|
||||
$skipped_all = true;
|
||||
foreach ($this->connection->pendingOutgoing as $k => $message) {
|
||||
@ -123,7 +155,7 @@ final class WriteLoop extends Loop
|
||||
$this->connection->pendingOutgoingGauge?->set(\count($this->connection->pendingOutgoing));
|
||||
$message->setMsgId($message_id);
|
||||
$this->connection->outgoing_messages[$message_id] = $message;
|
||||
$this->connection->new_outgoing[$message_id] = $message;
|
||||
$this->connection->unencrypted_new_outgoing[$message_id] = $message;
|
||||
|
||||
$message->sent();
|
||||
}
|
||||
@ -133,16 +165,97 @@ final class WriteLoop extends Loop
|
||||
}
|
||||
return false;
|
||||
}
|
||||
public function encryptedWriteLoop(): bool
|
||||
public function encryptedWriteLoop(bool $first): bool
|
||||
{
|
||||
do {
|
||||
if (!$this->shared->hasTempAuthKey()) {
|
||||
return false;
|
||||
}
|
||||
if ($this->connection->isHttp() && empty($this->connection->pendingOutgoing)) {
|
||||
if (!$first && !$this->connection->pendingOutgoing) {
|
||||
return false;
|
||||
}
|
||||
|
||||
foreach ($this->connection->check_queue as $msg_id => $_) {
|
||||
$deferred = new DeferredFuture();
|
||||
$list = '';
|
||||
// Don't edit this here pls
|
||||
foreach ($message_ids as $message_id) {
|
||||
if (!isset($this->connection->outgoing_messages[$message_id])) {
|
||||
continue;
|
||||
}
|
||||
$list .= $this->connection->outgoing_messages[$message_id]->constructor.', ';
|
||||
}
|
||||
$this->API->logger("Still missing {$list} on DC {$this->datacenter}, sending state request", Logger::ERROR);
|
||||
$this->connection->objectCall('msgs_state_req', ['msg_ids' => $message_ids], $deferred);
|
||||
EventLoop::queue(function () use ($deferred, $message_ids): void {
|
||||
try {
|
||||
$result = $deferred->getFuture()->await(new TimeoutCancellation($this->pollTimeout));
|
||||
if (\is_callable($result)) {
|
||||
throw $result();
|
||||
}
|
||||
$reply = [];
|
||||
foreach (str_split($result['info']) as $key => $chr) {
|
||||
$message_id = $message_ids[$key];
|
||||
if (!isset($this->connection->outgoing_messages[$message_id])) {
|
||||
$this->API->logger("Already got response for and forgot about message ID $message_id");
|
||||
continue;
|
||||
}
|
||||
if (!isset($this->connection->new_outgoing[$message_id])) {
|
||||
$this->API->logger('Already got response for '.$this->connection->outgoing_messages[$message_id]);
|
||||
continue;
|
||||
}
|
||||
$message = $this->connection->new_outgoing[$message_id];
|
||||
$chr = \ord($chr);
|
||||
switch ($chr & 7) {
|
||||
case 0:
|
||||
$this->API->logger("Wrong message status 0 for $message", Logger::FATAL_ERROR);
|
||||
break;
|
||||
case 1:
|
||||
case 2:
|
||||
case 3:
|
||||
if ($message->constructor === 'msgs_state_req') {
|
||||
$this->connection->gotResponseForOutgoingMessage($message);
|
||||
break;
|
||||
}
|
||||
$this->API->logger("Message $message not received by server, resending...", Logger::ERROR);
|
||||
$this->connection->methodRecall($message_id);
|
||||
break;
|
||||
case 4:
|
||||
if ($chr & 128) {
|
||||
$this->API->logger("Message $message received by server and was already sent, requesting reply...", Logger::ERROR);
|
||||
$reply[] = $message_id;
|
||||
} elseif ($chr & 64) {
|
||||
$this->API->logger("Message $message received by server and was already processed, requesting reply...", Logger::ERROR);
|
||||
$reply[] = $message_id;
|
||||
} elseif ($chr & 32) {
|
||||
if ($message->getSent() + $this->resendTimeout < hrtime(true)) {
|
||||
if ($message->cancellation?->isRequested()) {
|
||||
unset($this->connection->new_outgoing[$message_id], $this->connection->outgoing_messages[$message_id]);
|
||||
|
||||
$this->API->logger("Cancelling $message...", Logger::ERROR);
|
||||
} else {
|
||||
$this->API->logger("Message $message received by server and is being processed for way too long, resending request...", Logger::ERROR);
|
||||
$this->connection->methodRecall($message_id);
|
||||
}
|
||||
} else {
|
||||
$this->API->logger("Message $message received by server and is being processed, waiting...", Logger::ERROR);
|
||||
}
|
||||
} else {
|
||||
$this->API->logger("Message $message received by server, waiting...", Logger::ERROR);
|
||||
$reply[] = $message_id;
|
||||
}
|
||||
}
|
||||
}
|
||||
//} catch (CancelledException) {
|
||||
//$this->API->logger("We did not receive a response for {$this->pollTimeout} seconds: reconnecting and exiting check loop on DC {$this->datacenter}");
|
||||
//EventLoop::queue($this->connection->reconnect(...));
|
||||
} catch (\Throwable $e) {
|
||||
$this->API->logger("Got exception in check loop for DC {$this->datacenter}");
|
||||
$this->API->logger((string) $e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
ksort($this->connection->pendingOutgoing);
|
||||
|
||||
$messages = [];
|
||||
@ -286,7 +399,7 @@ final class WriteLoop extends Loop
|
||||
$total_length += $actual_length;
|
||||
$MTmessage['bytes'] = $body_length;
|
||||
$messages[] = $MTmessage;
|
||||
$keys[$k] = $message_id;
|
||||
$keys[$k] = $message;
|
||||
|
||||
$message->setSeqNo($MTmessage['seqno'])
|
||||
->setMsgId($MTmessage['msg_id']);
|
||||
@ -325,10 +438,10 @@ final class WriteLoop extends Loop
|
||||
if ($count > 1 || $has_seq) {
|
||||
$this->API->logger("Wrapping in msg_container ({$count} messages of total size {$total_length}) as encrypted message for DC {$this->datacenter}", Logger::ULTRA_VERBOSE);
|
||||
$message_id = $this->connection->msgIdHandler->generateMessageId();
|
||||
$this->connection->pendingOutgoing[$this->connection->pendingOutgoingKey] = new Container($this->connection, array_values($keys));
|
||||
$this->connection->pendingOutgoing[$this->connection->pendingOutgoingKey] = $ct = new Container($this->connection, $keys);
|
||||
$this->connection->outgoingCtr?->inc();
|
||||
$this->connection->pendingOutgoingGauge?->set(\count($this->connection->pendingOutgoing));
|
||||
$keys[$this->connection->pendingOutgoingKey++] = $message_id;
|
||||
$keys[$this->connection->pendingOutgoingKey++] = $ct;
|
||||
$message_data = $this->API->getTL()->serializeObject(['type' => ''], ['_' => 'msg_container', 'messages' => $messages], 'container');
|
||||
$message_data_length = \strlen($message_data);
|
||||
$seq_no = $this->connection->generateOutSeqNo(false);
|
||||
@ -364,8 +477,7 @@ final class WriteLoop extends Loop
|
||||
$this->connection->ack_queue = \array_slice($this->connection->ack_queue, $ackCount);
|
||||
}
|
||||
|
||||
foreach ($keys as $key => $message_id) {
|
||||
$message = $this->connection->pendingOutgoing[$key];
|
||||
foreach ($keys as $key => $message) {
|
||||
unset($this->connection->pendingOutgoing[$key]);
|
||||
$this->connection->outgoing_messages[$message_id] = $message;
|
||||
if ($message->hasPromise()) {
|
||||
|
@ -29,28 +29,13 @@ use danog\MadelineProto\Connection;
|
||||
*/
|
||||
final class Container extends MTProtoOutgoingMessage
|
||||
{
|
||||
/**
|
||||
* Message IDs.
|
||||
*
|
||||
* @var list<int>
|
||||
*/
|
||||
private array $ids = [];
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
*
|
||||
* @param list<int> $ids
|
||||
* @param list<MTProtoOutgoingMessage> $msgs
|
||||
*/
|
||||
public function __construct(Connection $connection, array $ids)
|
||||
public function __construct(Connection $connection, public readonly array $msgs)
|
||||
{
|
||||
$this->ids = $ids;
|
||||
parent::__construct($connection, [], 'msg_container', '', false, false);
|
||||
}
|
||||
/**
|
||||
* Get message IDs.
|
||||
*/
|
||||
public function getIds(): array
|
||||
{
|
||||
return $this->ids;
|
||||
parent::__construct($connection, [], 'msg_container', '', false, false, null);
|
||||
}
|
||||
}
|
||||
|
@ -89,6 +89,8 @@ class MTProtoOutgoingMessage extends MTProtoMessage
|
||||
*/
|
||||
private int $tries = 0;
|
||||
|
||||
private ?string $checkTimer = null;
|
||||
|
||||
/**
|
||||
* Whether this message is related to a user, as in getting a successful reply means we have auth.
|
||||
*/
|
||||
@ -110,6 +112,7 @@ class MTProtoOutgoingMessage extends MTProtoMessage
|
||||
public readonly string $type,
|
||||
public readonly bool $isMethod,
|
||||
public readonly bool $unencrypted,
|
||||
public readonly ?Cancellation $cancellation,
|
||||
public readonly ?string $subtype = null,
|
||||
/**
|
||||
* Whether this message is related to a file upload, as in getting a redirect should redirect to a media server.
|
||||
@ -126,7 +129,6 @@ class MTProtoOutgoingMessage extends MTProtoMessage
|
||||
public readonly ?int $takeoutId = null,
|
||||
public readonly ?string $businessConnectionId = null,
|
||||
private ?DeferredFuture $resultDeferred = null,
|
||||
public readonly ?Cancellation $cancellation = null
|
||||
) {
|
||||
$this->userRelated = $constructor === 'users.getUsers' && $body === ['id' => [['_' => 'inputUserSelf']]] || $constructor === 'auth.exportAuthorization' || $constructor === 'updates.getDifference';
|
||||
|
||||
@ -158,14 +160,6 @@ class MTProtoOutgoingMessage extends MTProtoMessage
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Whether cancellation is requested.
|
||||
*/
|
||||
public function isCancellationRequested(): bool
|
||||
{
|
||||
return $this->cancellation?->isRequested() ?? false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Signal that we're trying to send the message.
|
||||
*/
|
||||
@ -188,12 +182,46 @@ class MTProtoOutgoingMessage extends MTProtoMessage
|
||||
}
|
||||
$this->state |= self::STATE_SENT;
|
||||
$this->sent = hrtime(true);
|
||||
$this->checkTimer = EventLoop::delay(
|
||||
$this->connection->API->getSettings()->getConnection()->getTimeout(),
|
||||
$this->check(...)
|
||||
);
|
||||
if (isset($this->sendDeferred)) {
|
||||
$sendDeferred = $this->sendDeferred;
|
||||
$this->sendDeferred = null;
|
||||
$sendDeferred->complete();
|
||||
}
|
||||
}
|
||||
private function check(): void
|
||||
{
|
||||
if ($this->state & self::STATE_REPLIED) {
|
||||
return;
|
||||
}
|
||||
$shared = $this->connection->getShared();
|
||||
$settings = $shared->getSettings();
|
||||
$global = $shared->getGenericSettings();
|
||||
$timeout = (float) $settings->getTimeout();
|
||||
$pfs = $global->getAuth()->getPfs();
|
||||
$unencrypted = !$shared->hasTempAuthKey();
|
||||
$notBound = !$shared->isBound();
|
||||
$pfsNotBound = $pfs && $notBound;
|
||||
$this->checkTimer = EventLoop::delay(
|
||||
$timeout,
|
||||
$this->check(...)
|
||||
);
|
||||
|
||||
if ($this->unencrypted === $unencrypted) {
|
||||
if (!$unencrypted && $pfsNotBound && $this->constructor !== 'auth.bindTempAuthKey') {
|
||||
return;
|
||||
}
|
||||
if ($unencrypted) {
|
||||
$this->connection->unencrypted_check_queue[$this->msgId] = true;
|
||||
} else {
|
||||
$this->connection->check_queue[$this->msgId] = true;
|
||||
}
|
||||
$this->connection->flush();
|
||||
}
|
||||
}
|
||||
/**
|
||||
* Set reply to message.
|
||||
*
|
||||
@ -209,6 +237,10 @@ class MTProtoOutgoingMessage extends MTProtoMessage
|
||||
if (!($this->state & self::STATE_SENT)) {
|
||||
$this->sent();
|
||||
}
|
||||
if ($this->checkTimer !== null) {
|
||||
EventLoop::cancel($this->checkTimer);
|
||||
$this->checkTimer = null;
|
||||
}
|
||||
|
||||
if ($this->isMethod) {
|
||||
$this->connection->inFlightGauge?->dec([
|
||||
@ -221,6 +253,9 @@ class MTProtoOutgoingMessage extends MTProtoMessage
|
||||
);
|
||||
}
|
||||
}
|
||||
if ($this->msgId !== null) {
|
||||
unset($this->connection->new_outgoing[$this->msgId], $this->connection->outgoing_messages[$this->msgId]);
|
||||
}
|
||||
|
||||
$this->serializedBody = null;
|
||||
$this->body = null;
|
||||
|
@ -20,7 +20,6 @@ declare(strict_types=1);
|
||||
|
||||
namespace danog\MadelineProto\MTProtoSession;
|
||||
|
||||
use Amp\TimeoutException;
|
||||
use danog\MadelineProto\DataCenterConnection;
|
||||
use danog\MadelineProto\Logger;
|
||||
use danog\MadelineProto\MTProto\MTProtoIncomingMessage;
|
||||
@ -47,14 +46,6 @@ trait AckHandler
|
||||
}
|
||||
return true;
|
||||
}
|
||||
/**
|
||||
* We have gotten a response for an outgoing message.
|
||||
*/
|
||||
public function gotResponseForOutgoingMessage(MTProtoOutgoingMessage $outgoingMessage): void
|
||||
{
|
||||
// The server acknowledges that it received my message
|
||||
unset($this->new_outgoing[$outgoingMessage->getMsgId()]);
|
||||
}
|
||||
/**
|
||||
* Acknowledge incoming message ID.
|
||||
*/
|
||||
@ -66,72 +57,4 @@ trait AckHandler
|
||||
// I let the server know that I received its message
|
||||
$this->ack_queue[$message_id] = $message_id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if there are some pending calls.
|
||||
*/
|
||||
public function hasPendingCalls(): bool
|
||||
{
|
||||
$timeout = (int) ($this->shared->getSettings()->getTimeout() * 1_000_000_000.0);
|
||||
$pfs = $this->shared->getGenericSettings()->getAuth()->getPfs();
|
||||
$unencrypted = !$this->shared->hasTempAuthKey();
|
||||
$notBound = !$this->shared->isBound();
|
||||
$pfsNotBound = $pfs && $notBound;
|
||||
/** @var MTProtoOutgoingMessage */
|
||||
foreach ($this->new_outgoing as $message) {
|
||||
if ($message->wasSent()
|
||||
&& $message->getSent() + $timeout < hrtime(true)
|
||||
&& $message->unencrypted === $unencrypted
|
||||
&& $message->constructor !== 'msgs_state_req') {
|
||||
if (!$unencrypted && $pfsNotBound && $message->constructor !== 'auth.bindTempAuthKey') {
|
||||
continue;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
/**
|
||||
* Get all pending calls (also clear pending state requests).
|
||||
*/
|
||||
public function getPendingCalls(): array
|
||||
{
|
||||
$settings = $this->shared->getSettings();
|
||||
$global = $this->shared->getGenericSettings();
|
||||
$dropTimeout = (int) ($global->getRpc()->getRpcDropTimeout() * 1_000_000_000.0);
|
||||
$timeout = (int) ($settings->getTimeout() * 1_000_000_000.0);
|
||||
$pfs = $global->getAuth()->getPfs();
|
||||
$unencrypted = !$this->shared->hasTempAuthKey();
|
||||
$notBound = !$this->shared->isBound();
|
||||
$pfsNotBound = $pfs && $notBound;
|
||||
if ($this->datacenter < 0) {
|
||||
$dropTimeout *= 10;
|
||||
}
|
||||
$result = [];
|
||||
/** @var MTProtoOutgoingMessage $message */
|
||||
foreach ($this->new_outgoing as $message_id => $message) {
|
||||
if ($message->wasSent()
|
||||
&& $message->getSent() + $timeout < hrtime(true)
|
||||
&& $message->unencrypted === $unencrypted
|
||||
) {
|
||||
if (!$unencrypted && $pfsNotBound && $message->constructor !== 'auth.bindTempAuthKey') {
|
||||
continue;
|
||||
}
|
||||
if ($message->constructor === 'msgs_state_req' || $message->constructor === 'ping_delay_disconnect'
|
||||
|| ($message->getSent() + $dropTimeout < hrtime(true))
|
||||
) {
|
||||
Logger::log('No reply for message: ' . $message, Logger::WARNING);
|
||||
$this->handleReject($message, static fn () => new TimeoutException('Request timeout'));
|
||||
continue;
|
||||
}
|
||||
if ($message->getState() & MTProtoOutgoingMessage::STATE_REPLIED) {
|
||||
$this->API->logger("Already replied to message $message, but still in new_outgoing");
|
||||
unset($this->new_outgoing[$message_id]);
|
||||
continue;
|
||||
}
|
||||
$result[] = $message_id;
|
||||
}
|
||||
}
|
||||
return $result;
|
||||
}
|
||||
}
|
||||
|
@ -20,10 +20,14 @@ declare(strict_types=1);
|
||||
|
||||
namespace danog\MadelineProto\MTProtoSession;
|
||||
|
||||
use Amp\CompositeCancellation;
|
||||
use Amp\DeferredFuture;
|
||||
use Amp\Future;
|
||||
use Amp\Sync\LocalKeyedMutex;
|
||||
use Amp\TimeoutCancellation;
|
||||
use danog\MadelineProto\DataCenterConnection;
|
||||
use danog\MadelineProto\Logger;
|
||||
use danog\MadelineProto\MTProto;
|
||||
use danog\MadelineProto\MTProto\Container;
|
||||
use danog\MadelineProto\MTProto\MTProtoOutgoingMessage;
|
||||
use danog\MadelineProto\TL\Exception;
|
||||
@ -38,6 +42,7 @@ use function Amp\Future\await;
|
||||
*
|
||||
*
|
||||
* @property DataCenterConnection $shared
|
||||
* @property MTProto $API
|
||||
* @internal
|
||||
*/
|
||||
trait CallHandler
|
||||
@ -45,39 +50,52 @@ trait CallHandler
|
||||
/**
|
||||
* Recall method.
|
||||
*/
|
||||
public function methodRecall(int $message_id, ?int $datacenter = null): void
|
||||
public function methodRecall(MTProtoOutgoingMessage $request, ?int $forceDatacenter = null, float|Future|null $defer = null): void
|
||||
{
|
||||
if ($datacenter === $this->datacenter) {
|
||||
$datacenter = null;
|
||||
}
|
||||
$message = $this->outgoing_messages[$message_id] ?? null;
|
||||
$message_ids = $message instanceof Container
|
||||
? $message->getIds()
|
||||
: [$message_id];
|
||||
foreach ($message_ids as $message_id) {
|
||||
if (isset($this->outgoing_messages[$message_id])
|
||||
&& !$this->outgoing_messages[$message_id]->canGarbageCollect()) {
|
||||
if ($datacenter) {
|
||||
/** @var MTProtoOutgoingMessage */
|
||||
$message = $this->outgoing_messages[$message_id];
|
||||
$this->gotResponseForOutgoingMessage($message);
|
||||
$message->setMsgId(null);
|
||||
$message->setSeqNo(null);
|
||||
EventLoop::queue(function () use ($datacenter, $message): void {
|
||||
$this->API->datacenter->waitGetConnection($datacenter)
|
||||
->sendMessage($message);
|
||||
});
|
||||
} else {
|
||||
/** @var MTProtoOutgoingMessage */
|
||||
$message = $this->outgoing_messages[$message_id];
|
||||
if (!$message->hasSeqNo()) {
|
||||
$this->gotResponseForOutgoingMessage($message);
|
||||
}
|
||||
EventLoop::queue($this->sendMessage(...), $message);
|
||||
}
|
||||
} else {
|
||||
$this->API->logger('Could not resend '.($this->outgoing_messages[$message_id] ?? $message_id));
|
||||
$id = $request->getMsgId();
|
||||
unset($this->outgoing_messages[$id], $this->new_outgoing[$id]);
|
||||
if ($request instanceof Container) {
|
||||
foreach ($request->msgs as $msg) {
|
||||
$this->methodRecall($msg, $forceDatacenter, $defer);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if ($request->isCancellationRequested()) {
|
||||
return;
|
||||
}
|
||||
if (\is_float($defer)) {
|
||||
$d = new DeferredFuture;
|
||||
$id = EventLoop::delay($defer, $d->complete(...));
|
||||
$request->cancellation?->subscribe(static fn () => EventLoop::cancel($id));
|
||||
$defer = $d;
|
||||
return;
|
||||
}
|
||||
$prev = $request->previousQueuedMessage;
|
||||
if (!$prev->hasReply()) {
|
||||
$prev->getResultPromise()->finally(
|
||||
fn () => $this->methodRecall($request, $this->datacenter, $defer)
|
||||
);
|
||||
return;
|
||||
}
|
||||
if ($defer) {
|
||||
$defer->finally(
|
||||
fn () => $this->methodRecall($request, $this->datacenter)
|
||||
);
|
||||
return;
|
||||
}
|
||||
$datacenter = $forceDatacenter ?? $this->datacenter;
|
||||
if ($forceDatacenter !== null) {
|
||||
/** @var MTProtoOutgoingMessage */
|
||||
$request->setMsgId(null);
|
||||
$request->setSeqNo(null);
|
||||
}
|
||||
if ($datacenter === $this->datacenter) {
|
||||
EventLoop::queue($this->sendMessage(...), $request);
|
||||
} else {
|
||||
EventLoop::queue(function () use ($datacenter, $request): void {
|
||||
$this->API->datacenter->waitGetConnection($datacenter)
|
||||
->sendMessage($request);
|
||||
});
|
||||
}
|
||||
}
|
||||
/**
|
||||
@ -92,6 +110,7 @@ trait CallHandler
|
||||
return $readFuture->await();
|
||||
}
|
||||
private LocalKeyedMutex $abstractionQueueMutex;
|
||||
private ?float $drop = null;
|
||||
/**
|
||||
* Call method and make sure it is asynchronously sent (generator).
|
||||
*
|
||||
@ -163,6 +182,10 @@ trait CallHandler
|
||||
if (!$encrypted && $this->shared->hasTempAuthKey()) {
|
||||
$encrypted = true;
|
||||
}
|
||||
$timeout = new TimeoutCancellation($this->drop ??= (float) $this->API->getSettings()->getRpc()->getRpcDropTimeout());
|
||||
$cancellation = $cancellation !== null
|
||||
? new CompositeCancellation($cancellation, $timeout)
|
||||
: $timeout;
|
||||
$message = new MTProtoOutgoingMessage(
|
||||
connection: $this,
|
||||
body: $args,
|
||||
@ -187,7 +210,6 @@ trait CallHandler
|
||||
$message->setMsgId($args['madelineMsgId']);
|
||||
}
|
||||
$this->sendMessage($message);
|
||||
$this->checker->resume();
|
||||
return new WrappedFuture($response->getFuture());
|
||||
}
|
||||
/**
|
||||
@ -198,8 +220,15 @@ trait CallHandler
|
||||
*/
|
||||
public function objectCall(string $object, array $args, ?DeferredFuture $promise = null): void
|
||||
{
|
||||
$cancellation = $args['cancellation'] ?? null;
|
||||
$cancellation?->throwIfRequested();
|
||||
$timeout = new TimeoutCancellation($this->drop ??= (float) $this->API->getSettings()->getRpc()->getRpcDropTimeout());
|
||||
$cancellation = $cancellation !== null
|
||||
? new CompositeCancellation($cancellation, $timeout)
|
||||
: $timeout;
|
||||
$this->sendMessage(
|
||||
new MTProtoOutgoingMessage(
|
||||
cancellation: $cancellation,
|
||||
connection: $this,
|
||||
body: $args,
|
||||
constructor: $object,
|
||||
|
@ -64,7 +64,9 @@ trait Reliable
|
||||
}
|
||||
if ($ok) {
|
||||
foreach ($content['msg_ids'] as $msg_id) {
|
||||
$this->methodRecall($msg_id);
|
||||
if (isset($this->outgoing_messages[$msg_id])) {
|
||||
$this->methodRecall($this->outgoing_messages[$msg_id]);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
$this->sendMsgsStateInfo($content['msg_ids'], $current_msg_id);
|
||||
|
@ -20,6 +20,7 @@ declare(strict_types=1);
|
||||
|
||||
namespace danog\MadelineProto\MTProtoSession;
|
||||
|
||||
use Amp\DeferredFuture;
|
||||
use Amp\SignalException;
|
||||
use danog\BetterPrometheus\BetterHistogram;
|
||||
use danog\Loop\Loop;
|
||||
@ -43,6 +44,8 @@ use Throwable;
|
||||
|
||||
use const PHP_EOL;
|
||||
|
||||
use function Amp\async;
|
||||
|
||||
/**
|
||||
* Manages responses.
|
||||
*
|
||||
@ -114,7 +117,7 @@ trait ResponseHandler
|
||||
}
|
||||
|
||||
$this->API->logger('Trying to assign a response of type ' . $response_type . ' to its request...', Logger::VERBOSE);
|
||||
foreach ($this->new_outgoing as $expecting_msg_id => $expecting) {
|
||||
foreach ($this->unencrypted_new_outgoing as $expecting_msg_id => $expecting) {
|
||||
if (!$expecting->type) {
|
||||
continue;
|
||||
}
|
||||
@ -170,14 +173,6 @@ trait ResponseHandler
|
||||
$this->handleMessages([$message]);
|
||||
}
|
||||
}
|
||||
/**
|
||||
* @param callable(): \Throwable $data
|
||||
*/
|
||||
private function handleReject(MTProtoOutgoingMessage $message, callable $data): void
|
||||
{
|
||||
$this->gotResponseForOutgoingMessage($message);
|
||||
$message->reply($data);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle RPC response.
|
||||
@ -211,7 +206,7 @@ trait ResponseHandler
|
||||
$exception = static fn (): \Throwable => $e;
|
||||
}
|
||||
if ($exception) {
|
||||
$this->handleReject($request, $exception);
|
||||
$request->reply($exception);
|
||||
}
|
||||
return;
|
||||
}
|
||||
@ -220,12 +215,10 @@ trait ResponseHandler
|
||||
switch ($response['error_code']) {
|
||||
case 48:
|
||||
$this->shared->getTempAuthKey()->setServerSalt($response['new_server_salt']);
|
||||
$this->methodRecall($requestId);
|
||||
$this->methodRecall($request);
|
||||
return;
|
||||
case 20:
|
||||
$request->setMsgId(null);
|
||||
$request->setSeqNo(null);
|
||||
$this->methodRecall($requestId);
|
||||
$this->methodRecall($request, $this->datacenter);
|
||||
return;
|
||||
case 16:
|
||||
case 17:
|
||||
@ -233,11 +226,12 @@ trait ResponseHandler
|
||||
$this->API->logger('Set time delta to ' . $this->time_delta, Logger::WARNING);
|
||||
$this->API->resetMTProtoSession("time delta update");
|
||||
$this->shared->setTempAuthKey(null);
|
||||
EventLoop::queue($this->shared->initAuthorization(...));
|
||||
EventLoop::queue($this->methodRecall(...), $requestId);
|
||||
$d = new DeferredFuture;
|
||||
async($this->shared->initAuthorization(...))->finally($d->complete(...));
|
||||
$this->methodRecall($request, $this->datacenter, $d->getFuture());
|
||||
return;
|
||||
}
|
||||
$this->handleReject($request, static fn () => RPCErrorException::make('Received bad_msg_notification: ' . MTProto::BAD_MSG_ERROR_CODES[$response['error_code']], $response['error_code'], $request->constructor));
|
||||
$request->reply(static fn () => RPCErrorException::make('Received bad_msg_notification: ' . MTProto::BAD_MSG_ERROR_CODES[$response['error_code']], $response['error_code'], $request->constructor));
|
||||
return;
|
||||
}
|
||||
|
||||
@ -275,7 +269,6 @@ trait ResponseHandler
|
||||
}
|
||||
}
|
||||
}
|
||||
$this->gotResponseForOutgoingMessage($request);
|
||||
|
||||
$this->requestResponse?->inc([
|
||||
'method' => $request->constructor,
|
||||
@ -283,7 +276,7 @@ trait ResponseHandler
|
||||
'error_code' => '200',
|
||||
]);
|
||||
|
||||
EventLoop::queue($request->reply(...), $response);
|
||||
$request->reply($response);
|
||||
}
|
||||
/**
|
||||
* @param array{error_message: string, error_code: int} $response
|
||||
@ -314,12 +307,7 @@ trait ResponseHandler
|
||||
&& !$request->shouldRefreshReferences()
|
||||
) {
|
||||
$this->API->logger("Got {$response['error_message']}, refreshing file reference and repeating method call...");
|
||||
$this->gotResponseForOutgoingMessage($request);
|
||||
$msgId = $request->getMsgId();
|
||||
$request->setRefreshReferences(true);
|
||||
$request->setMsgId(null);
|
||||
$request->setSeqNo(null);
|
||||
$this->methodRecall($msgId);
|
||||
$this->methodRecall($request, $this->datacenter);
|
||||
return null;
|
||||
}
|
||||
|
||||
@ -334,28 +322,13 @@ trait ResponseHandler
|
||||
)
|
||||
) {
|
||||
$this->API->logger("Resending $request due to {$response['error_message']}");
|
||||
$this->gotResponseForOutgoingMessage($request);
|
||||
$msgId = $request->getMsgId();
|
||||
$request->setSent(hrtime(true) + (5*60 * 1_000_000_000));
|
||||
$request->setMsgId(null);
|
||||
$request->setSeqNo(null);
|
||||
$prev = $request->previousQueuedMessage;
|
||||
if ($prev->hasReply()) {
|
||||
$this->methodRecall($msgId);
|
||||
} else {
|
||||
$prev->getResultPromise()->finally(
|
||||
fn () => $this->methodRecall($msgId)
|
||||
);
|
||||
}
|
||||
$this->methodRecall($request, $this->datacenter);
|
||||
return null;
|
||||
}
|
||||
if ((($response['error_code'] === -503 || $response['error_message'] === '-503') && !\in_array($request->constructor, ['messages.getBotCallbackAnswer', 'messages.getInlineBotResults'], true))
|
||||
|| (\in_array($response['error_message'], ['MSGID_DECREASE_RETRY', 'HISTORY_GET_FAILED', 'RPC_CONNECT_FAILED', 'RPC_CALL_FAIL', 'RPC_MCGET_FAIL', 'PERSISTENT_TIMESTAMP_OUTDATED', 'RPC_MCGET_FAIL', 'no workers running', 'No workers running'], true))) {
|
||||
$this->API->logger("Resending $request in 1 second due to {$response['error_message']}");
|
||||
$msgId = $request->getMsgId();
|
||||
$request->setMsgId(null);
|
||||
$request->setSeqNo(null);
|
||||
EventLoop::delay(1.0, fn () => $this->methodRecall($msgId));
|
||||
$this->methodRecall($request, $this->datacenter, 1.0);
|
||||
return null;
|
||||
}
|
||||
return static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor);
|
||||
@ -386,20 +359,7 @@ trait ResponseHandler
|
||||
)
|
||||
) {
|
||||
$this->API->logger("Resending $request due to {$response['error_message']}");
|
||||
$this->gotResponseForOutgoingMessage($request);
|
||||
$msgId = $request->getMsgId();
|
||||
$request->setSent(hrtime(true) + (5*60 * 1_000_000_000));
|
||||
$request->setMsgId(null);
|
||||
$request->setSeqNo(null);
|
||||
\assert($msgId !== null);
|
||||
$prev = $request->previousQueuedMessage;
|
||||
if ($prev->hasReply()) {
|
||||
$this->methodRecall($msgId);
|
||||
} else {
|
||||
$prev->getResultPromise()->finally(
|
||||
fn () => $this->methodRecall($msgId)
|
||||
);
|
||||
}
|
||||
$this->methodRecall($request, $this->datacenter);
|
||||
return null;
|
||||
}
|
||||
return static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor);
|
||||
@ -420,12 +380,7 @@ trait ResponseHandler
|
||||
case 'AUTH_KEY_UNREGISTERED':
|
||||
case 'AUTH_KEY_INVALID':
|
||||
if ($this->API->authorized !== \danog\MadelineProto\API::LOGGED_IN) {
|
||||
$this->gotResponseForOutgoingMessage($request);
|
||||
EventLoop::queue(
|
||||
$this->handleReject(...),
|
||||
$request,
|
||||
static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor)
|
||||
);
|
||||
$request->reply(static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor));
|
||||
return null;
|
||||
}
|
||||
$this->session_id = null;
|
||||
@ -441,14 +396,16 @@ trait ResponseHandler
|
||||
$this->API->logout();
|
||||
return static fn () => new SignalException(sprintf(Lang::$current_lang['account_banned'], $phone));
|
||||
}
|
||||
EventLoop::queue($this->shared->initAuthorization(...));
|
||||
EventLoop::queue($this->methodRecall(...), $request->getMsgId());
|
||||
$deferred = new DeferredFuture;
|
||||
async($this->shared->initAuthorization(...))->finally($deferred->complete(...));
|
||||
$this->methodRecall($request, $this->datacenter, $deferred->getFuture());
|
||||
return null;
|
||||
case 'AUTH_KEY_PERM_EMPTY':
|
||||
$this->API->logger('Temporary auth key not bound, resetting temporary auth key...', Logger::ERROR);
|
||||
$this->shared->setTempAuthKey(null);
|
||||
EventLoop::queue($this->shared->initAuthorization(...));
|
||||
EventLoop::queue($this->methodRecall(...), $request->getMsgId());
|
||||
$deferred = new DeferredFuture;
|
||||
async($this->shared->initAuthorization(...))->finally($deferred->complete(...));
|
||||
$this->methodRecall($request, $this->datacenter, $deferred->getFuture());
|
||||
return null;
|
||||
}
|
||||
return static fn () => RPCErrorException::make($response['error_message'], $response['error_code'], $request->constructor);
|
||||
@ -457,14 +414,7 @@ trait ResponseHandler
|
||||
$limit = $request->floodWaitLimit ?? $this->API->settings->getRPC()->getFloodTimeout();
|
||||
if ($seconds < $limit) {
|
||||
$this->API->logger("Flood, waiting $seconds seconds before repeating async call of $request...", Logger::NOTICE);
|
||||
$this->gotResponseForOutgoingMessage($request);
|
||||
$msgId = $request->getMsgId();
|
||||
$request->setSent(hrtime(true) + ($seconds * 1_000_000_000));
|
||||
$request->setMsgId(null);
|
||||
$request->setSeqNo(null);
|
||||
\assert($msgId !== null);
|
||||
$id = EventLoop::delay((float) $seconds, fn () => $this->methodRecall($msgId));
|
||||
$request->cancellation?->subscribe(static fn () => EventLoop::cancel($id));
|
||||
$this->methodRecall($request, $this->datacenter, (float) $seconds);
|
||||
return null;
|
||||
}
|
||||
if (str_starts_with($response['error_message'], 'FLOOD_WAIT_')) {
|
||||
|
@ -77,6 +77,12 @@ trait Session
|
||||
* @var array<MTProtoOutgoingMessage>
|
||||
*/
|
||||
public array $new_outgoing = [];
|
||||
/**
|
||||
* New unencrypted outgoing message array.
|
||||
*
|
||||
* @var array<MTProtoOutgoingMessage>
|
||||
*/
|
||||
public array $unencrypted_new_outgoing = [];
|
||||
/**
|
||||
* Pending outgoing messages.
|
||||
*
|
||||
@ -104,6 +110,16 @@ trait Session
|
||||
*
|
||||
*/
|
||||
public array $ack_queue = [];
|
||||
/**
|
||||
* Check queue.
|
||||
*
|
||||
*/
|
||||
public array $check_queue = [];
|
||||
/**
|
||||
* Check queue.
|
||||
*
|
||||
*/
|
||||
public array $unencrypted_check_queue = [];
|
||||
/**
|
||||
* Message ID handler.
|
||||
*
|
||||
@ -178,6 +194,12 @@ trait Session
|
||||
$new_outgoing[$key] = $message;
|
||||
}
|
||||
$this->new_outgoing = $new_outgoing;
|
||||
|
||||
$unencrypted_new_outgoing = [];
|
||||
foreach ($this->unencrypted_new_outgoing as $key => $message) {
|
||||
$unencrypted_new_outgoing[$key] = $message;
|
||||
}
|
||||
$this->unencrypted_new_outgoing = $unencrypted_new_outgoing;
|
||||
}
|
||||
/**
|
||||
* Create MTProto session if needed.
|
||||
@ -227,6 +249,6 @@ trait Session
|
||||
public function backupSession(): array
|
||||
{
|
||||
$pending = array_values($this->pendingOutgoing);
|
||||
return array_merge($pending, $this->new_outgoing);
|
||||
return array_merge($pending, $this->new_outgoing, $this->unencrypted_new_outgoing);
|
||||
}
|
||||
}
|
||||
|
@ -240,7 +240,7 @@ trait FilesLogic
|
||||
if ($result->shouldServe()) {
|
||||
$pipe = new Pipe(1024 * 1024);
|
||||
[$start, $end] = $result->getServeRange();
|
||||
EventLoop::queue($this->downloadToStream(...), $messageMedia, $pipe->getSink(), $cb, $start, $end, $cancellation);
|
||||
async($this->downloadToStream(...), $messageMedia, $pipe->getSink(), $cb, $start, $end, $cancellation)->finally($pipe->getSink()->close(...));
|
||||
$body = $pipe->getSource();
|
||||
} elseif (!\in_array($result->getCode(), [HttpStatus::OK, HttpStatus::PARTIAL_CONTENT], true)) {
|
||||
$body = $result->getCodeExplanation();
|
||||
|
@ -267,11 +267,13 @@ final class PeerDatabase implements TLCallback
|
||||
}
|
||||
$new = $new ? self::getUsernames($new) : [];
|
||||
$old = $old ? self::getUsernames($old) : [];
|
||||
$diffToRemove = array_diff($old, $new);
|
||||
$diffToAdd = array_diff($new, $old);
|
||||
if (!$diffToAdd && !$diffToRemove) {
|
||||
return;
|
||||
foreach ($old as $key => $username) {
|
||||
if (!isset($this->usernames[$username])) {
|
||||
unset($old[$key]);
|
||||
}
|
||||
}
|
||||
$diffToRemove = array_diff($old, $new);
|
||||
$diffToAdd = array_diff($new, $diffToRemove);
|
||||
$lock = $this->decacheMutex->acquire();
|
||||
try {
|
||||
foreach ($diffToRemove as $username) {
|
||||
@ -417,6 +419,9 @@ final class PeerDatabase implements TLCallback
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
$this->recacheChatUsername($user['id'], $existingChat, $user);
|
||||
|
||||
if ($existingChat != $user) {
|
||||
$this->API->logger("Updated user {$user['id']}", Logger::ULTRA_VERBOSE);
|
||||
if (($user['min'] ?? false) && !($existingChat['min'] ?? false)) {
|
||||
@ -552,6 +557,7 @@ final class PeerDatabase implements TLCallback
|
||||
return;
|
||||
}
|
||||
}
|
||||
$this->recacheChatUsername($bot_api_id, $existingChat, $chat);
|
||||
if ($existingChat != $chat) {
|
||||
$this->API->logger("Updated chat {$bot_api_id}", Logger::ULTRA_VERBOSE);
|
||||
if (($chat['min'] ?? false) && $existingChat && !($existingChat['min'] ?? false)) {
|
||||
|
Loading…
Reference in New Issue
Block a user