1
0
mirror of https://github.com/danog/MadelineProto.git synced 2025-01-12 00:58:17 +01:00
MadelineProto/src/DataCenterConnection.php

742 lines
25 KiB
PHP
Raw Normal View History

2022-12-30 21:54:44 +01:00
<?php
declare(strict_types=1);
2020-01-31 19:29:43 +01:00
2019-08-31 22:43:58 +02:00
/**
* Connection module handling all connections to a datacenter.
*
* This file is part of MadelineProto.
* MadelineProto is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version.
* MadelineProto is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
* See the GNU Affero General Public License for more details.
* You should have received a copy of the GNU General Public License along with MadelineProto.
* If not, see <http://www.gnu.org/licenses/>.
*
* @author Daniil Gentili <daniil@daniil.it>
2023-01-04 12:43:01 +01:00
* @copyright 2016-2023 Daniil Gentili <daniil@daniil.it>
2019-08-31 22:43:58 +02:00
* @license https://opensource.org/licenses/AGPL-3.0 AGPLv3
2019-10-31 15:07:35 +01:00
* @link https://docs.madelineproto.xyz MadelineProto documentation
2019-08-31 22:43:58 +02:00
*/
namespace danog\MadelineProto;
2022-12-30 20:24:13 +01:00
use Amp\DeferredFuture;
2023-01-08 16:06:50 +01:00
use Amp\Future;
2022-08-13 16:36:51 +02:00
use Amp\Sync\LocalMutex;
2020-07-28 20:39:32 +02:00
use danog\MadelineProto\Loop\Generic\PeriodicLoopInternal;
2023-07-05 21:28:17 +02:00
use danog\MadelineProto\MTProto\MTProtoOutgoingMessage;
2019-09-02 15:30:29 +02:00
use danog\MadelineProto\MTProto\PermAuthKey;
use danog\MadelineProto\MTProto\TempAuthKey;
2022-08-13 16:36:51 +02:00
use danog\MadelineProto\MTProtoTools\Crypt;
use danog\MadelineProto\Settings\Connection as ConnectionSettings;
2019-08-31 22:43:58 +02:00
use danog\MadelineProto\Stream\ConnectionContext;
2019-09-02 14:37:30 +02:00
use danog\MadelineProto\Stream\MTProtoTransport\HttpsStream;
use danog\MadelineProto\Stream\MTProtoTransport\HttpStream;
use danog\MadelineProto\Stream\Transport\WssStream;
2019-09-01 14:07:04 +02:00
use JsonSerializable;
use Revolt\EventLoop;
2019-08-31 22:43:58 +02:00
2022-12-30 19:21:36 +01:00
use function count;
2020-10-01 20:48:22 +02:00
/**
2020-10-02 16:13:19 +02:00
* Datacenter connection.
2020-10-01 20:48:22 +02:00
*/
2023-01-15 12:05:38 +01:00
final class DataCenterConnection implements JsonSerializable
2019-08-31 22:43:58 +02:00
{
const READ_WEIGHT = 1;
const READ_WEIGHT_MEDIA = 5;
const WRITE_WEIGHT = 10;
2019-12-29 14:04:02 +01:00
/**
* Promise for connection.
*
*/
2023-01-08 16:06:50 +01:00
private Future $connectionsPromise;
2019-12-29 14:04:02 +01:00
/**
* Deferred for connection.
*
*/
2023-01-08 18:12:58 +01:00
private ?DeferredFuture $connectionsDeferred = null;
2019-08-31 22:43:58 +02:00
/**
* Temporary auth key.
*
*/
2023-01-04 15:13:55 +01:00
private ?TempAuthKey $tempAuthKey = null;
2019-08-31 22:43:58 +02:00
/**
* Permanent auth key.
*
*/
2023-01-04 15:13:55 +01:00
private ?PermAuthKey $permAuthKey = null;
2019-08-31 22:43:58 +02:00
/**
2019-09-01 01:52:28 +02:00
* Connections open to a certain DC.
2019-08-31 22:43:58 +02:00
*
2020-10-01 20:48:22 +02:00
* @var array<int, Connection>
2019-08-31 22:43:58 +02:00
*/
2023-01-04 15:13:55 +01:00
private array $connections = [];
2019-09-01 01:52:28 +02:00
/**
2019-09-01 14:07:04 +02:00
* Connection weights.
2019-09-01 01:52:28 +02:00
*
2020-10-01 20:48:22 +02:00
* @var array<int, int>
2019-09-01 01:52:28 +02:00
*/
2023-01-04 15:13:55 +01:00
private array $availableConnections = [];
2019-08-31 22:43:58 +02:00
/**
2019-09-01 01:52:28 +02:00
* Main API instance.
2019-08-31 22:43:58 +02:00
*
*/
2023-01-04 15:13:55 +01:00
private MTProto $API;
2019-08-31 22:43:58 +02:00
/**
2019-09-01 01:52:28 +02:00
* Connection context.
2019-08-31 22:43:58 +02:00
*
*/
2023-01-04 15:13:55 +01:00
private ConnectionContext $ctx;
2019-08-31 22:43:58 +02:00
/**
2019-09-01 01:52:28 +02:00
* DC ID.
2019-08-31 22:43:58 +02:00
*/
2023-01-08 16:23:18 +01:00
private int $datacenter;
2019-09-01 01:52:28 +02:00
/**
2019-09-01 23:39:29 +02:00
* Linked DC ID.
2019-09-01 01:52:28 +02:00
*
*/
2023-01-15 19:39:01 +01:00
private ?int $linkedDc = null;
2019-09-01 01:52:28 +02:00
/**
2019-09-01 14:07:04 +02:00
* Loop to keep weights at sane value.
2019-09-01 01:52:28 +02:00
*/
2020-07-28 20:39:32 +02:00
private ?PeriodicLoopInternal $robinLoop = null;
2019-09-02 14:37:30 +02:00
/**
* Decrement roundrobin weight by this value if busy reading.
*
*/
2023-01-04 15:13:55 +01:00
private int $decRead = 1;
2019-09-02 14:37:30 +02:00
/**
* Decrement roundrobin weight by this value if busy writing.
*
*/
2023-01-04 15:13:55 +01:00
private int $decWrite = 10;
2019-09-04 17:48:07 +02:00
/**
2019-09-12 18:56:26 +02:00
* Backed up messages.
2019-09-04 17:48:07 +02:00
*
*/
2023-01-04 15:13:55 +01:00
private array $backup = [];
/**
* Whether this socket has to be reconnected.
*
*/
2023-01-04 15:13:55 +01:00
private bool $needsReconnect = false;
/**
* Indicate if this socket needs to be reconnected.
*
* @param boolean $needsReconnect Whether the socket has to be reconnected
*/
2022-12-08 20:16:40 +01:00
public function needReconnect(bool $needsReconnect): void
{
$this->needsReconnect = $needsReconnect;
}
/**
* Whether this sockets needs to be reconnected.
*/
public function shouldReconnect(): bool
{
return $this->needsReconnect;
}
2022-08-13 16:36:51 +02:00
private ?LocalMutex $initingAuth = null;
/**
* Init auth keys for single DC.
*
* @internal
*/
2023-01-03 22:07:58 +01:00
public function initAuthorization(): void
2022-08-13 16:36:51 +02:00
{
$logger = $this->API->logger;
$this->initingAuth ??= new LocalMutex;
$lock = $this->initingAuth->acquire();
2022-08-13 16:36:51 +02:00
try {
$logger->logger("Initing auth for DC {$this->datacenter}", Logger::NOTICE);
$this->waitGetConnection();
2022-08-13 16:36:51 +02:00
$connection = $this->getAuthConnection();
$this->createSession();
$cdn = $this->isCDN();
$media = $this->isMedia();
$pfs = $this->API->settings->getAuth()->getPfs();
2022-08-13 16:36:51 +02:00
if (!$this->hasTempAuthKey() || !$this->hasPermAuthKey() || !$this->isBound()) {
if (!$this->hasPermAuthKey() && !$cdn && !$media) {
2023-06-24 18:00:19 +02:00
$logger->logger(\sprintf('Generating permanent authorization key for DC %s...', $this->datacenter), Logger::NOTICE);
$this->setPermAuthKey($connection->createAuthKey(false));
2022-08-13 16:36:51 +02:00
}
if ($media) {
2023-01-11 18:47:27 +01:00
$this->link(-$this->datacenter);
2022-08-13 16:36:51 +02:00
if ($this->hasTempAuthKey()) {
return;
}
}
if ($pfs) {
2022-08-13 16:36:51 +02:00
if (!$cdn) {
2023-06-24 18:00:19 +02:00
$logger->logger(\sprintf('Generating temporary authorization key for DC %s...', $this->datacenter), Logger::NOTICE);
2022-08-13 16:36:51 +02:00
$this->setTempAuthKey(null);
$this->setTempAuthKey($connection->createAuthKey(true));
$this->bindTempAuthKey();
$connection->methodCallAsyncRead('help.getConfig', []);
$this->syncAuthorization();
2022-08-13 16:36:51 +02:00
} elseif (!$this->hasTempAuthKey()) {
2023-06-24 18:00:19 +02:00
$logger->logger(\sprintf('Generating temporary authorization key for DC %s...', $this->datacenter), Logger::NOTICE);
$this->setTempAuthKey($connection->createAuthKey(true));
2022-08-13 16:36:51 +02:00
}
} else {
if (!$cdn) {
$this->bind(false);
$connection->methodCallAsyncRead('help.getConfig', []);
$this->syncAuthorization();
2022-08-13 16:36:51 +02:00
} elseif (!$this->hasTempAuthKey()) {
2023-06-24 18:00:19 +02:00
$logger->logger(\sprintf('Generating temporary authorization key for DC %s...', $this->datacenter), Logger::NOTICE);
$this->setTempAuthKey($connection->createAuthKey(true));
2022-08-13 16:36:51 +02:00
}
}
$this->flush();
2022-08-13 16:36:51 +02:00
} elseif (!$cdn) {
$this->syncAuthorization();
2022-08-13 16:36:51 +02:00
}
} finally {
$lock->release();
}
2022-08-15 17:43:32 +02:00
if ($this->hasTempAuthKey()) {
$connection->pingHttpWaiter();
}
2022-08-13 16:36:51 +02:00
}
/**
* Bind temporary and permanent auth keys.
*
* @internal
*/
2023-01-04 12:37:12 +01:00
public function bindTempAuthKey(): bool
2022-08-13 16:36:51 +02:00
{
$connection = $this->getAuthConnection();
$logger = $this->API->logger;
$expires_in = $this->API->settings->getAuth()->getDefaultTempAuthKeyExpiresIn();
for ($retry_id_total = 1; $retry_id_total <= $this->API->settings->getAuth()->getMaxAuthTries(); $retry_id_total++) {
try {
2022-12-30 19:21:36 +01:00
$logger->logger('Binding authorization keys...', Logger::VERBOSE);
$nonce = Tools::random(8);
2022-08-13 16:36:51 +02:00
$expires_at = \time() + $expires_in;
$temp_auth_key_id = $this->getTempAuthKey()->getID();
$perm_auth_key_id = $this->getPermAuthKey()->getID();
$temp_session_id = $connection->session_id;
$message_data = ($this->API->getTL()->serializeObject(['type' => ''], ['_' => 'bind_auth_key_inner', 'nonce' => $nonce, 'temp_auth_key_id' => $temp_auth_key_id, 'perm_auth_key_id' => $perm_auth_key_id, 'temp_session_id' => $temp_session_id, 'expires_at' => $expires_at], 'bindTempAuthKey_inner'));
2022-08-13 16:36:51 +02:00
$message_id = $connection->msgIdHandler->generateMessageId();
$seq_no = 0;
$encrypted_data = Tools::random(16).Tools::packSignedLong($message_id).\pack('VV', $seq_no, \strlen($message_data)).$message_data;
2022-08-13 16:36:51 +02:00
$message_key = \substr(\sha1($encrypted_data, true), -16);
2022-12-30 19:21:36 +01:00
$padding = Tools::random(Tools::posmod(-\strlen($encrypted_data), 16));
[$aes_key, $aes_iv] = Crypt::oldAesCalculate($message_key, $this->getPermAuthKey()->getAuthKey());
2022-08-13 16:36:51 +02:00
$encrypted_message = $this->getPermAuthKey()->getID().$message_key.Crypt::igeEncrypt($encrypted_data.$padding, $aes_key, $aes_iv);
$res = $connection->methodCallAsyncRead('auth.bindTempAuthKey', ['perm_auth_key_id' => $perm_auth_key_id, 'nonce' => $nonce, 'expires_at' => $expires_at, 'encrypted_message' => $encrypted_message], ['msg_id' => $message_id]);
2022-08-13 16:36:51 +02:00
if ($res === true) {
2022-12-30 19:21:36 +01:00
$logger->logger("Bound temporary and permanent authorization keys, DC {$this->datacenter}", Logger::NOTICE);
2022-08-13 16:36:51 +02:00
$this->bind();
return true;
}
2022-12-30 19:21:36 +01:00
} catch (SecurityException $e) {
$logger->logger('An exception occurred while generating the authorization key: '.$e->getMessage().' Retrying (try number '.$retry_id_total.')...', Logger::WARNING);
} catch (Exception $e) {
$logger->logger('An exception occurred while generating the authorization key: '.$e->getMessage().' Retrying (try number '.$retry_id_total.')...', Logger::WARNING);
} catch (RPCErrorException $e) {
$logger->logger('An RPCErrorException occurred while generating the authorization key: '.$e->getMessage().' Retrying (try number '.$retry_id_total.')...', Logger::WARNING);
2022-08-13 16:36:51 +02:00
}
}
2022-12-30 19:21:36 +01:00
throw new SecurityException('An error occurred while binding temporary and permanent authorization keys.');
2022-08-13 16:36:51 +02:00
}
/**
* Sync authorization data between DCs.
*/
2023-01-03 22:07:58 +01:00
private function syncAuthorization(): void
2022-08-13 16:36:51 +02:00
{
$socket = $this->getAuthConnection();
$logger = $this->API->logger;
2023-07-04 18:19:06 +02:00
if ($this->API->authorized === \danog\MadelineProto\API::LOGGED_IN && !$this->isAuthorized()) {
2022-08-13 16:36:51 +02:00
foreach ($this->API->datacenter->getDataCenterConnections() as $authorized_dc_id => $authorized_socket) {
if ($this->API->authorized_dc !== -1 && $authorized_dc_id !== $this->API->authorized_dc) {
continue;
}
2022-08-14 14:40:52 +02:00
if ($authorized_socket->hasTempAuthKey()
&& $authorized_socket->hasPermAuthKey()
&& $authorized_socket->isAuthorized()
2023-07-04 18:19:06 +02:00
&& $this->API->authorized === \danog\MadelineProto\API::LOGGED_IN
2022-08-14 14:40:52 +02:00
&& !$this->isAuthorized()
2022-08-13 16:36:51 +02:00
&& !$authorized_socket->isCDN()
) {
try {
$logger->logger('Trying to copy authorization from DC '.$authorized_dc_id.' to DC '.$this->datacenter);
2023-01-08 16:23:18 +01:00
$exported_authorization = $this->API->methodCallAsyncRead('auth.exportAuthorization', ['dc_id' => $this->datacenter], ['datacenter' => $authorized_dc_id]);
$socket->methodCallAsyncRead('auth.importAuthorization', $exported_authorization);
2022-08-13 16:36:51 +02:00
$this->authorized(true);
break;
2022-12-30 19:21:36 +01:00
} catch (Exception $e) {
$logger->logger('Failure while syncing authorization from DC '.$authorized_dc_id.' to DC '.$this->datacenter.': '.$e->getMessage(), Logger::ERROR);
} catch (RPCErrorException $e) {
$logger->logger('Failure while syncing authorization from DC '.$authorized_dc_id.' to DC '.$this->datacenter.': '.$e->getMessage(), Logger::ERROR);
2022-08-13 16:36:51 +02:00
if ($e->rpc === 'DC_ID_INVALID') {
break;
}
}
// Turns out this DC isn't authorized after all
}
}
}
}
2019-09-01 23:39:29 +02:00
/**
* Get temporary authorization key.
*/
public function getTempAuthKey(): TempAuthKey
{
2023-01-11 18:47:27 +01:00
if (!$this->tempAuthKey) {
throw new NothingInTheSocketException();
}
return $this->tempAuthKey;
2019-09-01 23:39:29 +02:00
}
/**
* Get permanent authorization key.
*/
public function getPermAuthKey(): PermAuthKey
{
2023-01-11 18:47:27 +01:00
if (!$this->permAuthKey) {
throw new NothingInTheSocketException();
}
return $this->permAuthKey;
2019-09-01 23:39:29 +02:00
}
/**
* Check if has temporary authorization key.
*/
public function hasTempAuthKey(): bool
{
2023-01-11 18:47:27 +01:00
return $this->tempAuthKey !== null && $this->tempAuthKey->hasAuthKey();
2019-09-01 23:39:29 +02:00
}
/**
* Check if has permanent authorization key.
*/
public function hasPermAuthKey(): bool
{
2023-01-11 18:47:27 +01:00
return $this->permAuthKey !== null && $this->permAuthKey->hasAuthKey();
2019-09-01 23:39:29 +02:00
}
/**
* Set temporary authorization key.
*
* @param TempAuthKey|null $key Auth key
*/
2020-10-03 12:36:08 +02:00
public function setTempAuthKey(?TempAuthKey $key): void
2019-09-01 23:39:29 +02:00
{
2023-01-11 18:47:27 +01:00
$this->tempAuthKey = $key;
2019-09-01 23:39:29 +02:00
}
/**
* Set permanent authorization key.
*
* @param PermAuthKey|null $key Auth key
*/
2020-10-03 12:36:08 +02:00
public function setPermAuthKey(?PermAuthKey $key): void
2019-09-01 23:39:29 +02:00
{
2023-01-11 18:47:27 +01:00
$this->permAuthKey = $key;
2019-09-01 23:39:29 +02:00
}
/**
* Bind temporary and permanent auth keys.
*
* @param bool $pfs Whether to bind using PFS
*/
2022-12-08 20:16:40 +01:00
public function bind(bool $pfs = true): void
2019-09-01 23:39:29 +02:00
{
2019-09-03 14:40:50 +02:00
if (!$pfs && !$this->tempAuthKey) {
$this->tempAuthKey = new TempAuthKey();
}
$this->tempAuthKey->bind($this->permAuthKey, $pfs);
2019-09-01 23:39:29 +02:00
}
2019-09-04 17:48:07 +02:00
/**
* Check if auth keys are bound.
*/
public function isBound(): bool
{
return $this->tempAuthKey ? $this->tempAuthKey->isBound() : false;
}
2019-08-31 22:43:58 +02:00
/**
* Check if we are logged in.
*/
public function isAuthorized(): bool
{
2019-09-01 23:39:29 +02:00
return $this->hasTempAuthKey() ? $this->getTempAuthKey()->isAuthorized() : false;
2019-08-31 22:43:58 +02:00
}
/**
* Set the authorized boolean.
*
* @param boolean $authorized Whether we are authorized
*/
2022-12-08 20:16:40 +01:00
public function authorized(bool $authorized): void
2019-08-31 22:43:58 +02:00
{
2019-09-02 15:30:29 +02:00
if ($authorized) {
$this->getTempAuthKey()->authorized($authorized);
} elseif ($this->hasTempAuthKey()) {
2019-09-02 15:30:29 +02:00
$this->getTempAuthKey()->authorized($authorized);
}
2019-09-01 23:39:29 +02:00
}
/**
* Link permanent authorization info of main DC to media DC.
*
2023-01-08 16:23:18 +01:00
* @param int $dc Main DC ID
2019-09-01 23:39:29 +02:00
*/
2023-01-08 16:23:18 +01:00
public function link(int $dc): void
2019-09-01 23:39:29 +02:00
{
2023-01-15 19:39:01 +01:00
$this->linkedDc = $dc;
2020-01-31 19:29:43 +01:00
$this->permAuthKey =& $this->API->datacenter->getDataCenterConnection($dc)->permAuthKey;
2019-08-31 22:43:58 +02:00
}
2019-09-01 14:07:04 +02:00
/**
* Reset MTProto sessions.
*/
2022-12-08 20:16:40 +01:00
public function resetSession(): void
2019-09-01 14:07:04 +02:00
{
foreach ($this->connections as $socket) {
$socket->resetSession();
}
}
/**
2019-09-02 17:08:36 +02:00
* Create MTProto sessions if needed.
*/
2022-12-08 20:16:40 +01:00
public function createSession(): void
{
foreach ($this->connections as $socket) {
$socket->createSession();
}
}
2019-09-01 14:07:04 +02:00
/**
* Flush all pending packets.
*/
public function flush(): void
2019-09-01 14:07:04 +02:00
{
2023-06-19 10:15:39 +02:00
if (!isset($this->datacenter)) {
return;
}
2022-12-30 19:21:36 +01:00
$this->API->logger->logger("Flushing pending messages, DC {$this->datacenter}", Logger::NOTICE);
2019-09-01 14:07:04 +02:00
foreach ($this->connections as $socket) {
$socket->flush();
}
}
2019-08-31 22:43:58 +02:00
/**
2019-09-01 01:52:28 +02:00
* Get connection context.
2019-08-31 22:43:58 +02:00
*/
public function getCtx(): ConnectionContext
{
return $this->ctx;
}
2019-12-31 13:12:58 +01:00
/**
* Has connection context?
*/
public function hasCtx(): bool
{
return isset($this->ctx);
}
2019-08-31 22:43:58 +02:00
/**
* Connect function.
*
* @param ConnectionContext $ctx Connection context
* @param int $id Optional connection ID to reconnect
2019-08-31 22:43:58 +02:00
*/
2023-01-03 22:07:58 +01:00
public function connect(ConnectionContext $ctx, int $id = -1): void
2019-08-31 22:43:58 +02:00
{
2020-01-31 19:29:43 +01:00
$this->API->logger->logger("Trying shared connection via {$ctx} ({$id})");
2019-08-31 22:43:58 +02:00
$this->datacenter = $ctx->getDc();
$media = $ctx->isMedia() || $ctx->isCDN();
2023-01-26 15:44:42 +01:00
if ($media) {
2019-09-01 01:52:28 +02:00
if (!$this->robinLoop) {
2023-01-25 16:32:48 +01:00
$this->robinLoop = new PeriodicLoopInternal(
$this->API,
$this->even(...),
"robin loop DC {$this->datacenter}",
$this->API->getSettings()->getConnection()->getRobinPeriod()
);
2019-09-01 01:52:28 +02:00
}
$this->robinLoop->start();
}
$this->decRead = $media ? self::READ_WEIGHT_MEDIA : self::READ_WEIGHT;
$this->decWrite = self::WRITE_WEIGHT;
2019-12-29 13:20:18 +01:00
if ($id === -1 || !isset($this->connections[$id])) {
2019-09-03 19:03:39 +02:00
if ($this->connections) {
2023-01-04 15:13:55 +01:00
$this->API->logger->logger('Already connected!', Logger::WARNING);
2019-09-04 17:48:07 +02:00
return;
2019-09-03 19:03:39 +02:00
}
2023-07-26 18:16:19 +02:00
$f = new DeferredFuture;
$this->connectionsPromise = $f->getFuture();
2023-01-15 18:47:29 +01:00
$this->ctx = $ctx->getCtx();
2023-01-26 15:44:42 +01:00
$this->connectMore(1);
$this->restoreBackup();
2023-01-03 21:51:49 +01:00
$f->complete();
2023-01-08 16:58:44 +01:00
if (isset($this->connectionsDeferred)) {
2019-12-29 14:04:02 +01:00
$connectionsDeferred = $this->connectionsDeferred;
$this->connectionsDeferred = null;
2022-12-30 21:43:58 +01:00
$connectionsDeferred->complete();
2019-12-29 14:04:02 +01:00
}
} else {
2023-01-15 18:47:29 +01:00
$this->ctx = $ctx->getCtx();
2019-09-03 14:40:50 +02:00
$this->availableConnections[$id] = 0;
$this->connections[$id]->connect($ctx);
}
}
/**
2019-09-02 17:08:36 +02:00
* Connect to the DC using count more sockets.
*
* @param integer $count Number of sockets to open
*/
2023-01-03 22:07:58 +01:00
private function connectMore(int $count): void
{
$ctx = $this->ctx->getCtx();
2019-09-02 17:08:36 +02:00
$count += $previousCount = \count($this->connections);
for ($x = $previousCount; $x < $count; $x++) {
2019-10-28 17:08:04 +01:00
$connection = new Connection();
$connection->setExtra($this, $x);
$connection->connect($ctx);
2019-10-28 17:08:04 +01:00
$this->connections[$x] = $connection;
2019-09-01 01:52:28 +02:00
$this->availableConnections[$x] = 0;
2019-08-31 22:43:58 +02:00
$ctx = $this->ctx->getCtx();
}
}
2019-10-28 17:08:04 +01:00
/**
* Signal that a connection ID disconnected.
*
* @param integer $id Connection ID
*/
2022-12-08 20:16:40 +01:00
public function signalDisconnect(int $id): void
2019-10-28 17:08:04 +01:00
{
$backup = $this->connections[$id]->backupSession();
$list = '';
2020-01-03 16:47:57 +01:00
foreach ($backup as $k => $message) {
2022-08-13 16:36:51 +02:00
if ($message->getConstructor() === 'msgs_state_req'
|| $message->getConstructor() === 'ping_delay_disconnect'
|| $message->isUnencrypted()) {
2020-01-03 16:47:57 +01:00
unset($backup[$k]);
continue;
}
$list .= $message->getConstructor();
2019-10-28 17:08:04 +01:00
$list .= ', ';
}
2020-01-31 19:29:43 +01:00
$this->API->logger->logger("Backed up {$list} from DC {$this->datacenter}.{$id}");
2019-10-28 17:08:04 +01:00
$this->backup = \array_merge($this->backup, $backup);
unset($this->connections[$id], $this->availableConnections[$id]);
}
2019-09-01 01:52:28 +02:00
/**
2019-09-01 14:07:04 +02:00
* Close all connections to DC.
2019-09-01 01:52:28 +02:00
*/
2020-02-28 14:14:02 +01:00
public function disconnect(): void
2019-08-31 22:43:58 +02:00
{
2023-01-08 19:02:49 +01:00
$this->connectionsDeferred = new DeferredFuture();
$this->connectionsPromise = $this->connectionsDeferred->getFuture();
2023-01-08 16:41:42 +01:00
if (!isset($this->ctx)) {
return;
}
2019-09-01 01:52:28 +02:00
$this->API->logger->logger("Disconnecting from shared DC {$this->datacenter}");
if ($this->robinLoop) {
2023-01-24 14:28:49 +01:00
$this->robinLoop->stop();
2019-09-01 01:52:28 +02:00
$this->robinLoop = null;
}
2019-09-12 18:56:26 +02:00
$before = \count($this->backup);
2019-09-01 01:52:28 +02:00
foreach ($this->connections as $connection) {
$connection->disconnect();
}
2019-09-12 18:56:26 +02:00
$count = \count($this->backup) - $before;
2020-01-31 19:29:43 +01:00
$this->API->logger->logger("Backed up {$count}, added to {$before} existing messages) from DC {$this->datacenter}");
2019-09-01 01:52:28 +02:00
$this->connections = [];
$this->availableConnections = [];
2019-08-31 22:43:58 +02:00
}
2019-09-01 01:52:28 +02:00
/**
2019-09-01 14:07:04 +02:00
* Reconnect to DC.
2019-09-01 01:52:28 +02:00
*/
2023-01-03 22:07:58 +01:00
public function reconnect(): void
2019-08-31 22:43:58 +02:00
{
2019-09-01 01:52:28 +02:00
$this->API->logger->logger("Reconnecting shared DC {$this->datacenter}");
$this->disconnect();
$this->connect($this->ctx);
2019-08-31 22:43:58 +02:00
}
2019-09-04 17:48:07 +02:00
/**
2019-09-12 18:56:26 +02:00
* Restore backed up messages.
2019-09-04 17:48:07 +02:00
*/
2022-12-08 20:16:40 +01:00
public function restoreBackup(): void
2019-09-04 17:48:07 +02:00
{
$backup = $this->backup;
$this->backup = [];
2019-09-12 18:56:26 +02:00
$count = \count($backup);
2020-01-31 19:29:43 +01:00
$this->API->logger->logger("Restoring {$count} messages to DC {$this->datacenter}");
2023-07-05 21:28:17 +02:00
/** @var MTProtoOutgoingMessage */
2019-09-04 17:48:07 +02:00
foreach ($backup as $message) {
if ($message->hasSeqno()) {
$message->setSeqno(null);
2020-07-12 01:27:26 +02:00
}
if ($message->hasMsgId()) {
$message->setMsgId(null);
2020-07-12 01:27:26 +02:00
}
2023-07-05 21:28:17 +02:00
if (!($message->getState() & MTProtoOutgoingMessage::STATE_REPLIED)) {
EventLoop::queue($this->getConnection()->sendMessage(...), $message, false);
2020-02-07 21:13:49 +01:00
}
2019-09-04 17:48:07 +02:00
}
$this->flush();
}
2019-09-01 23:39:29 +02:00
/**
* Get connection for authorization.
*/
public function getAuthConnection(): Connection
{
return $this->connections[0];
}
/**
* Check if any connection is available.
*
* @param integer $id Connection ID
*/
2023-01-04 15:13:55 +01:00
public function hasConnection(int $id = -1): bool|int
{
return $id < 0 ? \count($this->connections) : isset($this->connections[$id]);
}
2019-12-29 14:04:02 +01:00
/**
* Get best socket in round robin, asynchronously.
*/
2023-01-04 12:37:12 +01:00
public function waitGetConnection(): Connection
2019-12-29 14:04:02 +01:00
{
if (empty($this->availableConnections)) {
2023-01-03 21:51:49 +01:00
$this->connectionsPromise->await();
2019-12-29 14:04:02 +01:00
}
2020-02-05 17:29:48 +01:00
return $this->getConnection();
2019-12-29 14:04:02 +01:00
}
2019-09-01 01:52:28 +02:00
/**
* Get best socket in round robin.
*
* @param integer $id Connection ID, for manual fetching
2019-09-01 01:52:28 +02:00
*/
2019-09-13 18:03:18 +02:00
public function getConnection(int $id = -1): Connection
2019-08-31 22:43:58 +02:00
{
2019-09-13 18:03:18 +02:00
if ($id >= 0) {
return $this->connections[$id];
}
2019-09-02 15:30:29 +02:00
if (\count($this->availableConnections) <= 1) {
2019-09-01 01:52:28 +02:00
return $this->connections[0];
2019-08-31 22:43:58 +02:00
}
$max = \max($this->availableConnections);
2019-09-02 17:08:36 +02:00
$key = \array_search($max, $this->availableConnections);
2019-09-01 01:52:28 +02:00
// Decrease to implement round robin
$this->availableConnections[$key]--;
return $this->connections[$key];
2019-08-31 22:43:58 +02:00
}
2019-09-01 01:52:28 +02:00
/**
2019-09-01 14:07:04 +02:00
* Even out round robin values.
2019-09-01 01:52:28 +02:00
*/
2022-12-08 20:16:40 +01:00
public function even(): void
2019-08-31 22:43:58 +02:00
{
2019-10-28 19:48:59 +01:00
if (!$this->availableConnections) {
return;
}
2019-09-03 19:03:39 +02:00
$min = \min($this->availableConnections);
if ($min < 50) {
foreach ($this->availableConnections as &$count) {
$count += 50;
}
2019-09-04 17:48:07 +02:00
} elseif ($min < 100) {
$max = $this->isMedia() || $this->isCDN() ? $this->API->getSettings()->getConnection()->getMaxMediaSocketCount() : 1;
if (\count($this->availableConnections) < $max) {
$this->connectMore(2);
} else {
foreach ($this->availableConnections as &$value) {
$value += 1000;
}
2019-09-01 01:52:28 +02:00
}
2019-08-31 22:43:58 +02:00
}
}
2019-09-02 14:37:30 +02:00
/**
* Indicate that one of the sockets is busy reading.
*
* @param boolean $reading Whether we're busy reading
* @param int $x Connection ID
*/
2022-12-08 20:16:40 +01:00
public function reading(bool $reading, int $x): void
2019-09-02 14:37:30 +02:00
{
2021-12-14 00:00:12 +01:00
if (!isset($this->availableConnections[$x])) {
return;
}
2019-09-02 14:37:30 +02:00
$this->availableConnections[$x] += $reading ? -$this->decRead : $this->decRead;
}
/**
* Indicate that one of the sockets is busy writing.
*
* @param boolean $writing Whether we're busy writing
* @param int $x Connection ID
*/
2022-12-08 20:16:40 +01:00
public function writing(bool $writing, int $x): void
2019-09-02 14:37:30 +02:00
{
2021-12-14 00:00:12 +01:00
if (!isset($this->availableConnections[$x])) {
return;
}
2019-09-02 14:37:30 +02:00
$this->availableConnections[$x] += $writing ? -$this->decWrite : $this->decWrite;
}
2019-09-01 01:52:28 +02:00
/**
2019-09-01 14:07:04 +02:00
* Set main instance.
2019-09-01 01:52:28 +02:00
*
* @param MTProto $API Main instance
*/
2022-12-30 19:21:36 +01:00
public function setExtra(MTProto $API): void
2019-09-01 01:52:28 +02:00
{
$this->API = $API;
}
/**
2019-09-01 14:07:04 +02:00
* Get main instance.
2019-09-01 01:52:28 +02:00
*/
2022-12-30 19:21:36 +01:00
public function getExtra(): MTProto
2019-09-01 01:52:28 +02:00
{
return $this->API;
}
2019-09-02 14:37:30 +02:00
/**
* Check if is an HTTP connection.
*/
2019-09-02 15:30:29 +02:00
public function isHttp(): bool
2019-09-02 14:37:30 +02:00
{
2023-06-28 15:50:38 +02:00
return \in_array($this->ctx->getStreamName(), [HttpStream::class, HttpsStream::class], true);
2019-09-02 14:37:30 +02:00
}
/**
* Check if is connected directly by IP address.
*/
public function byIPAddress(): bool
{
return !$this->ctx->hasStreamName(WssStream::class) && !$this->ctx->hasStreamName(HttpsStream::class);
}
2019-09-02 15:30:29 +02:00
/**
* Check if is a media connection.
2019-09-02 15:30:29 +02:00
*/
public function isMedia(): bool
{
return $this->ctx->isMedia();
}
/**
* Check if is a CDN connection.
2019-09-02 15:30:29 +02:00
*/
public function isCDN(): bool
{
return $this->ctx->isCDN();
}
2019-09-02 14:37:30 +02:00
/**
* Get DC-specific settings.
*/
public function getSettings(): ConnectionSettings
{
return $this->API->getSettings()->getConnection();
}
/**
* Get global settings.
2019-09-02 14:37:30 +02:00
*/
public function getGenericSettings(): Settings
2019-09-02 14:37:30 +02:00
{
return $this->API->getSettings();
2019-09-02 14:37:30 +02:00
}
2019-09-01 14:07:04 +02:00
/**
* JSON serialize function.
*/
public function jsonSerialize(): array
{
2023-01-15 19:39:01 +01:00
return $this->linkedDc ? ['linkedDc' => $this->linkedDc, 'tempAuthKey' => $this->tempAuthKey] : ['permAuthKey' => $this->permAuthKey, 'tempAuthKey' => $this->tempAuthKey];
2019-09-01 14:07:04 +02:00
}
2019-08-31 22:43:58 +02:00
/**
* Sleep function.
*
* @internal
*/
2022-12-30 19:21:36 +01:00
public function __sleep(): array
2019-08-31 22:43:58 +02:00
{
2023-01-15 19:39:01 +01:00
return $this->linkedDc ? ['linkedDc', 'tempAuthKey'] : ['permAuthKey', 'tempAuthKey'];
}
2019-08-31 22:43:58 +02:00
}