1
0
mirror of https://github.com/danog/libtgvoip.git synced 2024-11-26 20:24:38 +01:00

Refactoring

This commit is contained in:
Daniil Gentili 2020-01-27 16:02:59 +01:00
parent ec9b8863bd
commit f9bba1effa
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
9 changed files with 142 additions and 237 deletions

View File

@ -16,5 +16,6 @@
#include "controller/protocol/NetworkAPI.cpp"
#include "controller/protocol/Protocol.cpp"
#include "controller/protocol/Tick.cpp"
#include "controller/protocol/Reliable.cpp"
#include "controller/protocol/Legacy.cpp"

View File

@ -638,10 +638,12 @@ private:
// More legacy
bool legacyParsePacket(BufferInputStream &in, unsigned char &type, uint32_t &ackId, uint32_t &pseq, uint32_t &acks, unsigned char &pflags, size_t &packetInnerLen);
void legacyHandleReliablePackets();
void legacyWritePacketHeader(uint32_t pseq, uint32_t acks, BufferOutputStream *s, unsigned char type, uint32_t length, PacketSender *source);
void handleReliablePackets();
void SetupOutgoingVideoStream();
bool WasOutgoingPacketAcknowledged(uint32_t seq);
bool WasOutgoingPacketAcknowledged(uint32_t seq, bool checkAll = true);
RecentOutgoingPacket *GetRecentOutgoingPacket(uint32_t seq);
void NetworkPacketReceived(std::shared_ptr<NetworkPacket> packet);
void TrySendOutgoingPackets();
@ -663,7 +665,13 @@ private:
// Seqno of last sent packet
uint32_t lastSentSeq = 0;
// Status list of acked seqnos, starting from the seq explicitly present in the packet + up to 32 seqs ago
std::array<uint32_t, 33> peerAcks{0};
// Recent ougoing packets
std::vector<RecentOutgoingPacket> recentOutgoingPackets;
// Recent incoming packets
std::array<uint32_t, MAX_RECENT_PACKETS> recentIncomingSeqs{};
size_t recentIncomingSeqIdx = 0;

View File

