1
0
mirror of https://github.com/danog/libtgvoip.git synced 2024-11-26 20:24:38 +01:00

Write very simple nack logic

This commit is contained in:
Daniil Gentili 2020-01-28 21:45:56 +01:00
parent 0fb5c23b77
commit 202d4fa94d
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
15 changed files with 158 additions and 90 deletions

View File

@ -616,6 +616,10 @@ private:
bool micMuted = false;
uint32_t maxBitrate;
// Recent ougoing packets
std::vector<RecentOutgoingPacket> recentOutgoingPackets;
//
std::vector<std::shared_ptr<Stream>> outgoingStreams;
std::vector<std::shared_ptr<Stream>> incomingStreams;

View File

@ -599,7 +599,7 @@ std::string VoIPGroupController::GetDebugString()
(int)(conctl.GetAverageRTT() * 1000), (int)(conctl.GetMinimumRTT() * 1000),
int(conctl.GetInflightDataSize()), int(conctl.GetCongestionWindow()),
keyFingerprint[0], keyFingerprint[1], keyFingerprint[2], keyFingerprint[3], keyFingerprint[4], keyFingerprint[5], keyFingerprint[6], keyFingerprint[7],
lastSentSeq, peerAcks[0],
lastSentSeq, getLastAckedSeq(),
conctl.GetSendLossCount(), recvLossCount, encoder ? encoder->GetPacketLoss() : 0,
encoder ? (encoder->GetBitrate() / 1000) : 0,
(long long unsigned int)(stats.bytesSentMobile + stats.bytesSentWifi),

View File

