1
0
mirror of https://github.com/danog/libtgvoip.git synced 2024-11-26 20:24:38 +01:00

Package managers

This commit is contained in:
Daniil Gentili 2020-01-28 23:45:47 +01:00
parent 202d4fa94d
commit d5154a5b42
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
18 changed files with 148 additions and 114 deletions

View File

@ -138,7 +138,7 @@ struct CellularCarrierInfo
class PacketSender; class PacketSender;
class VoIPController : PacketManager class VoIPController
{ {
friend class VoIPGroupController; friend class VoIPGroupController;
friend class PacketSender; friend class PacketSender;
@ -400,7 +400,7 @@ public:
STREAM_TYPE_VIDEO STREAM_TYPE_VIDEO
}; };
struct Stream : PacketManager struct Stream
{ {
int32_t userID; int32_t userID;
uint8_t id; uint8_t id;
@ -566,7 +566,7 @@ private:
// More legacy // More legacy
bool legacyParsePacket(BufferInputStream &in, unsigned char &type, uint32_t &ackId, uint32_t &pseq, uint32_t &acks, unsigned char &pflags, size_t &packetInnerLen); bool legacyParsePacket(BufferInputStream &in, unsigned char &type, uint32_t &ackId, uint32_t &pseq, uint32_t &acks, unsigned char &pflags, size_t &packetInnerLen);
void legacyWritePacketHeader(uint32_t pseq, uint32_t acks, BufferOutputStream *s, unsigned char type, uint32_t length, PacketSender *source); void legacyWritePacketHeader(uint32_t pseq, uint32_t acks, BufferOutputStream *s, unsigned char type, uint32_t length);
void handleReliablePackets(); void handleReliablePackets();
@ -616,7 +616,6 @@ private:
bool micMuted = false; bool micMuted = false;
uint32_t maxBitrate; uint32_t maxBitrate;
// Recent ougoing packets // Recent ougoing packets
std::vector<RecentOutgoingPacket> recentOutgoingPackets; std::vector<RecentOutgoingPacket> recentOutgoingPackets;
@ -624,6 +623,8 @@ private:
std::vector<std::shared_ptr<Stream>> outgoingStreams; std::vector<std::shared_ptr<Stream>> outgoingStreams;
std::vector<std::shared_ptr<Stream>> incomingStreams; std::vector<std::shared_ptr<Stream>> incomingStreams;
PacketManager &getBestPacketManager();
unsigned char encryptionKey[256]; unsigned char encryptionKey[256];
unsigned char keyFingerprint[8]; unsigned char keyFingerprint[8];
unsigned char callID[16]; unsigned char callID[16];
@ -699,6 +700,8 @@ private:
bool needRate = false; bool needRate = false;
BlockingQueue<RawPendingOutgoingPacket> rawSendQueue; BlockingQueue<RawPendingOutgoingPacket> rawSendQueue;
PacketManager packetManager;
uint32_t initTimeoutID = MessageThread::INVALID_ID; uint32_t initTimeoutID = MessageThread::INVALID_ID;
uint32_t udpPingTimeoutID = MessageThread::INVALID_ID; uint32_t udpPingTimeoutID = MessageThread::INVALID_ID;

View File

@ -438,7 +438,7 @@ void VoIPGroupController::SendPacket(unsigned char *data, size_t len, Endpoint &
while (recentSentPackets.size() > 64) while (recentSentPackets.size() > 64)
recentSentPackets.erase(recentSentPackets.begin()); recentSentPackets.erase(recentSentPackets.begin());
} }
lastSentSeq = srcPacket.seq; packetManager.setLastSentSeq(srcPacket.seq);
if (IS_MOBILE_NETWORK(networkType)) if (IS_MOBILE_NETWORK(networkType))
stats.bytesSentMobile += (uint64_t)out.GetLength(); stats.bytesSentMobile += (uint64_t)out.GetLength();
@ -599,7 +599,7 @@ std::string VoIPGroupController::GetDebugString()
(int)(conctl.GetAverageRTT() * 1000), (int)(conctl.GetMinimumRTT() * 1000), (int)(conctl.GetAverageRTT() * 1000), (int)(conctl.GetMinimumRTT() * 1000),
int(conctl.GetInflightDataSize()), int(conctl.GetCongestionWindow()), int(conctl.GetInflightDataSize()), int(conctl.GetCongestionWindow()),
keyFingerprint[0], keyFingerprint[1], keyFingerprint[2], keyFingerprint[3], keyFingerprint[4], keyFingerprint[5], keyFingerprint[6], keyFingerprint[7], keyFingerprint[0], keyFingerprint[1], keyFingerprint[2], keyFingerprint[3], keyFingerprint[4], keyFingerprint[5], keyFingerprint[6], keyFingerprint[7],
lastSentSeq, getLastAckedSeq(), getBestPacketManager().getLastSentSeq(), getBestPacketManager().getLastAckedSeq(),
conctl.GetSendLossCount(), recvLossCount, encoder ? encoder->GetPacketLoss() : 0, conctl.GetSendLossCount(), recvLossCount, encoder ? encoder->GetPacketLoss() : 0,
encoder ? (encoder->GetBitrate() / 1000) : 0, encoder ? (encoder->GetBitrate() / 1000) : 0,
(long long unsigned int)(stats.bytesSentMobile + stats.bytesSentWifi), (long long unsigned int)(stats.bytesSentMobile + stats.bytesSentWifi),

View File

@ -3,7 +3,7 @@
using namespace tgvoip; using namespace tgvoip;
AudioPacketSender::AudioPacketSender(VoIPController *controller, const std::shared_ptr<OpusEncoder> &encoder, const std::shared_ptr<VoIPController::Stream> &stream) : PacketSender(controller), encoder(encoder), stream(stream) AudioPacketSender::AudioPacketSender(VoIPController *controller, const std::shared_ptr<OpusEncoder> &encoder, const std::shared_ptr<VoIPController::Stream> &stream) : PacketSender(controller, stream), encoder(encoder)
{ {
SetSource(encoder); SetSource(encoder);
} }
@ -121,9 +121,16 @@ void AudioPacketSender::SendFrame(unsigned char *data, size_t len, unsigned char
} }
else else
{ {
PendingOutgoingPacket p{
/*.seq=*/0,
/*.type=*/PKT_STREAM_DATA,
/*.len=*/pkt.GetLength(),
/*.data=*/Buffer(move(pkt)),
/*.endpoint=*/0,
};
SendPacket(move(p));
} }
//SendOrEnqueuePacket(move(p));
if (PeerVersion() < 7 && secondaryLen && shittyInternetMode) if (PeerVersion() < 7 && secondaryLen && shittyInternetMode)
{ {
Buffer ecBuf(secondaryLen); Buffer ecBuf(secondaryLen);

View File

@ -45,7 +45,6 @@ private:
void SendFrame(unsigned char *data, size_t len, unsigned char *secondaryData, size_t secondaryLen); void SendFrame(unsigned char *data, size_t len, unsigned char *secondaryData, size_t secondaryLen);
std::shared_ptr<OpusEncoder> encoder; std::shared_ptr<OpusEncoder> encoder;
std::shared_ptr<VoIPController::Stream> stream;
uint32_t audioTimestampOut = 0; uint32_t audioTimestampOut = 0;

View File

@ -66,9 +66,9 @@ void VoIPController::InitializeTimers()
statsDump << std::setprecision(3) statsDump << std::setprecision(3)
<< GetCurrentTime() - connectionInitTime << GetCurrentTime() - connectionInitTime
<< endpoints.at(currentEndpoint).rtts[0] << endpoints.at(currentEndpoint).rtts[0]
<< getLastRemoteSeq() << getBestPacketManager().getLastRemoteSeq()
<< (uint32_t)getLocalSeq() << (uint32_t)getBestPacketManager().getLocalSeq()
<< getLastAckedSeq() << getBestPacketManager().getLastAckedSeq()
<< recvLossCount << recvLossCount
<< conctl.GetSendLossCount() << conctl.GetSendLossCount()
<< (int)conctl.GetInflightDataSize() << (int)conctl.GetInflightDataSize()

View File

@ -218,6 +218,7 @@ string VoIPController::GetDebugString()
jitterBuffer->GetAverageLateCount(avgLate); jitterBuffer->GetAverageLateCount(avgLate);
else else
memset(avgLate, 0, 3 * sizeof(double)); memset(avgLate, 0, 3 * sizeof(double));
PacketManager &manager = getBestPacketManager();
snprintf(buffer, sizeof(buffer), snprintf(buffer, sizeof(buffer),
"Jitter buffer: %d/%.2f | %.1f, %.1f, %.1f\n" "Jitter buffer: %d/%.2f | %.1f, %.1f, %.1f\n"
"RTT avg/min: %d/%d\n" "RTT avg/min: %d/%d\n"
@ -237,7 +238,7 @@ string VoIPController::GetDebugString()
int(conctl.GetInflightDataSize()), int(conctl.GetCongestionWindow()), int(conctl.GetInflightDataSize()), int(conctl.GetCongestionWindow()),
keyFingerprint[0], keyFingerprint[1], keyFingerprint[2], keyFingerprint[3], keyFingerprint[4], keyFingerprint[5], keyFingerprint[6], keyFingerprint[7], keyFingerprint[0], keyFingerprint[1], keyFingerprint[2], keyFingerprint[3], keyFingerprint[4], keyFingerprint[5], keyFingerprint[6], keyFingerprint[7],
useMTProto2 ? " (MTProto2.0)" : "", useMTProto2 ? " (MTProto2.0)" : "",
lastSentSeq, getLastAckedSeq(), getLastRemoteSeq(), manager.getLastSentSeq(), manager.getLastAckedSeq(), manager.getLastRemoteSeq(),
sendLosses, recvLossCount, encoder ? encoder->GetPacketLoss() : 0, sendLosses, recvLossCount, encoder ? encoder->GetPacketLoss() : 0,
encoder ? (encoder->GetBitrate() / 1000) : 0, encoder ? (encoder->GetBitrate() / 1000) : 0,
static_cast<unsigned int>(unsentStreamPackets), static_cast<unsigned int>(unsentStreamPackets),
@ -413,7 +414,7 @@ string VoIPController::GetDebugLog()
{"tcp_used", useTCP}, {"tcp_used", useTCP},
{"p2p_type", p2pType}, {"p2p_type", p2pType},
{"packet_stats", json11::Json::object{ {"packet_stats", json11::Json::object{
{"out", (int)getLocalSeq()}, {"out", (int)getBestPacketManager().getLocalSeq()},
{"in", (int)packetsReceived}, {"in", (int)packetsReceived},
{"lost_out", (int)conctl.GetSendLossCount()}, {"lost_out", (int)conctl.GetSendLossCount()},
{"lost_in", (int)recvLossCount}}}, {"lost_in", (int)recvLossCount}}},

View File

@ -7,6 +7,7 @@
#include "../../VoIPController.h" #include "../../VoIPController.h"
#include "../protocol/PacketStructs.h" #include "../protocol/PacketStructs.h"
#include "../protocol/PacketManager.h"
#include <functional> #include <functional>
#include <stdint.h> #include <stdint.h>
@ -15,11 +16,15 @@ namespace tgvoip
class PacketSender class PacketSender
{ {
public: public:
PacketSender(VoIPController *controller) : controller(controller){}; PacketSender(VoIPController *controller, const std::shared_ptr<VoIPController::Stream> &stream) : controller(controller), stream(stream), packetManager(stream->id){};
virtual ~PacketSender() = default; virtual ~PacketSender() = default;
virtual void PacketAcknowledged(uint32_t seq, double sendTime, double ackTime, uint8_t type, uint32_t size) = 0; virtual void PacketAcknowledged(uint32_t seq, double sendTime, double ackTime, uint8_t type, uint32_t size) = 0;
virtual void PacketLost(uint32_t seq, uint8_t type, uint32_t size) = 0; virtual void PacketLost(uint32_t seq, uint8_t type, uint32_t size) = 0;
inline PacketManager &getPacketManager()
{
return packetManager;
}
protected: protected:
inline void SendExtra(Buffer &data, unsigned char type) inline void SendExtra(Buffer &data, unsigned char type)
{ {
@ -33,13 +38,13 @@ protected:
inline uint32_t SendPacket(PendingOutgoingPacket pkt) inline uint32_t SendPacket(PendingOutgoingPacket pkt)
{ {
uint32_t seq = controller->nextLocalSeq(); uint32_t seq = controller->peerVersion < PROTOCOL_RELIABLE ? controller->packetManager.nextLocalSeq() : packetManager.nextLocalSeq();
pkt.seq = seq; pkt.seq = seq;
controller->SendOrEnqueuePacket(std::move(pkt), true, this); controller->SendOrEnqueuePacket(std::move(pkt), true, this);
return seq; return seq;
} }
inline void SendPacketReliably(unsigned char type, unsigned char *data, size_t len, double retryInterval, double timeout, uint8_t tries = 0xFF) inline void SendPacketReliably(unsigned char type, unsigned char *data, size_t len, double retryInterval, double timeout, uint8_t tries = 0xFF)
{ {
controller->SendPacketReliably(type, data, len, retryInterval, timeout, tries); controller->SendPacketReliably(type, data, len, retryInterval, timeout, tries);
} }
@ -93,6 +98,10 @@ protected:
} }
VoIPController *controller; VoIPController *controller;
std::shared_ptr<VoIPController::Stream> stream;
PacketManager packetManager;
}; };
} // namespace tgvoip } // namespace tgvoip

View File

@ -9,10 +9,10 @@ using namespace std;
double VoIPController::GetAverageRTT() double VoIPController::GetAverageRTT()
{ {
ENFORCE_MSG_THREAD; ENFORCE_MSG_THREAD;
PacketManager &pm = getBestPacketManager();
if (lastSentSeq >= getLastAckedSeq()) if (pm.getLastSentSeq() >= pm.getLastAckedSeq())
{ {
uint32_t diff = lastSentSeq - getLastAckedSeq(); uint32_t diff = pm.getLastSentSeq() - pm.getLastAckedSeq();
//LOGV("rtt diff=%u", diff); //LOGV("rtt diff=%u", diff);
if (diff < 32) if (diff < 32)
{ {

View File

@ -82,7 +82,7 @@ bool VoIPController::legacyParsePacket(BufferInputStream &in, unsigned char &typ
return true; return true;
} }
void VoIPController::legacyWritePacketHeader(uint32_t pseq, uint32_t acks, BufferOutputStream *s, unsigned char type, uint32_t length, PacketSender *source) void VoIPController::legacyWritePacketHeader(uint32_t pseq, uint32_t acks, BufferOutputStream *s, unsigned char type, uint32_t length)
{ {
if (state == STATE_WAIT_INIT || state == STATE_WAIT_INIT_ACK) if (state == STATE_WAIT_INIT || state == STATE_WAIT_INIT_ACK)
@ -109,7 +109,7 @@ void VoIPController::legacyWritePacketHeader(uint32_t pseq, uint32_t acks, Buffe
{ {
s->WriteBytes(callID, 16); s->WriteBytes(callID, 16);
} }
s->WriteInt32(getLastRemoteSeq()); s->WriteInt32(packetManager.getLastRemoteSeq());
s->WriteInt32(pseq); s->WriteInt32(pseq);
s->WriteInt32(acks); s->WriteInt32(acks);
if (pflags & PFLAG_HAS_PROTO) if (pflags & PFLAG_HAS_PROTO)
@ -157,7 +157,7 @@ void VoIPController::legacyWritePacketHeader(uint32_t pseq, uint32_t acks, Buffe
} }
} }
s->WriteByte(type); s->WriteByte(type);
s->WriteInt32(getLastRemoteSeq()); s->WriteInt32(packetManager.getLastRemoteSeq());
s->WriteInt32(pseq); s->WriteInt32(pseq);
s->WriteInt32(acks); s->WriteInt32(acks);
if (peerVersion >= 6) if (peerVersion >= 6)

View File

@ -1,8 +0,0 @@
#include "Nack.h"
void HandleJitterInput(uint32_t timestamp)
{
}
void HandleJitterOutput(uint32_t timestamp)
{
}

View File

@ -1,17 +0,0 @@
namespace tgvoip
{
// Handle nack window
class Nack
{
public:
Nack() = default;
virtual ~Nack() = default;
// Do not provide seq, as there is a direct correlation between seqs and timestamps (in protocol >= 10)
void HandleJitterInput(uint32_t timestamp);
void HandleJitterOutput(uint32_t timestamp);
private:
};
} // namespace tgvoip

View File

@ -102,7 +102,7 @@ void VoIPController::SendInit()
{ {
ENFORCE_MSG_THREAD; ENFORCE_MSG_THREAD;
uint32_t initSeq = nextLocalSeq(); uint32_t initSeq = packetManager.nextLocalSeq();
for (pair<const int64_t, Endpoint> &_e : endpoints) for (pair<const int64_t, Endpoint> &_e : endpoints)
{ {
Endpoint &e = _e.second; Endpoint &e = _e.second;
@ -265,19 +265,6 @@ void VoIPController::TrySendOutgoingPackets()
} }
} }
bool VoIPController::WasOutgoingPacketAcknowledged(uint32_t seq, bool checkAll)
{
bool res = wasLocalAcked(seq);
if (res || !checkAll)
{
return res;
}
RecentOutgoingPacket *pkt = GetRecentOutgoingPacket(seq);
if (!pkt)
return false;
return pkt->ackTime != 0.0;
}
RecentOutgoingPacket *VoIPController::GetRecentOutgoingPacket(uint32_t seq) RecentOutgoingPacket *VoIPController::GetRecentOutgoingPacket(uint32_t seq)
{ {
@ -314,7 +301,7 @@ void VoIPController::SendRelayPings()
{ {
LOGV("Sending ping to %s", endpoint.GetAddress().ToString().c_str()); LOGV("Sending ping to %s", endpoint.GetAddress().ToString().c_str());
SendOrEnqueuePacket(PendingOutgoingPacket{ SendOrEnqueuePacket(PendingOutgoingPacket{
/*.seq=*/(endpoint.lastPingSeq = nextLocalSeq()), /*.seq=*/(endpoint.lastPingSeq = packetManager.nextLocalSeq()),
/*.type=*/PKT_PING, /*.type=*/PKT_PING,
/*.len=*/0, /*.len=*/0,
/*.data=*/Buffer(), /*.data=*/Buffer(),
@ -381,7 +368,7 @@ void VoIPController::SendNopPacket()
if (state != STATE_ESTABLISHED) if (state != STATE_ESTABLISHED)
return; return;
SendOrEnqueuePacket(PendingOutgoingPacket{ SendOrEnqueuePacket(PendingOutgoingPacket{
/*.seq=*/(firstSentPing = nextLocalSeq()), /*.seq=*/(firstSentPing = packetManager.nextLocalSeq()),
/*.type=*/PKT_NOP, /*.type=*/PKT_NOP,
/*.len=*/0, /*.len=*/0,
/*.data=*/Buffer(), /*.data=*/Buffer(),

View File

@ -4,6 +4,9 @@
using namespace tgvoip; using namespace tgvoip;
using namespace std; using namespace std;
PacketManager::PacketManager(uint8_t transportId) : transportId(transportId)
{
}
void PacketManager::ackLocal(uint32_t ackId, uint32_t mask) void PacketManager::ackLocal(uint32_t ackId, uint32_t mask)
{ {
lastAckedSeq = ackId; lastAckedSeq = ackId;

View File

@ -22,9 +22,18 @@ inline bool seqgte(uint32_t s1, uint32_t s2)
class PacketManager class PacketManager
{ {
public: public:
PacketManager() = default; PacketManager(uint8_t transportId = 0xFF);
virtual ~PacketManager() = default; virtual ~PacketManager() = default;
// Transport ID for multiplexing
inline uint8_t getTransportId()
{
return transportId;
}
uint8_t transportId = 0xFF; // Default transport ID
public:
/* Local seqno generation */ /* Local seqno generation */
// Get next local seqno // Get next local seqno
@ -44,6 +53,11 @@ public:
return lastSentSeq; return lastSentSeq;
} }
inline void setLastSentSeq(uint32_t lastSentSeq)
{
this->lastSentSeq = lastSentSeq;
}
// Seqno of last sent local packet // Seqno of last sent local packet
uint32_t lastSentSeq = 0; uint32_t lastSentSeq = 0;

View File

@ -1,10 +1,16 @@
#include "../PrivateDefines.cpp" #include "../PrivateDefines.cpp"
#include "PacketManager.h"
using namespace tgvoip; using namespace tgvoip;
using namespace std; using namespace std;
#pragma mark - Networking & crypto #pragma mark - Networking & crypto
PacketManager &VoIPController::getBestPacketManager()
{
return outgoingStreams.empty() ? packetManager : outgoingStreams[0]->packetSender->getPacketManager();
}
void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcEndpoint) void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcEndpoint)
{ {
ENFORCE_MSG_THREAD; ENFORCE_MSG_THREAD;
@ -176,10 +182,15 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
uint32_t pseq; // Incoming packet seqno uint32_t pseq; // Incoming packet seqno
uint32_t acks; // Ack mask uint32_t acks; // Ack mask
unsigned char type, pflags; // Packet type, flags unsigned char type, pflags; // Packet type, flags
uint8_t transportId = 0xFF; // Transport ID for reliable multiplexing
size_t packetInnerLen = 0; size_t packetInnerLen = 0;
if (peerVersion >= 8 || (!peerVersion && connectionMaxLayer >= 92)) if (peerVersion >= 8 || (!peerVersion && connectionMaxLayer >= 92))
{ {
type = in.ReadByte(); type = in.ReadByte();
if (peerVersion >= PROTOCOL_RELIABLE)
{
transportId = in.ReadByte();
}
ackId = in.ReadUInt32(); ackId = in.ReadUInt32();
pseq = in.ReadUInt32(); pseq = in.ReadUInt32();
acks = in.ReadUInt32(); acks = in.ReadUInt32();
@ -192,7 +203,11 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
} }
packetsReceived++; packetsReceived++;
if (!ackRemoteSeq(pseq)) { // Use packet manager of outgoing streams on both sides (probably should separate in separate vector)
PacketManager *manager = transportId == 0xFF ? &packetManager : &outgoingStreams[transportId]->packetSender->getPacketManager();
if (!manager->ackRemoteSeq(pseq))
{
return; return;
} }
@ -215,9 +230,9 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
recvTS = in.ReadUInt32(); recvTS = in.ReadUInt32();
} }
if (seqgt(ackId, getLastAckedSeq())) if (seqgt(ackId, manager->getLastAckedSeq()))
{ {
if (waitingForAcks && getLastAckedSeq() >= firstSentPing) if (waitingForAcks && manager->getLastAckedSeq() >= firstSentPing)
{ {
rttHistory.Reset(); rttHistory.Reset();
waitingForAcks = false; waitingForAcks = false;
@ -231,13 +246,13 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
} }
conctl.PacketAcknowledged(ackId); conctl.PacketAcknowledged(ackId);
ackLocal(ackId, acks); manager->ackLocal(ackId, acks);
for (auto &opkt : recentOutgoingPackets) for (auto &opkt : recentOutgoingPackets)
{ {
if (opkt.ackTime) if (opkt.ackTime)
continue; continue;
if (wasLocalAcked(opkt.seq)) if (manager->wasLocalAcked(opkt.seq))
{ {
opkt.ackTime = GetCurrentTime(); opkt.ackTime = GetCurrentTime();
opkt.rttTime = opkt.ackTime - opkt.sendTime; opkt.rttTime = opkt.ackTime - opkt.sendTime;
@ -260,7 +275,7 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
{ {
for (auto x = currentExtras.begin(); x != currentExtras.end();) for (auto x = currentExtras.begin(); x != currentExtras.end();)
{ {
if (x->firstContainingSeq != 0 && seqgte(getLastAckedSeq(), x->firstContainingSeq)) if (x->firstContainingSeq != 0 && seqgte(manager->getLastAckedSeq(), x->firstContainingSeq))
{ {
LOGV("Peer acknowledged extra type %u length %u", x->type, (unsigned int)x->data.Length()); LOGV("Peer acknowledged extra type %u length %u", x->type, (unsigned int)x->data.Length());
ProcessAcknowledgedOutgoingExtra(*x); ProcessAcknowledgedOutgoingExtra(*x);
@ -270,14 +285,14 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
++x; ++x;
} }
} }
//if (peerVersion < PROTOCOL_RELIABLE) if (peerVersion < PROTOCOL_RELIABLE)
handleReliablePackets(); // Use old reliability logic handleReliablePackets(); // Use old reliability logic
} }
Endpoint &_currentEndpoint = endpoints.at(currentEndpoint); Endpoint &_currentEndpoint = endpoints.at(currentEndpoint);
if (srcEndpoint.id != currentEndpoint && srcEndpoint.IsReflector() && (_currentEndpoint.IsP2P() || _currentEndpoint.averageRTT == 0)) if (srcEndpoint.id != currentEndpoint && srcEndpoint.IsReflector() && (_currentEndpoint.IsP2P() || _currentEndpoint.averageRTT == 0))
{ {
if (seqgt(lastSentSeq - 32, getLastAckedSeq())) if (seqgt(manager->getLastSentSeq() - 32, manager->getLastAckedSeq()))
{ {
currentEndpoint = srcEndpoint.id; currentEndpoint = srcEndpoint.id;
_currentEndpoint = srcEndpoint; _currentEndpoint = srcEndpoint;
@ -308,11 +323,11 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
SendNopPacket(); SendNopPacket();
} }
//#ifdef LOG_PACKETS //#ifdef LOG_PACKETS
LOGV("Received: from=%s:%u, seq=%u, length=%u, type=%s", srcEndpoint.GetAddress().ToString().c_str(), srcEndpoint.port, pseq, (unsigned int)packet.data.Length(), GetPacketTypeString(type).c_str()); LOGV("Received: from=%s:%u, seq=%u, length=%u, type=%s", srcEndpoint.GetAddress().ToString().c_str(), srcEndpoint.port, pseq, (unsigned int)packet.data.Length(), GetPacketTypeString(type).c_str());
//#endif //#endif
//LOGV("acks: %u -> %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf", getLastAckedSeq(), remoteAcks[0], remoteAcks[1], remoteAcks[2], remoteAcks[3], remoteAcks[4], remoteAcks[5], remoteAcks[6], remoteAcks[7]); //LOGV("acks: %u -> %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf", manager.getLastAckedSeq()(), remoteAcks[0], remoteAcks[1], remoteAcks[2], remoteAcks[3], remoteAcks[4], remoteAcks[5], remoteAcks[6], remoteAcks[7]);
//LOGD("recv: %u -> %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf", getLastRemoteSeq(), recvPacketTimes[0], recvPacketTimes[1], recvPacketTimes[2], recvPacketTimes[3], recvPacketTimes[4], recvPacketTimes[5], recvPacketTimes[6], recvPacketTimes[7]); //LOGD("recv: %u -> %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf", getLastRemoteSeq(), recvPacketTimes[0], recvPacketTimes[1], recvPacketTimes[2], recvPacketTimes[3], recvPacketTimes[4], recvPacketTimes[5], recvPacketTimes[6], recvPacketTimes[7]);
//LOGI("RTT = %.3lf", GetAverageRTT()); //LOGI("RTT = %.3lf", GetAverageRTT());
//LOGV("Packet %u type is %d", pseq, type); //LOGV("Packet %u type is %d", pseq, type);
@ -399,7 +414,7 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
LOGI("Sending init ack"); LOGI("Sending init ack");
size_t outLength = out.GetLength(); size_t outLength = out.GetLength();
SendOrEnqueuePacket(PendingOutgoingPacket{ SendOrEnqueuePacket(PendingOutgoingPacket{
/*.seq=*/nextLocalSeq(), /*.seq=*/packetManager.nextLocalSeq(),
/*.type=*/PKT_INIT_ACK, /*.type=*/PKT_INIT_ACK,
/*.len=*/outLength, /*.len=*/outLength,
/*.data=*/Buffer(move(out)), /*.data=*/Buffer(move(out)),
@ -607,6 +622,9 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
{ {
if (stm->jitterBuffer) if (stm->jitterBuffer)
{ {
if (peerVersion >= PROTOCOL_RELIABLE) {
manager->ackRemoteSeqsOlderThan(stm->jitterBuffer->GetSeqTooLate(rttHistory[0]));
}
stm->jitterBuffer->HandleInput(static_cast<unsigned char *>(buffer + in.GetOffset()), sdlen, pts, false); stm->jitterBuffer->HandleInput(static_cast<unsigned char *>(buffer + in.GetOffset()), sdlen, pts, false);
if (extraFEC) if (extraFEC)
{ {
@ -679,7 +697,7 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
pkt.WriteInt32(pseq); pkt.WriteInt32(pseq);
size_t pktLength = pkt.GetLength(); size_t pktLength = pkt.GetLength();
SendOrEnqueuePacket(PendingOutgoingPacket{ SendOrEnqueuePacket(PendingOutgoingPacket{
/*.seq=*/nextLocalSeq(), /*.seq=*/packetManager.nextLocalSeq(),
/*.type=*/PKT_PONG, /*.type=*/PKT_PONG,
/*.len=*/pktLength, /*.len=*/pktLength,
/*.data=*/Buffer(move(pkt)), /*.data=*/Buffer(move(pkt)),
@ -977,12 +995,17 @@ void VoIPController::ProcessAcknowledgedOutgoingExtra(UnacknowledgedExtraData &e
void VoIPController::WritePacketHeader(uint32_t pseq, BufferOutputStream *s, unsigned char type, uint32_t length, PacketSender *source) void VoIPController::WritePacketHeader(uint32_t pseq, BufferOutputStream *s, unsigned char type, uint32_t length, PacketSender *source)
{ {
uint32_t acks = getRemoteAckMask(); PacketManager &manager = peerVersion >= PROTOCOL_RELIABLE ? source->getPacketManager() : packetManager;
uint32_t acks = manager.getRemoteAckMask();
if (peerVersion >= 8 || (!peerVersion && connectionMaxLayer >= 92)) if (peerVersion >= 8 || (!peerVersion && connectionMaxLayer >= 92))
{ {
s->WriteByte(type); s->WriteByte(type);
s->WriteInt32(getLastRemoteSeq()); if (peerVersion >= PROTOCOL_RELIABLE)
{
s->WriteByte(manager.getTransportId());
}
s->WriteInt32(manager.getLastRemoteSeq());
s->WriteInt32(pseq); s->WriteInt32(pseq);
s->WriteInt32(acks); s->WriteInt32(acks);
@ -1015,7 +1038,7 @@ void VoIPController::WritePacketHeader(uint32_t pseq, BufferOutputStream *s, uns
} }
else else
{ {
legacyWritePacketHeader(pseq, acks, s, type, length, source); legacyWritePacketHeader(pseq, acks, s, type, length);
} }
unacknowledgedIncomingPacketCount = 0; unacknowledgedIncomingPacketCount = 0;
@ -1033,6 +1056,6 @@ void VoIPController::WritePacketHeader(uint32_t pseq, BufferOutputStream *s, uns
{ {
recentOutgoingPackets.erase(recentOutgoingPackets.begin()); recentOutgoingPackets.erase(recentOutgoingPackets.begin());
} }
lastSentSeq = pseq; manager.setLastSentSeq(pseq);
//LOGI("packet header size %d", s->GetLength()); //LOGI("packet header size %d", s->GetLength());
} }

View File

@ -49,7 +49,7 @@ void VoIPController::UpdateReliablePackets()
if (GetCurrentTime() - qp->lastSentTime >= qp->retryInterval) if (GetCurrentTime() - qp->lastSentTime >= qp->retryInterval)
{ {
messageThread.Post(std::bind(&VoIPController::UpdateReliablePackets, this), qp->retryInterval); messageThread.Post(std::bind(&VoIPController::UpdateReliablePackets, this), qp->retryInterval);
uint32_t seq = nextLocalSeq(); uint32_t seq = packetManager.nextLocalSeq();
qp->seqs.Add(seq); qp->seqs.Add(seq);
qp->lastSentTime = GetCurrentTime(); qp->lastSentTime = GetCurrentTime();
//LOGD("Sending queued packet, seq=%u, type=%u, len=%u", seq, qp.type, qp.data.Length()); //LOGD("Sending queued packet, seq=%u, type=%u, len=%u", seq, qp.type, qp.data.Length());
@ -92,3 +92,17 @@ void VoIPController::handleReliablePackets()
++it; ++it;
} }
} }
bool VoIPController::WasOutgoingPacketAcknowledged(uint32_t seq, bool checkAll)
{
bool res = getBestPacketManager().wasLocalAcked(seq);
if (res || !checkAll)
{
return res;
}
RecentOutgoingPacket *pkt = GetRecentOutgoingPacket(seq);
if (!pkt)
return false;
return pkt->ackTime != 0.0;
}

View File

@ -11,7 +11,7 @@
using namespace tgvoip; using namespace tgvoip;
using namespace tgvoip::video; using namespace tgvoip::video;
VideoPacketSender::VideoPacketSender(VoIPController *controller, VideoSource *videoSource, std::shared_ptr<VoIPController::Stream> stream) : PacketSender(controller), stm(stream) VideoPacketSender::VideoPacketSender(VoIPController *controller, VideoSource *videoSource, const std::shared_ptr<VoIPController::Stream> &stream) : PacketSender(controller, stream)
{ {
SetSource(videoSource); SetSource(videoSource);
} }
@ -110,13 +110,13 @@ void VideoPacketSender::SetSource(VideoSource *source)
uint32_t bitrate = videoCongestionControl.GetBitrate(); uint32_t bitrate = videoCongestionControl.GetBitrate();
currentVideoBitrate = bitrate; currentVideoBitrate = bitrate;
source->SetBitrate(bitrate); source->SetBitrate(bitrate);
source->Reset(stm->codec, stm->resolution = GetVideoResolutionForCurrentBitrate()); source->Reset(stream->codec, stream->resolution = GetVideoResolutionForCurrentBitrate());
source->Start(); source->Start();
source->SetCallback(std::bind(&VideoPacketSender::SendFrame, this, placeholders::_1, placeholders::_2, placeholders::_3)); source->SetCallback(std::bind(&VideoPacketSender::SendFrame, this, placeholders::_1, placeholders::_2, placeholders::_3));
source->SetStreamStateCallback([this](bool paused) { source->SetStreamStateCallback([this](bool paused) {
stm->paused = paused; stream->paused = paused;
GetMessageThread().Post([this] { GetMessageThread().Post([this] {
SendStreamFlags(*stm); SendStreamFlags(*stream);
}); });
}); });
} }
@ -141,13 +141,13 @@ void VideoPacketSender::SendFrame(const Buffer &_frame, uint32_t flags, uint32_t
source->SetBitrate(bitrate); source->SetBitrate(bitrate);
} }
int resolutionFromBitrate = GetVideoResolutionForCurrentBitrate(); int resolutionFromBitrate = GetVideoResolutionForCurrentBitrate();
if (resolutionFromBitrate != stm->resolution && currentTime - lastVideoResolutionChangeTime > 3.0 && currentTime - sourceChangeTime > 10.0) if (resolutionFromBitrate != stream->resolution && currentTime - lastVideoResolutionChangeTime > 3.0 && currentTime - sourceChangeTime > 10.0)
{ {
LOGI("Changing video resolution: %d -> %d", stm->resolution, resolutionFromBitrate); LOGI("Changing video resolution: %d -> %d", stream->resolution, resolutionFromBitrate);
stm->resolution = resolutionFromBitrate; stream->resolution = resolutionFromBitrate;
GetMessageThread().Post([this, resolutionFromBitrate] { GetMessageThread().Post([this, resolutionFromBitrate] {
source->Reset(stm->codec, resolutionFromBitrate); source->Reset(stream->codec, resolutionFromBitrate);
stm->csdIsValid = false; stream->csdIsValid = false;
}); });
lastVideoResolutionChangeTime = currentTime; lastVideoResolutionChangeTime = currentTime;
return; return;
@ -167,18 +167,18 @@ void VideoPacketSender::SendFrame(const Buffer &_frame, uint32_t flags, uint32_t
} }
uint32_t pts = videoFrameCount++; uint32_t pts = videoFrameCount++;
bool csdInvalidated = !stm->csdIsValid; bool csdInvalidated = !stream->csdIsValid;
if (!stm->csdIsValid) if (!stream->csdIsValid)
{ {
vector<Buffer> &csd = source->GetCodecSpecificData(); vector<Buffer> &csd = source->GetCodecSpecificData();
stm->codecSpecificData.clear(); stream->codecSpecificData.clear();
for (Buffer &b : csd) for (Buffer &b : csd)
{ {
stm->codecSpecificData.push_back(Buffer::CopyOf(b)); stream->codecSpecificData.push_back(Buffer::CopyOf(b));
} }
stm->csdIsValid = true; stream->csdIsValid = true;
stm->width = source->GetFrameWidth(); stream->width = source->GetFrameWidth();
stm->height = source->GetFrameHeight(); stream->height = source->GetFrameHeight();
//SendStreamCSD(); //SendStreamCSD();
} }
@ -186,13 +186,13 @@ void VideoPacketSender::SendFrame(const Buffer &_frame, uint32_t flags, uint32_t
if (flags & VIDEO_FRAME_FLAG_KEYFRAME) if (flags & VIDEO_FRAME_FLAG_KEYFRAME)
{ {
BufferOutputStream os(256); BufferOutputStream os(256);
os.WriteInt16((int16_t)stm->width); os.WriteInt16((int16_t)stream->width);
os.WriteInt16((int16_t)stm->height); os.WriteInt16((int16_t)stream->height);
unsigned char sizeAndFlag = (unsigned char)stm->codecSpecificData.size(); unsigned char sizeAndFlag = (unsigned char)stream->codecSpecificData.size();
if (csdInvalidated) if (csdInvalidated)
sizeAndFlag |= 0x80; sizeAndFlag |= 0x80;
os.WriteByte(sizeAndFlag); os.WriteByte(sizeAndFlag);
for (Buffer &b : stm->codecSpecificData) for (Buffer &b : stream->codecSpecificData)
{ {
assert(b.Length() < 255); assert(b.Length() < 255);
os.WriteByte(static_cast<unsigned char>(b.Length())); os.WriteByte(static_cast<unsigned char>(b.Length()));
@ -226,7 +226,7 @@ void VideoPacketSender::SendFrame(const Buffer &_frame, uint32_t flags, uint32_t
} }
unsigned char pflags = STREAM_DATA_FLAG_LEN16; unsigned char pflags = STREAM_DATA_FLAG_LEN16;
//pflags |= STREAM_DATA_FLAG_HAS_MORE_FLAGS; //pflags |= STREAM_DATA_FLAG_HAS_MORE_FLAGS;
pkt.WriteByte((unsigned char)(stm->id | pflags)); // streamID + flags pkt.WriteByte((unsigned char)(stream->id | pflags)); // streamID + flags
int16_t lengthAndFlags = static_cast<int16_t>(len & 0x7FF); int16_t lengthAndFlags = static_cast<int16_t>(len & 0x7FF);
if (segmentCount > 1) if (segmentCount > 1)
lengthAndFlags |= STREAM_DATA_XFLAG_FRAGMENTED; lengthAndFlags |= STREAM_DATA_XFLAG_FRAGMENTED;
@ -302,7 +302,7 @@ void VideoPacketSender::SendFrame(const Buffer &_frame, uint32_t flags, uint32_t
fecFrameCount = 0; fecFrameCount = 0;
LOGV("FEC packet length: %u", (unsigned int)fecPacket.Length()); LOGV("FEC packet length: %u", (unsigned int)fecPacket.Length());
BufferOutputStream out(1500); BufferOutputStream out(1500);
out.WriteByte(stm->id); out.WriteByte(stream->id);
out.WriteByte((uint8_t)frameSeq); out.WriteByte((uint8_t)frameSeq);
out.WriteByte(FEC_SCHEME_XOR); out.WriteByte(FEC_SCHEME_XOR);
out.WriteByte(3); out.WriteByte(3);
@ -329,7 +329,7 @@ int VideoPacketSender::GetVideoResolutionForCurrentBitrate()
if (VoIPController::GetCurrentTime() - sourceChangeTime > 10.0) if (VoIPController::GetCurrentTime() - sourceChangeTime > 10.0)
{ {
// TODO: probably move this to server config // TODO: probably move this to server config
if (stm->codec == CODEC_AVC || stm->codec == CODEC_VP8) if (stream->codec == CODEC_AVC || stream->codec == CODEC_VP8)
{ {
if (currentVideoBitrate > 400000) if (currentVideoBitrate > 400000)
{ {
@ -344,7 +344,7 @@ int VideoPacketSender::GetVideoResolutionForCurrentBitrate()
resolutionFromBitrate = INIT_VIDEO_RES_360; resolutionFromBitrate = INIT_VIDEO_RES_360;
} }
} }
else if (stm->codec == CODEC_HEVC || stm->codec == CODEC_VP9) else if (stream->codec == CODEC_HEVC || stream->codec == CODEC_VP9)
{ {
if (currentVideoBitrate > 400000) if (currentVideoBitrate > 400000)
{ {
@ -366,9 +366,9 @@ int VideoPacketSender::GetVideoResolutionForCurrentBitrate()
} }
else else
{ {
if (stm->codec == CODEC_AVC || stm->codec == CODEC_VP8) if (stream->codec == CODEC_AVC || stream->codec == CODEC_VP8)
resolutionFromBitrate = INIT_VIDEO_RES_720; resolutionFromBitrate = INIT_VIDEO_RES_720;
else if (stm->codec == CODEC_HEVC || stm->codec == CODEC_VP9) else if (stream->codec == CODEC_HEVC || stream->codec == CODEC_VP9)
resolutionFromBitrate = INIT_VIDEO_RES_1080; resolutionFromBitrate = INIT_VIDEO_RES_1080;
} }
return std::min(peerMaxVideoResolution, resolutionFromBitrate); return std::min(peerMaxVideoResolution, resolutionFromBitrate);

View File

@ -21,7 +21,7 @@ class VideoSource;
class VideoPacketSender : public PacketSender class VideoPacketSender : public PacketSender
{ {
public: public:
VideoPacketSender(VoIPController *controller, VideoSource *videoSource, std::shared_ptr<VoIPController::Stream> stream); VideoPacketSender(VoIPController *controller, VideoSource *videoSource, const std::shared_ptr<VoIPController::Stream> &stream);
virtual ~VideoPacketSender(); virtual ~VideoPacketSender();
virtual void PacketAcknowledged(uint32_t seq, double sendTime, double ackTime, uint8_t type, uint32_t size) override; virtual void PacketAcknowledged(uint32_t seq, double sendTime, double ackTime, uint8_t type, uint32_t size) override;
virtual void PacketLost(uint32_t seq, uint8_t type, uint32_t size) override; virtual void PacketLost(uint32_t seq, uint8_t type, uint32_t size) override;
@ -45,7 +45,6 @@ private:
int GetVideoResolutionForCurrentBitrate(); int GetVideoResolutionForCurrentBitrate();
VideoSource *source = NULL; VideoSource *source = NULL;
std::shared_ptr<VoIPController::Stream> stm;
video::ScreamCongestionController videoCongestionControl; video::ScreamCongestionController videoCongestionControl;
double firstVideoFrameTime = 0.0; double firstVideoFrameTime = 0.0;
uint32_t videoFrameCount = 0; uint32_t videoFrameCount = 0;