@ -24,6 +24,7 @@ void VoIPController::HandleAudioInput(unsigned char *data, size_t len, unsigned
shared_ptr<Buffer> secondaryDataBufPtr = make_shared<Buffer>(move(secondaryDataBuf));
messageThread.Post([this, dataBufPtr, secondaryDataBufPtr, len, secondaryLen]() {
/*
unsentStreamPacketsHistory.Add(static_cast<unsigned int>(unsentStreamPackets));
if (unsentStreamPacketsHistory.Average() >= maxUnsentStreamPackets && !videoPacketSender)
{
@ -32,11 +33,11 @@ void VoIPController::HandleAudioInput(unsigned char *data, size_t len, unsigned
unsentStreamPacketsHistory.Reset();
unsentStreamPackets = 0;
}
if (waitingForAcks || dontSendPackets > 0 || ((unsigned int)unsentStreamPackets >= maxUnsentStreamPackets /*&& endpoints[currentEndpoint].type==Endpoint::Type::TCP_RELAY*/))
{
//if (waitingForAcks || dontSendPackets > 0 || ((unsigned int)unsentStreamPackets >= maxUnsentStreamPackets))
/*{
LOGV("waiting for queue, dropping outgoing audio packet, %d %d %d [%d]", (unsigned int)unsentStreamPackets, waitingForAcks, dontSendPackets, maxUnsentStreamPackets);
return;
}
}*/
//LOGV("Audio packet size %u", (unsigned int)len);
if (!receivedInitAck)
return;

View File

@ -81,39 +81,106 @@ bool VoIPController::legacyParsePacket(BufferInputStream &in, unsigned char &typ
}
return true;
}
void VoIPController::legacyHandleReliablePackets()
void VoIPController::legacyWritePacketHeader(uint32_t pseq, uint32_t acks, BufferOutputStream *s, unsigned char type, uint32_t length, PacketSender *source)
{
for (auto it = reliablePackets.begin(); it != reliablePackets.end();)
if (state == STATE_WAIT_INIT || state == STATE_WAIT_INIT_ACK)
{
ReliableOutgoingPacket &qp = *it;
bool didAck = false;
for (uint8_t j = 0; j < 16; j++)
s->WriteInt32(TLID_DECRYPTED_AUDIO_BLOCK);
int64_t randomID;
crypto.rand_bytes((uint8_t *)&randomID, 8);
s->WriteInt64(randomID);
unsigned char randBytes[7];
crypto.rand_bytes(randBytes, 7);
s->WriteByte(7);
s->WriteBytes(randBytes, 7);
uint32_t pflags = PFLAG_HAS_RECENT_RECV | PFLAG_HAS_SEQ;
if (length > 0)
pflags |= PFLAG_HAS_DATA;
if (state == STATE_WAIT_INIT || state == STATE_WAIT_INIT_ACK)
{
LOGD("queued packet %ld, seq %u=%u", reliablePackets.end() - it, j, qp.seqs[j]);
if (qp.seqs[j] == 0)
break;
int distance = lastRemoteAckSeq - qp.seqs[j];
//LOGV("remote acks index %u, value %f", distance, distance>=0 && distance<32 ? distance[remoteAcksIndex] : -1);
if (seqgt(lastRemoteAckSeq, qp.seqs[j]) && distance >= 0 && distance < 32)
pflags |= PFLAG_HAS_CALL_ID | PFLAG_HAS_PROTO;
}
pflags |= ((uint32_t)type) << 24;
s->WriteInt32(pflags);
if (pflags & PFLAG_HAS_CALL_ID)
{
s->WriteBytes(callID, 16);
}
s->WriteInt32(lastRemoteSeq);
s->WriteInt32(pseq);
s->WriteInt32(acks);
if (pflags & PFLAG_HAS_PROTO)
{
s->WriteInt32(PROTOCOL_NAME);
}
if (length > 0)
{
if (length <= 253)
{
for (const auto &opkt : recentOutgoingPackets) // Optimize this
{
if (opkt.seq == qp.seqs[j] && opkt.ackTime > 0)
{
LOGD("did ack seq %u, removing", qp.seqs[j]);
didAck = true;
break;
}
}
if (didAck)
break;
s->WriteByte((unsigned char)length);
}
else
{
s->WriteByte(254);
s->WriteByte((unsigned char)(length & 0xFF));
s->WriteByte((unsigned char)((length >> 8) & 0xFF));
s->WriteByte((unsigned char)((length >> 16) & 0xFF));
}
}
if (didAck)
{
it = reliablePackets.erase(it);
continue;
}
++it;
}
}
else
{
s->WriteInt32(TLID_SIMPLE_AUDIO_BLOCK);
int64_t randomID;
crypto.rand_bytes((uint8_t *)&randomID, 8);
s->WriteInt64(randomID);
unsigned char randBytes[7];
crypto.rand_bytes(randBytes, 7);
s->WriteByte(7);
s->WriteBytes(randBytes, 7);
uint32_t lenWithHeader = length + 13;
if (lenWithHeader > 0)
{
if (lenWithHeader <= 253)
{
s->WriteByte((unsigned char)lenWithHeader);
}
else
{
s->WriteByte(254);
s->WriteByte((unsigned char)(lenWithHeader & 0xFF));
s->WriteByte((unsigned char)((lenWithHeader >> 8) & 0xFF));
s->WriteByte((unsigned char)((lenWithHeader >> 16) & 0xFF));
}
}
s->WriteByte(type);
s->WriteInt32(lastRemoteSeq);
s->WriteInt32(pseq);
s->WriteInt32(acks);
if (peerVersion >= 6)
{
if (currentExtras.empty())
{
s->WriteByte(0);
}
else
{
s->WriteByte(XPFLAG_HAS_EXTRA);
s->WriteByte(static_cast<unsigned char>(currentExtras.size()));
for (vector<UnacknowledgedExtraData>::iterator x = currentExtras.begin(); x != currentExtras.end(); ++x)
{
LOGV("Writing extra into header: type %u, length %d", x->type, int(x->data.Length()));
assert(x->data.Length() <= 254);
s->WriteByte(static_cast<unsigned char>(x->data.Length() + 1));
s->WriteByte(x->type);
s->WriteBytes(*x->data, x->data.Length());
if (x->firstContainingSeq == 0)
x->firstContainingSeq = pseq;
}
}
}
}
}

View File

