From 4af8a894910a608d079b3f92a7812455b9775632 Mon Sep 17 00:00:00 2001 From: Daniil Gentili Date: Wed, 29 Jan 2020 15:52:43 +0100 Subject: [PATCH] Finish adaptation --- VoIPController.h | 11 +- controller/PrivateDefines.h | 3 +- controller/audio/AudioPacketSender.cpp | 5 + controller/controller/Init.cpp | 2 - controller/net/JitterBuffer.cpp | 8 +- controller/net/JitterBuffer.h | 9 +- controller/net/PacketSender.h | 4 +- controller/protocol/Bandwidth.cpp | 2 +- controller/protocol/Endpoints.cpp | 23 ++--- controller/protocol/Loop.cpp | 8 +- controller/protocol/NetworkAPI.cpp | 36 +++---- controller/protocol/PacketManager.cpp | 15 ++- controller/protocol/PacketManager.h | 14 ++- controller/protocol/PacketStructs.h | 4 + controller/protocol/Protocol.cpp | 138 ++++++++++++++++++------- controller/protocol/Reliable.cpp | 4 +- controller/protocol/Tick.cpp | 2 +- 17 files changed, 184 insertions(+), 104 deletions(-) diff --git a/VoIPController.h b/VoIPController.h index 1633813..4432ff2 100755 --- a/VoIPController.h +++ b/VoIPController.h @@ -437,8 +437,8 @@ public: protected: virtual void ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcEndpoint); virtual void ProcessExtraData(Buffer &data); - virtual void WritePacketHeader(uint32_t seq, BufferOutputStream *s, unsigned char type, uint32_t length, PacketSender *source); - virtual void SendPacket(unsigned char *data, size_t len, Endpoint &ep, PendingOutgoingPacket &srcPacket); + virtual uint8_t WritePacketHeader(PendingOutgoingPacket &pkt, BufferOutputStream &s, PacketSender *source); + virtual void SendPacket(unsigned char *data, size_t len, Endpoint &ep, uint32_t seq, uint8_t type, uint8_t transportId); virtual void SendInit(); virtual void SendUdpPing(Endpoint &endpoint); virtual void SendRelayPings(); @@ -451,6 +451,7 @@ protected: std::shared_ptr GetStreamByType(StreamType type, bool outgoing); std::shared_ptr GetStreamByID(unsigned char id, bool outgoing); Endpoint *GetEndpointForPacket(const PendingOutgoingPacket &pkt); + Endpoint *GetEndpointById(const int64_t id); bool SendOrEnqueuePacket(PendingOutgoingPacket pkt, bool enqueue = true, PacketSender *source = NULL); CellularCarrierInfo GetCarrierInfo(); @@ -502,7 +503,7 @@ private: void UpdateAudioBitrate(); void UpdateSignalBars(); void UpdateReliablePackets(); - void SendNopPacket(); + void SendNopPacket(PacketManager &pm); void TickJitterBufferAndCongestionControl(); void ResetUdpAvailability(); inline static std::string NetworkTypeToString(int type) @@ -572,7 +573,6 @@ private: void SetupOutgoingVideoStream(); bool WasOutgoingPacketAcknowledged(uint32_t seq, bool checkAll = true); - RecentOutgoingPacket *GetRecentOutgoingPacket(uint32_t seq); void NetworkPacketReceived(std::shared_ptr packet); void TrySendOutgoingPackets(); @@ -616,9 +616,6 @@ private: bool micMuted = false; uint32_t maxBitrate; - // Recent ougoing packets - std::vector recentOutgoingPackets; - // std::vector> outgoingStreams; std::vector> incomingStreams; diff --git a/controller/PrivateDefines.h b/controller/PrivateDefines.h index 0dea0df..d4d984e 100644 --- a/controller/PrivateDefines.h +++ b/controller/PrivateDefines.h @@ -34,7 +34,7 @@ enum ProtocolVersions }; #define PROTOCOL_NAME 0x50567247 // "GrVP" in little endian (reversed here) -#define PROTOCOL_VERSION 10 +#define PROTOCOL_VERSION 9 #define MIN_PROTOCOL_VERSION 3 #define STREAM_DATA_FLAG_LEN16 0x40 @@ -72,6 +72,7 @@ enum ProtocolVersions #define XPFLAG_HAS_EXTRA 1 #define XPFLAG_HAS_RECV_TS 2 +#define XPFLAG_HAS_TRANSPORT_ID 4 #define EXTRA_TYPE_STREAM_FLAGS 1 #define EXTRA_TYPE_STREAM_CSD 2 diff --git a/controller/audio/AudioPacketSender.cpp b/controller/audio/AudioPacketSender.cpp index 89631cb..e1e6c23 100644 --- a/controller/audio/AudioPacketSender.cpp +++ b/controller/audio/AudioPacketSender.cpp @@ -74,6 +74,8 @@ void AudioPacketSender::SendFrame(unsigned char *data, size_t len, unsigned char pkt.WriteInt32(audioTimestampOut); pkt.WriteBytes(*dataBufPtr, 0, len); + //LOGE("SEND: For pts %u = seq %u, using seq %u", audioTimestampOut, audioTimestampOut/60 + 1, packetManager.getLocalSeq()); + if (hasExtraFEC) { Buffer ecBuf(secondaryLen); @@ -108,6 +110,9 @@ void AudioPacketSender::SendFrame(unsigned char *data, size_t len, unsigned char if (PeerVersion() < PROTOCOL_RELIABLE) { + // Need to increase this anyway to go hand in hand with timestamp + packetManager.nextLocalSeq(); + double rtt = LastRtt(); rtt = !rtt || rtt > 0.3 ? 0.5 : rtt; // Tweak this (a lot) later diff --git a/controller/controller/Init.cpp b/controller/controller/Init.cpp index 9feeb31..bbdbfb1 100644 --- a/controller/controller/Init.cpp +++ b/controller/controller/Init.cpp @@ -42,8 +42,6 @@ VoIPController::VoIPController() : rawSendQueue(64) stm->packetSender = std::make_unique(this, nullptr, stm); outgoingStreams.push_back(stm); - - recentOutgoingPackets.reserve(MAX_RECENT_PACKETS); } void VoIPController::InitializeTimers() diff --git a/controller/net/JitterBuffer.cpp b/controller/net/JitterBuffer.cpp index 1e035ab..c61abd1 100755 --- a/controller/net/JitterBuffer.cpp +++ b/controller/net/JitterBuffer.cpp @@ -96,7 +96,7 @@ void JitterBuffer::PutInternal(jitter_packet_t *pkt, bool overwriteExisting) { if (!slots[i].buffer.IsEmpty() && slots[i].timestamp == pkt->timestamp) { - //LOGV("Found existing packet for timestamp %u, overwrite %d", pkt->timestamp, overwriteExisting); + LOGV("Found existing packet for timestamp %u, overwrite %d", pkt->timestamp, overwriteExisting); if (overwriteExisting) { slots[i].buffer.CopyFrom(pkt->buffer, pkt->size); @@ -113,7 +113,7 @@ void JitterBuffer::PutInternal(jitter_packet_t *pkt, bool overwriteExisting) outstandingDelayChange = 0; nextFetchTimestamp = static_cast(static_cast(pkt->timestamp) - step * minDelay); first = true; - LOGI("jitter: resyncing, next timeDecodeNextFramestamp = %lld (step=%d, minDelay=%f)", (long long int)nextFetchTimestamp, step, minDelay); + LOGI("jitter: resyncing, next timestamp = %lld (step=%d, minDelay=%f)", (long long int)nextFetchTimestamp, step, minDelay); } for (i = 0; i < JITTER_SLOT_COUNT; i++) @@ -154,13 +154,13 @@ void JitterBuffer::PutInternal(jitter_packet_t *pkt, bool overwriteExisting) // Late packet check if (pkt->timestamp < nextFetchTimestamp) { - //LOGW("jitter: would drop packet with timestamp %d because it is late but not hopelessly", pkt->timestamp); + LOGW("jitter: would drop packet with timestamp %d because it is late but not hopelessly", pkt->timestamp); latePacketCount++; lostPackets--; } else if (pkt->timestamp < nextFetchTimestamp - 1) { - //LOGW("jitter: dropping packet with timestamp %d because it is too late", pkt->timestamp); + LOGW("jitter: dropping packet with timestamp %d because it is too late", pkt->timestamp); latePacketCount++; return; } diff --git a/controller/net/JitterBuffer.h b/controller/net/JitterBuffer.h index 8211385..dc517de 100644 --- a/controller/net/JitterBuffer.h +++ b/controller/net/JitterBuffer.h @@ -10,11 +10,13 @@ #include #include #include +#include #include #include "controller/media/MediaStreamItf.h" #include "tools/BlockingQueue.h" #include "tools/Buffers.h" #include "tools/threading.h" +#include "tools/logging.h" #define JITTER_SLOT_COUNT 64 #define JITTER_SLOT_SIZE 1024 @@ -48,9 +50,10 @@ public: // Any sequence numbers smaller than this cannot possibly arrive in time for playing. inline uint32_t GetSeqTooLate(double rtt) { + //LOGE("Next fetch timestamp: %ld, rtt %lf, step %d", nextFetchTimestamp.load(), rtt * 1000, step) // The absolute minimum time(stamp) that will (barely) be accepted by the jitter buffer in time + RTT time - // Then convert timestamp into a seqno: remember, in protocol >= PROTOCOL_RELIABLE, seq = ts * step - return ((nextFetchTimestamp + (rtt * 1000)) / static_cast(step)) / 1000; + // Then convert timestamp into a seqno: remember, in protocol >= PROTOCOL_RELIABLE, seq = ts * step + 1 + return ((nextFetchTimestamp + (rtt * 1000)) / static_cast(step) + 1) - lostCount; // Seqs start at 1 } private: @@ -70,7 +73,7 @@ private: Mutex mutex; uint32_t step; std::array slots; - int64_t nextFetchTimestamp = 0; // What frame to read next + std::atomic nextFetchTimestamp = ATOMIC_VAR_INIT(0); // What frame to read next double minDelay = 6; uint32_t minMinDelay; uint32_t maxMinDelay; diff --git a/controller/net/PacketSender.h b/controller/net/PacketSender.h index 826246a..03cd2d6 100644 --- a/controller/net/PacketSender.h +++ b/controller/net/PacketSender.h @@ -16,7 +16,7 @@ namespace tgvoip class PacketSender { public: - PacketSender(VoIPController *controller, const std::shared_ptr &stream) : controller(controller), stream(stream), packetManager(stream->id){}; + PacketSender(VoIPController *controller, const std::shared_ptr &stream) : controller(controller), stream(stream), packetManager(stream->id - 1){}; virtual ~PacketSender() = default; virtual void PacketAcknowledged(uint32_t seq, double sendTime, double ackTime, uint8_t type, uint32_t size) = 0; virtual void PacketLost(uint32_t seq, uint8_t type, uint32_t size) = 0; @@ -102,6 +102,8 @@ protected: std::shared_ptr stream; PacketManager packetManager; + + std::vector reliableQueue; }; } // namespace tgvoip diff --git a/controller/protocol/Bandwidth.cpp b/controller/protocol/Bandwidth.cpp index e0943da..6cf1a9f 100644 --- a/controller/protocol/Bandwidth.cpp +++ b/controller/protocol/Bandwidth.cpp @@ -18,7 +18,7 @@ double VoIPController::GetAverageRTT() { double res = 0; int count = 0; - for (const auto &packet : recentOutgoingPackets) + for (const auto &packet : pm.getRecentOutgoingPackets()) { if (packet.rttTime) { diff --git a/controller/protocol/Endpoints.cpp b/controller/protocol/Endpoints.cpp index 32a7eee..db8f83a 100644 --- a/controller/protocol/Endpoints.cpp +++ b/controller/protocol/Endpoints.cpp @@ -10,31 +10,31 @@ Endpoint &VoIPController::GetRemoteEndpoint() Endpoint *VoIPController::GetEndpointForPacket(const PendingOutgoingPacket &pkt) { - Endpoint *endpoint = nullptr; - if (pkt.endpoint) + return GetEndpointById(pkt.endpoint); +} + +Endpoint *VoIPController::GetEndpointById(const int64_t id) +{ + if (id) { try { - endpoint = &endpoints.at(pkt.endpoint); + return &endpoints.at(id); } catch (out_of_range &x) { - LOGW("Unable to send packet via nonexistent endpoint %" PRIu64, pkt.endpoint); + LOGW("Unable to send packet via nonexistent endpoint %" PRIu64, id); return NULL; } } - if (!endpoint) - endpoint = &endpoints.at(currentEndpoint); - return endpoint; + return &endpoints.at(currentEndpoint); } - int64_t VoIPController::GetPreferredRelayID() { return preferredRelay; } - void VoIPController::SetRemoteEndpoints(vector endpoints, bool allowP2p, int32_t connectionMaxLayer) { LOGW("Set remote endpoints, allowP2P=%d, connectionMaxLayer=%u", allowP2p ? 1 : 0, connectionMaxLayer); @@ -69,10 +69,6 @@ void VoIPController::SetRemoteEndpoints(vector endpoints, bool allowP2 AddIPv6Relays(); } - - - - void VoIPController::AddIPv6Relays() { if (!myIPv6.IsEmpty() && !didAddIPv6Relays) @@ -149,4 +145,3 @@ void VoIPController::AddTCPRelays() didAddTcpRelays = true; } } - diff --git a/controller/protocol/Loop.cpp b/controller/protocol/Loop.cpp index e0a2f2c..2308918 100644 --- a/controller/protocol/Loop.cpp +++ b/controller/protocol/Loop.cpp @@ -216,12 +216,12 @@ void VoIPController::NetworkPacketReceived(shared_ptr _packet) else stats.bytesRecvdWifi += (uint64_t)packet.data.Length(); - try - { + /*try + {*/ ProcessIncomingPacket(packet, endpoints.at(srcEndpointID)); - } + /*} catch (out_of_range &x) { LOGW("Error parsing packet: %s", x.what()); - } + }*/ } diff --git a/controller/protocol/NetworkAPI.cpp b/controller/protocol/NetworkAPI.cpp index 3a36512..0a1cb86 100644 --- a/controller/protocol/NetworkAPI.cpp +++ b/controller/protocol/NetworkAPI.cpp @@ -41,7 +41,6 @@ bool VoIPController::SendOrEnqueuePacket(PendingOutgoingPacket pkt, bool enqueue } canSend = endpoint->socket && endpoint->socket->IsReadyToSend(); } - conctl.PacketSent(pkt.seq, pkt.len); if (!canSend) { if (enqueue) @@ -51,13 +50,12 @@ bool VoIPController::SendOrEnqueuePacket(PendingOutgoingPacket pkt, bool enqueue } return false; } + conctl.PacketSent(pkt.seq, pkt.len); if ((endpoint->type == Endpoint::Type::TCP_RELAY && useTCP) || (endpoint->type != Endpoint::Type::TCP_RELAY && useUDP)) { - //BufferOutputStream p(buf, sizeof(buf)); - BufferOutputStream p(1500); - WritePacketHeader(pkt.seq, &p, pkt.type, (uint32_t)pkt.len, source); - p.WriteBytes(pkt.data); - SendPacket(p.GetBuffer(), p.GetLength(), *endpoint, pkt); + BufferOutputStream out(1500); + uint8_t transportId = WritePacketHeader(pkt, out, source); + SendPacket(out.GetBuffer(), out.GetLength(), *endpoint, pkt.seq, pkt.type, transportId); if (pkt.type == PKT_STREAM_DATA) { unsentStreamPackets--; @@ -66,7 +64,7 @@ bool VoIPController::SendOrEnqueuePacket(PendingOutgoingPacket pkt, bool enqueue return true; } -void VoIPController::SendPacket(unsigned char *data, size_t len, Endpoint &ep, PendingOutgoingPacket &srcPacket) +void VoIPController::SendPacket(unsigned char *data, size_t len, Endpoint &ep, uint32_t seq, uint8_t type, uint8_t transportId) { if (stopping) return; @@ -84,9 +82,9 @@ void VoIPController::SendPacket(unsigned char *data, size_t len, Endpoint &ep, P } //LOGV("Sending %d bytes to %s:%d", out.GetLength(), ep.address.ToString().c_str(), ep.port); - //#ifdef LOG_PACKETS - LOGV("Sending: to=%s:%u, seq=%u, length=%u, type=%s", ep.GetAddress().ToString().c_str(), ep.port, srcPacket.seq, (unsigned int)out.GetLength(), GetPacketTypeString(srcPacket.type).c_str()); - //#endif + #ifdef LOG_PACKETS + LOGV("Sending: to=%s:%u, seq=%u, length=%u, type=%s, transportId=%hhu", ep.GetAddress().ToString().c_str(), ep.port, seq, (unsigned int)out.GetLength(), GetPacketTypeString(type).c_str(), transportId); + #endif rawSendQueue.Put( RawPendingOutgoingPacket{ @@ -266,17 +264,6 @@ void VoIPController::TrySendOutgoingPackets() } -RecentOutgoingPacket *VoIPController::GetRecentOutgoingPacket(uint32_t seq) -{ - for (RecentOutgoingPacket &opkt : recentOutgoingPackets) - { - if (opkt.seq == seq) - { - return &opkt; - } - } - return nullptr; -} void VoIPController::SendRelayPings() { @@ -363,16 +350,17 @@ void VoIPController::SendRelayPings() } } -void VoIPController::SendNopPacket() +void VoIPController::SendNopPacket(PacketManager &pm) { if (state != STATE_ESTABLISHED) return; + PacketSender *source = pm.getTransportId() == 0xFF ? nullptr : outgoingStreams[pm.getTransportId()]->packetSender.get(); SendOrEnqueuePacket(PendingOutgoingPacket{ - /*.seq=*/(firstSentPing = packetManager.nextLocalSeq()), + /*.seq=*/(firstSentPing = pm.nextLocalSeq()), /*.type=*/PKT_NOP, /*.len=*/0, /*.data=*/Buffer(), - /*.endpoint=*/0}); + /*.endpoint=*/0}, source); } void VoIPController::SendPublicEndpointsRequest() diff --git a/controller/protocol/PacketManager.cpp b/controller/protocol/PacketManager.cpp index 2d2b0aa..d8cf0bf 100644 --- a/controller/protocol/PacketManager.cpp +++ b/controller/protocol/PacketManager.cpp @@ -6,6 +6,7 @@ using namespace std; PacketManager::PacketManager(uint8_t transportId) : transportId(transportId) { + recentOutgoingPackets.reserve(MAX_RECENT_PACKETS); } void PacketManager::ackLocal(uint32_t ackId, uint32_t mask) { @@ -60,4 +61,16 @@ bool PacketManager::ackRemoteSeq(uint32_t ackId) return false; } return true; -} \ No newline at end of file +} + +RecentOutgoingPacket *PacketManager::GetRecentOutgoingPacket(uint32_t seq) +{ + for (RecentOutgoingPacket &opkt : recentOutgoingPackets) + { + if (opkt.seq == seq) + { + return &opkt; + } + } + return nullptr; +} diff --git a/controller/protocol/PacketManager.h b/controller/protocol/PacketManager.h index cf4fe83..0da9ec3 100644 --- a/controller/protocol/PacketManager.h +++ b/controller/protocol/PacketManager.h @@ -26,7 +26,7 @@ public: virtual ~PacketManager() = default; // Transport ID for multiplexing - inline uint8_t getTransportId() + inline uint8_t getTransportId() { return transportId; } @@ -114,5 +114,17 @@ private: // Recent incoming remote packets uint32_t lastRemoteSeqsMask; + +public: // Recent outgoing packet list + inline std::vector &getRecentOutgoingPackets() + { + return recentOutgoingPackets; + } + + RecentOutgoingPacket *GetRecentOutgoingPacket(uint32_t seq); + +private: + // Recent ougoing packets + std::vector recentOutgoingPackets; }; } // namespace tgvoip \ No newline at end of file diff --git a/controller/protocol/PacketStructs.h b/controller/protocol/PacketStructs.h index 50ed8c9..b09078d 100644 --- a/controller/protocol/PacketStructs.h +++ b/controller/protocol/PacketStructs.h @@ -8,6 +8,10 @@ namespace tgvoip class PacketSender; struct RecentOutgoingPacket { + // For simple NACK reliable resending + int64_t endpoint; + Buffer data; + uint32_t seq; uint16_t id; // for group calls only double sendTime; diff --git a/controller/protocol/Protocol.cpp b/controller/protocol/Protocol.cpp index 804e6ea..ad8ef86 100644 --- a/controller/protocol/Protocol.cpp +++ b/controller/protocol/Protocol.cpp @@ -187,15 +187,16 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE if (peerVersion >= 8 || (!peerVersion && connectionMaxLayer >= 92)) { type = in.ReadByte(); - if (peerVersion >= PROTOCOL_RELIABLE) - { - transportId = in.ReadByte(); - } ackId = in.ReadUInt32(); pseq = in.ReadUInt32(); acks = in.ReadUInt32(); pflags = in.ReadByte(); packetInnerLen = innerLen - 14; + + if (pflags & XPFLAG_HAS_TRANSPORT_ID) + { + transportId = in.ReadByte(); + } } else if (!legacyParsePacket(in, type, ackId, pseq, acks, pflags, packetInnerLen)) { @@ -211,6 +212,10 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE return; } + #ifdef LOG_PACKETS + LOGV("Received: from=%s:%u, seq=%u, length=%u, type=%s, transportId=%hhu", srcEndpoint.GetAddress().ToString().c_str(), srcEndpoint.port, pseq, (unsigned int)packet.data.Length(), GetPacketTypeString(type).c_str(), transportId); + #endif + // Extra data if (pflags & XPFLAG_HAS_EXTRA) { @@ -247,18 +252,29 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE conctl.PacketAcknowledged(ackId); manager->ackLocal(ackId, acks); + + if (transportId != 0xFF && !incomingStreams.empty() && incomingStreams[transportId]->jitterBuffer) + { + // Technically I should be using the specific packet manager's rtt history but will separate later + uint32_t tooOldSeq = incomingStreams[transportId]->jitterBuffer->GetSeqTooLate(rttHistory[0]); + LOGW("Reverse acking seqs older than %u, newest seq received from remote %u (transportId %hhu)", tooOldSeq, manager->getLastRemoteSeq(), transportId); + manager->ackRemoteSeqsOlderThan(tooOldSeq); + } - for (auto &opkt : recentOutgoingPackets) + for (auto &opkt : manager->getRecentOutgoingPackets()) { if (opkt.ackTime) + { continue; + } if (manager->wasLocalAcked(opkt.seq)) { + opkt.data = Buffer(); opkt.ackTime = GetCurrentTime(); opkt.rttTime = opkt.ackTime - opkt.sendTime; if (opkt.lost) { - LOGW("acknowledged lost packet %u", opkt.seq); + LOGW("acknowledged lost packet %u (transportId %hhu)", opkt.seq, transportId); sendLosses--; } if (opkt.sender && !opkt.lost) @@ -269,6 +285,34 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE // TODO move this to a PacketSender conctl.PacketAcknowledged(opkt.seq); } + else if (peerVersion >= PROTOCOL_RELIABLE && opkt.data.Length() && opkt.seq < manager->getLastAckedSeq()) + { + if (manager->getLastAckedSeq() - opkt.seq > 32) + { + LOGW("Marking reliable NACK packet %u as lost, since is more than 32 seqs old (last acked %u, transportId %hhu)", opkt.seq, manager->getLastAckedSeq(), transportId); + opkt.lost = true; + opkt.data = Buffer(); + continue; + } + if (opkt.sendTime + rttHistory[0] < VoIPController::GetCurrentTime()) + { + LOGW("It is possibly a bit too early to resend NACK packet %u (transportId %hhu)", opkt.seq, transportId); + } + LOGW("Resending reliable NACK packet %u, (transportId %hhu)", opkt.seq, transportId); + BufferOutputStream copy(opkt.data.Length()); + copy.WriteBytes(opkt.data); + Endpoint *endpoint = GetEndpointById(opkt.endpoint); + if (!endpoint) + { + LOGE("Recent NACK queue contained packet (%u) for nonexistent endpoint, (transportId %hhu)", opkt.seq, transportId); + opkt.lost = true; + opkt.data = Buffer(); + continue; + } + opkt.sendTime = GetCurrentTime(); + SendPacket(copy.GetBuffer(), copy.GetLength(), *endpoint, opkt.seq, opkt.type, transportId); + //SendOrEnqueuePacket() + } } if (peerVersion >= 6) @@ -316,17 +360,12 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE } }*/ - unacknowledgedIncomingPacketCount++; - if (unacknowledgedIncomingPacketCount > unackNopThreshold) + if (unacknowledgedIncomingPacketCount++ > unackNopThreshold) { //LOGV("Sending nop packet as ack"); - SendNopPacket(); + SendNopPacket(packetManager); } - //#ifdef LOG_PACKETS - LOGV("Received: from=%s:%u, seq=%u, length=%u, type=%s", srcEndpoint.GetAddress().ToString().c_str(), srcEndpoint.port, pseq, (unsigned int)packet.data.Length(), GetPacketTypeString(type).c_str()); - //#endif - //LOGV("acks: %u -> %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf", manager.getLastAckedSeq()(), remoteAcks[0], remoteAcks[1], remoteAcks[2], remoteAcks[3], remoteAcks[4], remoteAcks[5], remoteAcks[6], remoteAcks[7]); //LOGD("recv: %u -> %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf", getLastRemoteSeq(), recvPacketTimes[0], recvPacketTimes[1], recvPacketTimes[2], recvPacketTimes[3], recvPacketTimes[4], recvPacketTimes[5], recvPacketTimes[6], recvPacketTimes[7]); //LOGI("RTT = %.3lf", GetAverageRTT()); @@ -588,6 +627,8 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE uint32_t pts = in.ReadUInt32(); unsigned char fragmentCount = 1; unsigned char fragmentIndex = 0; + //LOGE("RECV: For pts %u = seq %u, got seq %u", pts, pts/60 + 1, pseq); + //LOGD("stream data, pts=%d, len=%d, rem=%d", pts, sdlen, in.Remaining()); audioTimestampIn = pts; if (!audioOutStarted && audioOutput) @@ -622,10 +663,14 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE { if (stm->jitterBuffer) { - if (peerVersion >= PROTOCOL_RELIABLE) { - manager->ackRemoteSeqsOlderThan(stm->jitterBuffer->GetSeqTooLate(rttHistory[0])); - } stm->jitterBuffer->HandleInput(static_cast(buffer + in.GetOffset()), sdlen, pts, false); + /*if (peerVersion >= PROTOCOL_RELIABLE) + { + // Technically I should be using the specific packet manager's rtt history but will separate later + uint32_t tooOldSeq = stm->jitterBuffer->GetSeqTooLate(rttHistory[0]) - 1; + LOGW("Reverse acking seqs older than %u, newest acked seq %u (transportId %hhu)", tooOldSeq, manager->getLastRemoteSeq(), transportId); + manager->ackRemoteSeqsOlderThan(tooOldSeq); + }*/ if (extraFEC) { in.Seek(in.GetOffset() + sdlen); @@ -993,21 +1038,17 @@ void VoIPController::ProcessAcknowledgedOutgoingExtra(UnacknowledgedExtraData &e } } -void VoIPController::WritePacketHeader(uint32_t pseq, BufferOutputStream *s, unsigned char type, uint32_t length, PacketSender *source) +uint8_t VoIPController::WritePacketHeader(PendingOutgoingPacket &pkt, BufferOutputStream &s, PacketSender *source) { - PacketManager &manager = peerVersion >= PROTOCOL_RELIABLE ? source->getPacketManager() : packetManager; + PacketManager &manager = peerVersion >= PROTOCOL_RELIABLE && source ? source->getPacketManager() : packetManager; uint32_t acks = manager.getRemoteAckMask(); if (peerVersion >= 8 || (!peerVersion && connectionMaxLayer >= 92)) { - s->WriteByte(type); - if (peerVersion >= PROTOCOL_RELIABLE) - { - s->WriteByte(manager.getTransportId()); - } - s->WriteInt32(manager.getLastRemoteSeq()); - s->WriteInt32(pseq); - s->WriteInt32(acks); + s.WriteByte(pkt.type); + s.WriteInt32(manager.getLastRemoteSeq()); + s.WriteInt32(pkt.seq); + s.WriteInt32(acks); unsigned char flags = currentExtras.empty() ? 0 : XPFLAG_HAS_EXTRA; @@ -1015,47 +1056,68 @@ void VoIPController::WritePacketHeader(uint32_t pseq, BufferOutputStream *s, uns if (peerVersion >= 9 && videoStream && videoStream->enabled) flags |= XPFLAG_HAS_RECV_TS; - s->WriteByte(flags); + if (peerVersion >= PROTOCOL_RELIABLE && manager.getTransportId() != 0xFF) + flags |= XPFLAG_HAS_TRANSPORT_ID; + + s.WriteByte(flags); + + if (flags & XPFLAG_HAS_TRANSPORT_ID) + { + s.WriteByte(manager.getTransportId()); + } if (!currentExtras.empty()) { - s->WriteByte(static_cast(currentExtras.size())); + s.WriteByte(static_cast(currentExtras.size())); for (auto &x : currentExtras) { LOGV("Writing extra into header: type %u, length %d", x.type, int(x.data.Length())); assert(x.data.Length() <= 254); - s->WriteByte(static_cast(x.data.Length() + 1)); - s->WriteByte(x.type); - s->WriteBytes(*x.data, x.data.Length()); + s.WriteByte(static_cast(x.data.Length() + 1)); + s.WriteByte(x.type); + s.WriteBytes(*x.data, x.data.Length()); if (x.firstContainingSeq == 0) - x.firstContainingSeq = pseq; + x.firstContainingSeq = pkt.seq; } } if (peerVersion >= 9 && videoStream && videoStream->enabled) { - s->WriteUInt32((lastRecvPacketTime - connectionInitTime) * 1000.0); + s.WriteUInt32((lastRecvPacketTime - connectionInitTime) * 1000.0); } } else { - legacyWritePacketHeader(pseq, acks, s, type, length); + legacyWritePacketHeader(pkt.seq, acks, &s, pkt.type, pkt.len); } + s.WriteBytes(pkt.data); + + Buffer copyBuf(s.GetLength()); + if (peerVersion >= PROTOCOL_RELIABLE) + copyBuf.CopyFrom(s.GetBuffer(), 0, s.GetLength()); + unacknowledgedIncomingPacketCount = 0; + + auto &recentOutgoingPackets = manager.getRecentOutgoingPackets(); + recentOutgoingPackets.push_back(RecentOutgoingPacket{ - pseq, + pkt.endpoint, + std::move(copyBuf), + pkt.seq, 0, GetCurrentTime(), 0, 0, - type, - length, + pkt.type, + static_cast(pkt.len), source, false}); while (recentOutgoingPackets.size() > MAX_RECENT_PACKETS) { recentOutgoingPackets.erase(recentOutgoingPackets.begin()); } - manager.setLastSentSeq(pseq); + manager.setLastSentSeq(pkt.seq); + + return manager.getTransportId(); //LOGI("packet header size %d", s->GetLength()); } \ No newline at end of file diff --git a/controller/protocol/Reliable.cpp b/controller/protocol/Reliable.cpp index 8cc9667..de12146 100644 --- a/controller/protocol/Reliable.cpp +++ b/controller/protocol/Reliable.cpp @@ -95,13 +95,13 @@ void VoIPController::handleReliablePackets() bool VoIPController::WasOutgoingPacketAcknowledged(uint32_t seq, bool checkAll) { - bool res = getBestPacketManager().wasLocalAcked(seq); + bool res = packetManager.wasLocalAcked(seq); if (res || !checkAll) { return res; } - RecentOutgoingPacket *pkt = GetRecentOutgoingPacket(seq); + RecentOutgoingPacket *pkt = packetManager.GetRecentOutgoingPacket(seq); if (!pkt) return false; return pkt->ackTime != 0.0; diff --git a/controller/protocol/Tick.cpp b/controller/protocol/Tick.cpp index be50216..efee4b5 100644 --- a/controller/protocol/Tick.cpp +++ b/controller/protocol/Tick.cpp @@ -136,7 +136,7 @@ void VoIPController::TickJitterBufferAndCongestionControl() double currentTime = GetCurrentTime(); double rtt = GetAverageRTT(); double packetLossTimeout = std::max(rtt * 2.0, 0.1); - for (RecentOutgoingPacket &pkt : recentOutgoingPackets) + for (RecentOutgoingPacket &pkt : getBestPacketManager().getRecentOutgoingPackets()) { if (pkt.ackTime != 0.0 || pkt.lost) continue;