1
0
mirror of https://github.com/danog/libtgvoip.git synced 2024-11-30 04:39:03 +01:00

Move ack handling

This commit is contained in:
Daniil Gentili 2020-01-27 17:18:33 +01:00
parent f9bba1effa
commit 1005735298
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
15 changed files with 195 additions and 43 deletions

View File

@ -18,6 +18,7 @@ controller/net/Endpoint.cpp \
controller/audio/OpusDecoder.cpp \
controller/audio/OpusEncoder.cpp \
controller/net/PacketReassembler.cpp \
controller/protocol/Ack.cpp \
VoIPGroupController.cpp \
VoIPServerConfig.cpp \
audio/AudioIO.cpp \

View File

@ -816,7 +816,8 @@ am__libtgvoip_la_SOURCES_DIST = TgVoip.cpp VoIPController.cpp \
controller/net/NetworkSocket.cpp controller/net/Endpoint.cpp \
controller/audio/OpusDecoder.cpp \
controller/audio/OpusEncoder.cpp \
controller/net/PacketReassembler.cpp VoIPGroupController.cpp \
controller/net/PacketReassembler.cpp \
controller/protocol/Ack.cpp VoIPGroupController.cpp \
VoIPServerConfig.cpp audio/AudioIO.cpp audio/AudioInput.cpp \
audio/AudioOutput.cpp audio/Resampler.cpp \
audio/AudioInputTester.cpp os/posix/NetworkSocketPosix.cpp \
@ -1755,9 +1756,9 @@ am__objects_12 = TgVoip.lo VoIPController.lo tools/Buffers.lo \
controller/net/NetworkSocket.lo controller/net/Endpoint.lo \
controller/audio/OpusDecoder.lo \
controller/audio/OpusEncoder.lo \
controller/net/PacketReassembler.lo VoIPGroupController.lo \
VoIPServerConfig.lo audio/AudioIO.lo audio/AudioInput.lo \
audio/AudioOutput.lo audio/Resampler.lo \
controller/net/PacketReassembler.lo controller/protocol/Ack.lo \
VoIPGroupController.lo VoIPServerConfig.lo audio/AudioIO.lo \
audio/AudioInput.lo audio/AudioOutput.lo audio/Resampler.lo \
audio/AudioInputTester.lo os/posix/NetworkSocketPosix.lo \
video/VideoSource.lo video/VideoRenderer.lo \
video/VideoPacketSender.lo video/VideoFEC.lo \
@ -2070,6 +2071,7 @@ am__depfiles_remade = ./$(DEPDIR)/TgVoip.Plo \
controller/net/$(DEPDIR)/JitterBuffer.Plo \
controller/net/$(DEPDIR)/NetworkSocket.Plo \
controller/net/$(DEPDIR)/PacketReassembler.Plo \
controller/protocol/$(DEPDIR)/Ack.Plo \
os/darwin/$(DEPDIR)/AudioInputAudioUnit.Plo \
os/darwin/$(DEPDIR)/AudioInputAudioUnitOSX.Plo \
os/darwin/$(DEPDIR)/AudioOutputAudioUnit.Plo \
@ -2385,7 +2387,8 @@ SRC = TgVoip.cpp VoIPController.cpp tools/Buffers.cpp \
controller/net/NetworkSocket.cpp controller/net/Endpoint.cpp \
controller/audio/OpusDecoder.cpp \
controller/audio/OpusEncoder.cpp \
controller/net/PacketReassembler.cpp VoIPGroupController.cpp \
controller/net/PacketReassembler.cpp \
controller/protocol/Ack.cpp VoIPGroupController.cpp \
VoIPServerConfig.cpp audio/AudioIO.cpp audio/AudioInput.cpp \
audio/AudioOutput.cpp audio/Resampler.cpp \
audio/AudioInputTester.cpp os/posix/NetworkSocketPosix.cpp \
@ -2551,6 +2554,14 @@ controller/audio/OpusEncoder.lo: controller/audio/$(am__dirstamp) \
controller/audio/$(DEPDIR)/$(am__dirstamp)
controller/net/PacketReassembler.lo: controller/net/$(am__dirstamp) \
controller/net/$(DEPDIR)/$(am__dirstamp)
controller/protocol/$(am__dirstamp):
@$(MKDIR_P) controller/protocol
@: > controller/protocol/$(am__dirstamp)
controller/protocol/$(DEPDIR)/$(am__dirstamp):
@$(MKDIR_P) controller/protocol/$(DEPDIR)
@: > controller/protocol/$(DEPDIR)/$(am__dirstamp)
controller/protocol/Ack.lo: controller/protocol/$(am__dirstamp) \
controller/protocol/$(DEPDIR)/$(am__dirstamp)
audio/$(am__dirstamp):
@$(MKDIR_P) audio
@: > audio/$(am__dirstamp)
@ -3705,6 +3716,8 @@ mostlyclean-compile:
-rm -f controller/media/*.lo
-rm -f controller/net/*.$(OBJEXT)
-rm -f controller/net/*.lo
-rm -f controller/protocol/*.$(OBJEXT)
-rm -f controller/protocol/*.lo
-rm -f os/darwin/*.$(OBJEXT)
-rm -f os/darwin/*.lo
-rm -f os/linux/*.$(OBJEXT)
@ -4015,6 +4028,7 @@ distclean-compile:
@AMDEP_TRUE@@am__include@ @am__quote@controller/net/$(DEPDIR)/JitterBuffer.Plo@am__quote@ # am--include-marker
@AMDEP_TRUE@@am__include@ @am__quote@controller/net/$(DEPDIR)/NetworkSocket.Plo@am__quote@ # am--include-marker
@AMDEP_TRUE@@am__include@ @am__quote@controller/net/$(DEPDIR)/PacketReassembler.Plo@am__quote@ # am--include-marker
@AMDEP_TRUE@@am__include@ @am__quote@controller/protocol/$(DEPDIR)/Ack.Plo@am__quote@ # am--include-marker
@AMDEP_TRUE@@am__include@ @am__quote@os/darwin/$(DEPDIR)/AudioInputAudioUnit.Plo@am__quote@ # am--include-marker
@AMDEP_TRUE@@am__include@ @am__quote@os/darwin/$(DEPDIR)/AudioInputAudioUnitOSX.Plo@am__quote@ # am--include-marker
@AMDEP_TRUE@@am__include@ @am__quote@os/darwin/$(DEPDIR)/AudioOutputAudioUnit.Plo@am__quote@ # am--include-marker
@ -4223,6 +4237,7 @@ clean-libtool:
-rm -rf controller/audio/.libs controller/audio/_libs
-rm -rf controller/media/.libs controller/media/_libs
-rm -rf controller/net/.libs controller/net/_libs
-rm -rf controller/protocol/.libs controller/protocol/_libs
-rm -rf os/darwin/.libs os/darwin/_libs
-rm -rf os/linux/.libs os/linux/_libs
-rm -rf os/posix/.libs os/posix/_libs
@ -4529,6 +4544,8 @@ distclean-generic:
-rm -f controller/media/$(am__dirstamp)
-rm -f controller/net/$(DEPDIR)/$(am__dirstamp)
-rm -f controller/net/$(am__dirstamp)
-rm -f controller/protocol/$(DEPDIR)/$(am__dirstamp)
-rm -f controller/protocol/$(am__dirstamp)
-rm -f os/darwin/$(DEPDIR)/$(am__dirstamp)
-rm -f os/darwin/$(am__dirstamp)
-rm -f os/linux/$(DEPDIR)/$(am__dirstamp)
@ -4900,6 +4917,7 @@ distclean: distclean-am
-rm -f controller/net/$(DEPDIR)/JitterBuffer.Plo
-rm -f controller/net/$(DEPDIR)/NetworkSocket.Plo
-rm -f controller/net/$(DEPDIR)/PacketReassembler.Plo
-rm -f controller/protocol/$(DEPDIR)/Ack.Plo
-rm -f os/darwin/$(DEPDIR)/AudioInputAudioUnit.Plo
-rm -f os/darwin/$(DEPDIR)/AudioInputAudioUnitOSX.Plo
-rm -f os/darwin/$(DEPDIR)/AudioOutputAudioUnit.Plo
@ -5268,6 +5286,7 @@ maintainer-clean: maintainer-clean-am
-rm -f controller/net/$(DEPDIR)/JitterBuffer.Plo
-rm -f controller/net/$(DEPDIR)/NetworkSocket.Plo
-rm -f controller/net/$(DEPDIR)/PacketReassembler.Plo
-rm -f controller/protocol/$(DEPDIR)/Ack.Plo
-rm -f os/darwin/$(DEPDIR)/AudioInputAudioUnit.Plo
-rm -f os/darwin/$(DEPDIR)/AudioInputAudioUnitOSX.Plo
-rm -f os/darwin/$(DEPDIR)/AudioOutputAudioUnit.Plo

View File

@ -39,6 +39,7 @@
#include "controller/audio/EchoCanceller.h"
#include "controller/net/CongestionControl.h"
#include "controller/net/NetworkSocket.h"
#include "controller/protocol/Ack.h"
#include "tools/Buffers.h"
#include "controller/net/PacketReassembler.h"
#include "tools/MessageThread.h"
@ -141,7 +142,7 @@ namespace video
class VideoPacketSender;
}
class VoIPController
class VoIPController : Ack
{
friend class VoIPGroupController;
friend class PacketSender;
@ -397,7 +398,7 @@ public:
void SetAudioOutputDuckingEnabled(bool enabled);
#endif
struct Stream
struct Stream : Ack
{
int32_t userID;
uint8_t id;
@ -414,6 +415,13 @@ public:
bool csdIsValid = false;
bool paused = false;
int resolution;
// Stream-specific seqno
std::atomic<uint32_t> seq = ATOMIC_VAR_INIT(1);
// Status list of acked seqnos, starting from the seq explicitly present in the packet + up to 32 seqs ago
std::array<uint32_t, 33> peerAcks{0};
unsigned int width = 0;
unsigned int height = 0;
uint16_t rotation = 0;
@ -659,14 +667,10 @@ private:
// Seqno of last received packet
uint32_t lastRemoteSeq = 0;
// Seqno of last sent packet acked by remote
uint32_t lastRemoteAckSeq = 0;
// Seqno of last sent packet
uint32_t lastSentSeq = 0;
// Status list of acked seqnos, starting from the seq explicitly present in the packet + up to 32 seqs ago
std::array<uint32_t, 33> peerAcks{0};
// Acks now handled in Ack
// Recent ougoing packets
std::vector<RecentOutgoingPacket> recentOutgoingPackets;

View File

@ -598,7 +598,7 @@ std::string VoIPGroupController::GetDebugString()
(int)(conctl.GetAverageRTT() * 1000), (int)(conctl.GetMinimumRTT() * 1000),
int(conctl.GetInflightDataSize()), int(conctl.GetCongestionWindow()),
keyFingerprint[0], keyFingerprint[1], keyFingerprint[2], keyFingerprint[3], keyFingerprint[4], keyFingerprint[5], keyFingerprint[6], keyFingerprint[7],
lastSentSeq, lastRemoteAckSeq,
lastSentSeq, peerAcks[0],
conctl.GetSendLossCount(), recvLossCount, encoder ? encoder->GetPacketLoss() : 0,
encoder ? (encoder->GetBitrate() / 1000) : 0,
(long long unsigned int)(stats.bytesSentMobile + stats.bytesSentWifi),

View File

@ -3,6 +3,7 @@
//
#pragma once
#include <array>
#define PKT_INIT 1
#define PKT_INIT_ACK 2
@ -29,7 +30,7 @@
enum ProtocolVersions
{
PROTOCOL_OLD = 9,
PROTOCOL_DANOG = 10
PROTOCOL_RELIABLE = 10
};
#define PROTOCOL_NAME 0x50567247 // "GrVP" in little endian (reversed here)

View File

@ -27,7 +27,7 @@ void VoIPController::InitializeTimers()
<< endpoints.at(currentEndpoint).rtts[0]
<< lastRemoteSeq
<< (uint32_t)seq
<< lastRemoteAckSeq
<< peerAcks[0]
<< recvLossCount
<< conctl.GetSendLossCount()
<< (int)conctl.GetInflightDataSize()

View File

@ -277,7 +277,7 @@ string VoIPController::GetDebugString()
int(conctl.GetInflightDataSize()), int(conctl.GetCongestionWindow()),
keyFingerprint[0], keyFingerprint[1], keyFingerprint[2], keyFingerprint[3], keyFingerprint[4], keyFingerprint[5], keyFingerprint[6], keyFingerprint[7],
useMTProto2 ? " (MTProto2.0)" : "",
lastSentSeq, lastRemoteAckSeq, lastRemoteSeq,
lastSentSeq, peerAcks[0], lastRemoteSeq,
sendLosses, recvLossCount, encoder ? encoder->GetPacketLoss() : 0,
encoder ? (encoder->GetBitrate() / 1000) : 0,
static_cast<unsigned int>(unsentStreamPackets),

View File

@ -0,0 +1,27 @@
#include "Ack.h"
#include "../PrivateDefines.h"
using namespace tgvoip;
using namespace std;
void Ack::ack(uint32_t ackId, uint32_t mask)
{
peerAcks[0] = ackId;
for (unsigned int i = 1; i <= 32; i++)
{
peerAcks[i] = (mask >> (32 - i)) & 1 ? ackId - i : 0;
}
}
bool Ack::wasAcked(uint32_t seq)
{
if (seqgt(seq, peerAcks[0]))
return false;
uint32_t distance = peerAcks[0] - seq;
if (distance >= 0 && distance <= 32)
{
return peerAcks[distance];
}
return false;
}

19
controller/protocol/Ack.h Normal file
View File

@ -0,0 +1,19 @@
#include <atomic>
#include <array>
namespace tgvoip
{
struct Ack
{
// Ack specified ID + up to 32 seqs ago, specified by mask
void ack(uint32_t ackId, uint32_t mask);
// Check if seq was acked
bool wasAcked(uint32_t seq);
// Stream-specific seqno
std::atomic<uint32_t> seq = ATOMIC_VAR_INIT(1);
// Status list of acked seqnos, starting from the seq explicitly present in the packet + up to 32 seqs ago
std::array<uint32_t, 33> peerAcks{0};
};
}

View File

@ -10,9 +10,9 @@ double VoIPController::GetAverageRTT()
{
ENFORCE_MSG_THREAD;
if (lastSentSeq >= lastRemoteAckSeq)
if (lastSentSeq >= peerAcks[0])
{
uint32_t diff = lastSentSeq - lastRemoteAckSeq;
uint32_t diff = lastSentSeq - peerAcks[0];
//LOGV("rtt diff=%u", diff);
if (diff < 32)
{

View File

View File

View File

@ -267,19 +267,11 @@ void VoIPController::TrySendOutgoingPackets()
bool VoIPController::WasOutgoingPacketAcknowledged(uint32_t seq, bool checkAll)
{
if (seqgt(seq, lastRemoteAckSeq))
return false;
if (seq == lastRemoteAckSeq)
return true;
uint32_t distance = lastRemoteAckSeq - seq;
if (distance > 0 && distance <= 32) {
return peerAcks[distance];
bool res = wasAcked(seq);
if (res || !checkAll) {
return res;
}
if (!checkAll)
return false;
RecentOutgoingPacket *pkt = GetRecentOutgoingPacket(seq);
if (!pkt)
return false;

View File

@ -231,9 +231,9 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
recvTS = in.ReadUInt32();
}
if (seqgt(ackId, lastRemoteAckSeq))
if (seqgt(ackId, peerAcks[0]))
{
if (waitingForAcks && lastRemoteAckSeq >= firstSentPing)
if (waitingForAcks && peerAcks[0] >= firstSentPing)
{
rttHistory.Reset();
waitingForAcks = false;
@ -245,20 +245,15 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
1.0);
LOGI("resuming sending");
}
lastRemoteAckSeq = ackId;
conctl.PacketAcknowledged(ackId);
peerAcks[0] = ackId;
for (unsigned int i = 1; i <= 32; i++)
{
peerAcks[i] = (acks >> (32 - i)) & 1 ? ackId - i : 0;
}
ack(ackId, acks);
for (auto &opkt : recentOutgoingPackets)
{
if (opkt.ackTime)
continue;
if (find(peerAcks.begin(), peerAcks.end(), opkt.seq) != peerAcks.end())
if (wasAcked(opkt.seq))
{
opkt.ackTime = GetCurrentTime();
opkt.rttTime = opkt.ackTime - opkt.sendTime;
@ -281,7 +276,7 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
{
for (auto x = currentExtras.begin(); x != currentExtras.end();)
{
if (x->firstContainingSeq != 0 && seqgte(lastRemoteAckSeq, x->firstContainingSeq))
if (x->firstContainingSeq != 0 && seqgte(peerAcks[0], x->firstContainingSeq))
{
LOGV("Peer acknowledged extra type %u length %u", x->type, (unsigned int)x->data.Length());
ProcessAcknowledgedOutgoingExtra(*x);
@ -291,14 +286,14 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
++x;
}
}
//else
handleReliablePackets();
//if (peerVersion < PROTOCOL_RELIABLE)
handleReliablePackets(); // Use old reliability logic
}
Endpoint &_currentEndpoint = endpoints.at(currentEndpoint);
if (srcEndpoint.id != currentEndpoint && srcEndpoint.IsReflector() && (_currentEndpoint.IsP2P() || _currentEndpoint.averageRTT == 0))
{
if (seqgt(lastSentSeq - 32, lastRemoteAckSeq))
if (seqgt(lastSentSeq - 32, peerAcks[0]))
{
currentEndpoint = srcEndpoint.id;
_currentEndpoint = srcEndpoint;
@ -333,7 +328,7 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
LOGV("Received: from=%s:%u, seq=%u, length=%u, type=%s", srcEndpoint.GetAddress().ToString().c_str(), srcEndpoint.port, pseq, (unsigned int)packet.data.Length(), GetPacketTypeString(type).c_str());
#endif
//LOGV("acks: %u -> %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf", lastRemoteAckSeq, remoteAcks[0], remoteAcks[1], remoteAcks[2], remoteAcks[3], remoteAcks[4], remoteAcks[5], remoteAcks[6], remoteAcks[7]);
//LOGV("acks: %u -> %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf", peerAcks[0], remoteAcks[0], remoteAcks[1], remoteAcks[2], remoteAcks[3], remoteAcks[4], remoteAcks[5], remoteAcks[6], remoteAcks[7]);
//LOGD("recv: %u -> %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf", lastRemoteSeq, recvPacketTimes[0], recvPacketTimes[1], recvPacketTimes[2], recvPacketTimes[3], recvPacketTimes[4], recvPacketTimes[5], recvPacketTimes[6], recvPacketTimes[7]);
//LOGI("RTT = %.3lf", GetAverageRTT());
//LOGV("Packet %u type is %d", pseq, type);

View File

@ -0,0 +1,94 @@
#include "../PrivateDefines.cpp"
using namespace tgvoip;
using namespace std;
void VoIPController::SendPacketReliably(unsigned char type, unsigned char *data, size_t len, double retryInterval, double timeout, uint8_t tries)
{
ENFORCE_MSG_THREAD;
LOGV("Send reliably, type=%u, len=%u, retry=%.3f, timeout=%.3f, tries=%hhu", type, unsigned(len), retryInterval, timeout, tries);
ReliableOutgoingPacket pkt;
if (data)
{
Buffer b(len);
b.CopyFrom(data, 0, len);
pkt.data = move(b);
}
pkt.type = type;
pkt.retryInterval = retryInterval;
pkt.timeout = timeout;
pkt.tries = tries;
pkt.firstSentTime = 0;
pkt.lastSentTime = 0;
reliablePackets.push_back(move(pkt));
messageThread.Post(std::bind(&VoIPController::UpdateReliablePackets, this));
if (timeout > 0.0)
{
messageThread.Post(std::bind(&VoIPController::UpdateReliablePackets, this), timeout);
}
}
void VoIPController::UpdateReliablePackets()
{
vector<PendingOutgoingPacket> packetsToSend;
for (std::vector<ReliableOutgoingPacket>::iterator qp = reliablePackets.begin(); qp != reliablePackets.end();)
{
if (qp->timeout > 0 && qp->firstSentTime > 0 && GetCurrentTime() - qp->firstSentTime >= qp->timeout)
{
LOGD("Removing queued packet because of timeout");
qp = reliablePackets.erase(qp);
continue;
}
if (!qp->tries--)
{
LOGD("Removing queued packet because of no more tries");
qp = reliablePackets.erase(qp);
continue;
}
if (GetCurrentTime() - qp->lastSentTime >= qp->retryInterval)
{
messageThread.Post(std::bind(&VoIPController::UpdateReliablePackets, this), qp->retryInterval);
uint32_t seq = GenerateOutSeq();
qp->seqs.Add(seq);
qp->lastSentTime = GetCurrentTime();
//LOGD("Sending queued packet, seq=%u, type=%u, len=%u", seq, qp.type, qp.data.Length());
Buffer buf(qp->data.Length());
if (qp->firstSentTime == 0)
qp->firstSentTime = qp->lastSentTime;
if (qp->data.Length())
buf.CopyFrom(qp->data, qp->data.Length());
packetsToSend.push_back(PendingOutgoingPacket{
/*.seq=*/seq,
/*.type=*/qp->type,
/*.len=*/qp->data.Length(),
/*.data=*/move(buf),
/*.endpoint=*/0});
}
++qp;
}
for (PendingOutgoingPacket &pkt : packetsToSend)
{
SendOrEnqueuePacket(move(pkt));
}
}
void VoIPController::handleReliablePackets()
{
for (auto it = reliablePackets.begin(); it != reliablePackets.end();)
{
ReliableOutgoingPacket &qp = *it;
bool didAck = false;
for (uint8_t i = 0; i < qp.seqs.Size(); ++i)
{
if (!qp.seqs[i] || (didAck = WasOutgoingPacketAcknowledged(qp.seqs[i], false)))
break;
}
if (didAck)
{
LOGV("Acked queued packet with %hhu tries left", qp.tries);
it = reliablePackets.erase(it);
continue;
}
++it;
}
}