diff --git a/src/Connection.php b/src/Connection.php index 42e3b5079..037e9a065 100644 --- a/src/Connection.php +++ b/src/Connection.php @@ -580,7 +580,7 @@ final class Connection } $this->pendingOutgoing[$this->pendingOutgoingKey++] = $message; $this->outgoingCtr?->inc(); - $this->pendingOutgoingGauge?->set(count($this->pendingOutgoing)); + $this->pendingOutgoingGauge?->set(\count($this->pendingOutgoing)); if (isset($this->writer)) { $this->writer->resume(); } diff --git a/src/Loop/Connection/ReadLoop.php b/src/Loop/Connection/ReadLoop.php index 652fd5d00..0f06dd2ab 100644 --- a/src/Loop/Connection/ReadLoop.php +++ b/src/Loop/Connection/ReadLoop.php @@ -139,10 +139,13 @@ final class ReadLoop extends Loop } throw $e; } + /** @var int $payload_length */ if ($payload_length & (1 << 31)) { $this->API->logger("Received quick ACK $payload_length from DC ".$this->datacenter, Logger::ULTRA_VERBOSE); + $this->connection->incomingBytesCtr?->incBy(4); return null; } + $this->connection->incomingBytesCtr?->incBy(4+$payload_length); if ($payload_length <= 16) { $code = Tools::unpackSignedInt($buffer->bufferRead(4)); if ($code === -1 && $payload_length >= 8) { @@ -177,6 +180,7 @@ final class ReadLoop extends Loop if ($left < (-$message_length & 15)) { $this->API->logger('Protocol padded unencrypted message', Logger::ULTRA_VERBOSE); } + $this->connection->incomingBytesCtr?->incBy($left); $buffer->bufferRead($left); } } elseif ($auth_key_id === $this->shared->getTempAuthKey()->getID()) { @@ -187,6 +191,7 @@ final class ReadLoop extends Loop $payload_length -= $left; $decrypted_data = Crypt::igeDecrypt($buffer->bufferRead($payload_length), $aes_key, $aes_iv); if ($left) { + $this->connection->incomingBytesCtr?->incBy($left); $buffer->bufferRead($left); } if ($message_key != substr(hash('sha256', substr($this->shared->getTempAuthKey()->getAuthKey(), 96, 32).$decrypted_data, true), 8, 16)) { diff --git a/src/Loop/Connection/WriteLoop.php b/src/Loop/Connection/WriteLoop.php index b0b384c4b..66dd0b47f 100644 --- a/src/Loop/Connection/WriteLoop.php +++ b/src/Loop/Connection/WriteLoop.php @@ -102,7 +102,7 @@ final class WriteLoop extends Loop } if ($message->getState() & MTProtoOutgoingMessage::STATE_REPLIED) { unset($this->connection->pendingOutgoing[$k]); - $this->connection->pendingOutgoingGauge?->set(count($this->connection->pendingOutgoing)); + $this->connection->pendingOutgoingGauge?->set(\count($this->connection->pendingOutgoing)); continue; } $skipped_all = false; @@ -112,14 +112,15 @@ final class WriteLoop extends Loop $pad_length = -$length & 15; $pad_length += 16 * Tools::randomInt(modulus: 16); $pad = Tools::random($pad_length); - $buffer = $this->connection->stream->getWriteBuffer(8 + 8 + 4 + $pad_length + $length); + $buffer = $this->connection->stream->getWriteBuffer($total_len = 8 + 8 + 4 + $pad_length + $length); $buffer->bufferWrite("\0\0\0\0\0\0\0\0".Tools::packSignedLong($message_id).Tools::packUnsignedInt($length).$message->getSerializedBody().$pad); $this->connection->httpSent(); + $this->connection->outgoingBytesCtr?->incBy($total_len); $this->API->logger("Sent $message as unencrypted message to DC $this->datacenter!", Logger::ULTRA_VERBOSE); unset($this->connection->pendingOutgoing[$k]); - $this->connection->pendingOutgoingGauge?->set(count($this->connection->pendingOutgoing)); + $this->connection->pendingOutgoingGauge?->set(\count($this->connection->pendingOutgoing)); $message->setMsgId($message_id); $this->connection->outgoing_messages[$message_id] = $message; $this->connection->new_outgoing[$message_id] = $message; @@ -162,13 +163,13 @@ final class WriteLoop extends Loop } if ($message->getState() & MTProtoOutgoingMessage::STATE_REPLIED) { unset($this->connection->pendingOutgoing[$k]); - $this->connection->pendingOutgoingGauge?->set(count($this->connection->pendingOutgoing)); + $this->connection->pendingOutgoingGauge?->set(\count($this->connection->pendingOutgoing)); $this->API->logger("Skipping resending of $message, we already got a reply in DC $this->datacenter"); continue; } if ($message instanceof Container) { unset($this->connection->pendingOutgoing[$k]); - $this->connection->pendingOutgoingGauge?->set(count($this->connection->pendingOutgoing)); + $this->connection->pendingOutgoingGauge?->set(\count($this->connection->pendingOutgoing)); continue; } $constructor = $message->constructor; @@ -326,7 +327,7 @@ final class WriteLoop extends Loop $message_id = $this->connection->msgIdHandler->generateMessageId(); $this->connection->pendingOutgoing[$this->connection->pendingOutgoingKey] = new Container(array_values($keys)); $this->connection->outgoingCtr?->inc(); - $this->connection->pendingOutgoingGauge?->set(count($this->connection->pendingOutgoing)); + $this->connection->pendingOutgoingGauge?->set(\count($this->connection->pendingOutgoing)); $keys[$this->connection->pendingOutgoingKey++] = $message_id; $message_data = $this->API->getTL()->serializeObject(['type' => ''], ['_' => 'msg_container', 'messages' => $messages], 'container'); $message_data_length = \strlen($message_data); @@ -353,9 +354,10 @@ final class WriteLoop extends Loop //$ack = unpack('V', substr($message_key_large, 0, 4))[1] | (1 << 31); [$aes_key, $aes_iv] = Crypt::kdf($message_key, $this->shared->getTempAuthKey()->getAuthKey()); $message = $this->shared->getTempAuthKey()->getID().$message_key.Crypt::igeEncrypt($plaintext.$padding, $aes_key, $aes_iv); - $buffer = $this->connection->stream->getWriteBuffer(\strlen($message)); + $buffer = $this->connection->stream->getWriteBuffer($total_len = \strlen($message)); $buffer->bufferWrite($message); $this->connection->httpSent(); + $this->connection->outgoingBytesCtr?->incBy($total_len); $this->API->logger("Sent encrypted payload to DC {$this->datacenter}", Logger::ULTRA_VERBOSE); if ($ackCount) { @@ -380,7 +382,7 @@ final class WriteLoop extends Loop } }); } - $this->connection->pendingOutgoingGauge?->set(count($this->connection->pendingOutgoing)); + $this->connection->pendingOutgoingGauge?->set(\count($this->connection->pendingOutgoing)); } while ($this->connection->pendingOutgoing && !$skipped); if (empty($this->connection->pendingOutgoing)) { $this->connection->pendingOutgoing = []; diff --git a/src/MTProtoSession/ResponseHandler.php b/src/MTProtoSession/ResponseHandler.php index e607e6e87..7b7e32b5e 100644 --- a/src/MTProtoSession/ResponseHandler.php +++ b/src/MTProtoSession/ResponseHandler.php @@ -145,7 +145,7 @@ trait ResponseHandler $this->checkInSeqNo($newMessage); $newMessage->setSeqNo(null); $tmp->enqueue($newMessage); - $this->connection->incomingCtr?->inc(); + $this->incomingCtr?->inc(); $this->incoming_messages[$msg['msg_id']] = $newMessage; } $this->checkInSeqNo($message); @@ -161,7 +161,7 @@ trait ResponseHandler } else { $this->msgIdHandler->checkIncomingMessageId($referencedMsgId, true); $message = new MTProtoIncomingMessage($content['orig_message'], $referencedMsgId, $message->unencrypted); - $this->connection->incomingCtr?->inc(); + $this->incomingCtr?->inc(); $this->incoming_messages[$referencedMsgId] = $message; $this->handleMessages([$message]); } @@ -276,10 +276,16 @@ trait ResponseHandler EventLoop::queue($request->reply(...), $response); } /** + * @param array{error_message: string, error_code: int} $response * @return (callable(): Throwable)|null */ private function handleRpcError(MTProtoOutgoingMessage $request, array $response): ?callable { + if ($response['error_code'] === 420) { + $this->rpcErrors?->inc(['message' => preg_replace('/\d+/', '', $response['error_message']), 'code' => (string) $response['error_code']]); + } else { + $this->rpcErrors?->inc(['message' => $response['error_message'], 'code' => (string) $response['error_code']]); + } if ($request->isMethod && $request->constructor !== 'auth.bindTempAuthKey' && $this->shared->hasTempAuthKey() diff --git a/src/MTProtoSession/Session.php b/src/MTProtoSession/Session.php index 95d7f2e95..c33b64cdf 100644 --- a/src/MTProtoSession/Session.php +++ b/src/MTProtoSession/Session.php @@ -42,9 +42,12 @@ trait Session use SeqNoHandler; use CallHandler; use Reliable; - public ?BetterGauge $pendingOutgoingGauge; - public ?BetterCounter $incomingCtr; - public ?BetterCounter $outgoingCtr; + public ?BetterGauge $pendingOutgoingGauge = null; + public ?BetterCounter $incomingCtr = null; + public ?BetterCounter $outgoingCtr = null; + public ?BetterCounter $incomingBytesCtr = null; + public ?BetterCounter $outgoingBytesCtr = null; + public ?BetterCounter $rpcErrors = null; /** * Incoming message array. * @@ -176,9 +179,13 @@ trait Session */ public function createSession(): void { - $this->pendingOutgoingGauge = $this->API->getPromGauge("MadelineProto", "pending_outgoing_mtproto_messages", "Number of not-yet sent outgoing MTProto messages", ['datacenter' => $this->datacenter, 'connection' => $this->id]); - $this->incomingCtr = $this->API->getPromCounter("MadelineProto", "outgoing_mtproto_messages", "Number of received MTProto messages", ['datacenter' => $this->datacenter, 'connection' => $this->id]); - $this->outgoingCtr = $this->API->getPromCounter("MadelineProto", "incoming_mtproto_messages", "Number of sent MTProto messages", ['datacenter' => $this->datacenter, 'connection' => $this->id]); + $labels = ['datacenter' => (string) $this->datacenter, 'connection' => (string) $this->id]; + $this->pendingOutgoingGauge = $this->API->getPromGauge("MadelineProto", "pending_outgoing_mtproto_messages", "Number of not-yet sent outgoing MTProto messages", $labels); + $this->incomingCtr = $this->API->getPromCounter("MadelineProto", "incoming_mtproto_messages", "Number of received MTProto messages", $labels); + $this->outgoingCtr = $this->API->getPromCounter("MadelineProto", "outgoing_mtproto_messages", "Number of sent MTProto messages", $labels); + $this->incomingBytesCtr = $this->API->getPromCounter("MadelineProto", "incoming_bytes", "Number of received bytes", $labels); + $this->outgoingBytesCtr = $this->API->getPromCounter("MadelineProto", "outgoing_bytes", "Number of sent bytes", $labels); + $this->rpcErrors = $this->API->getPromCounter("MadelineProto", "rpc_errors", "Number of received RPC errors by type", $labels); if ($this->session_id === null) { $this->resetSession("creating initial session"); }