GramJs: Fix reconnecting after sudden WS connection close (#1320)

This commit is contained in:
Alexander Zinchuk 2021-07-27 20:14:08 +03:00
parent 66e9b35f50
commit 73910b1b4b
6 changed files with 37 additions and 19 deletions

View File

@ -174,7 +174,7 @@ class TelegramClient {
});
}
// set defaults vars
this._sender.userDisconnected = true;
this._sender.userDisconnected = false;
this._sender._user_connected = false;
this._sender._reconnecting = false;
this._sender._disconnected = true;
@ -317,6 +317,9 @@ class TelegramClient {
// export region
_cleanupExportedSender(dcId) {
if (this.session.dcId !== dcId) {
this.session.setAuthKey(undefined, dcId);
}
this._exportedSenderPromises[dcId] = undefined;
}
@ -343,6 +346,7 @@ class TelegramClient {
sender._authenticated = true;
}
sender.dcId = dcId;
sender.userDisconnected = false;
return sender;
} catch (err) {

View File

@ -107,7 +107,6 @@ export async function downloadFile(
// used to populate the sender
await client.getSender(dcId);
// eslint-disable-next-line no-constant-condition
while (true) {
let limit = partSize;
@ -129,12 +128,8 @@ export async function downloadFile(
promises.push((async (offsetMemo: number) => {
// eslint-disable-next-line no-constant-condition
while (true) {
const sender = await client.getSender(dcId);
try {
if (!sender._user_connected) {
await sleep(DISCONNECT_SLEEP);
continue;
}
const sender = await client.getSender(dcId);
const result = await sender.send(new Api.upload.GetFile({
location: inputLocation,
offset: offsetMemo,

View File

@ -38,8 +38,8 @@ export async function uploadFile(
const partCount = Math.floor((size + partSize - 1) / partSize);
const buffer = Buffer.from(await fileToBuffer(file));
// We always upload from the DC we are in.
const sender = await client.getSender(client.session.dcId);
// Make sure a new sender can be created before starting upload
await client.getSender(client.session.dcId);
if (!workers || !size) {
workers = 1;
@ -66,11 +66,9 @@ export async function uploadFile(
// eslint-disable-next-line no-loop-func
sendingParts.push((async (jMemo: number, bytesMemo: Buffer) => {
while (true) {
if (!sender._user_connected) {
await sleep(DISCONNECT_SLEEP);
continue;
}
try {
// We always upload from the DC we are in
const sender = await client.getSender(client.session.dcId);
await sender.send(
isLarge
? new Api.upload.SaveBigFilePart({

View File

@ -7,7 +7,7 @@ const WebSocketClient = require('websocket').w3cwebsocket;
const closeError = new Error('WebSocket was closed');
class PromisedWebSockets {
constructor() {
constructor(disconnectedCallback) {
/* CONTEST
this.isBrowser = typeof process === 'undefined' ||
process.type === 'renderer' ||
@ -17,6 +17,7 @@ class PromisedWebSockets {
*/
this.client = undefined;
this.closed = true;
this.disconnectedCallback = disconnectedCallback;
}
async readExactly(number) {
@ -96,6 +97,9 @@ class PromisedWebSockets {
console.error(`Socket ${ip} closed. Code: ${code}, reason: ${reason}, was clean: ${wasClean}`);
this.resolveRead(false);
this.closed = true;
if (this.disconnectedCallback) {
this.disconnectedCallback();
}
};
// CONTEST
// Seems to not be working, at least in a web worker

View File

@ -826,7 +826,7 @@ class MTProtoSender {
async _reconnect() {
this._log.debug('Closing current connection...');
try {
await this.disconnect();
await this._disconnect();
} catch (err) {
this._log.warn(err);
}
@ -834,7 +834,14 @@ class MTProtoSender {
this._send_queue.append(undefined);
this._state.reset();
await this.connect(this._connection, true);
// For some reason reusing existing connection caused stuck requests
const newConnection = new this._connection.constructor(
this._connection._ip,
this._connection._port,
this._connection._dcId,
this._connection._log,
);
await this.connect(newConnection, true);
this._reconnecting = false;
// uncomment this if you want to resend

View File

@ -29,7 +29,11 @@ class Connection {
this._recvArray = new AsyncQueue();
// this.socket = new PromiseSocket(new Socket())
this.socket = new PromisedWebSockets();
this.socket = new PromisedWebSockets(this.disconnectCallback.bind(this));
}
async disconnectCallback() {
await this.disconnect(true);
}
async _connect() {
@ -51,10 +55,16 @@ class Connection {
this._recvTask = this._recvLoop();
}
async disconnect() {
async disconnect(fromCallback = false) {
if (!this._connected) {
return;
}
this._connected = false;
void this._recvArray.push(undefined);
await this.socket.close();
if (!fromCallback) {
await this.socket.close();
}
}
async send(data) {