. * * @author Daniil Gentili * @copyright 2016-2023 Daniil Gentili * @license https://opensource.org/licenses/AGPL-3.0 AGPLv3 * @link https://docs.madelineproto.xyz MadelineProto documentation */ // Please keep the above notice the next time you copy my code, or I will sue you :) namespace danog\MadelineProto; use Amp\ByteStream\Pipe; use Amp\ByteStream\ReadableStream; use Amp\Cancellation; use Amp\CancelledException; use Amp\DeferredCancellation; use Amp\DeferredFuture; use danog\Loop\GenericLoop; use danog\Loop\Loop; use danog\MadelineProto\Loop\VoIPLoop; use danog\MadelineProto\MTProtoTools\Crypt; use danog\MadelineProto\VoIP\CallState; use danog\MadelineProto\VoIP\DiscardReason; use danog\MadelineProto\VoIP\Endpoint; use danog\MadelineProto\VoIP\MessageHandler; use danog\MadelineProto\VoIP\VoIPState; use phpseclib3\Math\BigInteger; use Revolt\EventLoop; use SplQueue; use Throwable; use Webmozart\Assert\Assert; use function Amp\delay; /** @internal */ final class VoIPController { const NET_TYPE_UNKNOWN = 0; const NET_TYPE_GPRS = 1; const NET_TYPE_EDGE = 2; const NET_TYPE_3G = 3; const NET_TYPE_HSPA = 4; const NET_TYPE_LTE = 5; const NET_TYPE_WIFI = 6; const NET_TYPE_ETHERNET = 7; const NET_TYPE_OTHER_HIGH_SPEED = 8; const NET_TYPE_OTHER_LOW_SPEED = 9; const NET_TYPE_DIALUP = 10; const NET_TYPE_OTHER_MOBILE = 11; const DATA_SAVING_NEVER = 0; const DATA_SAVING_MOBILE = 1; const DATA_SAVING_ALWAYS = 2; const PROXY_NONE = 0; const PROXY_SOCKS5 = 1; const AUDIO_STATE_NONE = -1; const AUDIO_STATE_CREATED = 0; const AUDIO_STATE_CONFIGURED = 1; const AUDIO_STATE_RUNNING = 2; const PKT_INIT = 1; const PKT_INIT_ACK = 2; const PKT_STREAM_STATE = 3; const PKT_STREAM_DATA = 4; const PKT_UPDATE_STREAMS = 5; const PKT_PING = 6; const PKT_PONG = 7; const PKT_STREAM_DATA_X2 = 8; const PKT_STREAM_DATA_X3 = 9; const PKT_LAN_ENDPOINT = 10; const PKT_NETWORK_CHANGED = 11; const PKT_SWITCH_PREF_RELAY = 12; const PKT_SWITCH_TO_P2P = 13; const PKT_NOP = 14; const TLID_DECRYPTED_AUDIO_BLOCK = "\xc1\xdb\xf9\x48"; const TLID_SIMPLE_AUDIO_BLOCK = "\x0d\x0e\x76\xcc"; const TLID_REFLECTOR_SELF_INFO = "\xC7\x72\x15\xc0"; const TLID_REFLECTOR_PEER_INFO = "\x1C\x37\xD9\x27"; const PROTO_ID = 'GrVP'; const PROTOCOL_VERSION = 9; const MIN_PROTOCOL_VERSION = 9; const STREAM_TYPE_AUDIO = 1; const STREAM_TYPE_VIDEO = 2; const CODEC_OPUS = 'SUPO'; private MessageHandler $messageHandler; private VoIPState $voipState = VoIPState::CREATED; private CallState $callState; private array $call; /** @var array */ private array $holdFiles = []; /** @var list */ private array $inputFiles = []; private int $holdIndex = 0; /** * @var array */ private array $sockets = []; private Endpoint $bestEndpoint; private ?string $pendingPing = null; private ?string $timeoutWatcher = null; private float $lastIncomingTimestamp = 0.0; private float $lastOutgoingTimestamp = 0.0; private int $opusTimestamp = 0; private SplQueue $packetQueue; private ?DeferredFuture $packetDeferred = null; private Cancellation $cancellation; private DeferredCancellation $deferred; private Loop $djLoop; /** Auth key */ private readonly string $authKey; public readonly VoIP $public; /** @var ?list{string, string, string, string} */ private ?array $visualization = null; /** * Constructor. * * @internal */ public function __construct( public readonly MTProto $API, array $call ) { $this->public = new VoIP($API, $call); $call['_'] = 'inputPhoneCall'; $this->packetQueue = new SplQueue(); $this->deferred = new DeferredCancellation; $this->cancellation = $this->deferred->getCancellation(); $this->djLoop = new VoIPLoop($this, $this->djLoop(...), "Play loop"); Assert::true($this->djLoop->start()); $this->call = $call; if ($this->public->outgoing) { $this->callState = CallState::REQUESTED; } else { $this->callState = CallState::INCOMING; } } public function __serialize(): array { $data = \get_object_vars($this); unset($data['cancellation'], $data['deferred'], $data['packetDeferred'], $data['djLoop']); $data['holdFiles'] = \array_filter( $data['holdFiles'], fn ($v) => !$v instanceof ReadableStream ); $data['inputFiles'] = \array_filter( $data['inputFiles'], fn ($v) => !$v instanceof ReadableStream ); return $data; } /** * Wakeup function. */ public function __unserialize(array $data): void { foreach ($data as $key => $value) { $this->{$key} = $value; } $this->deferred = new DeferredCancellation; $this->cancellation = $this->deferred->getCancellation(); $this->djLoop = new GenericLoop($this->djLoop(...), "Play loop"); Assert::true($this->djLoop->start()); if ($this->callState === CallState::RUNNING) { if ($this->voipState === VoIPState::CREATED) { // No endpoints yet return; } $this->lastIncomingTimestamp = \microtime(true); $this->startReadLoop(); if ($this->voipState === VoIPState::WAIT_INIT) { EventLoop::queue($this->sendInits(...)); } elseif ($this->voipState === VoIPState::WAIT_INIT_ACK) { EventLoop::queue($this->sendInits(...)); } elseif ($this->voipState === VoIPState::WAIT_PONG) { $this->pendingPing = EventLoop::repeat(0.2, $this->ping(...)); } elseif ($this->voipState === VoIPState::WAIT_STREAM_INIT) { EventLoop::queue($this->initStream(...)); } elseif ($this->voipState === VoIPState::ESTABLISHED) { $diff = (int) ((\microtime(true) - $this->lastOutgoingTimestamp) * 1000); $this->opusTimestamp += $diff - ($diff % 60); EventLoop::queue($this->startWriteLoop(...)); } } } /** * Confirm requested call. * @internal */ public function confirm(array $params): bool { if ($this->callState !== CallState::REQUESTED) { $this->API->logger->logger(\sprintf(Lang::$current_lang['call_error_2'], $this->public->callID)); return false; } $this->API->logger->logger(\sprintf(Lang::$current_lang['call_confirming'], $this->public->otherID), Logger::VERBOSE); $dh_config = $this->API->getDhConfig(); $params['g_b'] = new BigInteger((string) $params['g_b'], 256); Crypt::checkG($params['g_b'], $dh_config['p']); $key = \str_pad($params['g_b']->powMod($this->call['a'], $dh_config['p'])->toBytes(), 256, \chr(0), STR_PAD_LEFT); try { $res = ($this->API->methodCallAsyncRead('phone.confirmCall', ['key_fingerprint' => \substr(\sha1($key, true), -8), 'peer' => ['id' => $params['id'], 'access_hash' => $params['access_hash'], '_' => 'inputPhoneCall'], 'g_a' => $this->call['g_a'], 'protocol' => ['_' => 'phoneCallProtocol', 'udp_reflector' => true, 'min_layer' => 65, 'max_layer' => 92]]))['phone_call']; } catch (RPCErrorException $e) { if ($e->rpc === 'CALL_ALREADY_ACCEPTED') { $this->API->logger->logger(\sprintf(Lang::$current_lang['call_already_accepted'], $params['id'])); return true; } if ($e->rpc === 'CALL_ALREADY_DECLINED') { $this->API->logger->logger(Lang::$current_lang['call_already_declined']); $this->discard(DiscardReason::HANGUP); return false; } throw $e; } $visualization = []; $length = new BigInteger(\count(Magic::$emojis)); foreach (\str_split(\hash('sha256', $key.\str_pad($this->call['g_a'], 256, \chr(0), STR_PAD_LEFT), true), 8) as $number) { $number[0] = \chr(\ord($number[0]) & 0x7f); $visualization[] = Magic::$emojis[(int) (new BigInteger($number, 256))->divide($length)[1]->toString()]; } $this->visualization = $visualization; $this->authKey = $key; $this->callState = CallState::RUNNING; $this->messageHandler = new MessageHandler( $this, \substr(\hash('sha256', $key, true), -16) ); $this->initialize($res['connections']); return true; } /** * Accept incoming call. */ public function accept(): self { if ($this->callState === CallState::RUNNING || $this->callState === CallState::ENDED) { return $this; } Assert::eq($this->callState->name, CallState::INCOMING->name); $this->API->logger->logger(\sprintf(Lang::$current_lang['accepting_call'], $this->public->otherID), Logger::VERBOSE); $dh_config = $this->API->getDhConfig(); $this->API->logger->logger('Generating b...', Logger::VERBOSE); $b = BigInteger::randomRange(Magic::$two, $dh_config['p']->subtract(Magic::$two)); $g_b = $dh_config['g']->powMod($b, $dh_config['p']); Crypt::checkG($g_b, $dh_config['p']); $this->callState = CallState::ACCEPTED; try { $this->API->methodCallAsyncRead('phone.acceptCall', ['peer' => ['id' => $this->call['id'], 'access_hash' => $this->call['access_hash'], '_' => 'inputPhoneCall'], 'g_b' => $g_b->toBytes(), 'protocol' => ['_' => 'phoneCallProtocol', 'udp_reflector' => true, 'udp_p2p' => true, 'min_layer' => 65, 'max_layer' => 92]]); } catch (RPCErrorException $e) { if ($e->rpc === 'CALL_ALREADY_ACCEPTED') { $this->API->logger->logger(\sprintf(Lang::$current_lang['call_already_accepted'], $this->public->callID)); return $this; } if ($e->rpc === 'CALL_ALREADY_DECLINED') { $this->API->logger->logger(Lang::$current_lang['call_already_declined']); $this->discard(DiscardReason::HANGUP); return $this; } throw $e; } $this->call['b'] = $b; return $this; } /** * Complete call handshake. * * @internal */ public function complete(array $params): bool { if ($this->callState !== CallState::ACCEPTED) { $this->API->logger->logger(\sprintf(Lang::$current_lang['call_error_3'], $params['id'])); return false; } $this->API->logger->logger(\sprintf(Lang::$current_lang['call_completing'], $this->public->otherID), Logger::VERBOSE); $dh_config = $this->API->getDhConfig(); if (\hash('sha256', (string) $params['g_a_or_b'], true) !== (string) $this->call['g_a_hash']) { throw new SecurityException('Invalid g_a!'); } $params['g_a_or_b'] = new BigInteger((string) $params['g_a_or_b'], 256); Crypt::checkG($params['g_a_or_b'], $dh_config['p']); $key = \str_pad($params['g_a_or_b']->powMod($this->call['b'], $dh_config['p'])->toBytes(), 256, \chr(0), STR_PAD_LEFT); if (\substr(\sha1($key, true), -8) != $params['key_fingerprint']) { throw new SecurityException(Lang::$current_lang['fingerprint_invalid']); } $visualization = []; $length = new BigInteger(\count(Magic::$emojis)); foreach (\str_split(\hash('sha256', $key.\str_pad($params['g_a_or_b']->toBytes(), 256, \chr(0), STR_PAD_LEFT), true), 8) as $number) { $number[0] = \chr(\ord($number[0]) & 0x7f); $visualization[] = Magic::$emojis[(int) (new BigInteger($number, 256))->divide($length)[1]->toString()]; } $this->visualization = $visualization; $this->authKey = $key; $this->callState = CallState::RUNNING; $this->messageHandler = new MessageHandler( $this, \substr(\hash('sha256', $key, true), -16) ); $this->initialize($params['connections']); return true; } /** * Get call emojis (will return null if the call is not inited yet). * * @return ?list{string, string, string, string} */ public function getVisualization(): ?array { return $this->visualization; } /** * Discard call. * * @param int<1, 5> $rating Call rating in stars * @param string $comment Additional comment on call quality. */ public function discard(DiscardReason $reason = DiscardReason::HANGUP, ?int $rating = null, ?string $comment = null): self { if ($this->callState === CallState::ENDED) { return $this; } $this->API->cleanupCall($this->public->callID); $this->callState = CallState::ENDED; $this->djLoop->stop(); $this->deferred->cancel(); $this->skip(); $this->API->logger->logger("Now closing $this"); if (isset($this->timeoutWatcher)) { EventLoop::cancel($this->timeoutWatcher); } if (isset($this->pendingPing)) { EventLoop::cancel($this->pendingPing); } $this->API->logger->logger("Closing all sockets in $this"); foreach ($this->sockets as $socket) { $socket->disconnect(); } $this->packetQueue = new SplQueue; $this->packetDeferred?->complete(false); $this->API->logger->logger("Closed all sockets, discarding $this"); $this->API->logger->logger(\sprintf(Lang::$current_lang['call_discarding'], $this->public->callID), Logger::VERBOSE); try { $this->API->methodCallAsyncRead('phone.discardCall', ['peer' => $this->call, 'duration' => \time() - $this->public->date, 'connection_id' => 0, 'reason' => ['_' => match ($reason) { DiscardReason::BUSY => 'phoneCallDiscardReasonBusy', DiscardReason::HANGUP => 'phoneCallDiscardReasonHangup', DiscardReason::DISCONNECTED => 'phoneCallDiscardReasonDisconnect', DiscardReason::MISSED => 'phoneCallDiscardReasonMissed' }]]); } catch (RPCErrorException $e) { if (!\in_array($e->rpc, ['CALL_ALREADY_DECLINED', 'CALL_ALREADY_ACCEPTED'], true)) { throw $e; } } if ($rating !== null) { $this->API->logger->logger(\sprintf('Setting rating for call %s...', $this->call), Logger::VERBOSE); $this->API->methodCallAsyncRead('phone.setCallRating', ['peer' => $this->call, 'rating' => $rating, 'comment' => $comment]); } return $this; } private function setVoipState(VoIPState $state): bool { if ($this->voipState->value >= $state->value) { return false; } $old = $this->voipState; $this->voipState = $state; EventLoop::queue($this->API->logger->logger(...), "Changing state from {$old->name} to {$state->name} in $this"); return true; } /** * Connect to the specified endpoints. */ private function initialize(array $endpoints): void { foreach ($endpoints as $endpoint) { try { $this->sockets['v6 '.$endpoint['id']] = new Endpoint( '['.$endpoint['ipv6'].']', $endpoint['port'], $endpoint['peer_tag'], true, $this->public->outgoing, $this->authKey, $this->messageHandler ); } catch (Throwable) { } try { $this->sockets['v4 '.$endpoint['id']] = new Endpoint( $endpoint['ip'], $endpoint['port'], $endpoint['peer_tag'], true, $this->public->outgoing, $this->authKey, $this->messageHandler ); } catch (Throwable) { } } $this->setVoipState(VoIPState::WAIT_INIT); $this->startReadLoop(); $this->sendInits(); } private function sendInits(): void { foreach ($this->sockets as $socket) { $socket->udpPing(); $socket->write($this->messageHandler->encryptPacket([ '_' => self::PKT_INIT, 'protocol' => self::PROTOCOL_VERSION, 'min_protocol' => self::MIN_PROTOCOL_VERSION, 'audio_streams' => [self::CODEC_OPUS], 'video_streams' => [] ], true)); } } /** * Handle incoming packet. */ private function handlePacket(Endpoint $socket, array $packet): void { switch ($packet['_']) { case self::PKT_INIT: $this->setVoipState(VoIPState::WAIT_INIT_ACK); $socket->write($this->messageHandler->encryptPacket([ '_' => self::PKT_INIT_ACK, 'protocol' => self::PROTOCOL_VERSION, 'min_protocol' => self::MIN_PROTOCOL_VERSION, 'all_streams' => [ ['id' => 0, 'type' => self::STREAM_TYPE_AUDIO, 'codec' => self::CODEC_OPUS, 'frame_duration' => 60, 'enabled' => 1] ] ])); $socket->write($this->messageHandler->encryptPacket([ '_' => self::PKT_INIT, 'protocol' => self::PROTOCOL_VERSION, 'min_protocol' => self::MIN_PROTOCOL_VERSION, 'audio_streams' => [self::CODEC_OPUS], 'video_streams' => [] ])); break; case self::PKT_INIT_ACK: if ($this->setVoipState(VoIPState::WAIT_PONG)) { $this->pendingPing = EventLoop::repeat(0.2, $this->ping(...)); } break; case self::PKT_STREAM_DATA: $cnt = 1; break; case self::PKT_STREAM_DATA_X2: $cnt = 2; break; case self::PKT_STREAM_DATA_X3: $cnt = 3; break; case self::PKT_PING: $socket->write($this->messageHandler->encryptPacket(['_' => self::PKT_PONG, 'out_seq_no' => $packet['out_seq_no']])); break; case self::PKT_PONG: if ($this->setVoipState(VoIPState::WAIT_STREAM_INIT)) { EventLoop::cancel($this->pendingPing); $this->pendingPing = null; $this->bestEndpoint ??= $socket; $this->initStream(); } break; } } private function initStream(): void { $this->bestEndpoint->writeReliably([ '_' => self::PKT_STREAM_STATE, 'id' => 0, 'enabled' => false ]); $this->startWriteLoop(); } private function ping(): void { foreach ($this->sockets as $socket) { EventLoop::queue(fn () => $socket->write($this->messageHandler->encryptPacket(['_' => self::PKT_PING]))); } } private function startReadLoop(): void { foreach ($this->sockets as $socket) { EventLoop::queue(function () use ($socket): void { while (true) { try { $payload = $socket->read(); } catch (Throwable $e) { $this->API->logger->logger("Got $e in this!"); continue; } if (!$payload) { break; } $this->lastIncomingTimestamp = \microtime(true); EventLoop::queue($this->handlePacket(...), $socket, $payload); } $this->API->logger->logger("Exiting VoIP read loop in $this!"); }); } } private function startPlaying(LocalFile|RemoteUrl|ReadableStream $f): void { $it = null; if ($f instanceof LocalFile) { try { $it = new Ogg($f, $this->cancellation); if (!\in_array('MADELINE_ENCODER_V=1', $it->comments, true)) { $it = null; } } catch (CancelledException $e) { throw $e; } catch (Throwable) { $it = null; } } if (!$it) { EventLoop::queue($this->API->logger->logger(...), "Starting conversion fiber..."); $pipe = new Pipe(4096); EventLoop::queue(function () use ($f, $pipe): void { try { Ogg::convert($f, $pipe->getSink(), $this->cancellation); } catch (CancelledException) { } finally { $pipe->getSink()->close(); } }); $it = new Ogg($pipe->getSource()); } foreach ($it->opusPackets as $packet) { $this->packetQueue->enqueue($packet); if ($this->packetDeferred) { $deferred = $this->packetDeferred; $this->packetDeferred = null; $deferred->complete(true); } } } private bool $muted = true; private bool $playingHold = false; private function djLoop(): ?float { if ($this->callState === CallState::ENDED) { $this->API->logger->logger("Exiting DJ loop in $this because the call ended!"); return GenericLoop::STOP; } $this->API->logger->logger("Starting DJ loop in $this!"); $file = \array_shift($this->inputFiles); if (!$file) { $this->API->logger->logger("Pausing DJ loop in $this!"); return GenericLoop::PAUSE; } try { $this->startPlaying($file); } catch (CancelledException) { $this->packetQueue = new SplQueue; if ($this->packetDeferred) { $deferred = $this->packetDeferred; $this->packetDeferred = null; $deferred?->complete(false); } } return GenericLoop::CONTINUE; } private function pullPacket(): ?string { if ($this->packetQueue->isEmpty()) { if ($this->callState === CallState::ENDED) { return null; } if ($this->djLoop->isPaused()) { if (!$this->holdFiles || $this->inputFiles) { return null; } $this->playingHold = true; $this->inputFiles []= $this->holdFiles[($this->holdIndex++) % \count($this->holdFiles)]; Assert::true($this->djLoop->resume()); return null; } $this->packetDeferred ??= new DeferredFuture; if (!$this->packetDeferred->getFuture()->await()) { return null; } } return $this->packetQueue->dequeue(); } /** * Start write loop. */ private function startWriteLoop(): void { $this->setVoipState(VoIPState::ESTABLISHED); $this->timeoutWatcher = EventLoop::repeat(10, function (): void { if (\microtime(true) - $this->lastIncomingTimestamp > 10) { $this->discard(DiscardReason::DISCONNECTED); } }); $delay = $this->muted ? 0.2 : 0.06; $t = \microtime(true) + $delay; while (true) { if ($packet = $this->pullPacket()) { if ($this->muted) { if (!$this->bestEndpoint->writeReliably([ '_' => self::PKT_STREAM_STATE, 'id' => 0, 'enabled' => true ])) { $this->API->logger->logger("Exiting write loop in $this because we could not write stream state!"); return; } $this->muted = false; $delay = 0.06; $this->opusTimestamp = 0; } $packet = $this->messageHandler->encryptPacket([ '_' => self::PKT_STREAM_DATA, 'stream_id' => 0, 'data' => $packet, 'timestamp' => $this->opusTimestamp ]); $this->opusTimestamp += 60; } else { if (!$this->muted) { if (!$this->bestEndpoint->writeReliably([ '_' => self::PKT_STREAM_STATE, 'id' => 0, 'enabled' => false ])) { $this->API->logger->logger("Exiting write loop in $this because we could not write stream state!"); return; } $this->muted = true; $delay = 0.2; } $packet = $this->messageHandler->encryptPacket([ '_' => self::PKT_NOP ]); } //$this->API->logger->logger("Writing {$this->opusTimestamp} in $this!"); $cur = \microtime(true); $diff = $t - $cur; if ($diff > 0) { delay($diff); } if (!$this->bestEndpoint->write($packet)) { $this->API->logger->logger("Exiting write loop in $this!"); return; } if ($diff > 0) { $cur += $diff; } $this->lastOutgoingTimestamp = $cur; $t += $delay; } } /** * Play file. */ public function play(LocalFile|RemoteUrl|ReadableStream $file): self { $this->inputFiles[] = $file; if ($this->playingHold) { $this->playingHold = false; $this->skip(); } $this->djLoop->resume(); return $this; } /** * When called, skips to the next file in the playlist. */ public function skip(): void { $this->packetQueue = new SplQueue; $deferred = $this->deferred; $this->deferred = new DeferredCancellation; $this->cancellation = $this->deferred->getCancellation(); $deferred->cancel(); } /** * Stops playing all files, clears the main and the hold playlist. */ public function stop(): void { $this->inputFiles = []; $this->holdFiles = []; $this->skip(); } /** * Files to play on hold. */ public function playOnHold(LocalFile|RemoteUrl|ReadableStream ...$files): self { $this->holdFiles = $files; return $this; } /** * Get call state. */ public function getCallState(): CallState { return $this->callState; } /** * Get VoIP state. */ public function getVoIPState(): VoIPState { return $this->voipState; } /** * Get call representation. */ public function __toString(): string { return $this->public->__toString(); } }