diff --git a/VoIPController.h b/VoIPController.h index d6657a6..1633813 100755 --- a/VoIPController.h +++ b/VoIPController.h @@ -138,7 +138,7 @@ struct CellularCarrierInfo class PacketSender; -class VoIPController : PacketManager +class VoIPController { friend class VoIPGroupController; friend class PacketSender; @@ -400,7 +400,7 @@ public: STREAM_TYPE_VIDEO }; - struct Stream : PacketManager + struct Stream { int32_t userID; uint8_t id; @@ -566,7 +566,7 @@ private: // More legacy bool legacyParsePacket(BufferInputStream &in, unsigned char &type, uint32_t &ackId, uint32_t &pseq, uint32_t &acks, unsigned char &pflags, size_t &packetInnerLen); - void legacyWritePacketHeader(uint32_t pseq, uint32_t acks, BufferOutputStream *s, unsigned char type, uint32_t length, PacketSender *source); + void legacyWritePacketHeader(uint32_t pseq, uint32_t acks, BufferOutputStream *s, unsigned char type, uint32_t length); void handleReliablePackets(); @@ -616,14 +616,15 @@ private: bool micMuted = false; uint32_t maxBitrate; - // Recent ougoing packets std::vector recentOutgoingPackets; - + // std::vector> outgoingStreams; std::vector> incomingStreams; + PacketManager &getBestPacketManager(); + unsigned char encryptionKey[256]; unsigned char keyFingerprint[8]; unsigned char callID[16]; @@ -699,6 +700,8 @@ private: bool needRate = false; BlockingQueue rawSendQueue; + PacketManager packetManager; + uint32_t initTimeoutID = MessageThread::INVALID_ID; uint32_t udpPingTimeoutID = MessageThread::INVALID_ID; diff --git a/VoIPGroupController.cpp b/VoIPGroupController.cpp index 2df0d04..8c6672d 100644 --- a/VoIPGroupController.cpp +++ b/VoIPGroupController.cpp @@ -438,7 +438,7 @@ void VoIPGroupController::SendPacket(unsigned char *data, size_t len, Endpoint & while (recentSentPackets.size() > 64) recentSentPackets.erase(recentSentPackets.begin()); } - lastSentSeq = srcPacket.seq; + packetManager.setLastSentSeq(srcPacket.seq); if (IS_MOBILE_NETWORK(networkType)) stats.bytesSentMobile += (uint64_t)out.GetLength(); @@ -599,7 +599,7 @@ std::string VoIPGroupController::GetDebugString() (int)(conctl.GetAverageRTT() * 1000), (int)(conctl.GetMinimumRTT() * 1000), int(conctl.GetInflightDataSize()), int(conctl.GetCongestionWindow()), keyFingerprint[0], keyFingerprint[1], keyFingerprint[2], keyFingerprint[3], keyFingerprint[4], keyFingerprint[5], keyFingerprint[6], keyFingerprint[7], - lastSentSeq, getLastAckedSeq(), + getBestPacketManager().getLastSentSeq(), getBestPacketManager().getLastAckedSeq(), conctl.GetSendLossCount(), recvLossCount, encoder ? encoder->GetPacketLoss() : 0, encoder ? (encoder->GetBitrate() / 1000) : 0, (long long unsigned int)(stats.bytesSentMobile + stats.bytesSentWifi), diff --git a/controller/audio/AudioPacketSender.cpp b/controller/audio/AudioPacketSender.cpp index 1430d62..89631cb 100644 --- a/controller/audio/AudioPacketSender.cpp +++ b/controller/audio/AudioPacketSender.cpp @@ -3,7 +3,7 @@ using namespace tgvoip; -AudioPacketSender::AudioPacketSender(VoIPController *controller, const std::shared_ptr &encoder, const std::shared_ptr &stream) : PacketSender(controller), encoder(encoder), stream(stream) +AudioPacketSender::AudioPacketSender(VoIPController *controller, const std::shared_ptr &encoder, const std::shared_ptr &stream) : PacketSender(controller, stream), encoder(encoder) { SetSource(encoder); } @@ -121,9 +121,16 @@ void AudioPacketSender::SendFrame(unsigned char *data, size_t len, unsigned char } else { - + PendingOutgoingPacket p{ + /*.seq=*/0, + /*.type=*/PKT_STREAM_DATA, + /*.len=*/pkt.GetLength(), + /*.data=*/Buffer(move(pkt)), + /*.endpoint=*/0, + }; + + SendPacket(move(p)); } - //SendOrEnqueuePacket(move(p)); if (PeerVersion() < 7 && secondaryLen && shittyInternetMode) { Buffer ecBuf(secondaryLen); diff --git a/controller/audio/AudioPacketSender.h b/controller/audio/AudioPacketSender.h index 6b31e05..028a4a9 100644 --- a/controller/audio/AudioPacketSender.h +++ b/controller/audio/AudioPacketSender.h @@ -45,7 +45,6 @@ private: void SendFrame(unsigned char *data, size_t len, unsigned char *secondaryData, size_t secondaryLen); std::shared_ptr encoder; - std::shared_ptr stream; uint32_t audioTimestampOut = 0; diff --git a/controller/controller/Init.cpp b/controller/controller/Init.cpp index 06e2c5b..9feeb31 100644 --- a/controller/controller/Init.cpp +++ b/controller/controller/Init.cpp @@ -66,9 +66,9 @@ void VoIPController::InitializeTimers() statsDump << std::setprecision(3) << GetCurrentTime() - connectionInitTime << endpoints.at(currentEndpoint).rtts[0] - << getLastRemoteSeq() - << (uint32_t)getLocalSeq() - << getLastAckedSeq() + << getBestPacketManager().getLastRemoteSeq() + << (uint32_t)getBestPacketManager().getLocalSeq() + << getBestPacketManager().getLastAckedSeq() << recvLossCount << conctl.GetSendLossCount() << (int)conctl.GetInflightDataSize() diff --git a/controller/controller/PublicAPI.cpp b/controller/controller/PublicAPI.cpp index 3160487..938f00c 100644 --- a/controller/controller/PublicAPI.cpp +++ b/controller/controller/PublicAPI.cpp @@ -218,6 +218,7 @@ string VoIPController::GetDebugString() jitterBuffer->GetAverageLateCount(avgLate); else memset(avgLate, 0, 3 * sizeof(double)); + PacketManager &manager = getBestPacketManager(); snprintf(buffer, sizeof(buffer), "Jitter buffer: %d/%.2f | %.1f, %.1f, %.1f\n" "RTT avg/min: %d/%d\n" @@ -237,7 +238,7 @@ string VoIPController::GetDebugString() int(conctl.GetInflightDataSize()), int(conctl.GetCongestionWindow()), keyFingerprint[0], keyFingerprint[1], keyFingerprint[2], keyFingerprint[3], keyFingerprint[4], keyFingerprint[5], keyFingerprint[6], keyFingerprint[7], useMTProto2 ? " (MTProto2.0)" : "", - lastSentSeq, getLastAckedSeq(), getLastRemoteSeq(), + manager.getLastSentSeq(), manager.getLastAckedSeq(), manager.getLastRemoteSeq(), sendLosses, recvLossCount, encoder ? encoder->GetPacketLoss() : 0, encoder ? (encoder->GetBitrate() / 1000) : 0, static_cast(unsentStreamPackets), @@ -413,7 +414,7 @@ string VoIPController::GetDebugLog() {"tcp_used", useTCP}, {"p2p_type", p2pType}, {"packet_stats", json11::Json::object{ - {"out", (int)getLocalSeq()}, + {"out", (int)getBestPacketManager().getLocalSeq()}, {"in", (int)packetsReceived}, {"lost_out", (int)conctl.GetSendLossCount()}, {"lost_in", (int)recvLossCount}}}, diff --git a/controller/net/PacketSender.h b/controller/net/PacketSender.h index 84df9b6..826246a 100644 --- a/controller/net/PacketSender.h +++ b/controller/net/PacketSender.h @@ -7,6 +7,7 @@ #include "../../VoIPController.h" #include "../protocol/PacketStructs.h" +#include "../protocol/PacketManager.h" #include #include @@ -15,11 +16,15 @@ namespace tgvoip class PacketSender { public: - PacketSender(VoIPController *controller) : controller(controller){}; + PacketSender(VoIPController *controller, const std::shared_ptr &stream) : controller(controller), stream(stream), packetManager(stream->id){}; virtual ~PacketSender() = default; virtual void PacketAcknowledged(uint32_t seq, double sendTime, double ackTime, uint8_t type, uint32_t size) = 0; virtual void PacketLost(uint32_t seq, uint8_t type, uint32_t size) = 0; + inline PacketManager &getPacketManager() + { + return packetManager; + } protected: inline void SendExtra(Buffer &data, unsigned char type) { @@ -33,13 +38,13 @@ protected: inline uint32_t SendPacket(PendingOutgoingPacket pkt) { - uint32_t seq = controller->nextLocalSeq(); + uint32_t seq = controller->peerVersion < PROTOCOL_RELIABLE ? controller->packetManager.nextLocalSeq() : packetManager.nextLocalSeq(); pkt.seq = seq; controller->SendOrEnqueuePacket(std::move(pkt), true, this); return seq; } - inline void SendPacketReliably(unsigned char type, unsigned char *data, size_t len, double retryInterval, double timeout, uint8_t tries = 0xFF) + inline void SendPacketReliably(unsigned char type, unsigned char *data, size_t len, double retryInterval, double timeout, uint8_t tries = 0xFF) { controller->SendPacketReliably(type, data, len, retryInterval, timeout, tries); } @@ -93,6 +98,10 @@ protected: } VoIPController *controller; + + std::shared_ptr stream; + + PacketManager packetManager; }; } // namespace tgvoip diff --git a/controller/protocol/Bandwidth.cpp b/controller/protocol/Bandwidth.cpp index 49af2ca..e0943da 100644 --- a/controller/protocol/Bandwidth.cpp +++ b/controller/protocol/Bandwidth.cpp @@ -9,10 +9,10 @@ using namespace std; double VoIPController::GetAverageRTT() { ENFORCE_MSG_THREAD; - - if (lastSentSeq >= getLastAckedSeq()) + PacketManager &pm = getBestPacketManager(); + if (pm.getLastSentSeq() >= pm.getLastAckedSeq()) { - uint32_t diff = lastSentSeq - getLastAckedSeq(); + uint32_t diff = pm.getLastSentSeq() - pm.getLastAckedSeq(); //LOGV("rtt diff=%u", diff); if (diff < 32) { diff --git a/controller/protocol/Legacy.cpp b/controller/protocol/Legacy.cpp index 69748ce..0be9503 100644 --- a/controller/protocol/Legacy.cpp +++ b/controller/protocol/Legacy.cpp @@ -82,7 +82,7 @@ bool VoIPController::legacyParsePacket(BufferInputStream &in, unsigned char &typ return true; } -void VoIPController::legacyWritePacketHeader(uint32_t pseq, uint32_t acks, BufferOutputStream *s, unsigned char type, uint32_t length, PacketSender *source) +void VoIPController::legacyWritePacketHeader(uint32_t pseq, uint32_t acks, BufferOutputStream *s, unsigned char type, uint32_t length) { if (state == STATE_WAIT_INIT || state == STATE_WAIT_INIT_ACK) @@ -109,7 +109,7 @@ void VoIPController::legacyWritePacketHeader(uint32_t pseq, uint32_t acks, Buffe { s->WriteBytes(callID, 16); } - s->WriteInt32(getLastRemoteSeq()); + s->WriteInt32(packetManager.getLastRemoteSeq()); s->WriteInt32(pseq); s->WriteInt32(acks); if (pflags & PFLAG_HAS_PROTO) @@ -157,7 +157,7 @@ void VoIPController::legacyWritePacketHeader(uint32_t pseq, uint32_t acks, Buffe } } s->WriteByte(type); - s->WriteInt32(getLastRemoteSeq()); + s->WriteInt32(packetManager.getLastRemoteSeq()); s->WriteInt32(pseq); s->WriteInt32(acks); if (peerVersion >= 6) diff --git a/controller/protocol/Nack.cpp b/controller/protocol/Nack.cpp deleted file mode 100644 index 4a6edf7..0000000 --- a/controller/protocol/Nack.cpp +++ /dev/null @@ -1,8 +0,0 @@ -#include "Nack.h" - -void HandleJitterInput(uint32_t timestamp) -{ -} -void HandleJitterOutput(uint32_t timestamp) -{ -} diff --git a/controller/protocol/Nack.h b/controller/protocol/Nack.h deleted file mode 100644 index 49f9499..0000000 --- a/controller/protocol/Nack.h +++ /dev/null @@ -1,17 +0,0 @@ -namespace tgvoip -{ -// Handle nack window -class Nack -{ -public: - Nack() = default; - virtual ~Nack() = default; - - // Do not provide seq, as there is a direct correlation between seqs and timestamps (in protocol >= 10) - void HandleJitterInput(uint32_t timestamp); - void HandleJitterOutput(uint32_t timestamp); - -private: - -}; -} // namespace tgvoip \ No newline at end of file diff --git a/controller/protocol/NetworkAPI.cpp b/controller/protocol/NetworkAPI.cpp index 572d44c..3a36512 100644 --- a/controller/protocol/NetworkAPI.cpp +++ b/controller/protocol/NetworkAPI.cpp @@ -102,7 +102,7 @@ void VoIPController::SendInit() { ENFORCE_MSG_THREAD; - uint32_t initSeq = nextLocalSeq(); + uint32_t initSeq = packetManager.nextLocalSeq(); for (pair &_e : endpoints) { Endpoint &e = _e.second; @@ -265,19 +265,6 @@ void VoIPController::TrySendOutgoingPackets() } } -bool VoIPController::WasOutgoingPacketAcknowledged(uint32_t seq, bool checkAll) -{ - bool res = wasLocalAcked(seq); - if (res || !checkAll) - { - return res; - } - - RecentOutgoingPacket *pkt = GetRecentOutgoingPacket(seq); - if (!pkt) - return false; - return pkt->ackTime != 0.0; -} RecentOutgoingPacket *VoIPController::GetRecentOutgoingPacket(uint32_t seq) { @@ -314,7 +301,7 @@ void VoIPController::SendRelayPings() { LOGV("Sending ping to %s", endpoint.GetAddress().ToString().c_str()); SendOrEnqueuePacket(PendingOutgoingPacket{ - /*.seq=*/(endpoint.lastPingSeq = nextLocalSeq()), + /*.seq=*/(endpoint.lastPingSeq = packetManager.nextLocalSeq()), /*.type=*/PKT_PING, /*.len=*/0, /*.data=*/Buffer(), @@ -381,7 +368,7 @@ void VoIPController::SendNopPacket() if (state != STATE_ESTABLISHED) return; SendOrEnqueuePacket(PendingOutgoingPacket{ - /*.seq=*/(firstSentPing = nextLocalSeq()), + /*.seq=*/(firstSentPing = packetManager.nextLocalSeq()), /*.type=*/PKT_NOP, /*.len=*/0, /*.data=*/Buffer(), diff --git a/controller/protocol/PacketManager.cpp b/controller/protocol/PacketManager.cpp index 0b5b8b5..2d2b0aa 100644 --- a/controller/protocol/PacketManager.cpp +++ b/controller/protocol/PacketManager.cpp @@ -4,6 +4,9 @@ using namespace tgvoip; using namespace std; +PacketManager::PacketManager(uint8_t transportId) : transportId(transportId) +{ +} void PacketManager::ackLocal(uint32_t ackId, uint32_t mask) { lastAckedSeq = ackId; diff --git a/controller/protocol/PacketManager.h b/controller/protocol/PacketManager.h index aea18ae..cf4fe83 100644 --- a/controller/protocol/PacketManager.h +++ b/controller/protocol/PacketManager.h @@ -22,9 +22,18 @@ inline bool seqgte(uint32_t s1, uint32_t s2) class PacketManager { public: - PacketManager() = default; + PacketManager(uint8_t transportId = 0xFF); virtual ~PacketManager() = default; + // Transport ID for multiplexing + inline uint8_t getTransportId() + { + return transportId; + } + + uint8_t transportId = 0xFF; // Default transport ID + +public: /* Local seqno generation */ // Get next local seqno @@ -44,6 +53,11 @@ public: return lastSentSeq; } + inline void setLastSentSeq(uint32_t lastSentSeq) + { + this->lastSentSeq = lastSentSeq; + } + // Seqno of last sent local packet uint32_t lastSentSeq = 0; diff --git a/controller/protocol/Protocol.cpp b/controller/protocol/Protocol.cpp index db94de8..804e6ea 100644 --- a/controller/protocol/Protocol.cpp +++ b/controller/protocol/Protocol.cpp @@ -1,10 +1,16 @@ #include "../PrivateDefines.cpp" +#include "PacketManager.h" using namespace tgvoip; using namespace std; #pragma mark - Networking & crypto +PacketManager &VoIPController::getBestPacketManager() +{ + return outgoingStreams.empty() ? packetManager : outgoingStreams[0]->packetSender->getPacketManager(); +} + void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcEndpoint) { ENFORCE_MSG_THREAD; @@ -176,10 +182,15 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE uint32_t pseq; // Incoming packet seqno uint32_t acks; // Ack mask unsigned char type, pflags; // Packet type, flags + uint8_t transportId = 0xFF; // Transport ID for reliable multiplexing size_t packetInnerLen = 0; if (peerVersion >= 8 || (!peerVersion && connectionMaxLayer >= 92)) { type = in.ReadByte(); + if (peerVersion >= PROTOCOL_RELIABLE) + { + transportId = in.ReadByte(); + } ackId = in.ReadUInt32(); pseq = in.ReadUInt32(); acks = in.ReadUInt32(); @@ -192,7 +203,11 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE } packetsReceived++; - if (!ackRemoteSeq(pseq)) { + // Use packet manager of outgoing streams on both sides (probably should separate in separate vector) + PacketManager *manager = transportId == 0xFF ? &packetManager : &outgoingStreams[transportId]->packetSender->getPacketManager(); + + if (!manager->ackRemoteSeq(pseq)) + { return; } @@ -215,9 +230,9 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE recvTS = in.ReadUInt32(); } - if (seqgt(ackId, getLastAckedSeq())) + if (seqgt(ackId, manager->getLastAckedSeq())) { - if (waitingForAcks && getLastAckedSeq() >= firstSentPing) + if (waitingForAcks && manager->getLastAckedSeq() >= firstSentPing) { rttHistory.Reset(); waitingForAcks = false; @@ -231,13 +246,13 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE } conctl.PacketAcknowledged(ackId); - ackLocal(ackId, acks); + manager->ackLocal(ackId, acks); for (auto &opkt : recentOutgoingPackets) { if (opkt.ackTime) continue; - if (wasLocalAcked(opkt.seq)) + if (manager->wasLocalAcked(opkt.seq)) { opkt.ackTime = GetCurrentTime(); opkt.rttTime = opkt.ackTime - opkt.sendTime; @@ -260,7 +275,7 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE { for (auto x = currentExtras.begin(); x != currentExtras.end();) { - if (x->firstContainingSeq != 0 && seqgte(getLastAckedSeq(), x->firstContainingSeq)) + if (x->firstContainingSeq != 0 && seqgte(manager->getLastAckedSeq(), x->firstContainingSeq)) { LOGV("Peer acknowledged extra type %u length %u", x->type, (unsigned int)x->data.Length()); ProcessAcknowledgedOutgoingExtra(*x); @@ -270,14 +285,14 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE ++x; } } - //if (peerVersion < PROTOCOL_RELIABLE) - handleReliablePackets(); // Use old reliability logic + if (peerVersion < PROTOCOL_RELIABLE) + handleReliablePackets(); // Use old reliability logic } Endpoint &_currentEndpoint = endpoints.at(currentEndpoint); if (srcEndpoint.id != currentEndpoint && srcEndpoint.IsReflector() && (_currentEndpoint.IsP2P() || _currentEndpoint.averageRTT == 0)) { - if (seqgt(lastSentSeq - 32, getLastAckedSeq())) + if (seqgt(manager->getLastSentSeq() - 32, manager->getLastAckedSeq())) { currentEndpoint = srcEndpoint.id; _currentEndpoint = srcEndpoint; @@ -308,11 +323,11 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE SendNopPacket(); } -//#ifdef LOG_PACKETS + //#ifdef LOG_PACKETS LOGV("Received: from=%s:%u, seq=%u, length=%u, type=%s", srcEndpoint.GetAddress().ToString().c_str(), srcEndpoint.port, pseq, (unsigned int)packet.data.Length(), GetPacketTypeString(type).c_str()); -//#endif + //#endif - //LOGV("acks: %u -> %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf", getLastAckedSeq(), remoteAcks[0], remoteAcks[1], remoteAcks[2], remoteAcks[3], remoteAcks[4], remoteAcks[5], remoteAcks[6], remoteAcks[7]); + //LOGV("acks: %u -> %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf", manager.getLastAckedSeq()(), remoteAcks[0], remoteAcks[1], remoteAcks[2], remoteAcks[3], remoteAcks[4], remoteAcks[5], remoteAcks[6], remoteAcks[7]); //LOGD("recv: %u -> %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf", getLastRemoteSeq(), recvPacketTimes[0], recvPacketTimes[1], recvPacketTimes[2], recvPacketTimes[3], recvPacketTimes[4], recvPacketTimes[5], recvPacketTimes[6], recvPacketTimes[7]); //LOGI("RTT = %.3lf", GetAverageRTT()); //LOGV("Packet %u type is %d", pseq, type); @@ -399,7 +414,7 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE LOGI("Sending init ack"); size_t outLength = out.GetLength(); SendOrEnqueuePacket(PendingOutgoingPacket{ - /*.seq=*/nextLocalSeq(), + /*.seq=*/packetManager.nextLocalSeq(), /*.type=*/PKT_INIT_ACK, /*.len=*/outLength, /*.data=*/Buffer(move(out)), @@ -607,6 +622,9 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE { if (stm->jitterBuffer) { + if (peerVersion >= PROTOCOL_RELIABLE) { + manager->ackRemoteSeqsOlderThan(stm->jitterBuffer->GetSeqTooLate(rttHistory[0])); + } stm->jitterBuffer->HandleInput(static_cast(buffer + in.GetOffset()), sdlen, pts, false); if (extraFEC) { @@ -679,7 +697,7 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE pkt.WriteInt32(pseq); size_t pktLength = pkt.GetLength(); SendOrEnqueuePacket(PendingOutgoingPacket{ - /*.seq=*/nextLocalSeq(), + /*.seq=*/packetManager.nextLocalSeq(), /*.type=*/PKT_PONG, /*.len=*/pktLength, /*.data=*/Buffer(move(pkt)), @@ -977,12 +995,17 @@ void VoIPController::ProcessAcknowledgedOutgoingExtra(UnacknowledgedExtraData &e void VoIPController::WritePacketHeader(uint32_t pseq, BufferOutputStream *s, unsigned char type, uint32_t length, PacketSender *source) { - uint32_t acks = getRemoteAckMask(); + PacketManager &manager = peerVersion >= PROTOCOL_RELIABLE ? source->getPacketManager() : packetManager; + uint32_t acks = manager.getRemoteAckMask(); if (peerVersion >= 8 || (!peerVersion && connectionMaxLayer >= 92)) { s->WriteByte(type); - s->WriteInt32(getLastRemoteSeq()); + if (peerVersion >= PROTOCOL_RELIABLE) + { + s->WriteByte(manager.getTransportId()); + } + s->WriteInt32(manager.getLastRemoteSeq()); s->WriteInt32(pseq); s->WriteInt32(acks); @@ -1015,7 +1038,7 @@ void VoIPController::WritePacketHeader(uint32_t pseq, BufferOutputStream *s, uns } else { - legacyWritePacketHeader(pseq, acks, s, type, length, source); + legacyWritePacketHeader(pseq, acks, s, type, length); } unacknowledgedIncomingPacketCount = 0; @@ -1033,6 +1056,6 @@ void VoIPController::WritePacketHeader(uint32_t pseq, BufferOutputStream *s, uns { recentOutgoingPackets.erase(recentOutgoingPackets.begin()); } - lastSentSeq = pseq; + manager.setLastSentSeq(pseq); //LOGI("packet header size %d", s->GetLength()); } \ No newline at end of file diff --git a/controller/protocol/Reliable.cpp b/controller/protocol/Reliable.cpp index 030baa3..8cc9667 100644 --- a/controller/protocol/Reliable.cpp +++ b/controller/protocol/Reliable.cpp @@ -49,7 +49,7 @@ void VoIPController::UpdateReliablePackets() if (GetCurrentTime() - qp->lastSentTime >= qp->retryInterval) { messageThread.Post(std::bind(&VoIPController::UpdateReliablePackets, this), qp->retryInterval); - uint32_t seq = nextLocalSeq(); + uint32_t seq = packetManager.nextLocalSeq(); qp->seqs.Add(seq); qp->lastSentTime = GetCurrentTime(); //LOGD("Sending queued packet, seq=%u, type=%u, len=%u", seq, qp.type, qp.data.Length()); @@ -91,4 +91,18 @@ void VoIPController::handleReliablePackets() } ++it; } +} + +bool VoIPController::WasOutgoingPacketAcknowledged(uint32_t seq, bool checkAll) +{ + bool res = getBestPacketManager().wasLocalAcked(seq); + if (res || !checkAll) + { + return res; + } + + RecentOutgoingPacket *pkt = GetRecentOutgoingPacket(seq); + if (!pkt) + return false; + return pkt->ackTime != 0.0; } \ No newline at end of file diff --git a/video/VideoPacketSender.cpp b/video/VideoPacketSender.cpp index f6edfc3..37d5024 100644 --- a/video/VideoPacketSender.cpp +++ b/video/VideoPacketSender.cpp @@ -11,7 +11,7 @@ using namespace tgvoip; using namespace tgvoip::video; -VideoPacketSender::VideoPacketSender(VoIPController *controller, VideoSource *videoSource, std::shared_ptr stream) : PacketSender(controller), stm(stream) +VideoPacketSender::VideoPacketSender(VoIPController *controller, VideoSource *videoSource, const std::shared_ptr &stream) : PacketSender(controller, stream) { SetSource(videoSource); } @@ -110,13 +110,13 @@ void VideoPacketSender::SetSource(VideoSource *source) uint32_t bitrate = videoCongestionControl.GetBitrate(); currentVideoBitrate = bitrate; source->SetBitrate(bitrate); - source->Reset(stm->codec, stm->resolution = GetVideoResolutionForCurrentBitrate()); + source->Reset(stream->codec, stream->resolution = GetVideoResolutionForCurrentBitrate()); source->Start(); source->SetCallback(std::bind(&VideoPacketSender::SendFrame, this, placeholders::_1, placeholders::_2, placeholders::_3)); source->SetStreamStateCallback([this](bool paused) { - stm->paused = paused; + stream->paused = paused; GetMessageThread().Post([this] { - SendStreamFlags(*stm); + SendStreamFlags(*stream); }); }); } @@ -141,13 +141,13 @@ void VideoPacketSender::SendFrame(const Buffer &_frame, uint32_t flags, uint32_t source->SetBitrate(bitrate); } int resolutionFromBitrate = GetVideoResolutionForCurrentBitrate(); - if (resolutionFromBitrate != stm->resolution && currentTime - lastVideoResolutionChangeTime > 3.0 && currentTime - sourceChangeTime > 10.0) + if (resolutionFromBitrate != stream->resolution && currentTime - lastVideoResolutionChangeTime > 3.0 && currentTime - sourceChangeTime > 10.0) { - LOGI("Changing video resolution: %d -> %d", stm->resolution, resolutionFromBitrate); - stm->resolution = resolutionFromBitrate; + LOGI("Changing video resolution: %d -> %d", stream->resolution, resolutionFromBitrate); + stream->resolution = resolutionFromBitrate; GetMessageThread().Post([this, resolutionFromBitrate] { - source->Reset(stm->codec, resolutionFromBitrate); - stm->csdIsValid = false; + source->Reset(stream->codec, resolutionFromBitrate); + stream->csdIsValid = false; }); lastVideoResolutionChangeTime = currentTime; return; @@ -167,18 +167,18 @@ void VideoPacketSender::SendFrame(const Buffer &_frame, uint32_t flags, uint32_t } uint32_t pts = videoFrameCount++; - bool csdInvalidated = !stm->csdIsValid; - if (!stm->csdIsValid) + bool csdInvalidated = !stream->csdIsValid; + if (!stream->csdIsValid) { vector &csd = source->GetCodecSpecificData(); - stm->codecSpecificData.clear(); + stream->codecSpecificData.clear(); for (Buffer &b : csd) { - stm->codecSpecificData.push_back(Buffer::CopyOf(b)); + stream->codecSpecificData.push_back(Buffer::CopyOf(b)); } - stm->csdIsValid = true; - stm->width = source->GetFrameWidth(); - stm->height = source->GetFrameHeight(); + stream->csdIsValid = true; + stream->width = source->GetFrameWidth(); + stream->height = source->GetFrameHeight(); //SendStreamCSD(); } @@ -186,13 +186,13 @@ void VideoPacketSender::SendFrame(const Buffer &_frame, uint32_t flags, uint32_t if (flags & VIDEO_FRAME_FLAG_KEYFRAME) { BufferOutputStream os(256); - os.WriteInt16((int16_t)stm->width); - os.WriteInt16((int16_t)stm->height); - unsigned char sizeAndFlag = (unsigned char)stm->codecSpecificData.size(); + os.WriteInt16((int16_t)stream->width); + os.WriteInt16((int16_t)stream->height); + unsigned char sizeAndFlag = (unsigned char)stream->codecSpecificData.size(); if (csdInvalidated) sizeAndFlag |= 0x80; os.WriteByte(sizeAndFlag); - for (Buffer &b : stm->codecSpecificData) + for (Buffer &b : stream->codecSpecificData) { assert(b.Length() < 255); os.WriteByte(static_cast(b.Length())); @@ -226,7 +226,7 @@ void VideoPacketSender::SendFrame(const Buffer &_frame, uint32_t flags, uint32_t } unsigned char pflags = STREAM_DATA_FLAG_LEN16; //pflags |= STREAM_DATA_FLAG_HAS_MORE_FLAGS; - pkt.WriteByte((unsigned char)(stm->id | pflags)); // streamID + flags + pkt.WriteByte((unsigned char)(stream->id | pflags)); // streamID + flags int16_t lengthAndFlags = static_cast(len & 0x7FF); if (segmentCount > 1) lengthAndFlags |= STREAM_DATA_XFLAG_FRAGMENTED; @@ -302,7 +302,7 @@ void VideoPacketSender::SendFrame(const Buffer &_frame, uint32_t flags, uint32_t fecFrameCount = 0; LOGV("FEC packet length: %u", (unsigned int)fecPacket.Length()); BufferOutputStream out(1500); - out.WriteByte(stm->id); + out.WriteByte(stream->id); out.WriteByte((uint8_t)frameSeq); out.WriteByte(FEC_SCHEME_XOR); out.WriteByte(3); @@ -329,7 +329,7 @@ int VideoPacketSender::GetVideoResolutionForCurrentBitrate() if (VoIPController::GetCurrentTime() - sourceChangeTime > 10.0) { // TODO: probably move this to server config - if (stm->codec == CODEC_AVC || stm->codec == CODEC_VP8) + if (stream->codec == CODEC_AVC || stream->codec == CODEC_VP8) { if (currentVideoBitrate > 400000) { @@ -344,7 +344,7 @@ int VideoPacketSender::GetVideoResolutionForCurrentBitrate() resolutionFromBitrate = INIT_VIDEO_RES_360; } } - else if (stm->codec == CODEC_HEVC || stm->codec == CODEC_VP9) + else if (stream->codec == CODEC_HEVC || stream->codec == CODEC_VP9) { if (currentVideoBitrate > 400000) { @@ -366,9 +366,9 @@ int VideoPacketSender::GetVideoResolutionForCurrentBitrate() } else { - if (stm->codec == CODEC_AVC || stm->codec == CODEC_VP8) + if (stream->codec == CODEC_AVC || stream->codec == CODEC_VP8) resolutionFromBitrate = INIT_VIDEO_RES_720; - else if (stm->codec == CODEC_HEVC || stm->codec == CODEC_VP9) + else if (stream->codec == CODEC_HEVC || stream->codec == CODEC_VP9) resolutionFromBitrate = INIT_VIDEO_RES_1080; } return std::min(peerMaxVideoResolution, resolutionFromBitrate); diff --git a/video/VideoPacketSender.h b/video/VideoPacketSender.h index 6002b2c..6f44655 100644 --- a/video/VideoPacketSender.h +++ b/video/VideoPacketSender.h @@ -21,7 +21,7 @@ class VideoSource; class VideoPacketSender : public PacketSender { public: - VideoPacketSender(VoIPController *controller, VideoSource *videoSource, std::shared_ptr stream); + VideoPacketSender(VoIPController *controller, VideoSource *videoSource, const std::shared_ptr &stream); virtual ~VideoPacketSender(); virtual void PacketAcknowledged(uint32_t seq, double sendTime, double ackTime, uint8_t type, uint32_t size) override; virtual void PacketLost(uint32_t seq, uint8_t type, uint32_t size) override; @@ -45,7 +45,6 @@ private: int GetVideoResolutionForCurrentBitrate(); VideoSource *source = NULL; - std::shared_ptr stm; video::ScreamCongestionController videoCongestionControl; double firstVideoFrameTime = 0.0; uint32_t videoFrameCount = 0;