From ec9b8863bd59bdd2dc02ec8acf3b5e66af2734c1 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Sun, 26 Jan 2020 21:06:16 +0100 Subject: [PATCH] Add reliability (1) --- VoIPController.h | 14 +++++++------- controller/controller/PublicAPI.cpp | 2 ++ controller/media/Audio.cpp | 22 +++++++++++++--------- controller/net/JitterBuffer.cpp | 4 ++++ controller/net/JitterBuffer.h | 2 ++ controller/protocol/Legacy.cpp | 18 +++++++++--------- controller/protocol/Loop.cpp | 2 +- controller/protocol/NetworkAPI.cpp | 27 +++++++++++++++++---------- controller/protocol/Protocol.cpp | 6 +++--- 9 files changed, 58 insertions(+), 39 deletions(-) diff --git a/VoIPController.h b/VoIPController.h index dbb967b..635e4b6 100755 --- a/VoIPController.h +++ b/VoIPController.h @@ -482,9 +482,8 @@ protected: uint32_t size; PacketSender *sender; bool lost; - uint8_t retries = 0; }; - struct QueuedPacket + struct ReliableOutgoingPacket { Buffer data; unsigned char type; @@ -493,6 +492,7 @@ protected: double lastSentTime; double retryInterval; double timeout; + uint8_t tries; }; virtual void ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcEndpoint); virtual void ProcessExtraData(Buffer &data); @@ -556,7 +556,7 @@ private: void SendPublicEndpointsRequest(); void SendPublicEndpointsRequest(const Endpoint &relay); Endpoint &GetEndpointByType(const Endpoint::Type type); - void SendPacketReliably(unsigned char type, unsigned char *data, size_t len, double retryInterval, double timeout); + void SendPacketReliably(unsigned char type, unsigned char *data, size_t len, double retryInterval, double timeout, uint8_t tries = 0xFF); inline uint32_t GenerateOutSeq() { return seq++; @@ -573,7 +573,7 @@ private: void UpdateCongestion(); void UpdateAudioBitrate(); void UpdateSignalBars(); - void UpdateQueuedPackets(); + void UpdateReliablePackets(); void SendNopPacket(); void TickJitterBufferAndCongestionControl(); void ResetUdpAvailability(); @@ -638,13 +638,13 @@ private: // More legacy bool legacyParsePacket(BufferInputStream &in, unsigned char &type, uint32_t &ackId, uint32_t &pseq, uint32_t &acks, unsigned char &pflags, size_t &packetInnerLen); - void legacyHandleQueuedPackets(); + void legacyHandleReliablePackets(); void SetupOutgoingVideoStream(); bool WasOutgoingPacketAcknowledged(uint32_t seq); RecentOutgoingPacket *GetRecentOutgoingPacket(uint32_t seq); void NetworkPacketReceived(std::shared_ptr packet); - void TrySendQueuedPackets(); + void TrySendOutgoingPackets(); int state = STATE_WAIT_INIT; std::map endpoints; @@ -713,7 +713,7 @@ private: bool dataSavingRequestedByPeer = false; std::string activeNetItfName; double publicEndpointsReqTime = 0; - std::vector queuedPackets; + std::vector reliablePackets; double connectionInitTime = 0; double lastRecvPacketTime = 0; Config config; diff --git a/controller/controller/PublicAPI.cpp b/controller/controller/PublicAPI.cpp index e30ed76..c76aa80 100644 --- a/controller/controller/PublicAPI.cpp +++ b/controller/controller/PublicAPI.cpp @@ -44,6 +44,8 @@ VoIPController::VoIPController() : ecAudioPackets(4), stm->enabled = 1; stm->frameDuration = 60; outgoingStreams.push_back(stm); + + recentOutgoingPackets.reserve(MAX_RECENT_PACKETS); } VoIPController::~VoIPController() diff --git a/controller/media/Audio.cpp b/controller/media/Audio.cpp index 1dbc7a6..52c49e7 100644 --- a/controller/media/Audio.cpp +++ b/controller/media/Audio.cpp @@ -79,17 +79,21 @@ void VoIPController::HandleAudioInput(unsigned char *data, size_t len, unsigned } unsentStreamPackets++; - PendingOutgoingPacket p{ - /*.seq=*/GenerateOutSeq(), - /*.type=*/PKT_STREAM_DATA, - /*.len=*/pkt.GetLength(), - /*.data=*/Buffer(move(pkt)), - /*.endpoint=*/0, - }; - conctl.PacketSent(p.seq, p.len); + //PendingOutgoingPacket p{ + // /*.seq=*/GenerateOutSeq(), + // /*.type=*/PKT_STREAM_DATA, + // /*.len=*/pkt.GetLength(), + // /*.data=*/Buffer(move(pkt)), + // /*.endpoint=*/0, + //}; - SendOrEnqueuePacket(move(p)); + //conctl.PacketSent(p.seq, p.len); + + shared_ptr outgoingAudioStream = GetStreamByType(STREAM_TYPE_AUDIO, true); + + SendPacketReliably(PKT_STREAM_DATA, pkt.GetBuffer(), pkt.GetLength(), GetAverageRTT(), outgoingAudioStream && outgoingAudioStream->jitterBuffer ? outgoingAudioStream->jitterBuffer->GetTimeoutWindow() : (GetAverageRTT() * 2.0), 10); // Todo Optimize RTT + //SendOrEnqueuePacket(move(p)); if (peerVersion < 7 && secondaryLen && shittyInternetMode) { Buffer ecBuf(secondaryLen); diff --git a/controller/net/JitterBuffer.cpp b/controller/net/JitterBuffer.cpp index afd2e03..ba006d4 100755 --- a/controller/net/JitterBuffer.cpp +++ b/controller/net/JitterBuffer.cpp @@ -66,6 +66,10 @@ int JitterBuffer::GetMinPacketCount() { return (int)minDelay; } +double JitterBuffer::GetTimeoutWindow() +{ + return (lossesToReset * step) / 1000.0; +} void JitterBuffer::HandleInput(unsigned char *data, size_t len, uint32_t timestamp, bool isEC) { diff --git a/controller/net/JitterBuffer.h b/controller/net/JitterBuffer.h index 3b4800e..dce7059 100644 --- a/controller/net/JitterBuffer.h +++ b/controller/net/JitterBuffer.h @@ -42,6 +42,8 @@ public: double GetLastMeasuredJitter(); double GetLastMeasuredDelay(); + double GetTimeoutWindow(); + private: struct jitter_packet_t { diff --git a/controller/protocol/Legacy.cpp b/controller/protocol/Legacy.cpp index 9a6cbd3..28a74ce 100644 --- a/controller/protocol/Legacy.cpp +++ b/controller/protocol/Legacy.cpp @@ -81,22 +81,22 @@ bool VoIPController::legacyParsePacket(BufferInputStream &in, unsigned char &typ } return true; } -void VoIPController::legacyHandleQueuedPackets() +void VoIPController::legacyHandleReliablePackets() { - for (auto it = queuedPackets.begin(); it != queuedPackets.end();) + for (auto it = reliablePackets.begin(); it != reliablePackets.end();) { - QueuedPacket &qp = *it; + ReliableOutgoingPacket &qp = *it; bool didAck = false; for (uint8_t j = 0; j < 16; j++) { - LOGD("queued packet %ld, seq %u=%u", queuedPackets.end() - it, j, qp.seqs[j]); + LOGD("queued packet %ld, seq %u=%u", reliablePackets.end() - it, j, qp.seqs[j]); if (qp.seqs[j] == 0) break; - int remoteAcksIndex = lastRemoteAckSeq - qp.seqs[j]; - //LOGV("remote acks index %u, value %f", remoteAcksIndex, remoteAcksIndex>=0 && remoteAcksIndex<32 ? remoteAcks[remoteAcksIndex] : -1); - if (seqgt(lastRemoteAckSeq, qp.seqs[j]) && remoteAcksIndex >= 0 && remoteAcksIndex < 32) + int distance = lastRemoteAckSeq - qp.seqs[j]; + //LOGV("remote acks index %u, value %f", distance, distance>=0 && distance<32 ? distance[remoteAcksIndex] : -1); + if (seqgt(lastRemoteAckSeq, qp.seqs[j]) && distance >= 0 && distance < 32) { - for (const auto &opkt : recentOutgoingPackets) + for (const auto &opkt : recentOutgoingPackets) // Optimize this { if (opkt.seq == qp.seqs[j] && opkt.ackTime > 0) { @@ -111,7 +111,7 @@ void VoIPController::legacyHandleQueuedPackets() } if (didAck) { - it = queuedPackets.erase(it); + it = reliablePackets.erase(it); continue; } ++it; diff --git a/controller/protocol/Loop.cpp b/controller/protocol/Loop.cpp index d78a429..e0a2f2c 100644 --- a/controller/protocol/Loop.cpp +++ b/controller/protocol/Loop.cpp @@ -115,7 +115,7 @@ void VoIPController::RunRecvThread() if (!writeSockets.empty()) { - messageThread.Post(bind(&VoIPController::TrySendQueuedPackets, this)); + messageThread.Post(bind(&VoIPController::TrySendOutgoingPackets, this)); } } LOGI("=== recv thread exiting ==="); diff --git a/controller/protocol/NetworkAPI.cpp b/controller/protocol/NetworkAPI.cpp index 9a75d0e..486430e 100644 --- a/controller/protocol/NetworkAPI.cpp +++ b/controller/protocol/NetworkAPI.cpp @@ -42,6 +42,7 @@ bool VoIPController::SendOrEnqueuePacket(PendingOutgoingPacket pkt, bool enqueue } canSend = endpoint->socket && endpoint->socket->IsReadyToSend(); } + conctl.PacketSent(pkt.seq, pkt.len); if (!canSend) { if (enqueue) @@ -236,7 +237,7 @@ void VoIPController::InitUDPProxy() } -void VoIPController::TrySendQueuedPackets() +void VoIPController::TrySendOutgoingPackets() { ENFORCE_MSG_THREAD; @@ -373,20 +374,25 @@ void VoIPController::SendRelayPings() } -void VoIPController::UpdateQueuedPackets() +void VoIPController::UpdateReliablePackets() { vector packetsToSend; - for (std::vector::iterator qp = queuedPackets.begin(); qp != queuedPackets.end();) + for (std::vector::iterator qp = reliablePackets.begin(); qp != reliablePackets.end();) { if (qp->timeout > 0 && qp->firstSentTime > 0 && GetCurrentTime() - qp->firstSentTime >= qp->timeout) { LOGD("Removing queued packet because of timeout"); - qp = queuedPackets.erase(qp); + qp = reliablePackets.erase(qp); + continue; + } + if (!qp->tries--) { + LOGD("Removing queued packet because of no more tries"); + qp = reliablePackets.erase(qp); continue; } if (GetCurrentTime() - qp->lastSentTime >= qp->retryInterval) { - messageThread.Post(std::bind(&VoIPController::UpdateQueuedPackets, this), qp->retryInterval); + messageThread.Post(std::bind(&VoIPController::UpdateReliablePackets, this), qp->retryInterval); uint32_t seq = GenerateOutSeq(); qp->seqs.Add(seq); qp->lastSentTime = GetCurrentTime(); @@ -486,12 +492,12 @@ Endpoint &VoIPController::GetEndpointByType(const Endpoint::Type type) throw out_of_range("no endpoint"); } -void VoIPController::SendPacketReliably(unsigned char type, unsigned char *data, size_t len, double retryInterval, double timeout) +void VoIPController::SendPacketReliably(unsigned char type, unsigned char *data, size_t len, double retryInterval, double timeout, uint8_t tries) { ENFORCE_MSG_THREAD; LOGD("Send reliably, type=%u, len=%u, retry=%.3f, timeout=%.3f", type, unsigned(len), retryInterval, timeout); - QueuedPacket pkt; + ReliableOutgoingPacket pkt; if (data) { Buffer b(len); @@ -501,13 +507,14 @@ void VoIPController::SendPacketReliably(unsigned char type, unsigned char *data, pkt.type = type; pkt.retryInterval = retryInterval; pkt.timeout = timeout; + pkt.tries = tries; pkt.firstSentTime = 0; pkt.lastSentTime = 0; - queuedPackets.push_back(move(pkt)); - messageThread.Post(std::bind(&VoIPController::UpdateQueuedPackets, this)); + reliablePackets.push_back(move(pkt)); + messageThread.Post(std::bind(&VoIPController::UpdateReliablePackets, this)); if (timeout > 0.0) { - messageThread.Post(std::bind(&VoIPController::UpdateQueuedPackets, this), timeout); + messageThread.Post(std::bind(&VoIPController::UpdateReliablePackets, this), timeout); } } diff --git a/controller/protocol/Protocol.cpp b/controller/protocol/Protocol.cpp index abb9edc..89de524 100644 --- a/controller/protocol/Protocol.cpp +++ b/controller/protocol/Protocol.cpp @@ -231,7 +231,7 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE recvTS = in.ReadUInt32(); } - if (seqgt(ackId, lastRemoteAckSeq)) // If is **not** out of order or retransmission + if (seqgt(ackId, lastRemoteAckSeq)) { if (waitingForAcks && lastRemoteAckSeq >= firstSentPing) { @@ -296,8 +296,8 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE ++x; } } - else - legacyHandleQueuedPackets(); + //else + legacyHandleReliablePackets(); } Endpoint &_currentEndpoint = endpoints.at(currentEndpoint);