1
0
mirror of https://github.com/danog/MadelineProto.git synced 2024-12-02 14:17:45 +01:00

Refactoring

This commit is contained in:
Daniil Gentili 2023-01-04 12:12:44 +01:00
parent a4739410b3
commit 288ac4c93a
21 changed files with 91 additions and 226 deletions

View File

@ -39,7 +39,7 @@ use function Amp\delay;
use function Amp\File\exists;
use function Amp\File\touch as touchAsync;
use function Amp\Future\await;
use function Amp\Future\awaitAll;
use function Amp\Future\await;
use function Amp\Future\awaitAny;
use function Amp\Future\awaitFirst;
@ -103,7 +103,7 @@ abstract class AsyncTools extends StrTools
foreach ($promises as &$promise) {
$promise = self::call($promise);
}
return awaitAll($promises);
return await($promises);
}
/**
* Returns a promise that succeeds when the first promise succeeds, and fails only if all promises fail.

View File

@ -5,9 +5,11 @@ declare(strict_types=1);
namespace danog\MadelineProto\Db;
use danog\MadelineProto\MTProto;
use danog\MadelineProto\Tools;
use LogicException;
use function Amp\async;
use function Amp\Future\await;
/**
* Include this trait and call DbPropertiesTrait::initDb to use MadelineProto's database backend for properties.
*
@ -39,10 +41,10 @@ trait DbPropertiesTrait
unset($this->{$property});
} else {
$table = "{$prefix}_{$property}";
$promises[$property] = DbPropertiesFactory::get($dbSettings, $table, $type, $this->{$property});
$promises[$property] = async(DbPropertiesFactory::get(...), $dbSettings, $table, $type, $this->{$property});
}
}
$promises = Tools::all($promises);
$promises = await($promises);
foreach ($promises as $key => $data) {
$this->{$key} = $data;
}

View File

@ -108,13 +108,13 @@ use danog\MadelineProto\Tools;
$API->initSelfRestart();
while (true) {
try {
Tools::wait($session->storeIpcState(new IpcState($runnerId)));
Tools::wait(Server::waitShutdown());
$session->storeIpcState(new IpcState($runnerId));
Server::waitShutdown();
Shutdown::removeCallback('restarter');
return;
} catch (Throwable $e) {
Logger::log((string) $e, Logger::FATAL_ERROR);
Tools::wait($API->report("Surfaced: $e"));
$API->report("Surfaced: $e");
}
}
} catch (Throwable $e) {

View File

@ -24,7 +24,6 @@ use Amp\DeferredFuture;
use Amp\Future;
use Amp\Ipc\IpcServer;
use Amp\Ipc\Sync\ChannelledSocket;
use Amp\Success;
use danog\Loop\SignalLoop;
use danog\MadelineProto\Exception;
use danog\MadelineProto\Ipc\Runner\ProcessRunner;
@ -160,13 +159,13 @@ class Server extends SignalLoop
/**
* Wait for shutdown.
*/
public static function waitShutdown(): Future
public static function waitShutdown(): void
{
if (self::$shutdownNow) {
return new Success;
return;
}
self::$shutdownDeferred ??= new DeferredFuture;
return self::$shutdownDeferred->getFuture();
self::$shutdownDeferred->getFuture()->await();
}
/**
* Shutdown.

View File

@ -1,24 +0,0 @@
<?php
declare(strict_types=1);
namespace danog\MadelineProto\Ipc\Wrapper;
use Amp\ByteStream\InputStream as AmpInputStream;
use Amp\ByteStream\PendingReadError;
use danog\MadelineProto\Tools;
class InputStream extends Obj implements AmpInputStream
{
/**
* Reads data from the stream.
*
* @return Promise Resolves with a string when new data is available or `null` if the stream has closed.
* @psalm-return Promise<string|null>
* @throws PendingReadError Thrown if another read operation is still pending.
*/
public function read(): ?string
{
return Tools::call($this->__call('read'));
}
}

View File

