2022-12-30 21:54:44 +01:00
< ? php
declare ( strict_types = 1 );
2020-01-31 19:29:43 +01:00
2019-08-31 22:43:58 +02:00
/**
* Connection module handling all connections to a datacenter .
*
* This file is part of MadelineProto .
* MadelineProto is free software : you can redistribute it and / or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation , either version 3 of the License , or ( at your option ) any later version .
* MadelineProto is distributed in the hope that it will be useful , but WITHOUT ANY WARRANTY ; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE .
* See the GNU Affero General Public License for more details .
* You should have received a copy of the GNU General Public License along with MadelineProto .
* If not , see < http :// www . gnu . org / licenses />.
*
* @ author Daniil Gentili < daniil @ daniil . it >
2023-01-04 12:43:01 +01:00
* @ copyright 2016 - 2023 Daniil Gentili < daniil @ daniil . it >
2019-08-31 22:43:58 +02:00
* @ license https :// opensource . org / licenses / AGPL - 3.0 AGPLv3
2019-10-31 15:07:35 +01:00
* @ link https :// docs . madelineproto . xyz MadelineProto documentation
2019-08-31 22:43:58 +02:00
*/
namespace danog\MadelineProto ;
2022-12-30 20:24:13 +01:00
use Amp\DeferredFuture ;
2023-01-08 16:06:50 +01:00
use Amp\Future ;
2022-08-13 16:36:51 +02:00
use Amp\Sync\LocalMutex ;
2020-07-28 20:39:32 +02:00
use danog\MadelineProto\Loop\Generic\PeriodicLoopInternal ;
2020-10-18 14:46:34 +02:00
use danog\MadelineProto\MTProto\OutgoingMessage ;
2019-09-02 15:30:29 +02:00
use danog\MadelineProto\MTProto\PermAuthKey ;
use danog\MadelineProto\MTProto\TempAuthKey ;
2022-08-13 16:36:51 +02:00
use danog\MadelineProto\MTProtoTools\Crypt ;
2020-09-22 11:48:12 +02:00
use danog\MadelineProto\Settings\Connection as ConnectionSettings ;
2019-08-31 22:43:58 +02:00
use danog\MadelineProto\Stream\ConnectionContext ;
2019-09-02 14:37:30 +02:00
use danog\MadelineProto\Stream\MTProtoTransport\HttpsStream ;
use danog\MadelineProto\Stream\MTProtoTransport\HttpStream ;
2019-09-17 21:35:53 +02:00
use danog\MadelineProto\Stream\Transport\WssStream ;
2019-09-01 14:07:04 +02:00
use JsonSerializable ;
2023-01-21 21:21:35 +01:00
use Revolt\EventLoop ;
2019-08-31 22:43:58 +02:00
2022-12-30 19:21:36 +01:00
use function count ;
2020-10-01 20:48:22 +02:00
/**
2020-10-02 16:13:19 +02:00
* Datacenter connection .
2020-10-01 20:48:22 +02:00
*/
2023-01-15 12:05:38 +01:00
final class DataCenterConnection implements JsonSerializable
2019-08-31 22:43:58 +02:00
{
2019-09-02 16:54:36 +02:00
const READ_WEIGHT = 1 ;
const READ_WEIGHT_MEDIA = 5 ;
const WRITE_WEIGHT = 10 ;
2019-12-29 14:04:02 +01:00
/**
* Promise for connection .
*
*/
2023-01-08 16:06:50 +01:00
private Future $connectionsPromise ;
2019-12-29 14:04:02 +01:00
/**
* Deferred for connection .
*
*/
2023-01-08 18:12:58 +01:00
private ? DeferredFuture $connectionsDeferred = null ;
2019-08-31 22:43:58 +02:00
/**
* Temporary auth key .
*
*/
2023-01-04 15:13:55 +01:00
private ? TempAuthKey $tempAuthKey = null ;
2019-08-31 22:43:58 +02:00
/**
* Permanent auth key .
*
*/
2023-01-04 15:13:55 +01:00
private ? PermAuthKey $permAuthKey = null ;
2019-08-31 22:43:58 +02:00
/**
2019-09-01 01:52:28 +02:00
* Connections open to a certain DC .
2019-08-31 22:43:58 +02:00
*
2020-10-01 20:48:22 +02:00
* @ var array < int , Connection >
2019-08-31 22:43:58 +02:00
*/
2023-01-04 15:13:55 +01:00
private array $connections = [];
2019-09-01 01:52:28 +02:00
/**
2019-09-01 14:07:04 +02:00
* Connection weights .
2019-09-01 01:52:28 +02:00
*
2020-10-01 20:48:22 +02:00
* @ var array < int , int >
2019-09-01 01:52:28 +02:00
*/
2023-01-04 15:13:55 +01:00
private array $availableConnections = [];
2019-08-31 22:43:58 +02:00
/**
2019-09-01 01:52:28 +02:00
* Main API instance .
2019-08-31 22:43:58 +02:00
*
*/
2023-01-04 15:13:55 +01:00
private MTProto $API ;
2019-08-31 22:43:58 +02:00
/**
2019-09-01 01:52:28 +02:00
* Connection context .
2019-08-31 22:43:58 +02:00
*
*/
2023-01-04 15:13:55 +01:00
private ConnectionContext $ctx ;
2019-08-31 22:43:58 +02:00
/**
2019-09-01 01:52:28 +02:00
* DC ID .
2019-08-31 22:43:58 +02:00
*/
2023-01-08 16:23:18 +01:00
private int $datacenter ;
2019-09-01 01:52:28 +02:00
/**
2019-09-01 23:39:29 +02:00
* Linked DC ID .
2019-09-01 01:52:28 +02:00
*
*/
2023-01-15 19:39:01 +01:00
private ? int $linkedDc = null ;
2019-09-01 01:52:28 +02:00
/**
2019-09-01 14:07:04 +02:00
* Loop to keep weights at sane value .
2019-09-01 01:52:28 +02:00
*/
2020-07-28 20:39:32 +02:00
private ? PeriodicLoopInternal $robinLoop = null ;
2019-09-02 14:37:30 +02:00
/**
* Decrement roundrobin weight by this value if busy reading .
*
*/
2023-01-04 15:13:55 +01:00
private int $decRead = 1 ;
2019-09-02 14:37:30 +02:00
/**
* Decrement roundrobin weight by this value if busy writing .
*
*/
2023-01-04 15:13:55 +01:00
private int $decWrite = 10 ;
2019-09-04 17:48:07 +02:00
/**
2019-09-12 18:56:26 +02:00
* Backed up messages .
2019-09-04 17:48:07 +02:00
*
*/
2023-01-04 15:13:55 +01:00
private array $backup = [];
2019-09-17 21:35:53 +02:00
/**
* Whether this socket has to be reconnected .
*
*/
2023-01-04 15:13:55 +01:00
private bool $needsReconnect = false ;
2019-09-17 21:35:53 +02:00
/**
* Indicate if this socket needs to be reconnected .
*
* @ param boolean $needsReconnect Whether the socket has to be reconnected
*/
2022-12-08 20:16:40 +01:00
public function needReconnect ( bool $needsReconnect ) : void
2019-09-17 21:35:53 +02:00
{
$this -> needsReconnect = $needsReconnect ;
}
/**
* Whether this sockets needs to be reconnected .
*/
public function shouldReconnect () : bool
{
return $this -> needsReconnect ;
}
2022-08-13 16:36:51 +02:00
private ? LocalMutex $initingAuth = null ;
/**
* Init auth keys for single DC .
*
* @ internal
*/
2023-01-03 22:07:58 +01:00
public function initAuthorization () : void
2022-08-13 16:36:51 +02:00
{
$logger = $this -> API -> logger ;
$this -> initingAuth ? ? = new LocalMutex ;
2022-12-30 22:31:20 +01:00
$lock = $this -> initingAuth -> acquire ();
2022-08-13 16:36:51 +02:00
try {
$logger -> logger ( " Initing auth for DC { $this -> datacenter } " , Logger :: NOTICE );
2022-12-30 22:31:20 +01:00
$this -> waitGetConnection ();
2022-08-13 16:36:51 +02:00
$connection = $this -> getAuthConnection ();
$this -> createSession ();
$cdn = $this -> isCDN ();
$media = $this -> isMedia ();
2022-10-07 21:02:40 +02:00
$pfs = $this -> API -> settings -> getAuth () -> getPfs ();
2022-08-13 16:36:51 +02:00
if ( ! $this -> hasTempAuthKey () || ! $this -> hasPermAuthKey () || ! $this -> isBound ()) {
if ( ! $this -> hasPermAuthKey () && ! $cdn && ! $media ) {
2022-12-30 19:21:36 +01:00
$logger -> logger ( \sprintf ( Lang :: $current_lang [ 'gen_perm_auth_key' ], $this -> datacenter ), Logger :: NOTICE );
2022-12-30 22:31:20 +01:00
$this -> setPermAuthKey ( $connection -> createAuthKey ( false ));
2022-08-13 16:36:51 +02:00
}
if ( $media ) {
2023-01-11 18:47:27 +01:00
$this -> link ( - $this -> datacenter );
2022-08-13 16:36:51 +02:00
if ( $this -> hasTempAuthKey ()) {
return ;
}
}
2022-10-07 21:02:40 +02:00
if ( $pfs ) {
2022-08-13 16:36:51 +02:00
if ( ! $cdn ) {
2022-12-30 19:21:36 +01:00
$logger -> logger ( \sprintf ( Lang :: $current_lang [ 'gen_temp_auth_key' ], $this -> datacenter ), Logger :: NOTICE );
2022-08-13 16:36:51 +02:00
$this -> setTempAuthKey ( null );
2022-12-30 22:31:20 +01:00
$this -> setTempAuthKey ( $connection -> createAuthKey ( true ));
$this -> bindTempAuthKey ();
$connection -> methodCallAsyncRead ( 'help.getConfig' , []);
$this -> syncAuthorization ();
2022-08-13 16:36:51 +02:00
} elseif ( ! $this -> hasTempAuthKey ()) {
2022-12-30 19:21:36 +01:00
$logger -> logger ( \sprintf ( Lang :: $current_lang [ 'gen_temp_auth_key' ], $this -> datacenter ), Logger :: NOTICE );
2022-12-30 22:31:20 +01:00
$this -> setTempAuthKey ( $connection -> createAuthKey ( true ));
2022-08-13 16:36:51 +02:00
}
} else {
if ( ! $cdn ) {
$this -> bind ( false );
2022-12-30 22:31:20 +01:00
$connection -> methodCallAsyncRead ( 'help.getConfig' , []);
$this -> syncAuthorization ();
2022-08-13 16:36:51 +02:00
} elseif ( ! $this -> hasTempAuthKey ()) {
2022-12-30 19:21:36 +01:00
$logger -> logger ( \sprintf ( Lang :: $current_lang [ 'gen_temp_auth_key' ], $this -> datacenter ), Logger :: NOTICE );
2022-12-30 22:31:20 +01:00
$this -> setTempAuthKey ( $connection -> createAuthKey ( true ));
2022-08-13 16:36:51 +02:00
}
}
2022-10-07 21:02:40 +02:00
$this -> flush ();
2022-08-13 16:36:51 +02:00
} elseif ( ! $cdn ) {
2022-12-30 22:31:20 +01:00
$this -> syncAuthorization ();
2022-08-13 16:36:51 +02:00
}
} finally {
$lock -> release ();
}
2022-08-15 17:43:32 +02:00
if ( $this -> hasTempAuthKey ()) {
$connection -> pingHttpWaiter ();
}
2022-08-13 16:36:51 +02:00
}
/**
* Bind temporary and permanent auth keys .
*
* @ internal
*/
2023-01-04 12:37:12 +01:00
public function bindTempAuthKey () : bool
2022-08-13 16:36:51 +02:00
{
$connection = $this -> getAuthConnection ();
$logger = $this -> API -> logger ;
$expires_in = $this -> API -> settings -> getAuth () -> getDefaultTempAuthKeyExpiresIn ();
for ( $retry_id_total = 1 ; $retry_id_total <= $this -> API -> settings -> getAuth () -> getMaxAuthTries (); $retry_id_total ++ ) {
try {
2022-12-30 19:21:36 +01:00
$logger -> logger ( 'Binding authorization keys...' , Logger :: VERBOSE );
$nonce = Tools :: random ( 8 );
2022-08-13 16:36:51 +02:00
$expires_at = \time () + $expires_in ;
$temp_auth_key_id = $this -> getTempAuthKey () -> getID ();
$perm_auth_key_id = $this -> getPermAuthKey () -> getID ();
$temp_session_id = $connection -> session_id ;
2022-12-30 22:31:20 +01:00
$message_data = ( $this -> API -> getTL () -> serializeObject ([ 'type' => '' ], [ '_' => 'bind_auth_key_inner' , 'nonce' => $nonce , 'temp_auth_key_id' => $temp_auth_key_id , 'perm_auth_key_id' => $perm_auth_key_id , 'temp_session_id' => $temp_session_id , 'expires_at' => $expires_at ], 'bindTempAuthKey_inner' ));
2022-08-13 16:36:51 +02:00
$message_id = $connection -> msgIdHandler -> generateMessageId ();
$seq_no = 0 ;
2022-12-30 19:21:36 +01:00
$encrypted_data = Tools :: random ( 16 ) . $message_id . \pack ( 'VV' , $seq_no , \strlen ( $message_data )) . $message_data ;
2022-08-13 16:36:51 +02:00
$message_key = \substr ( \sha1 ( $encrypted_data , true ), - 16 );
2022-12-30 19:21:36 +01:00
$padding = Tools :: random ( Tools :: posmod ( - \strlen ( $encrypted_data ), 16 ));
[ $aes_key , $aes_iv ] = Crypt :: oldAesCalculate ( $message_key , $this -> getPermAuthKey () -> getAuthKey ());
2022-08-13 16:36:51 +02:00
$encrypted_message = $this -> getPermAuthKey () -> getID () . $message_key . Crypt :: igeEncrypt ( $encrypted_data . $padding , $aes_key , $aes_iv );
2022-12-30 22:31:20 +01:00
$res = $connection -> methodCallAsyncRead ( 'auth.bindTempAuthKey' , [ 'perm_auth_key_id' => $perm_auth_key_id , 'nonce' => $nonce , 'expires_at' => $expires_at , 'encrypted_message' => $encrypted_message ], [ 'msg_id' => $message_id ]);
2022-08-13 16:36:51 +02:00
if ( $res === true ) {
2022-12-30 19:21:36 +01:00
$logger -> logger ( " Bound temporary and permanent authorization keys, DC { $this -> datacenter } " , Logger :: NOTICE );
2022-08-13 16:36:51 +02:00
$this -> bind ();
return true ;
}
2022-12-30 19:21:36 +01:00
} catch ( SecurityException $e ) {
$logger -> logger ( 'An exception occurred while generating the authorization key: ' . $e -> getMessage () . ' Retrying (try number ' . $retry_id_total . ')...' , Logger :: WARNING );
} catch ( Exception $e ) {
$logger -> logger ( 'An exception occurred while generating the authorization key: ' . $e -> getMessage () . ' Retrying (try number ' . $retry_id_total . ')...' , Logger :: WARNING );
} catch ( RPCErrorException $e ) {
$logger -> logger ( 'An RPCErrorException occurred while generating the authorization key: ' . $e -> getMessage () . ' Retrying (try number ' . $retry_id_total . ')...' , Logger :: WARNING );
2022-08-13 16:36:51 +02:00
}
}
2022-12-30 19:21:36 +01:00
throw new SecurityException ( 'An error occurred while binding temporary and permanent authorization keys.' );
2022-08-13 16:36:51 +02:00
}
/**
* Sync authorization data between DCs .
*/
2023-01-03 22:07:58 +01:00
private function syncAuthorization () : void
2022-08-13 16:36:51 +02:00
{
$socket = $this -> getAuthConnection ();
$logger = $this -> API -> logger ;
if ( $this -> API -> authorized === MTProto :: LOGGED_IN && ! $this -> isAuthorized ()) {
foreach ( $this -> API -> datacenter -> getDataCenterConnections () as $authorized_dc_id => $authorized_socket ) {
if ( $this -> API -> authorized_dc !== - 1 && $authorized_dc_id !== $this -> API -> authorized_dc ) {
continue ;
}
2022-08-14 14:40:52 +02:00
if ( $authorized_socket -> hasTempAuthKey ()
&& $authorized_socket -> hasPermAuthKey ()
&& $authorized_socket -> isAuthorized ()
&& $this -> API -> authorized === MTProto :: LOGGED_IN
&& ! $this -> isAuthorized ()
2022-08-13 16:36:51 +02:00
&& ! $authorized_socket -> isCDN ()
) {
try {
$logger -> logger ( 'Trying to copy authorization from DC ' . $authorized_dc_id . ' to DC ' . $this -> datacenter );
2023-01-08 16:23:18 +01:00
$exported_authorization = $this -> API -> methodCallAsyncRead ( 'auth.exportAuthorization' , [ 'dc_id' => $this -> datacenter ], [ 'datacenter' => $authorized_dc_id ]);
$socket -> methodCallAsyncRead ( 'auth.importAuthorization' , $exported_authorization );
2022-08-13 16:36:51 +02:00
$this -> authorized ( true );
break ;
2022-12-30 19:21:36 +01:00
} catch ( Exception $e ) {
$logger -> logger ( 'Failure while syncing authorization from DC ' . $authorized_dc_id . ' to DC ' . $this -> datacenter . ': ' . $e -> getMessage (), Logger :: ERROR );
} catch ( RPCErrorException $e ) {
$logger -> logger ( 'Failure while syncing authorization from DC ' . $authorized_dc_id . ' to DC ' . $this -> datacenter . ': ' . $e -> getMessage (), Logger :: ERROR );
2022-08-13 16:36:51 +02:00
if ( $e -> rpc === 'DC_ID_INVALID' ) {
break ;
}
}
// Turns out this DC isn't authorized after all
}
}
}
}
2019-09-01 23:39:29 +02:00
/**
* Get temporary authorization key .
*/
public function getTempAuthKey () : TempAuthKey
{
2023-01-11 18:47:27 +01:00
if ( ! $this -> tempAuthKey ) {
throw new NothingInTheSocketException ();
}
return $this -> tempAuthKey ;
2019-09-01 23:39:29 +02:00
}
/**
* Get permanent authorization key .
*/
public function getPermAuthKey () : PermAuthKey
{
2023-01-11 18:47:27 +01:00
if ( ! $this -> permAuthKey ) {
throw new NothingInTheSocketException ();
}
return $this -> permAuthKey ;
2019-09-01 23:39:29 +02:00
}
/**
* Check if has temporary authorization key .
*/
public function hasTempAuthKey () : bool
{
2023-01-11 18:47:27 +01:00
return $this -> tempAuthKey !== null && $this -> tempAuthKey -> hasAuthKey ();
2019-09-01 23:39:29 +02:00
}
/**
* Check if has permanent authorization key .
*/
public function hasPermAuthKey () : bool
{
2023-01-11 18:47:27 +01:00
return $this -> permAuthKey !== null && $this -> permAuthKey -> hasAuthKey ();
2019-09-01 23:39:29 +02:00
}
/**
* Set temporary authorization key .
*
* @ param TempAuthKey | null $key Auth key
*/
2020-10-03 12:36:08 +02:00
public function setTempAuthKey ( ? TempAuthKey $key ) : void
2019-09-01 23:39:29 +02:00
{
2023-01-11 18:47:27 +01:00
$this -> tempAuthKey = $key ;
2019-09-01 23:39:29 +02:00
}
/**
* Set permanent authorization key .
*
* @ param PermAuthKey | null $key Auth key
*/
2020-10-03 12:36:08 +02:00
public function setPermAuthKey ( ? PermAuthKey $key ) : void
2019-09-01 23:39:29 +02:00
{
2023-01-11 18:47:27 +01:00
$this -> permAuthKey = $key ;
2019-09-01 23:39:29 +02:00
}
/**
* Bind temporary and permanent auth keys .
*
* @ param bool $pfs Whether to bind using PFS
*/
2022-12-08 20:16:40 +01:00
public function bind ( bool $pfs = true ) : void
2019-09-01 23:39:29 +02:00
{
2019-09-03 14:40:50 +02:00
if ( ! $pfs && ! $this -> tempAuthKey ) {
$this -> tempAuthKey = new TempAuthKey ();
}
2019-09-02 16:54:36 +02:00
$this -> tempAuthKey -> bind ( $this -> permAuthKey , $pfs );
2019-09-01 23:39:29 +02:00
}
2019-09-04 17:48:07 +02:00
/**
* Check if auth keys are bound .
*/
public function isBound () : bool
{
return $this -> tempAuthKey ? $this -> tempAuthKey -> isBound () : false ;
}
2019-08-31 22:43:58 +02:00
/**
* Check if we are logged in .
*/
public function isAuthorized () : bool
{
2019-09-01 23:39:29 +02:00
return $this -> hasTempAuthKey () ? $this -> getTempAuthKey () -> isAuthorized () : false ;
2019-08-31 22:43:58 +02:00
}
/**
* Set the authorized boolean .
*
* @ param boolean $authorized Whether we are authorized
*/
2022-12-08 20:16:40 +01:00
public function authorized ( bool $authorized ) : void
2019-08-31 22:43:58 +02:00
{
2019-09-02 15:30:29 +02:00
if ( $authorized ) {
$this -> getTempAuthKey () -> authorized ( $authorized );
2019-09-02 16:54:36 +02:00
} elseif ( $this -> hasTempAuthKey ()) {
2019-09-02 15:30:29 +02:00
$this -> getTempAuthKey () -> authorized ( $authorized );
}
2019-09-01 23:39:29 +02:00
}
/**
* Link permanent authorization info of main DC to media DC .
*
2023-01-08 16:23:18 +01:00
* @ param int $dc Main DC ID
2019-09-01 23:39:29 +02:00
*/
2023-01-08 16:23:18 +01:00
public function link ( int $dc ) : void
2019-09-01 23:39:29 +02:00
{
2023-01-15 19:39:01 +01:00
$this -> linkedDc = $dc ;
2020-01-31 19:29:43 +01:00
$this -> permAuthKey =& $this -> API -> datacenter -> getDataCenterConnection ( $dc ) -> permAuthKey ;
2019-08-31 22:43:58 +02:00
}
2019-09-01 14:07:04 +02:00
/**
* Reset MTProto sessions .
*/
2022-12-08 20:16:40 +01:00
public function resetSession () : void
2019-09-01 14:07:04 +02:00
{
foreach ( $this -> connections as $socket ) {
$socket -> resetSession ();
}
}
2019-09-02 16:54:36 +02:00
/**
2019-09-02 17:08:36 +02:00
* Create MTProto sessions if needed .
2019-09-02 16:54:36 +02:00
*/
2022-12-08 20:16:40 +01:00
public function createSession () : void
2019-09-02 16:54:36 +02:00
{
foreach ( $this -> connections as $socket ) {
$socket -> createSession ();
}
}
2019-09-01 14:07:04 +02:00
/**
* Flush all pending packets .
*/
2020-10-18 14:46:34 +02:00
public function flush () : void
2019-09-01 14:07:04 +02:00
{
2022-12-30 19:21:36 +01:00
$this -> API -> logger -> logger ( " Flushing pending messages, DC { $this -> datacenter } " , Logger :: NOTICE );
2019-09-01 14:07:04 +02:00
foreach ( $this -> connections as $socket ) {
$socket -> flush ();
}
}
2019-08-31 22:43:58 +02:00
/**
2019-09-01 01:52:28 +02:00
* Get connection context .
2019-08-31 22:43:58 +02:00
*/
public function getCtx () : ConnectionContext
{
return $this -> ctx ;
}
2019-12-31 13:12:58 +01:00
/**
* Has connection context ?
*/
public function hasCtx () : bool
{
return isset ( $this -> ctx );
}
2019-08-31 22:43:58 +02:00
/**
* Connect function .
*
* @ param ConnectionContext $ctx Connection context
2019-09-02 16:54:36 +02:00
* @ param int $id Optional connection ID to reconnect
2019-08-31 22:43:58 +02:00
*/
2023-01-03 22:07:58 +01:00
public function connect ( ConnectionContext $ctx , int $id = - 1 ) : void
2019-08-31 22:43:58 +02:00
{
2020-01-31 19:29:43 +01:00
$this -> API -> logger -> logger ( " Trying shared connection via { $ctx } ( { $id } ) " );
2019-08-31 22:43:58 +02:00
$this -> datacenter = $ctx -> getDc ();
2019-09-02 16:54:36 +02:00
$media = $ctx -> isMedia () || $ctx -> isCDN ();
2023-01-26 15:44:42 +01:00
if ( $media ) {
2019-09-01 01:52:28 +02:00
if ( ! $this -> robinLoop ) {
2023-01-25 16:32:48 +01:00
$this -> robinLoop = new PeriodicLoopInternal (
$this -> API ,
$this -> even ( ... ),
" robin loop DC { $this -> datacenter } " ,
$this -> API -> getSettings () -> getConnection () -> getRobinPeriod ()
);
2019-09-01 01:52:28 +02:00
}
$this -> robinLoop -> start ();
}
2019-09-02 16:54:36 +02:00
$this -> decRead = $media ? self :: READ_WEIGHT_MEDIA : self :: READ_WEIGHT ;
$this -> decWrite = self :: WRITE_WEIGHT ;
2019-12-29 13:20:18 +01:00
if ( $id === - 1 || ! isset ( $this -> connections [ $id ])) {
2019-09-03 19:03:39 +02:00
if ( $this -> connections ) {
2023-01-04 15:13:55 +01:00
$this -> API -> logger -> logger ( 'Already connected!' , Logger :: WARNING );
2019-09-04 17:48:07 +02:00
return ;
2019-09-03 19:03:39 +02:00
}
2023-01-15 18:47:29 +01:00
$this -> ctx = $ctx -> getCtx ();
2023-01-26 15:44:42 +01:00
$this -> connectMore ( 1 );
2022-12-30 22:31:20 +01:00
$this -> restoreBackup ();
2023-01-03 21:51:49 +01:00
$f = new DeferredFuture ;
$f -> complete ();
$this -> connectionsPromise = $f -> getFuture ();
2023-01-08 16:58:44 +01:00
if ( isset ( $this -> connectionsDeferred )) {
2019-12-29 14:04:02 +01:00
$connectionsDeferred = $this -> connectionsDeferred ;
$this -> connectionsDeferred = null ;
2022-12-30 21:43:58 +01:00
$connectionsDeferred -> complete ();
2019-12-29 14:04:02 +01:00
}
2019-09-02 16:54:36 +02:00
} else {
2023-01-15 18:47:29 +01:00
$this -> ctx = $ctx -> getCtx ();
2019-09-03 14:40:50 +02:00
$this -> availableConnections [ $id ] = 0 ;
2022-12-30 22:31:20 +01:00
$this -> connections [ $id ] -> connect ( $ctx );
2019-09-02 16:54:36 +02:00
}
}
/**
2019-09-02 17:08:36 +02:00
* Connect to the DC using count more sockets .
2019-09-02 16:54:36 +02:00
*
* @ param integer $count Number of sockets to open
*/
2023-01-03 22:07:58 +01:00
private function connectMore ( int $count ) : void
2019-09-02 16:54:36 +02:00
{
$ctx = $this -> ctx -> getCtx ();
2019-09-02 17:08:36 +02:00
$count += $previousCount = \count ( $this -> connections );
2019-09-02 16:54:36 +02:00
for ( $x = $previousCount ; $x < $count ; $x ++ ) {
2019-10-28 17:08:04 +01:00
$connection = new Connection ();
$connection -> setExtra ( $this , $x );
2022-12-30 22:31:20 +01:00
$connection -> connect ( $ctx );
2019-10-28 17:08:04 +01:00
$this -> connections [ $x ] = $connection ;
2019-09-01 01:52:28 +02:00
$this -> availableConnections [ $x ] = 0 ;
2019-08-31 22:43:58 +02:00
$ctx = $this -> ctx -> getCtx ();
}
}
2019-10-28 17:08:04 +01:00
/**
* Signal that a connection ID disconnected .
*
* @ param integer $id Connection ID
*/
2022-12-08 20:16:40 +01:00
public function signalDisconnect ( int $id ) : void
2019-10-28 17:08:04 +01:00
{
$backup = $this -> connections [ $id ] -> backupSession ();
$list = '' ;
2020-01-03 16:47:57 +01:00
foreach ( $backup as $k => $message ) {
2022-08-13 16:36:51 +02:00
if ( $message -> getConstructor () === 'msgs_state_req'
|| $message -> getConstructor () === 'ping_delay_disconnect'
|| $message -> isUnencrypted ()) {
2020-01-03 16:47:57 +01:00
unset ( $backup [ $k ]);
continue ;
}
2020-10-18 14:46:34 +02:00
$list .= $message -> getConstructor ();
2019-10-28 17:08:04 +01:00
$list .= ', ' ;
}
2020-01-31 19:29:43 +01:00
$this -> API -> logger -> logger ( " Backed up { $list } from DC { $this -> datacenter } . { $id } " );
2019-10-28 17:08:04 +01:00
$this -> backup = \array_merge ( $this -> backup , $backup );
unset ( $this -> connections [ $id ], $this -> availableConnections [ $id ]);
}
2019-09-01 01:52:28 +02:00
/**
2019-09-01 14:07:04 +02:00
* Close all connections to DC .
2019-09-01 01:52:28 +02:00
*/
2020-02-28 14:14:02 +01:00
public function disconnect () : void
2019-08-31 22:43:58 +02:00
{
2023-01-08 19:02:49 +01:00
$this -> connectionsDeferred = new DeferredFuture ();
$this -> connectionsPromise = $this -> connectionsDeferred -> getFuture ();
2023-01-08 16:41:42 +01:00
if ( ! isset ( $this -> ctx )) {
return ;
}
2019-09-01 01:52:28 +02:00
$this -> API -> logger -> logger ( " Disconnecting from shared DC { $this -> datacenter } " );
if ( $this -> robinLoop ) {
2023-01-24 14:28:49 +01:00
$this -> robinLoop -> stop ();
2019-09-01 01:52:28 +02:00
$this -> robinLoop = null ;
}
2019-09-12 18:56:26 +02:00
$before = \count ( $this -> backup );
2019-09-01 01:52:28 +02:00
foreach ( $this -> connections as $connection ) {
$connection -> disconnect ();
}
2019-09-12 18:56:26 +02:00
$count = \count ( $this -> backup ) - $before ;
2020-01-31 19:29:43 +01:00
$this -> API -> logger -> logger ( " Backed up { $count } , added to { $before } existing messages) from DC { $this -> datacenter } " );
2019-09-01 01:52:28 +02:00
$this -> connections = [];
$this -> availableConnections = [];
2019-08-31 22:43:58 +02:00
}
2019-09-01 01:52:28 +02:00
/**
2019-09-01 14:07:04 +02:00
* Reconnect to DC .
2019-09-01 01:52:28 +02:00
*/
2023-01-03 22:07:58 +01:00
public function reconnect () : void
2019-08-31 22:43:58 +02:00
{
2019-09-01 01:52:28 +02:00
$this -> API -> logger -> logger ( " Reconnecting shared DC { $this -> datacenter } " );
$this -> disconnect ();
2022-12-30 22:31:20 +01:00
$this -> connect ( $this -> ctx );
2019-08-31 22:43:58 +02:00
}
2019-09-04 17:48:07 +02:00
/**
2019-09-12 18:56:26 +02:00
* Restore backed up messages .
2019-09-04 17:48:07 +02:00
*/
2022-12-08 20:16:40 +01:00
public function restoreBackup () : void
2019-09-04 17:48:07 +02:00
{
$backup = $this -> backup ;
$this -> backup = [];
2019-09-12 18:56:26 +02:00
$count = \count ( $backup );
2020-01-31 19:29:43 +01:00
$this -> API -> logger -> logger ( " Restoring { $count } messages to DC { $this -> datacenter } " );
2020-10-18 14:46:34 +02:00
/** @var OutgoingMessage */
2019-09-04 17:48:07 +02:00
foreach ( $backup as $message ) {
2020-10-18 14:46:34 +02:00
if ( $message -> hasSeqno ()) {
$message -> setSeqno ( null );
2020-07-12 01:27:26 +02:00
}
2020-10-18 14:46:34 +02:00
if ( $message -> hasMsgId ()) {
$message -> setMsgId ( null );
2020-07-12 01:27:26 +02:00
}
2020-10-18 14:46:34 +02:00
if ( ! ( $message -> getState () & OutgoingMessage :: STATE_REPLIED )) {
2023-01-21 21:21:35 +01:00
EventLoop :: queue ( $this -> getConnection () -> sendMessage ( ... ), $message , false );
2020-02-07 21:13:49 +01:00
}
2019-09-04 17:48:07 +02:00
}
$this -> flush ();
}
2019-09-01 23:39:29 +02:00
/**
* Get connection for authorization .
*/
public function getAuthConnection () : Connection
{
return $this -> connections [ 0 ];
}
2019-09-17 21:35:53 +02:00
/**
* Check if any connection is available .
*
* @ param integer $id Connection ID
*/
2023-01-04 15:13:55 +01:00
public function hasConnection ( int $id = - 1 ) : bool | int
2019-09-17 21:35:53 +02:00
{
return $id < 0 ? \count ( $this -> connections ) : isset ( $this -> connections [ $id ]);
}
2019-12-29 14:04:02 +01:00
/**
* Get best socket in round robin , asynchronously .
*/
2023-01-04 12:37:12 +01:00
public function waitGetConnection () : Connection
2019-12-29 14:04:02 +01:00
{
if ( empty ( $this -> availableConnections )) {
2023-01-03 21:51:49 +01:00
$this -> connectionsPromise -> await ();
2019-12-29 14:04:02 +01:00
}
2020-02-05 17:29:48 +01:00
return $this -> getConnection ();
2019-12-29 14:04:02 +01:00
}
2019-09-01 01:52:28 +02:00
/**
* Get best socket in round robin .
*
2019-09-17 21:35:53 +02:00
* @ param integer $id Connection ID , for manual fetching
2019-09-01 01:52:28 +02:00
*/
2019-09-13 18:03:18 +02:00
public function getConnection ( int $id = - 1 ) : Connection
2019-08-31 22:43:58 +02:00
{
2019-09-13 18:03:18 +02:00
if ( $id >= 0 ) {
return $this -> connections [ $id ];
}
2019-09-02 15:30:29 +02:00
if ( \count ( $this -> availableConnections ) <= 1 ) {
2019-09-01 01:52:28 +02:00
return $this -> connections [ 0 ];
2019-08-31 22:43:58 +02:00
}
2019-09-02 16:54:36 +02:00
$max = \max ( $this -> availableConnections );
2019-09-02 17:08:36 +02:00
$key = \array_search ( $max , $this -> availableConnections );
2019-09-01 01:52:28 +02:00
// Decrease to implement round robin
$this -> availableConnections [ $key ] -- ;
return $this -> connections [ $key ];
2019-08-31 22:43:58 +02:00
}
2019-09-01 01:52:28 +02:00
/**
2019-09-01 14:07:04 +02:00
* Even out round robin values .
2019-09-01 01:52:28 +02:00
*/
2022-12-08 20:16:40 +01:00
public function even () : void
2019-08-31 22:43:58 +02:00
{
2019-10-28 19:48:59 +01:00
if ( ! $this -> availableConnections ) {
return ;
}
2019-09-03 19:03:39 +02:00
$min = \min ( $this -> availableConnections );
if ( $min < 50 ) {
foreach ( $this -> availableConnections as & $count ) {
$count += 50 ;
}
2019-09-04 17:48:07 +02:00
} elseif ( $min < 100 ) {
2020-09-22 11:48:12 +02:00
$max = $this -> isMedia () || $this -> isCDN () ? $this -> API -> getSettings () -> getConnection () -> getMaxMediaSocketCount () : 1 ;
2019-09-02 16:54:36 +02:00
if ( \count ( $this -> availableConnections ) < $max ) {
$this -> connectMore ( 2 );
} else {
foreach ( $this -> availableConnections as & $value ) {
$value += 1000 ;
}
2019-09-01 01:52:28 +02:00
}
2019-08-31 22:43:58 +02:00
}
}
2019-09-02 14:37:30 +02:00
/**
* Indicate that one of the sockets is busy reading .
*
* @ param boolean $reading Whether we ' re busy reading
* @ param int $x Connection ID
*/
2022-12-08 20:16:40 +01:00
public function reading ( bool $reading , int $x ) : void
2019-09-02 14:37:30 +02:00
{
2021-12-14 00:00:12 +01:00
if ( ! isset ( $this -> availableConnections [ $x ])) {
return ;
}
2019-09-02 14:37:30 +02:00
$this -> availableConnections [ $x ] += $reading ? - $this -> decRead : $this -> decRead ;
}
/**
* Indicate that one of the sockets is busy writing .
*
* @ param boolean $writing Whether we ' re busy writing
* @ param int $x Connection ID
*/
2022-12-08 20:16:40 +01:00
public function writing ( bool $writing , int $x ) : void
2019-09-02 14:37:30 +02:00
{
2021-12-14 00:00:12 +01:00
if ( ! isset ( $this -> availableConnections [ $x ])) {
return ;
}
2019-09-02 14:37:30 +02:00
$this -> availableConnections [ $x ] += $writing ? - $this -> decWrite : $this -> decWrite ;
}
2019-09-01 01:52:28 +02:00
/**
2019-09-01 14:07:04 +02:00
* Set main instance .
2019-09-01 01:52:28 +02:00
*
* @ param MTProto $API Main instance
*/
2022-12-30 19:21:36 +01:00
public function setExtra ( MTProto $API ) : void
2019-09-01 01:52:28 +02:00
{
$this -> API = $API ;
}
/**
2019-09-01 14:07:04 +02:00
* Get main instance .
2019-09-01 01:52:28 +02:00
*/
2022-12-30 19:21:36 +01:00
public function getExtra () : MTProto
2019-09-01 01:52:28 +02:00
{
return $this -> API ;
}
2019-09-02 14:37:30 +02:00
/**
* Check if is an HTTP connection .
*/
2019-09-02 15:30:29 +02:00
public function isHttp () : bool
2019-09-02 14:37:30 +02:00
{
2020-09-22 11:48:12 +02:00
return \in_array ( $this -> ctx -> getStreamName (), [ HttpStream :: class , HttpsStream :: class ]);
2019-09-02 14:37:30 +02:00
}
2019-09-17 21:35:53 +02:00
/**
* Check if is connected directly by IP address .
*/
public function byIPAddress () : bool
{
2020-09-22 11:48:12 +02:00
return ! $this -> ctx -> hasStreamName ( WssStream :: class ) && ! $this -> ctx -> hasStreamName ( HttpsStream :: class );
2019-09-17 21:35:53 +02:00
}
2019-09-02 15:30:29 +02:00
/**
2019-09-02 16:54:36 +02:00
* Check if is a media connection .
2019-09-02 15:30:29 +02:00
*/
public function isMedia () : bool
{
return $this -> ctx -> isMedia ();
}
/**
2019-09-02 16:54:36 +02:00
* Check if is a CDN connection .
2019-09-02 15:30:29 +02:00
*/
public function isCDN () : bool
{
return $this -> ctx -> isCDN ();
}
2019-09-02 14:37:30 +02:00
/**
2019-09-02 16:54:36 +02:00
* Get DC - specific settings .
2020-09-22 11:48:12 +02:00
*/
public function getSettings () : ConnectionSettings
{
return $this -> API -> getSettings () -> getConnection ();
}
/**
* Get global settings .
2019-09-02 14:37:30 +02:00
*/
2020-09-22 11:48:12 +02:00
public function getGenericSettings () : Settings
2019-09-02 14:37:30 +02:00
{
2020-09-22 11:48:12 +02:00
return $this -> API -> getSettings ();
2019-09-02 14:37:30 +02:00
}
2019-09-01 14:07:04 +02:00
/**
* JSON serialize function .
*/
public function jsonSerialize () : array
{
2023-01-15 19:39:01 +01:00
return $this -> linkedDc ? [ 'linkedDc' => $this -> linkedDc , 'tempAuthKey' => $this -> tempAuthKey ] : [ 'permAuthKey' => $this -> permAuthKey , 'tempAuthKey' => $this -> tempAuthKey ];
2019-09-01 14:07:04 +02:00
}
2019-08-31 22:43:58 +02:00
/**
* Sleep function .
*
* @ internal
*/
2022-12-30 19:21:36 +01:00
public function __sleep () : array
2019-08-31 22:43:58 +02:00
{
2023-01-15 19:39:01 +01:00
return $this -> linkedDc ? [ 'linkedDc' , 'tempAuthKey' ] : [ 'permAuthKey' , 'tempAuthKey' ];
}
public function __wakeup () : void
{
2023-01-15 20:13:47 +01:00
if ( isset ( $this -> linked ) && \is_int ( $this -> linked )) {
2023-01-15 19:39:01 +01:00
$this -> linkedDc = $this -> linked ;
}
2019-08-31 22:43:58 +02:00
}
}