mirror of
https://github.com/danog/MadelineProto.git
synced 2024-11-30 04:08:59 +01:00
Add more counters
This commit is contained in:
parent
3209f8c60e
commit
d9dcdfeb47
@ -580,7 +580,7 @@ final class Connection
|
||||
}
|
||||
$this->pendingOutgoing[$this->pendingOutgoingKey++] = $message;
|
||||
$this->outgoingCtr?->inc();
|
||||
$this->pendingOutgoingGauge?->set(count($this->pendingOutgoing));
|
||||
$this->pendingOutgoingGauge?->set(\count($this->pendingOutgoing));
|
||||
if (isset($this->writer)) {
|
||||
$this->writer->resume();
|
||||
}
|
||||
|
@ -139,10 +139,13 @@ final class ReadLoop extends Loop
|
||||
}
|
||||
throw $e;
|
||||
}
|
||||
/** @var int $payload_length */
|
||||
if ($payload_length & (1 << 31)) {
|
||||
$this->API->logger("Received quick ACK $payload_length from DC ".$this->datacenter, Logger::ULTRA_VERBOSE);
|
||||
$this->connection->incomingBytesCtr?->incBy(4);
|
||||
return null;
|
||||
}
|
||||
$this->connection->incomingBytesCtr?->incBy(4+$payload_length);
|
||||
if ($payload_length <= 16) {
|
||||
$code = Tools::unpackSignedInt($buffer->bufferRead(4));
|
||||
if ($code === -1 && $payload_length >= 8) {
|
||||
@ -177,6 +180,7 @@ final class ReadLoop extends Loop
|
||||
if ($left < (-$message_length & 15)) {
|
||||
$this->API->logger('Protocol padded unencrypted message', Logger::ULTRA_VERBOSE);
|
||||
}
|
||||
$this->connection->incomingBytesCtr?->incBy($left);
|
||||
$buffer->bufferRead($left);
|
||||
}
|
||||
} elseif ($auth_key_id === $this->shared->getTempAuthKey()->getID()) {
|
||||
@ -187,6 +191,7 @@ final class ReadLoop extends Loop
|
||||
$payload_length -= $left;
|
||||
$decrypted_data = Crypt::igeDecrypt($buffer->bufferRead($payload_length), $aes_key, $aes_iv);
|
||||
if ($left) {
|
||||
$this->connection->incomingBytesCtr?->incBy($left);
|
||||
$buffer->bufferRead($left);
|
||||
}
|
||||
if ($message_key != substr(hash('sha256', substr($this->shared->getTempAuthKey()->getAuthKey(), 96, 32).$decrypted_data, true), 8, 16)) {
|
||||
|
@ -102,7 +102,7 @@ final class WriteLoop extends Loop
|
||||
}
|
||||
if ($message->getState() & MTProtoOutgoingMessage::STATE_REPLIED) {
|
||||
unset($this->connection->pendingOutgoing[$k]);
|
||||
$this->connection->pendingOutgoingGauge?->set(count($this->connection->pendingOutgoing));
|
||||
$this->connection->pendingOutgoingGauge?->set(\count($this->connection->pendingOutgoing));
|
||||
continue;
|
||||
}
|
||||
$skipped_all = false;
|
||||
@ -112,14 +112,15 @@ final class WriteLoop extends Loop
|
||||
$pad_length = -$length & 15;
|
||||
$pad_length += 16 * Tools::randomInt(modulus: 16);
|
||||
$pad = Tools::random($pad_length);
|
||||
$buffer = $this->connection->stream->getWriteBuffer(8 + 8 + 4 + $pad_length + $length);
|
||||
$buffer = $this->connection->stream->getWriteBuffer($total_len = 8 + 8 + 4 + $pad_length + $length);
|
||||
$buffer->bufferWrite("\0\0\0\0\0\0\0\0".Tools::packSignedLong($message_id).Tools::packUnsignedInt($length).$message->getSerializedBody().$pad);
|
||||
$this->connection->httpSent();
|
||||
$this->connection->outgoingBytesCtr?->incBy($total_len);
|
||||
|
||||
$this->API->logger("Sent $message as unencrypted message to DC $this->datacenter!", Logger::ULTRA_VERBOSE);
|
||||
|
||||
unset($this->connection->pendingOutgoing[$k]);
|
||||
$this->connection->pendingOutgoingGauge?->set(count($this->connection->pendingOutgoing));
|
||||
$this->connection->pendingOutgoingGauge?->set(\count($this->connection->pendingOutgoing));
|
||||
$message->setMsgId($message_id);
|
||||
$this->connection->outgoing_messages[$message_id] = $message;
|
||||
$this->connection->new_outgoing[$message_id] = $message;
|
||||
@ -162,13 +163,13 @@ final class WriteLoop extends Loop
|
||||
}
|
||||
if ($message->getState() & MTProtoOutgoingMessage::STATE_REPLIED) {
|
||||
unset($this->connection->pendingOutgoing[$k]);
|
||||
$this->connection->pendingOutgoingGauge?->set(count($this->connection->pendingOutgoing));
|
||||
$this->connection->pendingOutgoingGauge?->set(\count($this->connection->pendingOutgoing));
|
||||
$this->API->logger("Skipping resending of $message, we already got a reply in DC $this->datacenter");
|
||||
continue;
|
||||
}
|
||||
if ($message instanceof Container) {
|
||||
unset($this->connection->pendingOutgoing[$k]);
|
||||
$this->connection->pendingOutgoingGauge?->set(count($this->connection->pendingOutgoing));
|
||||
$this->connection->pendingOutgoingGauge?->set(\count($this->connection->pendingOutgoing));
|
||||
continue;
|
||||
}
|
||||
$constructor = $message->constructor;
|
||||
@ -326,7 +327,7 @@ final class WriteLoop extends Loop
|
||||
$message_id = $this->connection->msgIdHandler->generateMessageId();
|
||||
$this->connection->pendingOutgoing[$this->connection->pendingOutgoingKey] = new Container(array_values($keys));
|
||||
$this->connection->outgoingCtr?->inc();
|
||||
$this->connection->pendingOutgoingGauge?->set(count($this->connection->pendingOutgoing));
|
||||
$this->connection->pendingOutgoingGauge?->set(\count($this->connection->pendingOutgoing));
|
||||
$keys[$this->connection->pendingOutgoingKey++] = $message_id;
|
||||
$message_data = $this->API->getTL()->serializeObject(['type' => ''], ['_' => 'msg_container', 'messages' => $messages], 'container');
|
||||
$message_data_length = \strlen($message_data);
|
||||
@ -353,9 +354,10 @@ final class WriteLoop extends Loop
|
||||
//$ack = unpack('V', substr($message_key_large, 0, 4))[1] | (1 << 31);
|
||||
[$aes_key, $aes_iv] = Crypt::kdf($message_key, $this->shared->getTempAuthKey()->getAuthKey());
|
||||
$message = $this->shared->getTempAuthKey()->getID().$message_key.Crypt::igeEncrypt($plaintext.$padding, $aes_key, $aes_iv);
|
||||
$buffer = $this->connection->stream->getWriteBuffer(\strlen($message));
|
||||
$buffer = $this->connection->stream->getWriteBuffer($total_len = \strlen($message));
|
||||
$buffer->bufferWrite($message);
|
||||
$this->connection->httpSent();
|
||||
$this->connection->outgoingBytesCtr?->incBy($total_len);
|
||||
$this->API->logger("Sent encrypted payload to DC {$this->datacenter}", Logger::ULTRA_VERBOSE);
|
||||
|
||||
if ($ackCount) {
|
||||
@ -380,7 +382,7 @@ final class WriteLoop extends Loop
|
||||
}
|
||||
});
|
||||
}
|
||||
$this->connection->pendingOutgoingGauge?->set(count($this->connection->pendingOutgoing));
|
||||
$this->connection->pendingOutgoingGauge?->set(\count($this->connection->pendingOutgoing));
|
||||
} while ($this->connection->pendingOutgoing && !$skipped);
|
||||
if (empty($this->connection->pendingOutgoing)) {
|
||||
$this->connection->pendingOutgoing = [];
|
||||
|
@ -145,7 +145,7 @@ trait ResponseHandler
|
||||
$this->checkInSeqNo($newMessage);
|
||||
$newMessage->setSeqNo(null);
|
||||
$tmp->enqueue($newMessage);
|
||||
$this->connection->incomingCtr?->inc();
|
||||
$this->incomingCtr?->inc();
|
||||
$this->incoming_messages[$msg['msg_id']] = $newMessage;
|
||||
}
|
||||
$this->checkInSeqNo($message);
|
||||
@ -161,7 +161,7 @@ trait ResponseHandler
|
||||
} else {
|
||||
$this->msgIdHandler->checkIncomingMessageId($referencedMsgId, true);
|
||||
$message = new MTProtoIncomingMessage($content['orig_message'], $referencedMsgId, $message->unencrypted);
|
||||
$this->connection->incomingCtr?->inc();
|
||||
$this->incomingCtr?->inc();
|
||||
$this->incoming_messages[$referencedMsgId] = $message;
|
||||
$this->handleMessages([$message]);
|
||||
}
|
||||
@ -276,10 +276,16 @@ trait ResponseHandler
|
||||
EventLoop::queue($request->reply(...), $response);
|
||||
}
|
||||
/**
|
||||
* @param array{error_message: string, error_code: int} $response
|
||||
* @return (callable(): Throwable)|null
|
||||
*/
|
||||
private function handleRpcError(MTProtoOutgoingMessage $request, array $response): ?callable
|
||||
{
|
||||
if ($response['error_code'] === 420) {
|
||||
$this->rpcErrors?->inc(['message' => preg_replace('/\d+/', '', $response['error_message']), 'code' => (string) $response['error_code']]);
|
||||
} else {
|
||||
$this->rpcErrors?->inc(['message' => $response['error_message'], 'code' => (string) $response['error_code']]);
|
||||
}
|
||||
if ($request->isMethod
|
||||
&& $request->constructor !== 'auth.bindTempAuthKey'
|
||||
&& $this->shared->hasTempAuthKey()
|
||||
|
@ -42,9 +42,12 @@ trait Session
|
||||
use SeqNoHandler;
|
||||
use CallHandler;
|
||||
use Reliable;
|
||||
public ?BetterGauge $pendingOutgoingGauge;
|
||||
public ?BetterCounter $incomingCtr;
|
||||
public ?BetterCounter $outgoingCtr;
|
||||
public ?BetterGauge $pendingOutgoingGauge = null;
|
||||
public ?BetterCounter $incomingCtr = null;
|
||||
public ?BetterCounter $outgoingCtr = null;
|
||||
public ?BetterCounter $incomingBytesCtr = null;
|
||||
public ?BetterCounter $outgoingBytesCtr = null;
|
||||
public ?BetterCounter $rpcErrors = null;
|
||||
/**
|
||||
* Incoming message array.
|
||||
*
|
||||
@ -176,9 +179,13 @@ trait Session
|
||||
*/
|
||||
public function createSession(): void
|
||||
{
|
||||
$this->pendingOutgoingGauge = $this->API->getPromGauge("MadelineProto", "pending_outgoing_mtproto_messages", "Number of not-yet sent outgoing MTProto messages", ['datacenter' => $this->datacenter, 'connection' => $this->id]);
|
||||
$this->incomingCtr = $this->API->getPromCounter("MadelineProto", "outgoing_mtproto_messages", "Number of received MTProto messages", ['datacenter' => $this->datacenter, 'connection' => $this->id]);
|
||||
$this->outgoingCtr = $this->API->getPromCounter("MadelineProto", "incoming_mtproto_messages", "Number of sent MTProto messages", ['datacenter' => $this->datacenter, 'connection' => $this->id]);
|
||||
$labels = ['datacenter' => (string) $this->datacenter, 'connection' => (string) $this->id];
|
||||
$this->pendingOutgoingGauge = $this->API->getPromGauge("MadelineProto", "pending_outgoing_mtproto_messages", "Number of not-yet sent outgoing MTProto messages", $labels);
|
||||
$this->incomingCtr = $this->API->getPromCounter("MadelineProto", "incoming_mtproto_messages", "Number of received MTProto messages", $labels);
|
||||
$this->outgoingCtr = $this->API->getPromCounter("MadelineProto", "outgoing_mtproto_messages", "Number of sent MTProto messages", $labels);
|
||||
$this->incomingBytesCtr = $this->API->getPromCounter("MadelineProto", "incoming_bytes", "Number of received bytes", $labels);
|
||||
$this->outgoingBytesCtr = $this->API->getPromCounter("MadelineProto", "outgoing_bytes", "Number of sent bytes", $labels);
|
||||
$this->rpcErrors = $this->API->getPromCounter("MadelineProto", "rpc_errors", "Number of received RPC errors by type", $labels);
|
||||
if ($this->session_id === null) {
|
||||
$this->resetSession("creating initial session");
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user