@ -1,42 +0,0 @@
<?php
declare(strict_types=1);
namespace danog\MadelineProto\Ipc\Wrapper;
use Amp\ByteStream\ClosedException;
use Amp\ByteStream\OutputStream as AmpOutputStream;
use Amp\ByteStream\StreamException;
use Amp\Future;
use danog\MadelineProto\Tools;
class OutputStream extends Obj implements AmpOutputStream
{
/**
* Writes data to the stream.
*
* @param string $data Bytes to write.
* @return Promise Succeeds once the data has been successfully written to the stream.
* @throws ClosedException If the stream has already been closed.
* @throws StreamException If writing to the stream fails.
*/
public function write(string $data): Future
{
return Tools::call($this->__call('write', [$data]));
}
/**
* Marks the stream as no longer writable. Optionally writes a final data chunk before. Note that this is not the
* same as forcefully closing the stream. This method waits for all pending writes to complete before closing the
* stream. Socket streams implementing this interface should only close the writable side of the stream.
*
* @param string $finalData Bytes to write.
* @return Promise Succeeds once the data has been successfully written to the stream.
* @throws ClosedException If the stream has already been closed.
* @throws StreamException If writing to the stream fails.
*/
public function end(string $finalData = ""): Future
{
return Tools::call($this->__call('write', [$finalData]));
}
}

View File

@ -1,28 +0,0 @@
<?php
declare(strict_types=1);
namespace danog\MadelineProto\Ipc\Wrapper;
use danog\MadelineProto\Tools;
use const SEEK_SET;
trait SeekableTrait
{
/**
* Set the handle's internal pointer position.
*
* $whence values:
*
* SEEK_SET - Set position equal to offset bytes.
* SEEK_CUR - Set position to current location plus offset.
* SEEK_END - Set position to end-of-file plus offset.
*
* @return int New offset position.
*/
public function seek(int $position, int $whence = SEEK_SET): int
{
return Tools::call($this->__call('seek', [$position, $whence]));
}
}

View File