@ -3,7 +3,6 @@
using namespace tgvoip;
using namespace std;
bool VoIPController::SendOrEnqueuePacket(PendingOutgoingPacket pkt, bool enqueue, PacketSender *source)
{
ENFORCE_MSG_THREAD;
@ -78,7 +77,7 @@ void VoIPController::SendPacket(unsigned char *data, size_t len, Endpoint &ep, P
out.WriteBytes((unsigned char *)ep.peerTag, 16);
else if (peerVersion < 9)
out.WriteBytes(callID, 16);
if (len > 0)
{
encryptPacket(data, len, out);
@ -99,7 +98,6 @@ void VoIPController::SendPacket(unsigned char *data, size_t len, Endpoint &ep, P
ep.type == Endpoint::Type::TCP_RELAY ? ep.socket : nullptr});
}
void VoIPController::SendInit()
{
ENFORCE_MSG_THREAD;
@ -236,7 +234,6 @@ void VoIPController::InitUDPProxy()
messageThread.Post(bind(&VoIPController::ResetUdpAvailability, this));
}
void VoIPController::TrySendOutgoingPackets()
{
ENFORCE_MSG_THREAD;
@ -268,8 +265,21 @@ void VoIPController::TrySendOutgoingPackets()
}
}
bool VoIPController::WasOutgoingPacketAcknowledged(uint32_t seq)
bool VoIPController::WasOutgoingPacketAcknowledged(uint32_t seq, bool checkAll)
{
if (seqgt(seq, lastRemoteAckSeq))
return false;
if (seq == lastRemoteAckSeq)
return true;
uint32_t distance = lastRemoteAckSeq - seq;
if (distance > 0 && distance <= 32) {
return peerAcks[distance];
}
if (!checkAll)
return false;
RecentOutgoingPacket *pkt = GetRecentOutgoingPacket(seq);
if (!pkt)
return false;
@ -374,49 +384,6 @@ void VoIPController::SendRelayPings()
}
void VoIPController::UpdateReliablePackets()
{
vector<PendingOutgoingPacket> packetsToSend;
for (std::vector<ReliableOutgoingPacket>::iterator qp = reliablePackets.begin(); qp != reliablePackets.end();)
{
if (qp->timeout > 0 && qp->firstSentTime > 0 && GetCurrentTime() - qp->firstSentTime >= qp->timeout)
{
LOGD("Removing queued packet because of timeout");
qp = reliablePackets.erase(qp);
continue;
}
if (!qp->tries--) {
LOGD("Removing queued packet because of no more tries");
qp = reliablePackets.erase(qp);
continue;
}
if (GetCurrentTime() - qp->lastSentTime >= qp->retryInterval)
{
messageThread.Post(std::bind(&VoIPController::UpdateReliablePackets, this), qp->retryInterval);
uint32_t seq = GenerateOutSeq();
qp->seqs.Add(seq);
qp->lastSentTime = GetCurrentTime();
//LOGD("Sending queued packet, seq=%u, type=%u, len=%u", seq, qp.type, qp.data.Length());
Buffer buf(qp->data.Length());
if (qp->firstSentTime == 0)
qp->firstSentTime = qp->lastSentTime;
if (qp->data.Length())
buf.CopyFrom(qp->data, qp->data.Length());
packetsToSend.push_back(PendingOutgoingPacket{
/*.seq=*/seq,
/*.type=*/qp->type,
/*.len=*/qp->data.Length(),
/*.data=*/move(buf),
/*.endpoint=*/0});
}
++qp;
}
for (PendingOutgoingPacket &pkt : packetsToSend)
{
SendOrEnqueuePacket(move(pkt));
}
}
void VoIPController::SendNopPacket()
{
if (state != STATE_ESTABLISHED)
@ -461,8 +428,6 @@ void VoIPController::SendPublicEndpointsRequest()
}
}
void VoIPController::SendPublicEndpointsRequest(const Endpoint &relay)
{
if (!useUDP)
@ -492,31 +457,6 @@ Endpoint &VoIPController::GetEndpointByType(const Endpoint::Type type)
throw out_of_range("no endpoint");
}
void VoIPController::SendPacketReliably(unsigned char type, unsigned char *data, size_t len, double retryInterval, double timeout, uint8_t tries)
{
ENFORCE_MSG_THREAD;
LOGD("Send reliably, type=%u, len=%u, retry=%.3f, timeout=%.3f", type, unsigned(len), retryInterval, timeout);
ReliableOutgoingPacket pkt;
if (data)
{
Buffer b(len);
b.CopyFrom(data, 0, len);
pkt.data = move(b);
}
pkt.type = type;
pkt.retryInterval = retryInterval;
pkt.timeout = timeout;
pkt.tries = tries;
pkt.firstSentTime = 0;
pkt.lastSentTime = 0;
reliablePackets.push_back(move(pkt));
messageThread.Post(std::bind(&VoIPController::UpdateReliablePackets, this));
if (timeout > 0.0)
{
messageThread.Post(std::bind(&VoIPController::UpdateReliablePackets, this), timeout);
}
}
void VoIPController::SendExtra(Buffer &data, unsigned char type)
{
@ -591,7 +531,6 @@ void VoIPController::ResetEndpointPingStats()
}
}
void VoIPController::SendStreamFlags(Stream &stream)
{
ENFORCE_MSG_THREAD;

View File

@ -248,15 +248,10 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
lastRemoteAckSeq = ackId;
conctl.PacketAcknowledged(ackId);
// Status list of acked seqnos, starting from the seq explicitly present in the packet + up to 32 seqs ago
std::array<uint32_t, 33> peerAcks{0};
peerAcks[0] = ackId;
for (unsigned int i = 1; i <= 32; i++)
{
if ((acks >> (32 - i)) & 1)
{
peerAcks[i] = ackId - i;
}
peerAcks[i] = (acks >> (32 - i)) & 1 ? ackId - i : 0;
}
for (auto &opkt : recentOutgoingPackets)
@ -297,7 +292,7 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
}
}
//else
legacyHandleReliablePackets();
handleReliablePackets();
}
Endpoint &_currentEndpoint = endpoints.at(currentEndpoint);
@ -1020,15 +1015,8 @@ void VoIPController::WritePacketHeader(uint32_t pseq, BufferOutputStream *s, uns
s->WriteInt32(lastRemoteSeq);
s->WriteInt32(pseq);
s->WriteInt32(acks);
unsigned char flags;
if (currentExtras.empty())
{
flags = 0;
}
else
{
flags = XPFLAG_HAS_EXTRA;
}
unsigned char flags = currentExtras.empty() ? 0 : XPFLAG_HAS_EXTRA;
shared_ptr<Stream> videoStream = GetStreamByType(STREAM_TYPE_VIDEO, false);
if (peerVersion >= 9 && videoStream && videoStream->enabled)
@ -1039,15 +1027,15 @@ void VoIPController::WritePacketHeader(uint32_t pseq, BufferOutputStream *s, uns
if (!currentExtras.empty())
{
s->WriteByte(static_cast<unsigned char>(currentExtras.size()));
for (vector<UnacknowledgedExtraData>::iterator x = currentExtras.begin(); x != currentExtras.end(); ++x)
for (auto &x : currentExtras)
{
LOGV("Writing extra into header: type %u, length %d", x->type, int(x->data.Length()));
assert(x->data.Length() <= 254);
s->WriteByte(static_cast<unsigned char>(x->data.Length() + 1));
s->WriteByte(x->type);
s->WriteBytes(*x->data, x->data.Length());
if (x->firstContainingSeq == 0)
x->firstContainingSeq = pseq;
LOGV("Writing extra into header: type %u, length %d", x.type, int(x.data.Length()));
assert(x.data.Length() <= 254);
s->WriteByte(static_cast<unsigned char>(x.data.Length() + 1));
s->WriteByte(x.type);
s->WriteBytes(*x.data, x.data.Length());
if (x.firstContainingSeq == 0)
x.firstContainingSeq = pseq;
}
}
if (peerVersion >= 9 && videoStream && videoStream->enabled)
@ -1057,104 +1045,7 @@ void VoIPController::WritePacketHeader(uint32_t pseq, BufferOutputStream *s, uns
}
else
{
if (state == STATE_WAIT_INIT || state == STATE_WAIT_INIT_ACK)
{
s->WriteInt32(TLID_DECRYPTED_AUDIO_BLOCK);
int64_t randomID;
crypto.rand_bytes((uint8_t *)&randomID, 8);
s->WriteInt64(randomID);
unsigned char randBytes[7];
crypto.rand_bytes(randBytes, 7);
s->WriteByte(7);
s->WriteBytes(randBytes, 7);
uint32_t pflags = PFLAG_HAS_RECENT_RECV | PFLAG_HAS_SEQ;
if (length > 0)
pflags |= PFLAG_HAS_DATA;
if (state == STATE_WAIT_INIT || state == STATE_WAIT_INIT_ACK)
{
pflags |= PFLAG_HAS_CALL_ID | PFLAG_HAS_PROTO;
}
pflags |= ((uint32_t)type) << 24;
s->WriteInt32(pflags);
if (pflags & PFLAG_HAS_CALL_ID)
{
s->WriteBytes(callID, 16);
}
s->WriteInt32(lastRemoteSeq);
s->WriteInt32(pseq);
s->WriteInt32(acks);
if (pflags & PFLAG_HAS_PROTO)
{
s->WriteInt32(PROTOCOL_NAME);
}
if (length > 0)
{
if (length <= 253)
{
s->WriteByte((unsigned char)length);
}
else
{
s->WriteByte(254);
s->WriteByte((unsigned char)(length & 0xFF));
s->WriteByte((unsigned char)((length >> 8) & 0xFF));
s->WriteByte((unsigned char)((length >> 16) & 0xFF));
}
}
}
else
{
s->WriteInt32(TLID_SIMPLE_AUDIO_BLOCK);
int64_t randomID;
crypto.rand_bytes((uint8_t *)&randomID, 8);
s->WriteInt64(randomID);
unsigned char randBytes[7];
crypto.rand_bytes(randBytes, 7);
s->WriteByte(7);
s->WriteBytes(randBytes, 7);
uint32_t lenWithHeader = length + 13;
if (lenWithHeader > 0)
{
if (lenWithHeader <= 253)
{
s->WriteByte((unsigned char)lenWithHeader);
}
else
{
s->WriteByte(254);
s->WriteByte((unsigned char)(lenWithHeader & 0xFF));
s->WriteByte((unsigned char)((lenWithHeader >> 8) & 0xFF));
s->WriteByte((unsigned char)((lenWithHeader >> 16) & 0xFF));
}
}
s->WriteByte(type);
s->WriteInt32(lastRemoteSeq);
s->WriteInt32(pseq);
s->WriteInt32(acks);
if (peerVersion >= 6)
{
if (currentExtras.empty())
{
s->WriteByte(0);
}
else
{
s->WriteByte(XPFLAG_HAS_EXTRA);
s->WriteByte(static_cast<unsigned char>(currentExtras.size()));
for (vector<UnacknowledgedExtraData>::iterator x = currentExtras.begin(); x != currentExtras.end(); ++x)
{
LOGV("Writing extra into header: type %u, length %d", x->type, int(x->data.Length()));
assert(x->data.Length() <= 254);
s->WriteByte(static_cast<unsigned char>(x->data.Length() + 1));
s->WriteByte(x->type);
s->WriteBytes(*x->data, x->data.Length());
if (x->firstContainingSeq == 0)
x->firstContainingSeq = pseq;
}
}
}
}
legacyWritePacketHeader(pseq, acks, s, type, length, source);
}
unacknowledgedIncomingPacketCount = 0;

View File

@ -161,7 +161,6 @@ void VoIPController::TickJitterBufferAndCongestionControl()
void VoIPController::UpdateRTT()
{
rttHistory.Add(GetAverageRTT());
//double v=rttHistory.Average();
if (rttHistory[0] > 10.0 && rttHistory[8] > 10.0 && (networkType == NET_TYPE_EDGE || networkType == NET_TYPE_GPRS))
{
waitingForAcks = true;

View File

@ -358,7 +358,7 @@ public:
return data[_i];
}
size_t Size() const
constexpr size_t Size() const
{
return size;
}

View File

@ -4,8 +4,7 @@
// you should have received with this source code distribution.
//
#ifndef LIBTGVOIP_UTILS_H
#define LIBTGVOIP_UTILS_H
#pragma once
#define TGVOIP_DISALLOW_COPY_AND_ASSIGN(TypeName) \
TypeName(const TypeName&) = delete; \
@ -16,4 +15,4 @@ TGVOIP_DISALLOW_COPY_AND_ASSIGN(TypeName); \
TypeName(TypeName&&) = default; \
TypeName& operator=(TypeName&&) = default
#endif /* LIBTGVOIP_UTILS_H */