1
0
mirror of https://github.com/danog/MadelineProto.git synced 2024-11-30 06:39:01 +01:00

Multiple improvements

This commit is contained in:
Daniil Gentili 2023-09-05 17:51:32 +02:00
parent 0fbf22f956
commit f4bc2b51f5
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
5 changed files with 84 additions and 59 deletions

View File

@ -34,7 +34,6 @@ use danog\MadelineProto\MTProto\MTProtoOutgoingMessage;
use danog\MadelineProto\MTProtoSession\Session;
use danog\MadelineProto\Stream\BufferedStreamInterface;
use danog\MadelineProto\Stream\ConnectionContext;
use danog\MadelineProto\Stream\ContextIterator;
use danog\MadelineProto\Stream\MTProtoBufferInterface;
use danog\MadelineProto\TL\Conversion\Extension;
use Webmozart\Assert\Assert;
@ -91,7 +90,6 @@ final class Connection
* Connection context.
*/
private ?ConnectionContext $chosenCtx = null;
private ?ContextIterator $ctxs = null;
/**
* HTTP request count.
*
@ -239,14 +237,14 @@ final class Connection
*/
public function isMedia(): bool
{
return $this->ctxs->isMedia();
return DataCenter::isMedia($this->datacenter);
}
/**
* Check if is a CDN connection.
*/
public function isCDN(): bool
{
return $this->ctxs->isCDN();
return $this->API->isCDN($this->datacenter);
}
private ?LocalMutex $connectMutex = null;
/**
@ -264,7 +262,7 @@ final class Connection
return $this;
}
$this->createSession();
foreach ($this->ctxs as $ctx) {
foreach ($this->shared->getCtxs() as $ctx) {
$this->API->logger->logger("Connecting to DC {$this->datacenterId} via $ctx ", Logger::WARNING);
try {
$this->stream = $ctx->getStream();
@ -518,14 +516,13 @@ final class Connection
* @param DataCenterConnection $extra Shared instance
* @param int $id Connection ID
*/
public function setExtra(DataCenterConnection $extra, int $id, ContextIterator $ctx): void
public function setExtra(DataCenterConnection $extra, int $datacenter, int $id): void
{
$this->shared = $extra;
$this->id = $id;
$this->API = $extra->getExtra();
$this->logger = $this->API->logger;
$this->ctxs = $ctx;
$this->datacenter = $ctx->getDc();
$this->datacenter = $datacenter;
$this->datacenterId = $this->datacenter . '.' . $this->id;
}
/**

View File

@ -22,7 +22,6 @@ namespace danog\MadelineProto;
use Amp\Dns\DnsResolver;
use Amp\Http\Client\HttpClient;
use Amp\Http\Client\Request;
use Amp\Socket\ConnectContext;
use Amp\Socket\InternetAddress;
use Amp\Socket\InternetAddressVersion;
@ -95,6 +94,13 @@ final class DataCenter
unset($this->dohWrapper);
}
$this->dohWrapper ??= new DoHWrapper($API);
if ($this->getSettings()->hasChanged()) {
foreach ($this->sockets as $dc => $socket) {
$socket->setExtra($this->API, $dc, $this->generateContexts($dc));
$socket->reconnect();
}
$this->getSettings()->applyChanges();
}
}
public function __sleep()
@ -113,12 +119,6 @@ final class DataCenter
{
return $this->API->getSettings()->getConnection();
}
public function isCdn(int $dc): bool
{
$test = $this->getSettings()->getTestMode() ? 'test' : 'main';
$ipv6 = $this->getSettings()->getIpv6() ? 'ipv6' : 'ipv4';
return $this->API->dcList[$test][$ipv6][$dc]['cdn'] ?? false;
}
public function getHTTPClient(): HttpClient
{
return $this->dohWrapper->HTTPClient;
@ -129,13 +129,58 @@ final class DataCenter
return $this->dohWrapper->DoHClient;
}
/**
* Normalizes "bindto" options to add a ":0" in case no port is present, otherwise PHP will silently ignore those.
*
* @throws \Error If an invalid option has been passed.
*
* @internal
*/
private static function normalizeBindToOption(string $bindTo = null): ?string
{
if ($bindTo === null) {
return null;
}
if (\preg_match("/\\[(?P<ip>[0-9a-f:]+)](:(?P<port>\\d+))?$/", $bindTo, $match)) {
$ip = $match['ip'];
$port = (int) ($match['port'] ?? 0);
if (\inet_pton($ip) === false) {
throw new \Error("Invalid IPv6 address: $ip");
}
if ($port < 0 || $port > 65535) {
throw new \Error("Invalid port: $port");
}
return "[$ip]:$port";
}
if (\preg_match("/(?P<ip>\\d+\\.\\d+\\.\\d+\\.\\d+)(:(?P<port>\\d+))?$/", $bindTo, $match)) {
$ip = $match['ip'];
$port = (int) ($match['port'] ?? 0);
if (\inet_pton($ip) === false) {
throw new \Error("Invalid IPv4 address: $ip");
}
if ($port < 0 || $port > 65535) {
throw new \Error("Invalid port: $port");
}
return "$ip:$port";
}
throw new \Error("Invalid bindTo value: $bindTo");
}
/**
* Generate contexts.
*
* @param integer $dc_number DC ID to generate contexts for
* @return non-empty-list<ConnectionContext>
*/
private function generateContexts(int $dc_number): array
private function generateContexts(int $dc_number): ContextIterator
{
$test = $this->getSettings()->getTestMode() ? 'test' : 'main';
$ipv6 = $this->getSettings()->getIpv6() ? 'ipv6' : 'ipv4';
@ -232,7 +277,7 @@ final class DataCenter
}
$combos = \array_unique($combos, SORT_REGULAR);
$bind = $this->getSettings()->getBindTo();
$bind = self::normalizeBindToOption($this->getSettings()->getBindTo());
$onlyIPv6 = null;
if ($bind !== null) {
$onlyIPv6 = InternetAddress::fromString($bind)->getVersion() === InternetAddressVersion::IPv6
@ -277,7 +322,7 @@ final class DataCenter
}
$ctx = (new ConnectionContext())
->setDc($dc_number)
->setCdn($this->isCdn($dc_number))
->setCdn($this->API->isCdn($dc_number))
->setSocketContext($context)
->setUri($uri)
->setIpv6($ipv6 === 'ipv6');
@ -315,16 +360,7 @@ final class DataCenter
if (empty($ctxs)) {
throw new AssertionError("No info for DC $dc_number!");
}
return $ctxs;
}
/**
* Get contents of file.
*
* @param string $url URL to fetch
*/
public function fileGetContents(string $url): string
{
return ($this->dohWrapper->HTTPClient->request(new Request($url)))->getBody()->buffer();
return new ContextIterator($ctxs);
}
public function waitGetConnection(int $dc): Connection
{
@ -348,7 +384,7 @@ final class DataCenter
$this->API->logger->logger("Connecting to DC {$dc}", Logger::WARNING);
$this->sockets[$dc] ??= new DataCenterConnection();
$this->sockets[$dc]->setExtra($this->API, new ContextIterator($ctxs));
$this->sockets[$dc]->setExtra($this->API, $dc, $ctxs);
$this->sockets[$dc]->connect();
} finally {
$lock->release();

View File

@ -133,6 +133,11 @@ final class DataCenterConnection implements JsonSerializable
{
return $this->needsReconnect;
}
public function getCtxs(): ContextIterator
{
\assert($this->ctx !== null);
return $this->ctx;
}
private ?LocalMutex $initingAuth = null;
/**
* Init auth keys for single DC.
@ -256,7 +261,7 @@ final class DataCenterConnection implements JsonSerializable
&& $authorized_socket->isAuthorized()
&& $this->API->authorized === \danog\MadelineProto\API::LOGGED_IN
&& !$this->isAuthorized()
&& !$authorized_socket->ctx->isCDN()
&& !$this->API->isCDN($authorized_dc_id)
) {
try {
$logger->logger('Trying to copy authorization from DC '.$authorized_dc_id.' to DC '.$this->datacenter);
@ -423,8 +428,7 @@ final class DataCenterConnection implements JsonSerializable
*/
public function connect(int $id = -1): void
{
$this->datacenter = $this->ctx->getDc();
$media = $this->ctx->isMedia() || $this->ctx->isCDN();
$media = DataCenter::isMedia($this->datacenter) || $this->API->isCDN($this->datacenter);
if ($media) {
if (!$this->robinLoop) {
$this->robinLoop = new PeriodicLoopInternal(
@ -455,7 +459,7 @@ final class DataCenterConnection implements JsonSerializable
$this->restoreBackup();
} else {
$this->availableConnections[$id] = 0;
$this->connections[$id]->setExtra($this, $id, $this->ctx);
$this->connections[$id]->setExtra($this, $this->datacenter, $id);
}
}
/**
@ -468,7 +472,7 @@ final class DataCenterConnection implements JsonSerializable
$count += $previousCount = \count($this->connections);
for ($x = $previousCount; $x < $count; $x++) {
$connection = new Connection();
$connection->setExtra($this, $x, $this->ctx);
$connection->setExtra($this, $this->datacenter, $x);
$this->connections[$x] = $connection;
$this->availableConnections[$x] = 0;
}
@ -613,7 +617,7 @@ final class DataCenterConnection implements JsonSerializable
$count += 50;
}
} elseif ($min < 100) {
$max = $this->ctx->isMedia() || $this->ctx->isCDN() ? $this->API->getSettings()->getConnection()->getMaxMediaSocketCount() : 1;
$max = DataCenter::isMedia($this->datacenter) || $this->API->isCDN($this->datacenter) ? $this->API->getSettings()->getConnection()->getMaxMediaSocketCount() : 1;
if (\count($this->availableConnections) < $max) {
$this->connectMore(2);
} else {
@ -654,8 +658,9 @@ final class DataCenterConnection implements JsonSerializable
*
* @param MTProto $API Main instance
*/
public function setExtra(MTProto $API, ContextIterator $ctx): void
public function setExtra(MTProto $API, int $datacenter, ContextIterator $ctx): void
{
$this->datacenter = $datacenter;
$this->API = $API;
$this->ctx = $ctx;
}

View File

@ -26,6 +26,7 @@ use Amp\DeferredFuture;
use Amp\Dns\DnsResolver;
use Amp\Future;
use Amp\Http\Client\HttpClient;
use Amp\Http\Client\Request;
use Amp\Sync\LocalMutex;
use danog\MadelineProto\Broadcast\Broadcast;
use danog\MadelineProto\Db\DbArray;
@ -740,16 +741,7 @@ final class MTProto implements TLCallback, LoggerGetter
*/
public function fileGetContents(string $url): string
{
return $this->datacenter->fileGetContents($url);
}
/**
* Get all datacenter connections.
* @internal
* @return array<DataCenterConnection>
*/
public function getDataCenterConnections(): array
{
return $this->datacenter->getDataCenterConnections();
return $this->getHTTPClient()->request(new Request($url))->getBody()->buffer();
}
/**
* Get main DC ID.
@ -1023,6 +1015,13 @@ final class MTProto implements TLCallback, LoggerGetter
$this->wrapper->getSession()->delete();
}
}
/** @internal */
public function isCdn(int $dc): bool
{
$test = $this->settings->getConnection()->getTestMode() ? 'test' : 'main';
$ipv6 = $this->settings->getConnection()->getIpv6() ? 'ipv6' : 'ipv4';
return $this->dcList[$test][$ipv6][$dc]['cdn'] ?? false;
}
/**
* Destructor.
*/

View File

@ -18,18 +18,6 @@ final class ContextIterator implements IteratorAggregate
) {
}
public function getDc(): int
{
return $this->ctxs[0]->getDc();
}
public function isMedia(): bool
{
return $this->ctxs[0]->isMedia();
}
public function isCDN(): bool
{
return $this->ctxs[0]->isCDN();
}
public function getIterator(): Traversable
{
foreach ($this->ctxs as $ctx) {