@ -1,11 +1,11 @@
#include "AudioPacketSender.h"
#include "../PrivateDefines.h"
using namespace tgvoip;
AudioPacketSender::AudioPacketSender(VoIPController *controller, const std::shared_ptr<OpusEncoder> &encoder, const std::shared_ptr<VoIPController::Stream> &stream) : PacketSender(controller), encoder(encoder), stream(stream)
{
SetSource(encoder);
SetSource(encoder);
}
AudioPacketSender::~AudioPacketSender()
@ -105,17 +105,24 @@ void AudioPacketSender::SendFrame(unsigned char *data, size_t len, unsigned char
//conctl.PacketSent(p.seq, p.len);
//shared_ptr<VoIPController::Stream> outgoingAudioStream = GetStreamByType(STREAM_TYPE_AUDIO, false);
double rtt = LastRtt();
rtt = !rtt || rtt > 0.3 ? 0.5 : rtt; // Tweak this (a lot) later
if (PeerVersion() < PROTOCOL_RELIABLE)
{
double rtt = LastRtt();
double timeout = 0; //(outgoingAudioStream && outgoingAudioStream->jitterBuffer ? outgoingAudioStream->jitterBuffer->GetTimeoutWindow() : 0) - rtt;
LOGE("TIMEOUT %lf", timeout + rtt);
rtt = !rtt || rtt > 0.3 ? 0.5 : rtt; // Tweak this (a lot) later
timeout = timeout <= 0 ? rtt : timeout;
SendPacketReliably(PKT_STREAM_DATA, pkt.GetBuffer(), pkt.GetLength(), rtt, timeout, 10); // Todo Optimize RTT
double timeout = 0; //(outgoingAudioStream && outgoingAudioStream->jitterBuffer ? outgoingAudioStream->jitterBuffer->GetTimeoutWindow() : 0) - rtt;
LOGE("TIMEOUT %lf", timeout + rtt);
timeout = timeout <= 0 ? rtt : timeout;
SendPacketReliably(PKT_STREAM_DATA, pkt.GetBuffer(), pkt.GetLength(), rtt, timeout, 10); // Todo Optimize RTT
}
else
{
}
//SendOrEnqueuePacket(move(p));
if (PeerVersion() < 7 && secondaryLen && shittyInternetMode)
{

View File

@ -66,9 +66,9 @@ void VoIPController::InitializeTimers()
statsDump << std::setprecision(3)
<< GetCurrentTime() - connectionInitTime
<< endpoints.at(currentEndpoint).rtts[0]
<< lastRemoteSeq
<< getLastRemoteSeq()
<< (uint32_t)getLocalSeq()
<< peerAcks[0]
<< getLastAckedSeq()
<< recvLossCount
<< conctl.GetSendLossCount()
<< (int)conctl.GetInflightDataSize()

View File

@ -237,7 +237,7 @@ string VoIPController::GetDebugString()
int(conctl.GetInflightDataSize()), int(conctl.GetCongestionWindow()),
keyFingerprint[0], keyFingerprint[1], keyFingerprint[2], keyFingerprint[3], keyFingerprint[4], keyFingerprint[5], keyFingerprint[6], keyFingerprint[7],
useMTProto2 ? " (MTProto2.0)" : "",
lastSentSeq, peerAcks[0], lastRemoteSeq,
lastSentSeq, getLastAckedSeq(), getLastRemoteSeq(),
sendLosses, recvLossCount, encoder ? encoder->GetPacketLoss() : 0,
encoder ? (encoder->GetBitrate() / 1000) : 0,
static_cast<unsigned int>(unsentStreamPackets),

View File

@ -12,8 +12,8 @@
using namespace tgvoip;
JitterBuffer::JitterBuffer(uint32_t _step) : step(_step),
slots{}
JitterBuffer::JitterBuffer(uint32_t step) : step(step),
slots{}
{
if (step < 30)
{
@ -83,7 +83,6 @@ void JitterBuffer::HandleInput(unsigned char *data, size_t len, uint32_t timesta
//LOGV("in, ts=%d, ec=%d", timestamp, isEC);
}
void JitterBuffer::PutInternal(jitter_packet_t *pkt, bool overwriteExisting)
{
if (pkt->size > JITTER_SLOT_SIZE)
@ -114,13 +113,14 @@ void JitterBuffer::PutInternal(jitter_packet_t *pkt, bool overwriteExisting)
outstandingDelayChange = 0;
nextFetchTimestamp = static_cast<int64_t>(static_cast<int64_t>(pkt->timestamp) - step * minDelay);
first = true;
LOGI("jitter: resyncing, next timestamp = %lld (step=%d, minDelay=%f)", (long long int)nextFetchTimestamp, step, minDelay);
LOGI("jitter: resyncing, next timeDecodeNextFramestamp = %lld (step=%d, minDelay=%f)", (long long int)nextFetchTimestamp, step, minDelay);
}
for (i = 0; i < JITTER_SLOT_COUNT; i++)
{
if (!slots[i].buffer.IsEmpty())
{
// Clear packets older than the last fetched packet
if (slots[i].timestamp < nextFetchTimestamp - 1)
{
slots[i].buffer = Buffer();
@ -237,7 +237,7 @@ size_t JitterBuffer::HandleOutput(unsigned char *buffer, size_t len, int offsetI
if (GetCurrentDelay() > 5)
{
LOGW("jitter: delay too big upon start (%u), dropping packets", delay);
for (;delay > GetMinPacketCount(); --delay)
for (; delay > GetMinPacketCount(); --delay)
{
for (int i = 0; i < JITTER_SLOT_COUNT; i++)
{
@ -445,7 +445,11 @@ void JitterBuffer::Tick()
lastMeasuredJitter = stddev;
lastMeasuredDelay = stddevDelay;
//LOGV("stddev=%.3f, avg=%.3f, ndelay=%d, dontDec=%u", stddev, avgdev, stddevDelay, dontDecMinDelay);
if (dontChangeDelay == 0)
if (dontChangeDelay)
{
--dontChangeDelay;
}
else
{
if (avgDelay > minDelay + 0.5)
{
@ -458,8 +462,6 @@ void JitterBuffer::Tick()
dontChangeDelay += 10;
}
}
if (dontChangeDelay > 0)
dontChangeDelay--;
//LOGV("jitter: avg delay=%d, delay=%d, late16=%.1f, dontDecMinDelay=%d", avgDelay, delayHistory[0], avgLate16, dontDecMinDelay);
/*if(!adjustingDelay) {

View File

@ -44,6 +44,15 @@ public:
double GetTimeoutWindow();
// Get minimum refetchable seq for (reverse) NACK logic.
// Any sequence numbers smaller than this cannot possibly arrive in time for playing.
inline uint32_t GetSeqTooLate(double rtt)
{
// The absolute minimum time(stamp) that will (barely) be accepted by the jitter buffer in time + RTT time
// Then convert timestamp into a seqno: remember, in protocol >= PROTOCOL_RELIABLE, seq = ts * step
return ((nextFetchTimestamp + (rtt * 1000)) / static_cast<uint64_t>(step)) / 1000;
}
private:
struct jitter_packet_t
{

View File

@ -10,9 +10,9 @@ double VoIPController::GetAverageRTT()
{
ENFORCE_MSG_THREAD;
if (lastSentSeq >= peerAcks[0])
if (lastSentSeq >= getLastAckedSeq())
{
uint32_t diff = lastSentSeq - peerAcks[0];
uint32_t diff = lastSentSeq - getLastAckedSeq();
//LOGV("rtt diff=%u", diff);
if (diff < 32)
{

View File

@ -109,7 +109,7 @@ void VoIPController::legacyWritePacketHeader(uint32_t pseq, uint32_t acks, Buffe
{
s->WriteBytes(callID, 16);
}
s->WriteInt32(lastRemoteSeq);
s->WriteInt32(getLastRemoteSeq());
s->WriteInt32(pseq);
s->WriteInt32(acks);
if (pflags & PFLAG_HAS_PROTO)
@ -157,7 +157,7 @@ void VoIPController::legacyWritePacketHeader(uint32_t pseq, uint32_t acks, Buffe
}
}
s->WriteByte(type);
s->WriteInt32(lastRemoteSeq);
s->WriteInt32(getLastRemoteSeq());
s->WriteInt32(pseq);
s->WriteInt32(acks);
if (peerVersion >= 6)

View File

@ -0,0 +1,8 @@
#include "Nack.h"
void HandleJitterInput(uint32_t timestamp)
{
}
void HandleJitterOutput(uint32_t timestamp)
{
}

View File

@ -0,0 +1,17 @@
namespace tgvoip
{
// Handle nack window
class Nack
{
public:
Nack() = default;
virtual ~Nack() = default;
// Do not provide seq, as there is a direct correlation between seqs and timestamps (in protocol >= 10)
void HandleJitterInput(uint32_t timestamp);
void HandleJitterOutput(uint32_t timestamp);
private:
};
} // namespace tgvoip

View File

@ -84,9 +84,9 @@ void VoIPController::SendPacket(unsigned char *data, size_t len, Endpoint &ep, P
}
//LOGV("Sending %d bytes to %s:%d", out.GetLength(), ep.address.ToString().c_str(), ep.port);
//#ifdef LOG_PACKETS
//#ifdef LOG_PACKETS
LOGV("Sending: to=%s:%u, seq=%u, length=%u, type=%s", ep.GetAddress().ToString().c_str(), ep.port, srcPacket.seq, (unsigned int)out.GetLength(), GetPacketTypeString(srcPacket.type).c_str());
//#endif
//#endif
rawSendQueue.Put(
RawPendingOutgoingPacket{
@ -268,7 +268,8 @@ void VoIPController::TrySendOutgoingPackets()
bool VoIPController::WasOutgoingPacketAcknowledged(uint32_t seq, bool checkAll)
{
bool res = wasLocalAcked(seq);
if (res || !checkAll) {
if (res || !checkAll)
{
return res;
}
@ -375,7 +376,6 @@ void VoIPController::SendRelayPings()
}
}
void VoIPController::SendNopPacket()
{
if (state != STATE_ESTABLISHED)
@ -449,7 +449,6 @@ Endpoint &VoIPController::GetEndpointByType(const Endpoint::Type type)
throw out_of_range("no endpoint");
}
void VoIPController::SendExtra(Buffer &data, unsigned char type)
{
ENFORCE_MSG_THREAD;

View File

@ -4,25 +4,22 @@
using namespace tgvoip;
using namespace std;
PacketManager::PacketManager() : recentIncomingSeqs(MAX_RECENT_PACKETS) {}
void PacketManager::ackLocal(uint32_t ackId, uint32_t mask)
{
peerAcks[0] = ackId;
for (unsigned int i = 1; i <= 32; i++)
{
peerAcks[i] = (mask >> (32 - i)) & 1 ? ackId - i : 0;
}
lastAckedSeq = ackId;
lastAckedSeqsMask = mask;
}
bool PacketManager::wasLocalAcked(uint32_t seq)
{
if (seqgt(seq, peerAcks[0]))
if (seq == lastAckedSeq)
return true;
if (seq > lastAckedSeq)
return false;
uint32_t distance = peerAcks[0] - seq;
if (distance >= 0 && distance <= 32)
uint32_t distance = lastAckedSeq - seq;
if (distance > 0 && distance <= 32)
{
return peerAcks[distance];
return (lastAckedSeqsMask >> (32 - distance)) & 1;
}
return false;
@ -30,19 +27,29 @@ bool PacketManager::wasLocalAcked(uint32_t seq)
bool PacketManager::ackRemoteSeq(uint32_t ackId)
{
// Duplicate and moving window check
if (seqgt(ackId, lastRemoteSeq - MAX_RECENT_PACKETS))
{
if (find(recentIncomingSeqs.begin(), recentIncomingSeqs.end(), ackId) != recentIncomingSeqs.end())
if (ackId == lastRemoteSeq)
{
LOGW("Received duplicated packet for seq %u", ackId);
return false;
}
recentIncomingSeqs.push_front(ackId);
recentIncomingSeqs.pop_back();
if (seqgt(ackId, lastRemoteSeq))
else if (ackId > lastRemoteSeq)
{
lastRemoteSeqsMask = (lastRemoteSeqsMask << 1) | 1;
lastRemoteSeqsMask <<= (ackId - lastRemoteSeq) - 1;
lastRemoteSeq = ackId;
}
else
{
uint32_t pos = 1 << ((lastRemoteSeq - ackId) - 1);
if (lastRemoteSeqsMask & pos)
{
LOGW("Received duplicated packet for seq %u", ackId);
return false;
}
lastRemoteSeqsMask |= pos;
}
}
else
{
@ -50,20 +57,4 @@ bool PacketManager::ackRemoteSeq(uint32_t ackId)
return false;
}
return true;
}
uint32_t PacketManager::getRemoteAckMask()
{
uint32_t acks = 0;
uint32_t distance;
for (const uint32_t &seq : recentIncomingSeqs)
{
if (!seq)
break;
distance = lastRemoteSeq - seq;
if (distance > 0 && distance <= 32)
{
acks |= (1 << (32 - distance));
}
}
return acks;
}

View File

@ -1,6 +1,6 @@
#pragma once
#include <atomic>
#include <list>
#include <bitset>
#include "PacketStructs.h"
namespace tgvoip
@ -19,16 +19,13 @@ inline bool seqgte(uint32_t s1, uint32_t s2)
}
// Local and remote packet history management
struct PacketManager
class PacketManager
{
PacketManager();
public:
PacketManager() = default;
virtual ~PacketManager() = default;
/* Local seqno ack */
// Ack specified local seq + up to 32 seqs ago, specified by mask
void ackLocal(uint32_t ackId, uint32_t mask);
// Check if local seq was acked
bool wasLocalAcked(uint32_t seq);
/* Local seqno generation */
// Get next local seqno
inline uint32_t nextLocalSeq()
@ -42,32 +39,66 @@ struct PacketManager
return seq;
}
inline uint32_t getLastSentSeq()
{
return lastSentSeq;
}
// Seqno of last sent local packet
uint32_t lastSentSeq = 0;
private:
// Stream-specific local seqno
std::atomic<uint32_t> seq = ATOMIC_VAR_INIT(1);
public:
// Recent ougoing packets
std::vector<RecentOutgoingPacket> recentOutgoingPackets;
/* Local seqno acks */
// Seqno of last sent local packet
uint32_t lastSentSeq = 0;
// Ack specified local seq + up to 32 seqs ago, specified by mask
void ackLocal(uint32_t ackId, uint32_t mask);
// Status list of acked local seqnos, starting from the seq explicitly present in the packet + up to 32 seqs ago
std::array<uint32_t, 33> peerAcks{0};
// Check if local seq was acked
bool wasLocalAcked(uint32_t seq);
inline uint32_t getLastAckedSeq() const
{
return lastAckedSeq;
}
private:
// Seqno of last acked packet
uint32_t lastAckedSeq = 0;
// Status list of acked local seqnos, excluding the seq explicitly present in the packet, up to 32 seqs ago
uint32_t lastAckedSeqsMask = 0;
public:
/* Remote seqno ack */
// Ack specified remote seq, returns false if too old or dupe
bool ackRemoteSeq(uint32_t ackId);
// Get ack mask for remote packets
uint32_t getRemoteAckMask();
// Ack remote seqs older than the specified seq
inline void ackRemoteSeqsOlderThan(uint32_t seq)
{
lastRemoteSeqsMask |= 0xFFFFFFFF << (lastRemoteSeq - seq);
}
// Get ack mask for remote packets
inline uint32_t getRemoteAckMask()
{
return lastRemoteSeqsMask;
}
inline uint32_t getLastRemoteSeq()
{
return lastRemoteSeq;
}
private:
// Seqno of last received remote packet
uint32_t lastRemoteSeq = 0;
private: // Slowly encapsulate all the things
// Recent incoming remote packets
std::list<uint32_t> recentIncomingSeqs;
uint32_t lastRemoteSeqsMask;
};
} // namespace tgvoip

View File

@ -215,9 +215,9 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
recvTS = in.ReadUInt32();
}
if (seqgt(ackId, peerAcks[0]))
if (seqgt(ackId, getLastAckedSeq()))
{
if (waitingForAcks && peerAcks[0] >= firstSentPing)
if (waitingForAcks && getLastAckedSeq() >= firstSentPing)
{
rttHistory.Reset();
waitingForAcks = false;
@ -260,7 +260,7 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
{
for (auto x = currentExtras.begin(); x != currentExtras.end();)
{
if (x->firstContainingSeq != 0 && seqgte(peerAcks[0], x->firstContainingSeq))
if (x->firstContainingSeq != 0 && seqgte(getLastAckedSeq(), x->firstContainingSeq))
{
LOGV("Peer acknowledged extra type %u length %u", x->type, (unsigned int)x->data.Length());
ProcessAcknowledgedOutgoingExtra(*x);
@ -277,7 +277,7 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
Endpoint &_currentEndpoint = endpoints.at(currentEndpoint);
if (srcEndpoint.id != currentEndpoint && srcEndpoint.IsReflector() && (_currentEndpoint.IsP2P() || _currentEndpoint.averageRTT == 0))
{
if (seqgt(lastSentSeq - 32, peerAcks[0]))
if (seqgt(lastSentSeq - 32, getLastAckedSeq()))
{
currentEndpoint = srcEndpoint.id;
_currentEndpoint = srcEndpoint;
@ -312,8 +312,8 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
LOGV("Received: from=%s:%u, seq=%u, length=%u, type=%s", srcEndpoint.GetAddress().ToString().c_str(), srcEndpoint.port, pseq, (unsigned int)packet.data.Length(), GetPacketTypeString(type).c_str());
//#endif
//LOGV("acks: %u -> %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf", peerAcks[0], remoteAcks[0], remoteAcks[1], remoteAcks[2], remoteAcks[3], remoteAcks[4], remoteAcks[5], remoteAcks[6], remoteAcks[7]);
//LOGD("recv: %u -> %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf", lastRemoteSeq, recvPacketTimes[0], recvPacketTimes[1], recvPacketTimes[2], recvPacketTimes[3], recvPacketTimes[4], recvPacketTimes[5], recvPacketTimes[6], recvPacketTimes[7]);
//LOGV("acks: %u -> %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf", getLastAckedSeq(), remoteAcks[0], remoteAcks[1], remoteAcks[2], remoteAcks[3], remoteAcks[4], remoteAcks[5], remoteAcks[6], remoteAcks[7]);
//LOGD("recv: %u -> %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf", getLastRemoteSeq(), recvPacketTimes[0], recvPacketTimes[1], recvPacketTimes[2], recvPacketTimes[3], recvPacketTimes[4], recvPacketTimes[5], recvPacketTimes[6], recvPacketTimes[7]);
//LOGI("RTT = %.3lf", GetAverageRTT());
//LOGV("Packet %u type is %d", pseq, type);
if (type == PKT_INIT)
@ -982,7 +982,7 @@ void VoIPController::WritePacketHeader(uint32_t pseq, BufferOutputStream *s, uns
if (peerVersion >= 8 || (!peerVersion && connectionMaxLayer >= 92))
{
s->WriteByte(type);
s->WriteInt32(lastRemoteSeq);
s->WriteInt32(getLastRemoteSeq());
s->WriteInt32(pseq);
s->WriteInt32(acks);