mirror of
https://github.com/danog/MadelineProto.git
synced 2024-11-30 09:58:59 +01:00
Disconnect from media DCs as needed
This commit is contained in:
parent
b03bf26e10
commit
407c07c315
@ -249,17 +249,18 @@ final class Connection
|
||||
}
|
||||
/**
|
||||
* Connects to a telegram DC using the specified protocol, proxy and connection parameters.
|
||||
*
|
||||
* @param ConnectionContext $ctx Connection context
|
||||
*/
|
||||
public function connect(ConnectionContext $ctx): void
|
||||
public function connect(): self
|
||||
{
|
||||
if ($this->stream) {
|
||||
return $this;
|
||||
}
|
||||
$ctx = $this->ctx;
|
||||
$this->ctx = $ctx->getCtx();
|
||||
$this->datacenter = $ctx->getDc();
|
||||
$this->datacenterId = $this->datacenter . '.' . $this->id;
|
||||
$this->API->logger->logger("Connecting to DC {$this->datacenterId}", Logger::WARNING);
|
||||
|
||||
$this->createSession();
|
||||
$this->stream = ($ctx->getStream());
|
||||
$this->API->logger->logger("Connecting to DC {$this->datacenterId}", Logger::WARNING);
|
||||
$this->stream = $ctx->getStream();
|
||||
$this->API->logger->logger("Connected to DC {$this->datacenterId}!", Logger::WARNING);
|
||||
if ($this->needsReconnect) {
|
||||
$this->needsReconnect = false;
|
||||
@ -290,6 +291,7 @@ final class Connection
|
||||
if ($this->pinger) {
|
||||
Assert::true($this->pinger->start(), "Could not start pinger stream");
|
||||
}
|
||||
return $this;
|
||||
}
|
||||
/**
|
||||
* Apply method abstractions.
|
||||
@ -495,12 +497,15 @@ final class Connection
|
||||
* @param DataCenterConnection $extra Shared instance
|
||||
* @param int $id Connection ID
|
||||
*/
|
||||
public function setExtra(DataCenterConnection $extra, int $id): void
|
||||
public function setExtra(DataCenterConnection $extra, int $id, ConnectionContext $ctx): void
|
||||
{
|
||||
$this->shared = $extra;
|
||||
$this->id = $id;
|
||||
$this->API = $extra->getExtra();
|
||||
$this->logger = $this->API->logger;
|
||||
$this->ctx = $ctx->getCtx();
|
||||
$this->datacenter = $ctx->getDc();
|
||||
$this->datacenterId = $this->datacenter . '.' . $this->id;
|
||||
}
|
||||
/**
|
||||
* Get main instance.
|
||||
@ -527,7 +532,9 @@ final class Connection
|
||||
$this->needsReconnect = true;
|
||||
if ($this->stream) {
|
||||
try {
|
||||
$this->stream->disconnect();
|
||||
$stream = $this->stream;
|
||||
$this->stream = null;
|
||||
$stream->disconnect();
|
||||
} catch (ClosedException $e) {
|
||||
$this->API->logger->logger($e);
|
||||
}
|
||||
|
@ -470,7 +470,7 @@ final class DataCenterConnection implements JsonSerializable
|
||||
} else {
|
||||
$this->ctx = $ctx->getCtx();
|
||||
$this->availableConnections[$id] = 0;
|
||||
$this->connections[$id]->connect($ctx);
|
||||
$this->connections[$id]->setExtra($this, $id, $ctx);
|
||||
}
|
||||
}
|
||||
/**
|
||||
@ -484,8 +484,7 @@ final class DataCenterConnection implements JsonSerializable
|
||||
$count += $previousCount = \count($this->connections);
|
||||
for ($x = $previousCount; $x < $count; $x++) {
|
||||
$connection = new Connection();
|
||||
$connection->setExtra($this, $x);
|
||||
$connection->connect($ctx);
|
||||
$connection->setExtra($this, $x, $ctx);
|
||||
$this->connections[$x] = $connection;
|
||||
$this->availableConnections[$x] = 0;
|
||||
$ctx = $this->ctx->getCtx();
|
||||
@ -575,9 +574,9 @@ final class DataCenterConnection implements JsonSerializable
|
||||
/**
|
||||
* Get connection for authorization.
|
||||
*/
|
||||
public function getAuthConnection(): Connection
|
||||
private function getAuthConnection(): Connection
|
||||
{
|
||||
return $this->connections[0];
|
||||
return $this->connections[0]->connect();
|
||||
}
|
||||
/**
|
||||
* Check if any connection is available.
|
||||
@ -596,7 +595,7 @@ final class DataCenterConnection implements JsonSerializable
|
||||
if (empty($this->availableConnections)) {
|
||||
$this->connectionsPromise->await();
|
||||
}
|
||||
return $this->getConnection();
|
||||
return $this->getConnection()->connect();
|
||||
}
|
||||
/**
|
||||
* Get best socket in round robin.
|
||||
|
@ -58,7 +58,6 @@ final class CheckLoop extends Loop
|
||||
if (!$this->connection->hasPendingCalls()) {
|
||||
return $this->timeout;
|
||||
}
|
||||
$last_msgid = $this->connection->msgIdHandler->getMaxId(true);
|
||||
if ($this->shared->hasTempAuthKey()) {
|
||||
$full_message_ids = $this->connection->getPendingCalls();
|
||||
foreach (\array_chunk($full_message_ids, 8192) as $message_ids) {
|
||||
|
@ -61,11 +61,17 @@ final class ReadLoop extends Loop
|
||||
return self::STOP;
|
||||
}
|
||||
EventLoop::queue(function () use ($e): void {
|
||||
if (!$e instanceof NothingInTheSocketException) {
|
||||
if ($e instanceof NothingInTheSocketException
|
||||
&& !$this->connection->hasPendingCalls()
|
||||
&& $this->shared->isMedia()
|
||||
) {
|
||||
$this->logger->logger("Got NothingInTheSocketException in DC {$this->datacenter}, disconnecting because we have nothing to do...", Logger::ERROR);
|
||||
$this->connection->disconnect(true);
|
||||
} else {
|
||||
$this->logger->logger($e);
|
||||
}
|
||||
$this->logger->logger("Got nothing in the socket in DC {$this->datacenter}, reconnecting...", Logger::ERROR);
|
||||
$this->logger->logger("Got exception in DC {$this->datacenter}, reconnecting...", Logger::ERROR);
|
||||
$this->connection->reconnect();
|
||||
}
|
||||
});
|
||||
return self::STOP;
|
||||
} catch (SecurityException $e) {
|
||||
|
@ -66,15 +66,9 @@ final class WriteLoop extends Loop
|
||||
}
|
||||
$this->connection->writing(true);
|
||||
try {
|
||||
$this->logger->logger("About to write in $this ".($this->shared->hasTempAuthKey() ? 'encrypted' : 'unencrypted'));
|
||||
$please_wait = $this->shared->hasTempAuthKey()
|
||||
? $this->encryptedWriteLoop()
|
||||
: $this->unencryptedWriteLoop();
|
||||
if ($please_wait) {
|
||||
$this->logger->logger("Did not write anything in $this");
|
||||
} else {
|
||||
$this->logger->logger("Wrote in $this");
|
||||
}
|
||||
} catch (StreamException $e) {
|
||||
if ($this->connection->shouldReconnect()) {
|
||||
$this->logger->logger("Stopping $this because we have to reconnect");
|
||||
|
Loading…
Reference in New Issue
Block a user