1
0
mirror of https://github.com/danog/libtgvoip.git synced 2024-11-30 04:39:03 +01:00

Finish adaptation

This commit is contained in:
Daniil Gentili 2020-01-29 15:52:43 +01:00
parent d5154a5b42
commit 4af8a89491
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
17 changed files with 184 additions and 104 deletions

View File

@ -437,8 +437,8 @@ public:
protected:
virtual void ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcEndpoint);
virtual void ProcessExtraData(Buffer &data);
virtual void WritePacketHeader(uint32_t seq, BufferOutputStream *s, unsigned char type, uint32_t length, PacketSender *source);
virtual void SendPacket(unsigned char *data, size_t len, Endpoint &ep, PendingOutgoingPacket &srcPacket);
virtual uint8_t WritePacketHeader(PendingOutgoingPacket &pkt, BufferOutputStream &s, PacketSender *source);
virtual void SendPacket(unsigned char *data, size_t len, Endpoint &ep, uint32_t seq, uint8_t type, uint8_t transportId);
virtual void SendInit();
virtual void SendUdpPing(Endpoint &endpoint);
virtual void SendRelayPings();
@ -451,6 +451,7 @@ protected:
std::shared_ptr<Stream> GetStreamByType(StreamType type, bool outgoing);
std::shared_ptr<Stream> GetStreamByID(unsigned char id, bool outgoing);
Endpoint *GetEndpointForPacket(const PendingOutgoingPacket &pkt);
Endpoint *GetEndpointById(const int64_t id);
bool SendOrEnqueuePacket(PendingOutgoingPacket pkt, bool enqueue = true, PacketSender *source = NULL);
CellularCarrierInfo GetCarrierInfo();
@ -502,7 +503,7 @@ private:
void UpdateAudioBitrate();
void UpdateSignalBars();
void UpdateReliablePackets();
void SendNopPacket();
void SendNopPacket(PacketManager &pm);
void TickJitterBufferAndCongestionControl();
void ResetUdpAvailability();
inline static std::string NetworkTypeToString(int type)
@ -572,7 +573,6 @@ private:
void SetupOutgoingVideoStream();
bool WasOutgoingPacketAcknowledged(uint32_t seq, bool checkAll = true);
RecentOutgoingPacket *GetRecentOutgoingPacket(uint32_t seq);
void NetworkPacketReceived(std::shared_ptr<NetworkPacket> packet);
void TrySendOutgoingPackets();
@ -616,9 +616,6 @@ private:
bool micMuted = false;
uint32_t maxBitrate;
// Recent ougoing packets
std::vector<RecentOutgoingPacket> recentOutgoingPackets;
//
std::vector<std::shared_ptr<Stream>> outgoingStreams;
std::vector<std::shared_ptr<Stream>> incomingStreams;

View File

@ -34,7 +34,7 @@ enum ProtocolVersions
};
#define PROTOCOL_NAME 0x50567247 // "GrVP" in little endian (reversed here)
#define PROTOCOL_VERSION 10
#define PROTOCOL_VERSION 9
#define MIN_PROTOCOL_VERSION 3
#define STREAM_DATA_FLAG_LEN16 0x40
@ -72,6 +72,7 @@ enum ProtocolVersions
#define XPFLAG_HAS_EXTRA 1
#define XPFLAG_HAS_RECV_TS 2
#define XPFLAG_HAS_TRANSPORT_ID 4
#define EXTRA_TYPE_STREAM_FLAGS 1
#define EXTRA_TYPE_STREAM_CSD 2

View File

