1
0
mirror of https://github.com/danog/libtgvoip.git synced 2024-12-02 09:37:52 +01:00

Add reliability (1)

This commit is contained in:
Daniil Gentili 2020-01-26 21:06:16 +01:00
parent c1ec091f80
commit ec9b8863bd
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
9 changed files with 58 additions and 39 deletions

View File

@ -482,9 +482,8 @@ protected:
uint32_t size; uint32_t size;
PacketSender *sender; PacketSender *sender;
bool lost; bool lost;
uint8_t retries = 0;
}; };
struct QueuedPacket struct ReliableOutgoingPacket
{ {
Buffer data; Buffer data;
unsigned char type; unsigned char type;
@ -493,6 +492,7 @@ protected:
double lastSentTime; double lastSentTime;
double retryInterval; double retryInterval;
double timeout; double timeout;
uint8_t tries;
}; };
virtual void ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcEndpoint); virtual void ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcEndpoint);
virtual void ProcessExtraData(Buffer &data); virtual void ProcessExtraData(Buffer &data);
@ -556,7 +556,7 @@ private:
void SendPublicEndpointsRequest(); void SendPublicEndpointsRequest();
void SendPublicEndpointsRequest(const Endpoint &relay); void SendPublicEndpointsRequest(const Endpoint &relay);
Endpoint &GetEndpointByType(const Endpoint::Type type); Endpoint &GetEndpointByType(const Endpoint::Type type);
void SendPacketReliably(unsigned char type, unsigned char *data, size_t len, double retryInterval, double timeout); void SendPacketReliably(unsigned char type, unsigned char *data, size_t len, double retryInterval, double timeout, uint8_t tries = 0xFF);
inline uint32_t GenerateOutSeq() inline uint32_t GenerateOutSeq()
{ {
return seq++; return seq++;
@ -573,7 +573,7 @@ private:
void UpdateCongestion(); void UpdateCongestion();
void UpdateAudioBitrate(); void UpdateAudioBitrate();
void UpdateSignalBars(); void UpdateSignalBars();
void UpdateQueuedPackets(); void UpdateReliablePackets();
void SendNopPacket(); void SendNopPacket();
void TickJitterBufferAndCongestionControl(); void TickJitterBufferAndCongestionControl();
void ResetUdpAvailability(); void ResetUdpAvailability();
@ -638,13 +638,13 @@ private:
// More legacy // More legacy
bool legacyParsePacket(BufferInputStream &in, unsigned char &type, uint32_t &ackId, uint32_t &pseq, uint32_t &acks, unsigned char &pflags, size_t &packetInnerLen); bool legacyParsePacket(BufferInputStream &in, unsigned char &type, uint32_t &ackId, uint32_t &pseq, uint32_t &acks, unsigned char &pflags, size_t &packetInnerLen);
void legacyHandleQueuedPackets(); void legacyHandleReliablePackets();
void SetupOutgoingVideoStream(); void SetupOutgoingVideoStream();
bool WasOutgoingPacketAcknowledged(uint32_t seq); bool WasOutgoingPacketAcknowledged(uint32_t seq);
RecentOutgoingPacket *GetRecentOutgoingPacket(uint32_t seq); RecentOutgoingPacket *GetRecentOutgoingPacket(uint32_t seq);
void NetworkPacketReceived(std::shared_ptr<NetworkPacket> packet); void NetworkPacketReceived(std::shared_ptr<NetworkPacket> packet);
void TrySendQueuedPackets(); void TrySendOutgoingPackets();
int state = STATE_WAIT_INIT; int state = STATE_WAIT_INIT;
std::map<int64_t, Endpoint> endpoints; std::map<int64_t, Endpoint> endpoints;
@ -713,7 +713,7 @@ private:
bool dataSavingRequestedByPeer = false; bool dataSavingRequestedByPeer = false;
std::string activeNetItfName; std::string activeNetItfName;
double publicEndpointsReqTime = 0; double publicEndpointsReqTime = 0;
std::vector<QueuedPacket> queuedPackets; std::vector<ReliableOutgoingPacket> reliablePackets;
double connectionInitTime = 0; double connectionInitTime = 0;
double lastRecvPacketTime = 0; double lastRecvPacketTime = 0;
Config config; Config config;

View File

@ -44,6 +44,8 @@ VoIPController::VoIPController() : ecAudioPackets(4),
stm->enabled = 1; stm->enabled = 1;
stm->frameDuration = 60; stm->frameDuration = 60;
outgoingStreams.push_back(stm); outgoingStreams.push_back(stm);
recentOutgoingPackets.reserve(MAX_RECENT_PACKETS);
} }
VoIPController::~VoIPController() VoIPController::~VoIPController()

View File

