GramJs: Support reconnects when transferring files, avoid main loop race condition (#1319)

This commit is contained in:
Alexander Zinchuk 2021-07-24 02:17:36 +03:00
parent a2462708ca
commit 0e59ff9bd7
7 changed files with 233 additions and 157 deletions

View File

@ -332,7 +332,7 @@ function getAppropriatedPartSize(fileSize) {
if (fileSize <= 786432000) { // 750MB
return 256;
}
if (fileSize <= 1572864000) { // 1500MB
if (fileSize <= 2097152000) { // 2000MB
return 512;
}

View File

@ -25,7 +25,7 @@ const DEFAULT_DC_ID = 2;
const WEBDOCUMENT_DC_ID = 4;
const DEFAULT_IPV4_IP = 'zws4.web.telegram.org';
const DEFAULT_IPV6_IP = '[2001:67c:4e8:f002::a]';
const BORROWED_SENDER_RELEASE_TIMEOUT = 30000; // 30 sec
const EXPORTED_SENDER_RELEASE_TIMEOUT = 30000; // 30 sec
const WEBDOCUMENT_REQUEST_PART_SIZE = 131072; // 128kb
const PING_INTERVAL = 3000; // 3 sec
@ -139,9 +139,12 @@ class TelegramClient {
// These will be set later
this._config = undefined;
this.phoneCodeHashes = [];
this._borrowedSenderPromises = {};
this._borrowedSenderReleaseTimeouts = {};
this._exportedSenderPromises = {};
this._exportedSenderReleaseTimeouts = {};
this._additionalDcsDisabled = args.additionalDcsDisabled;
this._loopStarted = false;
this._reconnecting = false;
this._destroyed = false;
}
@ -156,30 +159,50 @@ class TelegramClient {
async connect() {
await this._initSession();
this._sender = new MTProtoSender(this.session.getAuthKey(), {
logger: this._log,
dcId: this.session.dcId,
retries: this._connectionRetries,
delay: this._retryDelay,
autoReconnect: this._autoReconnect,
connectTimeout: this._timeout,
authKeyCallback: this._authKeyCallback.bind(this),
updateCallback: this._handleUpdate.bind(this),
isMainSender: true,
});
if (this._sender === undefined) {
// only init sender once to avoid multiple loops.
this._sender = new MTProtoSender(this.session.getAuthKey(), {
logger: this._log,
dcId: this.session.dcId,
retries: this._connectionRetries,
delay: this._retryDelay,
autoReconnect: this._autoReconnect,
connectTimeout: this._timeout,
authKeyCallback: this._authKeyCallback.bind(this),
updateCallback: this._handleUpdate.bind(this),
isMainSender: true,
});
}
// set defaults vars
this._sender.userDisconnected = true;
this._sender._user_connected = false;
this._sender._reconnecting = false;
this._sender._disconnected = true;
const connection = new this._connection(
this.session.serverAddress, this.session.port, this.session.dcId, this._log,
);
await this._sender.connect(connection);
const newConnection = await this._sender.connect(connection);
if (!newConnection) {
// we're already connected so no need to reset auth key.
if (!this._loopStarted) {
this._updateLoop();
this._loopStarted = true;
}
return;
}
this.session.setAuthKey(this._sender.authKey);
await this._sender.send(this._initWith(
new requests.help.GetConfig({}),
));
this._updateLoop();
if (!this._loopStarted) {
this._updateLoop();
this._loopStarted = true;
}
this._reconnecting = false;
}
async _initSession() {
@ -192,8 +215,11 @@ class TelegramClient {
}
async _updateLoop() {
while (this.isConnected()) {
while (!this._destroyed) {
await Helpers.sleep(PING_INTERVAL);
if (this._reconnecting) {
continue;
}
try {
await attempts(() => {
@ -205,11 +231,12 @@ class TelegramClient {
} catch (err) {
// eslint-disable-next-line no-console
console.warn(err);
if (this._reconnecting) {
continue;
}
await this.disconnect();
this.connect();
return;
await this.connect();
}
// We need to send some content-related request at least hourly
@ -225,6 +252,7 @@ class TelegramClient {
}
}
}
await this.disconnect();
}
/**
@ -237,7 +265,7 @@ class TelegramClient {
}
await Promise.all(
Object.values(this._borrowedSenderPromises)
Object.values(this._exportedSenderPromises)
.map((promise) => {
return promise && promise.then((sender) => {
if (sender) {
@ -248,7 +276,7 @@ class TelegramClient {
}),
);
this._borrowedSenderPromises = {};
this._exportedSenderPromises = {};
}
/**
@ -256,6 +284,8 @@ class TelegramClient {
* @returns {Promise<void>}
*/
async destroy() {
this._destroyed = true;
try {
await this.disconnect();
} catch (err) {
@ -274,6 +304,7 @@ class TelegramClient {
// so it's not valid anymore. Set to None to force recreating it.
await this._sender.authKey.setKey(undefined);
this.session.setAuthKey(undefined);
this._reconnecting = true;
await this.disconnect();
return this.connect();
}
@ -285,58 +316,14 @@ class TelegramClient {
// endregion
// export region
_cleanupBorrowedSender(dcId) {
this._borrowedSenderPromises[dcId] = undefined;
_cleanupExportedSender(dcId) {
this._exportedSenderPromises[dcId] = undefined;
}
_borrowExportedSender(dcId) {
if (this._additionalDcsDisabled) {
return undefined;
}
if (!this._borrowedSenderPromises[dcId]) {
this._borrowedSenderPromises[dcId] = this._createExportedSender(dcId);
}
return this._borrowedSenderPromises[dcId].then((sender) => {
if (!sender) {
this._borrowedSenderPromises[dcId] = undefined;
return this._borrowExportedSender(dcId);
}
if (this._borrowedSenderReleaseTimeouts[dcId]) {
clearTimeout(this._borrowedSenderReleaseTimeouts[dcId]);
this._borrowedSenderReleaseTimeouts[dcId] = undefined;
}
this._borrowedSenderReleaseTimeouts[dcId] = setTimeout(() => {
this._borrowedSenderReleaseTimeouts[dcId] = undefined;
this._borrowedSenderPromises[dcId] = undefined;
// eslint-disable-next-line no-console
console.warn(`Disconnecting from file socket #${dcId}...`);
sender.disconnect();
}, BORROWED_SENDER_RELEASE_TIMEOUT);
return sender;
});
}
async _createExportedSender(dcId) {
async _connectSender(sender, dcId) {
const dc = utils.getDC(dcId);
const sender = new MTProtoSender(this.session.getAuthKey(dcId),
{
logger: this._log,
dcId,
retries: this._connectionRetries,
delay: this._retryDelay,
autoReconnect: this._autoReconnect,
connectTimeout: this._timeout,
authKeyCallback: this._authKeyCallback.bind(this),
isMainSender: dcId === this.session.dcId,
onConnectionBreak: this._cleanupBorrowedSender.bind(this),
});
for (let i = 0; i < 5; i++) {
while (true) {
try {
await sender.connect(new this._connection(
dc.ipAddress,
@ -344,7 +331,8 @@ class TelegramClient {
dcId,
this._log,
));
if (this.session.dcId !== dcId) {
if (this.session.dcId !== dcId && !sender._authenticated) {
this._log.info(`Exporting authorization for data center ${dc.ipAddress}`);
const auth = await this.invoke(new requests.auth.ExportAuthorization({ dcId }));
const req = this._initWith(new requests.auth.ImportAuthorization({
@ -352,14 +340,78 @@ class TelegramClient {
bytes: auth.bytes,
}));
await sender.send(req);
sender._authenticated = true;
}
sender.dcId = dcId;
return sender;
} catch (e) {
} catch (err) {
// eslint-disable-next-line no-console
console.error(err);
await Helpers.sleep(1000);
await sender.disconnect();
}
}
return undefined;
}
async _borrowExportedSender(dcId, shouldReconnect, existingSender) {
if (this._additionalDcsDisabled) {
return undefined;
}
if (!this._exportedSenderPromises[dcId] || shouldReconnect) {
this._exportedSenderPromises[dcId] = this._connectSender(
existingSender || this._createExportedSender(dcId),
dcId,
);
}
let sender;
try {
sender = await this._exportedSenderPromises[dcId];
if (!sender.isConnected()) {
return this._borrowExportedSender(dcId, true, sender);
}
} catch (err) {
// eslint-disable-next-line no-console
console.error(err);
return this._borrowExportedSender(dcId, true);
}
if (this._exportedSenderReleaseTimeouts[dcId]) {
clearTimeout(this._exportedSenderReleaseTimeouts[dcId]);
this._exportedSenderReleaseTimeouts[dcId] = undefined;
}
this._exportedSenderReleaseTimeouts[dcId] = setTimeout(() => {
// eslint-disable-next-line no-console
console.warn(`Disconnecting from file socket #${dcId}...`);
this._exportedSenderReleaseTimeouts[dcId] = undefined;
sender.disconnect();
}, EXPORTED_SENDER_RELEASE_TIMEOUT);
return sender;
}
_createExportedSender(dcId) {
return new MTProtoSender(this.session.getAuthKey(dcId), {
logger: this._log,
dcId,
retries: this._connectionRetries,
delay: this._retryDelay,
autoReconnect: this._autoReconnect,
connectTimeout: this._timeout,
authKeyCallback: this._authKeyCallback.bind(this),
isMainSender: dcId === this.session.dcId,
onConnectionBreak: this._cleanupExportedSender.bind(this),
});
}
getSender(dcId) {
return dcId ? this._borrowExportedSender(dcId) : Promise.resolve(this._sender);
}
// end region
@ -610,6 +662,7 @@ class TelegramClient {
}
}
}
// region Invoking Telegram request
/**
* Invokes a MTProtoRequest (sends and receives it) and returns its result

View File

@ -3,6 +3,7 @@ import { default as Api } from '../tl/api';
import TelegramClient from './TelegramClient';
import { getAppropriatedPartSize } from '../Utils';
import { sleep, createDeferred } from '../Helpers';
import errors from '../errors';
export interface progressCallback {
isCanceled?: boolean;
@ -33,7 +34,7 @@ interface Deferred {
const MIN_CHUNK_SIZE = 4096;
const DEFAULT_CHUNK_SIZE = 64; // kb
const ONE_MB = 1024 * 1024;
const REQUEST_TIMEOUT = 15000;
const DISCONNECT_SLEEP = 1000;
class Foreman {
@ -90,24 +91,6 @@ export async function downloadFile(
throw new Error(`The part size must be evenly divisible by ${MIN_CHUNK_SIZE}`);
}
let sender: any;
if (dcId) {
try {
sender = await client._borrowExportedSender(dcId);
} catch (e) {
// This should never raise
client._log.error(e);
if (e.message === 'DC_ID_INVALID') {
// Can't export a sender for the ID we are currently in
sender = client._sender;
} else {
throw e;
}
}
} else {
sender = client._sender;
}
client._log.info(`Downloading file in chunks of ${partSize} bytes`);
const foreman = new Foreman(workers);
@ -121,6 +104,10 @@ export async function downloadFile(
progressCallback(progress);
}
// used to populate the sender
await client.getSender(dcId);
// eslint-disable-next-line no-constant-condition
while (true) {
let limit = partSize;
@ -139,39 +126,55 @@ export async function downloadFile(
}
// eslint-disable-next-line no-loop-func
promises.push((async () => {
try {
const result = await Promise.race([
await sender.send(new Api.upload.GetFile({
promises.push((async (offsetMemo: number) => {
// eslint-disable-next-line no-constant-condition
while (true) {
const sender = await client.getSender(dcId);
try {
if (!sender._user_connected) {
await sleep(DISCONNECT_SLEEP);
continue;
}
const result = await sender.send(new Api.upload.GetFile({
location: inputLocation,
offset,
offset: offsetMemo,
limit,
precise: isPrecise || undefined,
})),
sleep(REQUEST_TIMEOUT).then(() => Promise.reject(new Error('REQUEST_TIMEOUT'))),
]);
}));
if (progressCallback) {
if (progressCallback.isCanceled) {
throw new Error('USER_CANCELED');
if (progressCallback) {
if (progressCallback.isCanceled) {
throw new Error('USER_CANCELED');
}
progress += (1 / partsCount);
progressCallback(progress);
}
progress += (1 / partsCount);
progressCallback(progress);
}
if (!end && (result.bytes.length < limit)) {
hasEnded = true;
}
return result.bytes;
} catch (err) {
if (err.message === 'Disconnect') {
await sleep(DISCONNECT_SLEEP);
continue;
} else if (err instanceof errors.FloodWaitError) {
await sleep(err.seconds * 1000);
continue;
} else if (err.message !== 'USER_CANCELED') {
// eslint-disable-next-line no-console
console.error(err);
}
if (!end && (result.bytes.length < limit)) {
hasEnded = true;
throw err;
} finally {
foreman.releaseWorker();
}
return result.bytes;
} catch (err) {
hasEnded = true;
throw err;
} finally {
foreman.releaseWorker();
}
})());
})(offset));
offset += limit;
@ -179,7 +182,6 @@ export async function downloadFile(
break;
}
}
const results = await Promise.all(promises);
const buffers = results.filter(Boolean);
const totalLength = end ? (end + 1) - start : undefined;

View File

@ -4,6 +4,7 @@ import { default as Api } from '../tl/api';
import TelegramClient from './TelegramClient';
import { generateRandomBytes, readBigIntFromBuffer, sleep } from '../Helpers';
import { getAppropriatedPartSize } from '../Utils';
import errors from '../errors';
interface OnProgress {
isCanceled?: boolean;
@ -20,7 +21,7 @@ export interface UploadFileParams {
const KB_TO_BYTES = 1024;
const LARGE_FILE_THRESHOLD = 10 * 1024 * 1024;
const UPLOAD_TIMEOUT = 15 * 1000;
const DISCONNECT_SLEEP = 1000;
export async function uploadFile(
client: TelegramClient,
@ -38,7 +39,7 @@ export async function uploadFile(
const buffer = Buffer.from(await fileToBuffer(file));
// We always upload from the DC we are in.
const sender = await client._borrowExportedSender(client.session.dcId);
const sender = await client.getSender(client.session.dcId);
if (!workers || !size) {
workers = 1;
@ -63,47 +64,51 @@ export async function uploadFile(
const bytes = buffer.slice(j * partSize, (j + 1) * partSize);
// eslint-disable-next-line no-loop-func
sendingParts.push((async () => {
await sender.send(
isLarge
? new Api.upload.SaveBigFilePart({
fileId,
filePart: j,
fileTotalParts: partCount,
bytes,
})
: new Api.upload.SaveFilePart({
fileId,
filePart: j,
bytes,
}),
);
if (onProgress) {
if (onProgress.isCanceled) {
throw new Error('USER_CANCELED');
sendingParts.push((async (jMemo: number, bytesMemo: Buffer) => {
while (true) {
if (!sender._user_connected) {
await sleep(DISCONNECT_SLEEP);
continue;
}
try {
await sender.send(
isLarge
? new Api.upload.SaveBigFilePart({
fileId,
filePart: jMemo,
fileTotalParts: partCount,
bytes: bytesMemo,
})
: new Api.upload.SaveFilePart({
fileId,
filePart: jMemo,
bytes: bytesMemo,
}),
);
} catch (err) {
if (err.message === 'Disconnect') {
await sleep(DISCONNECT_SLEEP);
continue;
} else if (err instanceof errors.FloodWaitError) {
await sleep(err.seconds * 1000);
continue;
}
throw err;
}
progress += (1 / partCount);
onProgress(progress);
}
})());
}
try {
await Promise.race([
await Promise.all(sendingParts),
sleep(UPLOAD_TIMEOUT * workers).then(() => Promise.reject(new Error('TIMEOUT'))),
]);
} catch (err) {
if (err.message === 'TIMEOUT') {
// eslint-disable-next-line no-console
console.warn('Upload timeout. Retrying...');
i -= workers;
continue;
}
if (onProgress) {
if (onProgress.isCanceled) {
throw new Error('USER_CANCELED');
}
throw err;
progress += (1 / partCount);
onProgress(progress);
}
break;
}
})(j, bytes));
}
await Promise.all(sendingParts);
}
return isLarge

View File

@ -51,6 +51,11 @@ class Logger {
* @param message {string}
*/
warn(message) {
// todo remove later
if (_level === 'debug') {
// eslint-disable-next-line no-console
console.error(new Error().stack);
}
this._log('warn', message, this.colors.warn);
}
@ -72,6 +77,11 @@ class Logger {
* @param message {string}
*/
error(message) {
// todo remove later
if (_level === 'debug') {
// eslint-disable-next-line no-console
console.error(new Error().stack);
}
this._log('error', message, this.colors.error);
}

View File

@ -86,6 +86,8 @@ class PromisedWebSockets {
resolve(this);
};
this.client.onerror = (error) => {
// eslint-disable-next-line no-console
console.error('WebSocket error', error);
reject(error);
};
this.client.onclose = (event) => {

View File

@ -7,6 +7,7 @@ const RPCResult = require('../tl/core/RPCResult');
const MessageContainer = require('../tl/core/MessageContainer');
const GZIPPacked = require('../tl/core/GZIPPacked');
const RequestState = require('./RequestState');
const {
MsgsAck,
upload,
@ -169,6 +170,7 @@ class MTProtoSender {
this._log.info('User is already connected!');
return false;
}
this.isConnecting = true;
this._connection = connection;
for (let attempt = 0; attempt < this._retries; attempt++) {
@ -188,6 +190,7 @@ class MTProtoSender {
await Helpers.sleep(this._delay);
}
}
this.isConnecting = false;
return true;
}
@ -287,6 +290,7 @@ class MTProtoSender {
await this._authKeyCallback(this.authKey, this._dcId);
}
} else {
this._authenticated = true;
this._log.debug('Already have an auth key ...');
}
this._user_connected = true;