@ -74,6 +74,8 @@ void AudioPacketSender::SendFrame(unsigned char *data, size_t len, unsigned char
pkt.WriteInt32(audioTimestampOut);
pkt.WriteBytes(*dataBufPtr, 0, len);
//LOGE("SEND: For pts %u = seq %u, using seq %u", audioTimestampOut, audioTimestampOut/60 + 1, packetManager.getLocalSeq());
if (hasExtraFEC)
{
Buffer ecBuf(secondaryLen);
@ -108,6 +110,9 @@ void AudioPacketSender::SendFrame(unsigned char *data, size_t len, unsigned char
if (PeerVersion() < PROTOCOL_RELIABLE)
{
// Need to increase this anyway to go hand in hand with timestamp
packetManager.nextLocalSeq();
double rtt = LastRtt();
rtt = !rtt || rtt > 0.3 ? 0.5 : rtt; // Tweak this (a lot) later

View File

@ -42,8 +42,6 @@ VoIPController::VoIPController() : rawSendQueue(64)
stm->packetSender = std::make_unique<AudioPacketSender>(this, nullptr, stm);
outgoingStreams.push_back(stm);
recentOutgoingPackets.reserve(MAX_RECENT_PACKETS);
}
void VoIPController::InitializeTimers()

View File

@ -96,7 +96,7 @@ void JitterBuffer::PutInternal(jitter_packet_t *pkt, bool overwriteExisting)
{
if (!slots[i].buffer.IsEmpty() && slots[i].timestamp == pkt->timestamp)
{
//LOGV("Found existing packet for timestamp %u, overwrite %d", pkt->timestamp, overwriteExisting);
LOGV("Found existing packet for timestamp %u, overwrite %d", pkt->timestamp, overwriteExisting);
if (overwriteExisting)
{
slots[i].buffer.CopyFrom(pkt->buffer, pkt->size);
@ -113,7 +113,7 @@ void JitterBuffer::PutInternal(jitter_packet_t *pkt, bool overwriteExisting)
outstandingDelayChange = 0;
nextFetchTimestamp = static_cast<int64_t>(static_cast<int64_t>(pkt->timestamp) - step * minDelay);
first = true;
LOGI("jitter: resyncing, next timeDecodeNextFramestamp = %lld (step=%d, minDelay=%f)", (long long int)nextFetchTimestamp, step, minDelay);
LOGI("jitter: resyncing, next timestamp = %lld (step=%d, minDelay=%f)", (long long int)nextFetchTimestamp, step, minDelay);
}
for (i = 0; i < JITTER_SLOT_COUNT; i++)
@ -154,13 +154,13 @@ void JitterBuffer::PutInternal(jitter_packet_t *pkt, bool overwriteExisting)
// Late packet check
if (pkt->timestamp < nextFetchTimestamp)
{
//LOGW("jitter: would drop packet with timestamp %d because it is late but not hopelessly", pkt->timestamp);
LOGW("jitter: would drop packet with timestamp %d because it is late but not hopelessly", pkt->timestamp);
latePacketCount++;
lostPackets--;
}
else if (pkt->timestamp < nextFetchTimestamp - 1)
{
//LOGW("jitter: dropping packet with timestamp %d because it is too late", pkt->timestamp);
LOGW("jitter: dropping packet with timestamp %d because it is too late", pkt->timestamp);
latePacketCount++;
return;
}

View File

@ -10,11 +10,13 @@
#include <stdlib.h>
#include <vector>
#include <algorithm>
#include <atomic>
#include <stdio.h>
#include "controller/media/MediaStreamItf.h"
#include "tools/BlockingQueue.h"
#include "tools/Buffers.h"
#include "tools/threading.h"
#include "tools/logging.h"
#define JITTER_SLOT_COUNT 64
#define JITTER_SLOT_SIZE 1024
@ -48,9 +50,10 @@ public:
// Any sequence numbers smaller than this cannot possibly arrive in time for playing.
inline uint32_t GetSeqTooLate(double rtt)
{
//LOGE("Next fetch timestamp: %ld, rtt %lf, step %d", nextFetchTimestamp.load(), rtt * 1000, step)
// The absolute minimum time(stamp) that will (barely) be accepted by the jitter buffer in time + RTT time
// Then convert timestamp into a seqno: remember, in protocol >= PROTOCOL_RELIABLE, seq = ts * step
return ((nextFetchTimestamp + (rtt * 1000)) / static_cast<uint64_t>(step)) / 1000;
// Then convert timestamp into a seqno: remember, in protocol >= PROTOCOL_RELIABLE, seq = ts * step + 1
return ((nextFetchTimestamp + (rtt * 1000)) / static_cast<uint64_t>(step) + 1) - lostCount; // Seqs start at 1
}
private:
@ -70,7 +73,7 @@ private:
Mutex mutex;
uint32_t step;
std::array<jitter_packet_t, JITTER_SLOT_COUNT> slots;
int64_t nextFetchTimestamp = 0; // What frame to read next
std::atomic<int64_t> nextFetchTimestamp = ATOMIC_VAR_INIT(0); // What frame to read next
double minDelay = 6;
uint32_t minMinDelay;
uint32_t maxMinDelay;

View File

@ -16,7 +16,7 @@ namespace tgvoip
class PacketSender
{
public:
PacketSender(VoIPController *controller, const std::shared_ptr<VoIPController::Stream> &stream) : controller(controller), stream(stream), packetManager(stream->id){};
PacketSender(VoIPController *controller, const std::shared_ptr<VoIPController::Stream> &stream) : controller(controller), stream(stream), packetManager(stream->id - 1){};
virtual ~PacketSender() = default;
virtual void PacketAcknowledged(uint32_t seq, double sendTime, double ackTime, uint8_t type, uint32_t size) = 0;
virtual void PacketLost(uint32_t seq, uint8_t type, uint32_t size) = 0;
@ -102,6 +102,8 @@ protected:
std::shared_ptr<VoIPController::Stream> stream;
PacketManager packetManager;
std::vector<PendingOutgoingPacket> reliableQueue;
};
} // namespace tgvoip

View File

@ -18,7 +18,7 @@ double VoIPController::GetAverageRTT()
{
double res = 0;
int count = 0;
for (const auto &packet : recentOutgoingPackets)
for (const auto &packet : pm.getRecentOutgoingPackets())
{
if (packet.rttTime)
{

View File

@ -10,31 +10,31 @@ Endpoint &VoIPController::GetRemoteEndpoint()
Endpoint *VoIPController::GetEndpointForPacket(const PendingOutgoingPacket &pkt)
{
Endpoint *endpoint = nullptr;
if (pkt.endpoint)
return GetEndpointById(pkt.endpoint);
}
Endpoint *VoIPController::GetEndpointById(const int64_t id)
{
if (id)
{
try
{
endpoint = &endpoints.at(pkt.endpoint);
return &endpoints.at(id);
}
catch (out_of_range &x)
{
LOGW("Unable to send packet via nonexistent endpoint %" PRIu64, pkt.endpoint);
LOGW("Unable to send packet via nonexistent endpoint %" PRIu64, id);
return NULL;
}
}
if (!endpoint)
endpoint = &endpoints.at(currentEndpoint);
return endpoint;
return &endpoints.at(currentEndpoint);
}
int64_t VoIPController::GetPreferredRelayID()
{
return preferredRelay;
}
void VoIPController::SetRemoteEndpoints(vector<Endpoint> endpoints, bool allowP2p, int32_t connectionMaxLayer)
{
LOGW("Set remote endpoints, allowP2P=%d, connectionMaxLayer=%u", allowP2p ? 1 : 0, connectionMaxLayer);
@ -69,10 +69,6 @@ void VoIPController::SetRemoteEndpoints(vector<Endpoint> endpoints, bool allowP2
AddIPv6Relays();
}
void VoIPController::AddIPv6Relays()
{
if (!myIPv6.IsEmpty() && !didAddIPv6Relays)
@ -149,4 +145,3 @@ void VoIPController::AddTCPRelays()
didAddTcpRelays = true;
}
}

View File

@ -216,12 +216,12 @@ void VoIPController::NetworkPacketReceived(shared_ptr<NetworkPacket> _packet)
else
stats.bytesRecvdWifi += (uint64_t)packet.data.Length();
try
{
/*try
{*/
ProcessIncomingPacket(packet, endpoints.at(srcEndpointID));
}
/*}
catch (out_of_range &x)
{
LOGW("Error parsing packet: %s", x.what());
}
}*/
}

View File

@ -41,7 +41,6 @@ bool VoIPController::SendOrEnqueuePacket(PendingOutgoingPacket pkt, bool enqueue
}
canSend = endpoint->socket && endpoint->socket->IsReadyToSend();
}
conctl.PacketSent(pkt.seq, pkt.len);
if (!canSend)
{
if (enqueue)
@ -51,13 +50,12 @@ bool VoIPController::SendOrEnqueuePacket(PendingOutgoingPacket pkt, bool enqueue
}
return false;
}
conctl.PacketSent(pkt.seq, pkt.len);
if ((endpoint->type == Endpoint::Type::TCP_RELAY && useTCP) || (endpoint->type != Endpoint::Type::TCP_RELAY && useUDP))
{
//BufferOutputStream p(buf, sizeof(buf));
BufferOutputStream p(1500);
WritePacketHeader(pkt.seq, &p, pkt.type, (uint32_t)pkt.len, source);
p.WriteBytes(pkt.data);
SendPacket(p.GetBuffer(), p.GetLength(), *endpoint, pkt);
BufferOutputStream out(1500);
uint8_t transportId = WritePacketHeader(pkt, out, source);
SendPacket(out.GetBuffer(), out.GetLength(), *endpoint, pkt.seq, pkt.type, transportId);
if (pkt.type == PKT_STREAM_DATA)
{
unsentStreamPackets--;
@ -66,7 +64,7 @@ bool VoIPController::SendOrEnqueuePacket(PendingOutgoingPacket pkt, bool enqueue
return true;
}
void VoIPController::SendPacket(unsigned char *data, size_t len, Endpoint &ep, PendingOutgoingPacket &srcPacket)
void VoIPController::SendPacket(unsigned char *data, size_t len, Endpoint &ep, uint32_t seq, uint8_t type, uint8_t transportId)
{
if (stopping)
return;
@ -84,9 +82,9 @@ void VoIPController::SendPacket(unsigned char *data, size_t len, Endpoint &ep, P
}
//LOGV("Sending %d bytes to %s:%d", out.GetLength(), ep.address.ToString().c_str(), ep.port);
//#ifdef LOG_PACKETS
LOGV("Sending: to=%s:%u, seq=%u, length=%u, type=%s", ep.GetAddress().ToString().c_str(), ep.port, srcPacket.seq, (unsigned int)out.GetLength(), GetPacketTypeString(srcPacket.type).c_str());
//#endif
#ifdef LOG_PACKETS
LOGV("Sending: to=%s:%u, seq=%u, length=%u, type=%s, transportId=%hhu", ep.GetAddress().ToString().c_str(), ep.port, seq, (unsigned int)out.GetLength(), GetPacketTypeString(type).c_str(), transportId);
#endif
rawSendQueue.Put(
RawPendingOutgoingPacket{
@ -266,17 +264,6 @@ void VoIPController::TrySendOutgoingPackets()
}
RecentOutgoingPacket *VoIPController::GetRecentOutgoingPacket(uint32_t seq)
{
for (RecentOutgoingPacket &opkt : recentOutgoingPackets)
{
if (opkt.seq == seq)
{
return &opkt;
}
}
return nullptr;
}
void VoIPController::SendRelayPings()
{
@ -363,16 +350,17 @@ void VoIPController::SendRelayPings()
}
}
void VoIPController::SendNopPacket()
void VoIPController::SendNopPacket(PacketManager &pm)
{
if (state != STATE_ESTABLISHED)
return;
PacketSender *source = pm.getTransportId() == 0xFF ? nullptr : outgoingStreams[pm.getTransportId()]->packetSender.get();
SendOrEnqueuePacket(PendingOutgoingPacket{
/*.seq=*/(firstSentPing = packetManager.nextLocalSeq()),
/*.seq=*/(firstSentPing = pm.nextLocalSeq()),
/*.type=*/PKT_NOP,
/*.len=*/0,
/*.data=*/Buffer(),
/*.endpoint=*/0});
/*.endpoint=*/0}, source);
}
void VoIPController::SendPublicEndpointsRequest()

View File

@ -6,6 +6,7 @@ using namespace std;
PacketManager::PacketManager(uint8_t transportId) : transportId(transportId)
{
recentOutgoingPackets.reserve(MAX_RECENT_PACKETS);
}
void PacketManager::ackLocal(uint32_t ackId, uint32_t mask)
{
@ -60,4 +61,16 @@ bool PacketManager::ackRemoteSeq(uint32_t ackId)
return false;
}
return true;
}
}
RecentOutgoingPacket *PacketManager::GetRecentOutgoingPacket(uint32_t seq)
{
for (RecentOutgoingPacket &opkt : recentOutgoingPackets)
{
if (opkt.seq == seq)
{
return &opkt;
}
}
return nullptr;
}

View File

@ -26,7 +26,7 @@ public:
virtual ~PacketManager() = default;
// Transport ID for multiplexing
inline uint8_t getTransportId()
inline uint8_t getTransportId()
{
return transportId;
}
@ -114,5 +114,17 @@ private:
// Recent incoming remote packets
uint32_t lastRemoteSeqsMask;
public: // Recent outgoing packet list
inline std::vector<RecentOutgoingPacket> &getRecentOutgoingPackets()
{
return recentOutgoingPackets;
}
RecentOutgoingPacket *GetRecentOutgoingPacket(uint32_t seq);
private:
// Recent ougoing packets
std::vector<RecentOutgoingPacket> recentOutgoingPackets;
};
} // namespace tgvoip

View File

@ -8,6 +8,10 @@ namespace tgvoip
class PacketSender;
struct RecentOutgoingPacket
{
// For simple NACK reliable resending
int64_t endpoint;
Buffer data;
uint32_t seq;
uint16_t id; // for group calls only
double sendTime;

View File

@ -187,15 +187,16 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
if (peerVersion >= 8 || (!peerVersion && connectionMaxLayer >= 92))
{
type = in.ReadByte();
if (peerVersion >= PROTOCOL_RELIABLE)
{
transportId = in.ReadByte();
}
ackId = in.ReadUInt32();
pseq = in.ReadUInt32();
acks = in.ReadUInt32();
pflags = in.ReadByte();
packetInnerLen = innerLen - 14;
if (pflags & XPFLAG_HAS_TRANSPORT_ID)
{
transportId = in.ReadByte();
}
}
else if (!legacyParsePacket(in, type, ackId, pseq, acks, pflags, packetInnerLen))
{
@ -211,6 +212,10 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
return;
}
#ifdef LOG_PACKETS
LOGV("Received: from=%s:%u, seq=%u, length=%u, type=%s, transportId=%hhu", srcEndpoint.GetAddress().ToString().c_str(), srcEndpoint.port, pseq, (unsigned int)packet.data.Length(), GetPacketTypeString(type).c_str(), transportId);
#endif
// Extra data
if (pflags & XPFLAG_HAS_EXTRA)
{
@ -247,18 +252,29 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
conctl.PacketAcknowledged(ackId);
manager->ackLocal(ackId, acks);
if (transportId != 0xFF && !incomingStreams.empty() && incomingStreams[transportId]->jitterBuffer)
{
// Technically I should be using the specific packet manager's rtt history but will separate later
uint32_t tooOldSeq = incomingStreams[transportId]->jitterBuffer->GetSeqTooLate(rttHistory[0]);
LOGW("Reverse acking seqs older than %u, newest seq received from remote %u (transportId %hhu)", tooOldSeq, manager->getLastRemoteSeq(), transportId);
manager->ackRemoteSeqsOlderThan(tooOldSeq);
}
for (auto &opkt : recentOutgoingPackets)
for (auto &opkt : manager->getRecentOutgoingPackets())
{
if (opkt.ackTime)
{
continue;
}
if (manager->wasLocalAcked(opkt.seq))
{
opkt.data = Buffer();
opkt.ackTime = GetCurrentTime();
opkt.rttTime = opkt.ackTime - opkt.sendTime;
if (opkt.lost)
{
LOGW("acknowledged lost packet %u", opkt.seq);
LOGW("acknowledged lost packet %u (transportId %hhu)", opkt.seq, transportId);
sendLosses--;
}
if (opkt.sender && !opkt.lost)
@ -269,6 +285,34 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
// TODO move this to a PacketSender
conctl.PacketAcknowledged(opkt.seq);
}
else if (peerVersion >= PROTOCOL_RELIABLE && opkt.data.Length() && opkt.seq < manager->getLastAckedSeq())
{
if (manager->getLastAckedSeq() - opkt.seq > 32)
{
LOGW("Marking reliable NACK packet %u as lost, since is more than 32 seqs old (last acked %u, transportId %hhu)", opkt.seq, manager->getLastAckedSeq(), transportId);
opkt.lost = true;
opkt.data = Buffer();
continue;
}
if (opkt.sendTime + rttHistory[0] < VoIPController::GetCurrentTime())
{
LOGW("It is possibly a bit too early to resend NACK packet %u (transportId %hhu)", opkt.seq, transportId);
}
LOGW("Resending reliable NACK packet %u, (transportId %hhu)", opkt.seq, transportId);
BufferOutputStream copy(opkt.data.Length());
copy.WriteBytes(opkt.data);
Endpoint *endpoint = GetEndpointById(opkt.endpoint);
if (!endpoint)
{
LOGE("Recent NACK queue contained packet (%u) for nonexistent endpoint, (transportId %hhu)", opkt.seq, transportId);
opkt.lost = true;
opkt.data = Buffer();
continue;
}
opkt.sendTime = GetCurrentTime();
SendPacket(copy.GetBuffer(), copy.GetLength(), *endpoint, opkt.seq, opkt.type, transportId);
//SendOrEnqueuePacket()
}
}
if (peerVersion >= 6)
@ -316,17 +360,12 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
}
}*/
unacknowledgedIncomingPacketCount++;
if (unacknowledgedIncomingPacketCount > unackNopThreshold)
if (unacknowledgedIncomingPacketCount++ > unackNopThreshold)
{
//LOGV("Sending nop packet as ack");
SendNopPacket();
SendNopPacket(packetManager);
}
//#ifdef LOG_PACKETS
LOGV("Received: from=%s:%u, seq=%u, length=%u, type=%s", srcEndpoint.GetAddress().ToString().c_str(), srcEndpoint.port, pseq, (unsigned int)packet.data.Length(), GetPacketTypeString(type).c_str());
//#endif
//LOGV("acks: %u -> %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf", manager.getLastAckedSeq()(), remoteAcks[0], remoteAcks[1], remoteAcks[2], remoteAcks[3], remoteAcks[4], remoteAcks[5], remoteAcks[6], remoteAcks[7]);
//LOGD("recv: %u -> %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf", getLastRemoteSeq(), recvPacketTimes[0], recvPacketTimes[1], recvPacketTimes[2], recvPacketTimes[3], recvPacketTimes[4], recvPacketTimes[5], recvPacketTimes[6], recvPacketTimes[7]);
//LOGI("RTT = %.3lf", GetAverageRTT());
@ -588,6 +627,8 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
uint32_t pts = in.ReadUInt32();
unsigned char fragmentCount = 1;
unsigned char fragmentIndex = 0;
//LOGE("RECV: For pts %u = seq %u, got seq %u", pts, pts/60 + 1, pseq);
//LOGD("stream data, pts=%d, len=%d, rem=%d", pts, sdlen, in.Remaining());
audioTimestampIn = pts;
if (!audioOutStarted && audioOutput)
@ -622,10 +663,14 @@ void VoIPController::ProcessIncomingPacket(NetworkPacket &packet, Endpoint &srcE
{
if (stm->jitterBuffer)
{
if (peerVersion >= PROTOCOL_RELIABLE) {
manager->ackRemoteSeqsOlderThan(stm->jitterBuffer->GetSeqTooLate(rttHistory[0]));
}
stm->jitterBuffer->HandleInput(static_cast<unsigned char *>(buffer + in.GetOffset()), sdlen, pts, false);
/*if (peerVersion >= PROTOCOL_RELIABLE)
{
// Technically I should be using the specific packet manager's rtt history but will separate later
uint32_t tooOldSeq = stm->jitterBuffer->GetSeqTooLate(rttHistory[0]) - 1;
LOGW("Reverse acking seqs older than %u, newest acked seq %u (transportId %hhu)", tooOldSeq, manager->getLastRemoteSeq(), transportId);
manager->ackRemoteSeqsOlderThan(tooOldSeq);
}*/
if (extraFEC)
{
in.Seek(in.GetOffset() + sdlen);
@ -993,21 +1038,17 @@ void VoIPController::ProcessAcknowledgedOutgoingExtra(UnacknowledgedExtraData &e
}
}
void VoIPController::WritePacketHeader(uint32_t pseq, BufferOutputStream *s, unsigned char type, uint32_t length, PacketSender *source)
uint8_t VoIPController::WritePacketHeader(PendingOutgoingPacket &pkt, BufferOutputStream &s, PacketSender *source)
{
PacketManager &manager = peerVersion >= PROTOCOL_RELIABLE ? source->getPacketManager() : packetManager;
PacketManager &manager = peerVersion >= PROTOCOL_RELIABLE && source ? source->getPacketManager() : packetManager;
uint32_t acks = manager.getRemoteAckMask();
if (peerVersion >= 8 || (!peerVersion && connectionMaxLayer >= 92))
{
s->WriteByte(type);
if (peerVersion >= PROTOCOL_RELIABLE)
{
s->WriteByte(manager.getTransportId());
}
s->WriteInt32(manager.getLastRemoteSeq());
s->WriteInt32(pseq);
s->WriteInt32(acks);
s.WriteByte(pkt.type);
s.WriteInt32(manager.getLastRemoteSeq());
s.WriteInt32(pkt.seq);
s.WriteInt32(acks);
unsigned char flags = currentExtras.empty() ? 0 : XPFLAG_HAS_EXTRA;
@ -1015,47 +1056,68 @@ void VoIPController::WritePacketHeader(uint32_t pseq, BufferOutputStream *s, uns
if (peerVersion >= 9 && videoStream && videoStream->enabled)
flags |= XPFLAG_HAS_RECV_TS;
s->WriteByte(flags);
if (peerVersion >= PROTOCOL_RELIABLE && manager.getTransportId() != 0xFF)
flags |= XPFLAG_HAS_TRANSPORT_ID;
s.WriteByte(flags);
if (flags & XPFLAG_HAS_TRANSPORT_ID)
{
s.WriteByte(manager.getTransportId());
}
if (!currentExtras.empty())
{
s->WriteByte(static_cast<unsigned char>(currentExtras.size()));
s.WriteByte(static_cast<unsigned char>(currentExtras.size()));
for (auto &x : currentExtras)
{
LOGV("Writing extra into header: type %u, length %d", x.type, int(x.data.Length()));
assert(x.data.Length() <= 254);
s->WriteByte(static_cast<unsigned char>(x.data.Length() + 1));
s->WriteByte(x.type);
s->WriteBytes(*x.data, x.data.Length());
s.WriteByte(static_cast<unsigned char>(x.data.Length() + 1));
s.WriteByte(x.type);
s.WriteBytes(*x.data, x.data.Length());
if (x.firstContainingSeq == 0)
x.firstContainingSeq = pseq;
x.firstContainingSeq = pkt.seq;
}
}
if (peerVersion >= 9 && videoStream && videoStream->enabled)
{
s->WriteUInt32((lastRecvPacketTime - connectionInitTime) * 1000.0);
s.WriteUInt32((lastRecvPacketTime - connectionInitTime) * 1000.0);
}
}
else
{
legacyWritePacketHeader(pseq, acks, s, type, length);
legacyWritePacketHeader(pkt.seq, acks, &s, pkt.type, pkt.len);
}
s.WriteBytes(pkt.data);
Buffer copyBuf(s.GetLength());
if (peerVersion >= PROTOCOL_RELIABLE)
copyBuf.CopyFrom(s.GetBuffer(), 0, s.GetLength());
unacknowledgedIncomingPacketCount = 0;
auto &recentOutgoingPackets = manager.getRecentOutgoingPackets();
recentOutgoingPackets.push_back(RecentOutgoingPacket{
pseq,
pkt.endpoint,
std::move(copyBuf),
pkt.seq,
0,
GetCurrentTime(),
0,
0,
type,
length,
pkt.type,
static_cast<uint32_t>(pkt.len),
source,
false});
while (recentOutgoingPackets.size() > MAX_RECENT_PACKETS)
{
recentOutgoingPackets.erase(recentOutgoingPackets.begin());
}
manager.setLastSentSeq(pseq);
manager.setLastSentSeq(pkt.seq);
return manager.getTransportId();
//LOGI("packet header size %d", s->GetLength());
}

View File

@ -95,13 +95,13 @@ void VoIPController::handleReliablePackets()
bool VoIPController::WasOutgoingPacketAcknowledged(uint32_t seq, bool checkAll)
{
bool res = getBestPacketManager().wasLocalAcked(seq);
bool res = packetManager.wasLocalAcked(seq);
if (res || !checkAll)
{
return res;
}
RecentOutgoingPacket *pkt = GetRecentOutgoingPacket(seq);
RecentOutgoingPacket *pkt = packetManager.GetRecentOutgoingPacket(seq);
if (!pkt)
return false;
return pkt->ackTime != 0.0;

View File

@ -136,7 +136,7 @@ void VoIPController::TickJitterBufferAndCongestionControl()
double currentTime = GetCurrentTime();
double rtt = GetAverageRTT();
double packetLossTimeout = std::max(rtt * 2.0, 0.1);
for (RecentOutgoingPacket &pkt : recentOutgoingPackets)
for (RecentOutgoingPacket &pkt : getBestPacketManager().getRecentOutgoingPackets())
{
if (pkt.ackTime != 0.0 || pkt.lost)
continue;