@ -358,7 +358,7 @@ class Logger
return;
}
if (Magic::$suspendPeriodicLogging) {
Magic::$suspendPeriodicLogging->getFuture()->onResolve(fn () => $this->logger($param, $level, $file));
Magic::$suspendPeriodicLogging->getFuture()->complete(fn () => $this->logger($param, $level, $file));
return;
}
if (!self::$printed) {

View File

@ -70,9 +70,11 @@ use Psr\Log\LoggerInterface;
use Throwable;
use const DEBUG_BACKTRACE_IGNORE_ARGS;
use function Amp\async;
use function Amp\File\getSize;
use function Amp\File\touch as touchAsync;
use function Amp\Future\await;
use function time;
/**
@ -1029,15 +1031,15 @@ class MTProto implements TLCallback
$db = [];
if (!isset($this->referenceDatabase)) {
$this->referenceDatabase = new ReferenceDatabase($this);
$db []= $this->referenceDatabase->init();
$db []= async($this->referenceDatabase->init(...));
} else {
$db []= $this->referenceDatabase->init();
$db []= async($this->referenceDatabase->init(...));
}
if (!isset($this->minDatabase)) {
$this->minDatabase = new MinDatabase($this);
$db []= $this->minDatabase->init();
$db []= async($this->minDatabase->init(...));
} else {
$db []= $this->minDatabase->init();
$db []= async($this->minDatabase->init(...));
}
if (!isset($this->TL)) {
$this->TL = new TL($this);
@ -1049,7 +1051,7 @@ class MTProto implements TLCallback
}
$db []= $this->initDb($this);
Tools::all($db);
await($db);
$this->fillUsernamesCache();
if (!$this->settings->getDb()->getEnableFullPeerDb()) {
@ -1523,16 +1525,16 @@ class MTProto implements TLCallback
$this->datacenter->__construct($this, $this->dcList, $this->settings->getConnection(), $reconnectAll);
$dcs = [];
foreach ($this->datacenter->getDcs() as $new_dc) {
$dcs[] = $this->datacenter->dcConnect($new_dc);
$dcs[] = async($this->datacenter->dcConnect(...), $new_dc);
}
Tools::all($dcs);
await($dcs);
$this->initAuthorization();
$this->parseConfig();
$dcs = [];
foreach ($this->datacenter->getDcs(false) as $new_dc) {
$dcs[] = $this->datacenter->dcConnect($new_dc);
$dcs[] = async($this->datacenter->dcConnect(...), $new_dc);
}
Tools::all($dcs);
await($dcs);
$this->initAuthorization();
$this->parseConfig();
$this->getPhoneConfig();

View File

@ -20,8 +20,10 @@ declare(strict_types=1);
namespace danog\MadelineProto\MTProto;
use Amp\DeferredFuture;
use danog\MadelineProto\Tools;
use Amp\Future;
use function Amp\async;
use function Amp\Future\await;
/**
* Incoming message.
@ -74,9 +76,9 @@ class IncomingMessage extends Message
/**
* DB side effects to be resolved before using the content.
*
* @var Promise[]
* @var list<Future>
*/
private $sideEffects = [];
private array $sideEffects = [];
/**
* Constructor.
@ -186,7 +188,7 @@ class IncomingMessage extends Message
/**
* Set DB side effects to be resolved before using the content.
*
* @param Promise[] $sideEffects DB side effects to be resolved before using the content
* @param list<Future> $sideEffects DB side effects to be resolved before using the content
*/
public function setSideEffects(array $sideEffects): self
{
@ -199,41 +201,20 @@ class IncomingMessage extends Message
* Get DB side effects to be resolved before using the specified content.
*
* @template T
* @param T $return Return value
* @psalm-return ?Promise<T>
* @param T $return
* @return ?Future<T>
*/
public function getSideEffects($return): ?Promise
public function getSideEffects(mixed $return): ?Future
{
if (!$this->sideEffects) {
return null;
}
$deferred = new DeferredFuture;
$result = $deferred->getFuture();
$pending = \count($this->sideEffects);
foreach ($this->sideEffects as $promise) {
$promise = Tools::call($promise);
$promise->onResolve(function ($exception, $value) use (&$deferred, &$pending, $return): void {
if ($pending === 0) {
return;
}
if ($exception) {
$pending = 0;
$deferred->fail($exception);
$deferred = null;
return;
}
if (0 === --$pending) {
$deferred->complete($return);
}
});
}
$sideEffects = $this->sideEffects;
$this->sideEffects = [];
return $result;
return async(static function () use ($sideEffects, $return) {
await($sideEffects);
return $return;
});
}
/**

View File

@ -189,7 +189,6 @@ class OutgoingMessage extends Message
/**
* Set reply to message.
*
* @param Promise|mixed $result
*/
public function reply($result): void
{

View File

@ -28,7 +28,7 @@ use danog\MadelineProto\Tools;
use Generator;
use function Amp\async;
use function Amp\Future\awaitAll;
use function Amp\Future\await;
/**
* Manages method and object calls.
@ -101,7 +101,7 @@ trait CallHandler
return null;
}
if (\is_array($readDeferred)) {
return awaitAll(\array_map(fn (DeferredFuture $value) => $value->getFuture(), $readDeferred));
return await(\array_map(fn (DeferredFuture $value) => $value->getFuture(), $readDeferred));
}
return $readDeferred->getFuture()->await();
}
@ -152,7 +152,7 @@ trait CallHandler
if (!isset($aargs['postpone'])) {
$this->writer->resume();
}
return awaitAll($promises);
return await($promises);
}
$args = $this->API->botAPIToMTProto($args);
if (isset($args['ping_id']) && \is_int($args['ping_id'])) {

View File

@ -20,9 +20,6 @@ declare(strict_types=1);
namespace danog\MadelineProto\MTProtoSession;
use Amp\DeferredFuture;
use Amp\DeferredFuture;
use Amp\Failure;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Loop\Update\UpdateLoop;
@ -31,7 +28,6 @@ use danog\MadelineProto\MTProto\IncomingMessage;
use danog\MadelineProto\MTProto\OutgoingMessage;
use danog\MadelineProto\PTSException;
use danog\MadelineProto\RPCErrorException;
use danog\MadelineProto\Tools;
use phpseclib3\Math\BigInteger;
use Revolt\EventLoop;
use Throwable;
@ -259,22 +255,13 @@ trait ResponseHandler
if ($side = $message->getSideEffects($response)) {
if ($botAPI) {
$deferred = new DeferredFuture;
$promise = $deferred->getFuture();
$side->onResolve(function (?Throwable $error, $result) use ($deferred): void {
if ($error) {
$deferred->fail($error);
return;
}
$deferred->complete(Tools::call($this->API->MTProtoToBotAPI($result)));
});
$request->reply($promise);
async(fn () => $request->reply($this->API->MTProtoToBotAPI($side->await())));
} else {
$request->reply($side);
async(fn () => $request->reply($side->await()));
}
} else {
if ($botAPI) {
$request->reply(Tools::call($this->API->MTProtoToBotAPI($response)));
async(fn () => $request->reply($this->API->MTProtoToBotAPI($response)));
} else {
$request->reply($response);
}

View File

@ -7,10 +7,12 @@ namespace danog\MadelineProto\MTProtoTools;
use Amp\Sync\LocalMutex;
use danog\MadelineProto\DataCenter;
use danog\MadelineProto\Logger;
use danog\MadelineProto\Tools;
use Generator;
use phpseclib3\Math\BigInteger;
use function Amp\async;
use function Amp\Future\await;
/**
* @property DataCenter $datacenter
*/
@ -36,17 +38,16 @@ trait AuthKeyHandler
continue;
}
if ($socket->isMedia()) {
$media []= [$socket, 'initAuthorization'];
$media []= $socket->initAuthorization(...);
} else {
$main []= [$socket, 'initAuthorization'];
$main []= $socket->initAuthorization(...);
}
}
if ($main) {
$first = \array_shift($main)();
$first;
\array_shift($main)();
}
Tools::all(\array_map(fn ($cb) => $cb(), $main));
Tools::all(\array_map(fn ($cb) => $cb(), $media));
await(\array_map(async(...), $main));
await(\array_map(async(...), $media));
} finally {
$lock->release();
$this->logger("Done initing authorization!");

View File

@ -49,7 +49,7 @@ use function Amp\File\getSize;
use function Amp\File\openFile;
use function Amp\File\touch as touchAsync;
use function Amp\Future\all;
use function Amp\Future\await;
use function end;
/**
@ -199,7 +199,7 @@ trait Files
++$part_num;
if (!($part_num % $parallel_chunks)) {
// By default, 10 mb at a time, for a typical bandwidth of 1gbps (run the code in this every second)
Tools::all($promises);
await($promises);
$promises = [];
if ($exception) {
throw $exception;
@ -344,7 +344,7 @@ trait Files
$cb = [$bridge, 'callback'];
$read = $this->uploadFromCallable($reader, $size, $mime, '', $cb, true, $encrypted);
$write = $this->downloadToCallable($media, $writer, null, true, 0, -1, $chunk_size);
[$res] = Tools::all([$read, $write]);
[$res] = await([$read, $write]);
return $res;
}
@ -900,8 +900,8 @@ trait Files
$promises = [];
foreach ($params as $key => $param) {
$param['previous_promise'] = $previous_promise;
$previous_promise = Tools::call($this->downloadPart($messageMedia, $cdn, $datacenter, $old_dc, $ige, $cb, $param, $callable, $seekable));
$previous_promise->onResolve(static function ($e, $res) use (&$size): void {
$previous_promise = async($this->downloadPart(...), $messageMedia, $cdn, $datacenter, $old_dc, $ige, $cb, $param, $callable, $seekable);
$previous_promise->map(static function ($e, $res) use (&$size): void {
if ($res) {
$size += $res;
}
@ -909,7 +909,7 @@ trait Files
$promises[] = $previous_promise;
if (!($key % $parallel_chunks)) {
// 20 mb at a time, for a typical bandwidth of 1gbps
$res = Tools::all($promises);
$res = await($promises);
$promises = [];
foreach ($res as $r) {
if (!$r) {
@ -923,7 +923,7 @@ trait Files
}
}
if ($promises) {
Tools::all($promises);
await($promises);
}
}
$time = \microtime(true) - $start;

View File

@ -204,7 +204,7 @@ trait FilesLogic
$emit($payload);
return \strlen($payload);
};
Tools::call($this->downloadToCallable($messageMedia, $emit, $cb, false, ...$result->getServeRange()));
$this->downloadToCallable($messageMedia, $emit, $cb, false, ...$result->getServeRange());
},
),
);

View File

@ -38,8 +38,11 @@ use Webmozart\Assert\Assert;
use const danog\Decoder\PHOTOSIZE_SOURCE_DIALOGPHOTO_BIG;
use const danog\Decoder\PHOTOSIZE_SOURCE_DIALOGPHOTO_SMALL;
use const danog\Decoder\PROFILE_PHOTO;
use const SORT_NUMERIC;
use function Amp\async;
use function Amp\Future\await;
/**
* Manages peers.
@ -982,17 +985,17 @@ trait PeerHandler
$filters = ['channelParticipantsAdmins', 'channelParticipantsBots'];
$promises = [];
foreach ($filters as $filter) {
$promises []= $this->fetchParticipants($full['InputChannel'], $filter, '', $total_count, $res);
$promises []= async($this->fetchParticipants(...), $full['InputChannel'], $filter, '', $total_count, $res);
}
Tools::all($promises);
await($promises);
$q = '';
$filters = ['channelParticipantsSearch', 'channelParticipantsKicked', 'channelParticipantsBanned'];
$promises = [];
foreach ($filters as $filter) {
$promises []= $this->recurseAlphabetSearchParticipants($full['InputChannel'], $filter, $q, $total_count, $res, 0);
$promises []= async($this->recurseAlphabetSearchParticipants(...), $full['InputChannel'], $filter, $q, $total_count, $res, 0);
}
Tools::all($promises);
await($promises);
$this->logger->logger('Fetched '.\count($res['participants'])." out of {$total_count}");
$res['participants'] = \array_values($res['participants']);
@ -1033,19 +1036,19 @@ trait PeerHandler
}
$promises = [];
for ($x = 'a'; $x !== 'aa' && $total_count > \count($res['participants']); $x++) {
$promises []= $this->recurseAlphabetSearchParticipants($channel, $filter, $q.$x, $total_count, $res, $depth + 1);
$promises []= async($this->recurseAlphabetSearchParticipants(...), $channel, $filter, $q.$x, $total_count, $res, $depth + 1);
}
if ($depth > 2) {
return $promises;
}
$yielded = [...Tools::all($promises)];
$yielded = [...await($promises)];
while ($yielded) {
$newYielded = [];
foreach (\array_chunk($yielded, 10) as $promises) {
$newYielded = \array_merge($newYielded, ...(Tools::all($promises)));
$newYielded = \array_merge($newYielded, ...(await($promises)));
}
$yielded = $newYielded;
@ -1082,7 +1085,7 @@ trait PeerHandler
}
$promises = [];
foreach ($gres['participants'] as $participant) {
$promises []= Tools::call((function () use (&$res, $participant): void {
$promises []= async(function () use (&$res, $participant): void {
$newres = [];
$newres['user'] = ($this->getPwrChat($participant['user_id'] ?? $participant['peer'], false));
if (isset($participant['inviter_id'])) {
@ -1124,9 +1127,9 @@ trait PeerHandler
break;
}
$res['participants'][$participant['user_id'] ?? $this->getId($participant['peer'])] = $newres;
})());
});
}
Tools::all($promises);
await($promises);
$this->logger->logger('Fetched '.\count($gres['participants'])." channel participants with filter {$filter}, query {$q}, offset {$offset}, limit {$limit}, hash {$hash}: ".($cached ? 'cached' : 'not cached').', '.($offset + \count($gres['participants'])).' participants out of '.$gres['count'].', in total fetched '.\count($res['participants']).' out of '.$total_count);
$offset += \count($gres['participants']);
} while (\count($gres['participants']));

View File

@ -224,10 +224,8 @@ class Magic
public static $signaled = false;
/**
* Whether to suspend certain stdout log printing, when reading input.
*
* @var ?Deferred
*/
public static $suspendPeriodicLogging;
public static ?DeferredFuture $suspendPeriodicLogging = null;
/**
* All mime types.
*

View File

@ -26,7 +26,7 @@ use danog\MadelineProto\MTProtoTools\Crypt;
use danog\MadelineProto\SecurityException;
use danog\MadelineProto\Tools;
use function Amp\Future\awaitAll;
use function Amp\Future\await;
/**
* Manages packing and unpacking of messages, and the list of sent and received messages.
@ -141,7 +141,7 @@ trait MessageHandler
}
[$deserialized, $sideEffects] = $this->TL->deserialize($message_data, ['type' => '']);
if ($sideEffects) {
awaitAll($sideEffects);
await($sideEffects);
}
$this->secret_chats[$message['message']['chat_id']]['ttr']--;
if (($this->secret_chats[$message['message']['chat_id']]['ttr'] <= 0 || \time() - $this->secret_chats[$message['message']['chat_id']]['updated'] > 7 * 24 * 60 * 60) && $this->secret_chats[$message['message']['chat_id']]['rekeying'][0] === 0) {

View File

@ -302,11 +302,11 @@ class VoIP
}
$this->callState = self::CALL_STATE_ACCEPTED;
Tools::call($this->MadelineProto->acceptCall($this->callID))->onResolve(function ($e, $res): void {
if ($e || !$res) {
$this->discard(['_' => 'phoneCallDiscardReasonDisconnect']);
}
});
$res = $this->MadelineProto->acceptCall($this->callID);
if (!$res) {
$this->discard(['_' => 'phoneCallDiscardReasonDisconnect']);
}
return $this;
}

View File

@ -28,7 +28,6 @@ use danog\MadelineProto\Magic;
use danog\MadelineProto\MTProtoTools\Crypt;
use danog\MadelineProto\RPCErrorException;
use danog\MadelineProto\SecurityException;
use danog\MadelineProto\Tools;
use danog\MadelineProto\VoIP;
use phpseclib3\Math\BigInteger;
@ -52,17 +51,11 @@ trait AuthKeyHandler
*/
public function acceptCallFrom(VoIP $instance, array $user)
{
$promise = Tools::call((function () use ($instance, $user) {
if (!$res = $this->acceptCall($user)) {
$instance->discard();
return false;
}
return $instance;
})());
if ($this->wrapper && $this->wrapper->isAsync()) {
return $promise;
if (!$res = $this->acceptCall($user)) {
$instance->discard();
return false;
}
return Tools::wait($promise);
return $instance;
}
/**
* Undocumented function.
@ -76,16 +69,10 @@ trait AuthKeyHandler
*/
public function discardCallFrom(VoIP $instance, array $call, array $reason, array $rating = [], bool $need_debug = true)
{
$promise = Tools::call(function () use ($instance, $call, $reason, $rating, $need_debug) {
if (!$res = $this->discardCall($call, $reason, $rating, $need_debug)) {
return false;
}
return $instance;
});
if ($this->wrapper && $this->wrapper->isAsync()) {
return $promise;
if (!$res = $this->discardCall($call, $reason, $rating, $need_debug)) {
return false;
}
return Tools::wait($promise);
return $instance;
}
/**
* Request VoIP call.