1
0
mirror of https://github.com/danog/libtgvoip.git synced 2024-11-26 20:24:38 +01:00

Refactor jitterBuffer

This commit is contained in:
Daniil Gentili 2020-03-12 18:40:13 +01:00
parent c6fa5e0edf
commit 3b70f17223
Signed by: danog
GPG Key ID: 8C1BE3B34B230CA7
3 changed files with 76 additions and 106 deletions

View File

@ -187,7 +187,7 @@ void tgvoip::OpusEncoder::RunThread()
LOGV("starting encoder, packets per frame=%d", packetsPerFrame);
int16_t *frame;
if (packetsPerFrame > 1)
frame = reinterpret_cast<uint16_t *>(std::malloc(960 * 2 * packetsPerFrame));
frame = reinterpret_cast<int16_t *>(std::malloc(960 * 2 * packetsPerFrame));
else
frame = NULL;
bool frameHasVoice = false;

View File

@ -79,59 +79,55 @@ void JitterBuffer::HandleInput(unsigned char *data, size_t len, uint32_t timesta
pkt.buffer = Buffer::Wrap(data, len, [](void *) {}, [](void *a, size_t) -> void * { return a; });
pkt.timestamp = timestamp;
pkt.isEC = isEC;
PutInternal(&pkt, !isEC);
PutInternal(pkt, !isEC);
//LOGV("in, ts=%d, ec=%d", timestamp, isEC);
}
void JitterBuffer::PutInternal(jitter_packet_t *pkt, bool overwriteExisting)
void JitterBuffer::PutInternal(jitter_packet_t &pkt, bool overwriteExisting)
{
if (pkt->size > JITTER_SLOT_SIZE)
if (pkt.size > JITTER_SLOT_SIZE)
{
LOGE("The packet is too big to fit into the jitter buffer");
return;
}
int i;
for (i = 0; i < JITTER_SLOT_COUNT; i++)
{
if (!slots[i].buffer.IsEmpty() && slots[i].timestamp == pkt->timestamp)
{
//LOGV("Found existing packet for timestamp %u, overwrite %d", pkt->timestamp, overwriteExisting);
if (overwriteExisting)
{
slots[i].buffer.CopyFromOtherBuffer(pkt->buffer, pkt->size);
slots[i].size = pkt->size;
slots[i].isEC = pkt->isEC;
}
for (auto &slot : slots)
{
if (!slot.buffer.IsEmpty() && slot.timestamp == pkt.timestamp)
{
slot.buffer.CopyFromOtherBuffer(pkt.buffer, pkt.size);
slot.size = pkt.size;
slot.isEC = pkt.isEC;
return;
}
}
}
gotSinceReset++;
if (wasReset)
{
wasReset = false;
outstandingDelayChange = 0;
nextFetchTimestamp = static_cast<int64_t>(static_cast<int64_t>(pkt->timestamp) - step * minDelay);
nextFetchTimestamp = static_cast<int64_t>(static_cast<int64_t>(pkt.timestamp) - step * minDelay);
first = true;
LOGI("jitter: resyncing, next timestamp = %lld (step=%d, minDelay=%f)", (long long int)nextFetchTimestamp, step, (double)minDelay);
}
for (i = 0; i < JITTER_SLOT_COUNT; i++)
for (auto &slot : slots)
{
if (!slots[i].buffer.IsEmpty())
// Clear packets older than the last packet pulled from jitter buffer
if (!slot.buffer.IsEmpty() && slot.timestamp < nextFetchTimestamp - 1)
{
// Clear packets older than the last fetched packet
if (slots[i].timestamp < nextFetchTimestamp - 1)
{
slots[i].buffer = Buffer();
}
slot.buffer = Buffer();
}
}
/*double prevTime=0;
uint32_t closestTime=0;
for(i=0;i<JITTER_SLOT_COUNT;i++){
if(slots[i].buffer!=NULL && pkt->timestamp-slots[i].timestamp<pkt->timestamp-closestTime){
if(slots[i].buffer!=NULL && pkt.timestamp-slots[i].timestamp<pkt.timestamp-closestTime){
closestTime=slots[i].timestamp;
prevTime=slots[i].recvTime;
}
@ -139,11 +135,9 @@ void JitterBuffer::PutInternal(jitter_packet_t *pkt, bool overwriteExisting)
// Time deviation check
double time = VoIPController::GetCurrentTime();
if (expectNextAtTime != 0)
if (expectNextAtTime)
{
double dev = expectNextAtTime - time;
//LOGV("packet dev %f", dev);
deviationHistory.Add(dev);
deviationHistory.Add(expectNextAtTime - time);
expectNextAtTime += step / 1000.0;
}
else
@ -152,67 +146,56 @@ void JitterBuffer::PutInternal(jitter_packet_t *pkt, bool overwriteExisting)
}
// Late packet check
if (pkt->timestamp < nextFetchTimestamp)
if (pkt.timestamp < nextFetchTimestamp)
{
//LOGW("jitter: would drop packet with timestamp %d because it is late but not hopelessly", pkt->timestamp);
//LOGW("jitter: would drop packet with timestamp %d because it is late but not hopelessly", pkt.timestamp);
latePacketCount++;
lostPackets--;
}
else if (pkt->timestamp < nextFetchTimestamp - 1)
else if (pkt.timestamp < nextFetchTimestamp - 1)
{
//LOGW("jitter: dropping packet with timestamp %d because it is too late", pkt->timestamp);
//LOGW("jitter: dropping packet with timestamp %d because it is too late", pkt.timestamp);
latePacketCount++;
return;
}
if (pkt->timestamp > lastPutTimestamp)
lastPutTimestamp = pkt->timestamp;
if (pkt.timestamp > lastPutTimestamp)
lastPutTimestamp = pkt.timestamp;
for (i = 0; i < JITTER_SLOT_COUNT; i++)
// If no free slots or too many used up slots to be useful
auto slot = GetCurrentDelay() >= maxUsedSlots ? slots.end() : std::find_if(slots.begin(), slots.end(), [](const jitter_packet_t &a) -> bool {
return !a.buffer.IsEmpty();
});
if (slot == slots.end())
{
if (slots[i].buffer.IsEmpty())
break;
}
if (i == JITTER_SLOT_COUNT || GetCurrentDelay() >= maxUsedSlots)
{
int toRemove = JITTER_SLOT_COUNT;
uint32_t oldestTimestamp = 0xFFFFFFFF;
for (i = 0; i < JITTER_SLOT_COUNT; i++)
{
if (!slots[i].buffer.IsEmpty() && slots[i].timestamp < oldestTimestamp)
{
toRemove = i;
oldestTimestamp = slots[i].timestamp;
}
}
slot = std::min_element(slots.begin(), slots.end(), [](const jitter_packet_t &a, const jitter_packet_t &b) -> bool {
return !a.buffer.IsEmpty() && a.timestamp < b.timestamp;
});
slot->buffer = Buffer();
Advance();
slots[toRemove].buffer = Buffer();
i = toRemove;
}
slots[i].timestamp = pkt->timestamp;
slots[i].size = pkt->size;
slots[i].buffer = bufferPool.Get();
slots[i].recvTimeDiff = time - prevRecvTime;
slots[i].isEC = pkt->isEC;
slots[i].buffer.CopyFromOtherBuffer(pkt->buffer, pkt->size);
slot->timestamp = pkt.timestamp;
slot->size = pkt.size;
slot->buffer = bufferPool.Get();
slot->recvTimeDiff = time - prevRecvTime;
slot->isEC = pkt.isEC;
slot->buffer.CopyFromOtherBuffer(pkt.buffer, pkt.size);
#ifdef TGVOIP_DUMP_JITTER_STATS
fprintf(dump, "%u\t%.03f\t%d\t%.03f\t%.03f\t%.03f\n", pkt->timestamp, time, GetCurrentDelay(), lastMeasuredJitter, lastMeasuredDelay, minDelay);
fprintf(dump, "%u\t%.03f\t%d\t%.03f\t%.03f\t%.03f\n", pkt.timestamp, time, GetCurrentDelay(), lastMeasuredJitter, lastMeasuredDelay, minDelay);
#endif
prevRecvTime = time;
}
void JitterBuffer::Reset()
{
wasReset = true;
needBuffering = true;
lastPutTimestamp = 0;
int i;
for (i = 0; i < JITTER_SLOT_COUNT; i++)
{
if (!slots[i].buffer.IsEmpty())
{
slots[i].buffer = Buffer();
}
}
std::for_each(slots.begin(), slots.end(), [](jitter_packet_t &t) {
t.buffer = Buffer();
});
delayHistory.Reset();
lateHistory.Reset();
adjustingDelay = false;
@ -229,32 +212,32 @@ size_t JitterBuffer::HandleOutput(unsigned char *buffer, size_t len, int offsetI
jitter_packet_t pkt;
pkt.buffer = Buffer::Wrap(buffer, len, [](void *) {}, [](void *a, size_t) -> void * { return a; });
pkt.size = len;
MutexGuard m(mutex);
if (first)
{
first = false;
unsigned int delay = GetCurrentDelay();
if (GetCurrentDelay() > 5)
if (delay > 5)
{
LOGW("jitter: delay too big upon start (%u), dropping packets", delay);
for (; delay > GetMinPacketCount(); --delay)
{
for (int i = 0; i < JITTER_SLOT_COUNT; i++)
auto slot = std::find_if(slots.begin(), slots.end(), [&](const jitter_packet_t &a) -> bool {
return a.timestamp == nextFetchTimestamp;
});
if (!slot->buffer.IsEmpty())
{
if (slots[i].timestamp == nextFetchTimestamp)
{
if (!slots[i].buffer.IsEmpty())
{
slots[i].buffer = Buffer();
}
break;
}
slot->buffer = Buffer();
}
Advance();
}
}
}
int result = GetInternal(&pkt, offsetInSteps, advance);
int result = GetInternal(pkt, offsetInSteps, advance);
if (outstandingDelayChange != 0)
{
if (outstandingDelayChange < 0)
@ -289,7 +272,7 @@ size_t JitterBuffer::HandleOutput(unsigned char *buffer, size_t len, int offsetI
}
}
int JitterBuffer::GetInternal(jitter_packet_t *pkt, int offset, bool advance)
int JitterBuffer::GetInternal(jitter_packet_t &pkt, int offset, bool advance)
{
/*if(needBuffering && lastPutTimestamp<nextFetchTimestamp){
LOGV("jitter: don't have timestamp %lld, buffering", (long long int)nextFetchTimestamp);
@ -301,32 +284,24 @@ int JitterBuffer::GetInternal(jitter_packet_t *pkt, int offset, bool advance)
int64_t timestampToGet = nextFetchTimestamp + offset * (int32_t)step;
int i;
for (i = 0; i < JITTER_SLOT_COUNT; i++)
{
if (!slots[i].buffer.IsEmpty() && slots[i].timestamp == timestampToGet)
{
break;
}
}
auto slot = std::find_if(slots.begin(), slots.end(), [timestampToGet](const jitter_packet_t &a) -> bool {
return a.timestamp == timestampToGet && !a.buffer.IsEmpty();
});
if (i < JITTER_SLOT_COUNT)
if (slot != slots.end())
{
if (pkt && pkt->size < slots[i].size)
if (pkt.size < slot->size)
{
LOGE("jitter: packet won't fit into provided buffer of %d (need %d)", int(slots[i].size), int(pkt->size));
LOGE("jitter: packet won't fit into provided buffer of %d (need %d)", int(slot->size), int(pkt.size));
}
else
{
if (pkt)
{
pkt->size = slots[i].size;
pkt->timestamp = slots[i].timestamp;
pkt->buffer.CopyFromOtherBuffer(slots[i].buffer, slots[i].size);
pkt->isEC = slots[i].isEC;
pkt.size = slot->size;
pkt.timestamp = slot->timestamp;
pkt.buffer.CopyFromOtherBuffer(slot->buffer, slot->size);
pkt.isEC = slot->isEC;
}
}
slots[i].buffer = Buffer();
slot->buffer = Buffer();
if (offset == 0)
Advance();
lostCount = 0;
@ -371,14 +346,9 @@ void JitterBuffer::Advance()
unsigned int JitterBuffer::GetCurrentDelay()
{
unsigned int delay = 0;
int i;
for (i = 0; i < JITTER_SLOT_COUNT; i++)
{
if (!slots[i].buffer.IsEmpty())
delay++;
}
return delay;
return std::count_if(slots.begin(), slots.end(), [](const jitter_packet_t &a) -> bool {
return !a.buffer.IsEmpty();
});
}
void JitterBuffer::Tick()

View File

@ -65,8 +65,8 @@ private:
bool isEC = 0;
double recvTimeDiff = 0.0;
};
void PutInternal(jitter_packet_t *pkt, bool overwriteExisting);
int GetInternal(jitter_packet_t *pkt, int offset, bool advance);
void PutInternal(jitter_packet_t &pkt, bool overwriteExisting);
int GetInternal(jitter_packet_t &pkt, int offset, bool advance);
void Advance();
BufferPool<JITTER_SLOT_SIZE, JITTER_SLOT_COUNT> bufferPool;