@ -79,17 +79,21 @@ void VoIPController::HandleAudioInput(unsigned char *data, size_t len, unsigned
} }
unsentStreamPackets++; unsentStreamPackets++;
PendingOutgoingPacket p{
/*.seq=*/GenerateOutSeq(),
/*.type=*/PKT_STREAM_DATA,
/*.len=*/pkt.GetLength(),
/*.data=*/Buffer(move(pkt)),
/*.endpoint=*/0,
};
conctl.PacketSent(p.seq, p.len); //PendingOutgoingPacket p{
// /*.seq=*/GenerateOutSeq(),
// /*.type=*/PKT_STREAM_DATA,
// /*.len=*/pkt.GetLength(),
// /*.data=*/Buffer(move(pkt)),
// /*.endpoint=*/0,
//};
SendOrEnqueuePacket(move(p)); //conctl.PacketSent(p.seq, p.len);
shared_ptr<Stream> outgoingAudioStream = GetStreamByType(STREAM_TYPE_AUDIO, true);
SendPacketReliably(PKT_STREAM_DATA, pkt.GetBuffer(), pkt.GetLength(), GetAverageRTT(), outgoingAudioStream && outgoingAudioStream->jitterBuffer ? outgoingAudioStream->jitterBuffer->GetTimeoutWindow() : (GetAverageRTT() * 2.0), 10); // Todo Optimize RTT
//SendOrEnqueuePacket(move(p));
if (peerVersion < 7 && secondaryLen && shittyInternetMode) if (peerVersion < 7 && secondaryLen && shittyInternetMode)
{ {
Buffer ecBuf(secondaryLen); Buffer ecBuf(secondaryLen);

View File

@ -66,6 +66,10 @@ int JitterBuffer::GetMinPacketCount()
{ {
return (int)minDelay; return (int)minDelay;
} }
double JitterBuffer::GetTimeoutWindow()
{
return (lossesToReset * step) / 1000.0;
}
void JitterBuffer::HandleInput(unsigned char *data, size_t len, uint32_t timestamp, bool isEC) void JitterBuffer::HandleInput(unsigned char *data, size_t len, uint32_t timestamp, bool isEC)
{ {

View File

@ -42,6 +42,8 @@ public:
double GetLastMeasuredJitter(); double GetLastMeasuredJitter();
double GetLastMeasuredDelay(); double GetLastMeasuredDelay();
double GetTimeoutWindow();
private: private:
struct jitter_packet_t struct jitter_packet_t
{ {

View File

@ -81,22 +81,22 @@ bool VoIPController::legacyParsePacket(BufferInputStream &in, unsigned char &typ
} }
return true; return true;
} }
void VoIPController::legacyHandleQueuedPackets() void VoIPController::legacyHandleReliablePackets()
{ {
for (auto it = queuedPackets.begin(); it != queuedPackets.end();) for (auto it = reliablePackets.begin(); it != reliablePackets.end();)
{ {
QueuedPacket &qp = *it; ReliableOutgoingPacket &qp = *it;
bool didAck = false; bool didAck = false;
for (uint8_t j = 0; j < 16; j++) for (uint8_t j = 0; j < 16; j++)
{ {
LOGD("queued packet %ld, seq %u=%u", queuedPackets.end() - it, j, qp.seqs[j]); LOGD("queued packet %ld, seq %u=%u", reliablePackets.end() - it, j, qp.seqs[j]);
if (qp.seqs[j] == 0) if (qp.seqs[j] == 0)
break; break;
int remoteAcksIndex = lastRemoteAckSeq - qp.seqs[j]; int distance = lastRemoteAckSeq - qp.seqs[j];
//LOGV("remote acks index %u, value %f", remoteAcksIndex, remoteAcksIndex>=0 && remoteAcksIndex<32 ? remoteAcks[remoteAcksIndex] : -1); //LOGV("remote acks index %u, value %f", distance, distance>=0 && distance<32 ? distance[remoteAcksIndex] : -1);
if (seqgt(lastRemoteAckSeq, qp.seqs[j]) && remoteAcksIndex >= 0 && remoteAcksIndex < 32) if (seqgt(lastRemoteAckSeq, qp.seqs[j]) && distance >= 0 && distance < 32)
{ {
for (const auto &opkt : recentOutgoingPackets) for (const auto &opkt : recentOutgoingPackets) // Optimize this
{ {
if (opkt.seq == qp.seqs[j] && opkt.ackTime > 0) if (opkt.seq == qp.seqs[j] && opkt.ackTime > 0)
{ {
@ -111,7 +111,7 @@ void VoIPController::legacyHandleQueuedPackets()
} }
if (didAck) if (didAck)
{ {
it = queuedPackets.erase(it); it = reliablePackets.erase(it);
continue; continue;
} }
++it; ++it;

View File

@ -115,7 +115,7 @@ void VoIPController::RunRecvThread()
if (!writeSockets.empty()) if (!writeSockets.empty())
{ {
messageThread.Post(bind(&VoIPController::TrySendQueuedPackets, this)); messageThread.Post(bind(&VoIPController::TrySendOutgoingPackets, this));
} }
} }
LOGI("=== recv thread exiting ==="); LOGI("=== recv thread exiting ===");

View File

@ -42,6 +42,7 @@ bool VoIPController::SendOrEnqueuePacket(PendingOutgoingPacket pkt, bool enqueue
} }
canSend = endpoint->socket && endpoint->socket->IsReadyToSend(); canSend = endpoint->socket && endpoint->socket->IsReadyToSend();
} }
conctl.PacketSent(pkt.seq, pkt.len);
if (!canSend) if (!canSend)
{ {
if (enqueue) if (enqueue)
@ -236,7 +237,7 @@ void VoIPController::InitUDPProxy()
} }
void VoIPController::TrySendQueuedPackets() void VoIPController::TrySendOutgoingPackets()
{ {
ENFORCE_MSG_THREAD; ENFORCE_MSG_THREAD;
@ -373,20 +374,25 @@ void VoIPController::SendRelayPings()
} }
void VoIPController::UpdateQueuedPackets() void VoIPController::UpdateReliablePackets()
{ {
vector<PendingOutgoingPacket> packetsToSend; vector<PendingOutgoingPacket> packetsToSend;
for (std::vector<QueuedPacket>::iterator qp = queuedPackets.begin(); qp != queuedPackets.end();) for (std::vector<ReliableOutgoingPacket>::iterator qp = reliablePackets.begin(); qp != reliablePackets.end();)
{ {
if (qp->timeout > 0 && qp->firstSentTime > 0 && GetCurrentTime() - qp->firstSentTime >= qp->timeout) if (qp->timeout > 0 && qp->firstSentTime > 0 && GetCurrentTime() - qp->firstSentTime >= qp->timeout)
{ {
LOGD("Removing queued packet because of timeout"); LOGD("Removing queued packet because of timeout");
qp = queuedPackets.erase(qp); qp = reliablePackets.erase(qp);
continue;
}
if (!qp->tries--) {
LOGD("Removing queued packet because of no more tries");
qp = reliablePackets.erase(qp);
continue; continue;
} }
if (GetCurrentTime() - qp->lastSentTime >= qp->retryInterval) if (GetCurrentTime() - qp->lastSentTime >= qp->retryInterval)
{ {
messageThread.Post(std::bind(&VoIPController::UpdateQueuedPackets, this), qp->retryInterval); messageThread.Post(std::bind(&VoIPController::UpdateReliablePackets, this), qp->retryInterval);
uint32_t seq = GenerateOutSeq(); uint32_t seq = GenerateOutSeq();
qp->seqs.Add(seq); qp->seqs.Add(seq);
qp->lastSentTime = GetCurrentTime(); qp->lastSentTime = GetCurrentTime();
@ -486,12 +492,12 @@ Endpoint &VoIPController::GetEndpointByType(const Endpoint::Type type)
throw out_of_range("no endpoint"); throw out_of_range("no endpoint");
} }
void VoIPController::SendPacketReliably(unsigned char type, unsigned char *data, size_t len, double retryInterval, double timeout) void VoIPController::SendPacketReliably(unsigned char type, unsigned char *data, size_t len, double retryInterval, double timeout, uint8_t tries)
{ {
ENFORCE_MSG_THREAD; ENFORCE_MSG_THREAD;
LOGD("Send reliably, type=%u, len=%u, retry=%.3f, timeout=%.3f", type, unsigned(len), retryInterval, timeout); LOGD("Send reliably, type=%u, len=%u, retry=%.3f, timeout=%.3f", type, unsigned(len), retryInterval, timeout);
QueuedPacket pkt; ReliableOutgoingPacket pkt;
if (data) if (data)
{ {
Buffer b(len); Buffer b(len);
@ -501,13 +507,14 @@ void VoIPController::SendPacketReliably(unsigned char type, unsigned char *data,
pkt.type = type; pkt.type = type;
pkt.retryInterval = retryInterval; pkt.retryInterval = retryInterval;
pkt.timeout = timeout; pkt.timeout = timeout;
pkt.tries = tries;
pkt.firstSentTime = 0; pkt.firstSentTime = 0;
pkt.lastSentTime = 0; pkt.lastSentTime = 0;
queuedPackets.push_back(move(pkt)); reliablePackets.push_back(move(pkt));
messageThread.Post(std::bind(&VoIPController::UpdateQueuedPackets, this)); messageThread.Post(std::bind(&VoIPController::UpdateReliablePackets, this));
if (timeout > 0.0) if (timeout > 0.0)
{ {
messageThread.Post(std::bind(&VoIPController::UpdateQueuedPackets, this), timeout); messageThread.Post(std::bind(&VoIPController::UpdateReliablePackets, this), timeout);
} }
} }

View File

@ -231,7 +231,7 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
recvTS = in.ReadUInt32(); recvTS = in.ReadUInt32();
} }
if (seqgt(ackId, lastRemoteAckSeq)) // If is **not** out of order or retransmission if (seqgt(ackId, lastRemoteAckSeq))
{ {
if (waitingForAcks && lastRemoteAckSeq >= firstSentPing) if (waitingForAcks && lastRemoteAckSeq >= firstSentPing)
{ {
@ -296,8 +296,8 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
++x; ++x;
} }
} }
else //else
legacyHandleQueuedPackets(); legacyHandleReliablePackets();
} }
Endpoint &_currentEndpoint = endpoints.at(currentEndpoint); Endpoint &_currentEndpoint = endpoints.at(currentEndpoint);