1
0
mirror of https://github.com/danog/libtgvoip.git synced 2024-12-02 17:51:06 +01:00
libtgvoip/controller/protocol/Protocol.cpp

992 lines
36 KiB
C++
Raw Normal View History

2020-01-25 20:45:43 +01:00
#include "../PrivateDefines.cpp"
2020-03-21 21:33:51 +01:00
#include "packets/PacketManager.h"
2020-01-25 20:45:43 +01:00
using namespace tgvoip;
using namespace std;
#pragma mark - Networking & crypto
2020-01-28 23:45:47 +01:00
PacketManager &VoIPController::getBestPacketManager()
{
2020-03-22 20:09:44 +01:00
return outgoingStreams[ver.isNew() ? StreamId::Audio : StreamId::Signaling]->packetManager;
2020-01-28 23:45:47 +01:00
}
2020-03-22 20:09:44 +01:00
void VoIPController::ProcessIncomingPacket(NetworkPacket &npacket, Endpoint &srcEndpoint)
2020-01-25 20:45:43 +01:00
{
ENFORCE_MSG_THREAD;
// Initial packet decryption and recognition
2020-03-22 20:09:44 +01:00
unsigned char *buffer = *npacket.data;
size_t len = npacket.data.Length();
BufferInputStream in(npacket.data);
if (ver.peerVersion < 9 || srcEndpoint.IsReflector())
2020-01-25 20:45:43 +01:00
{
if (in.Remaining() < 16)
{
LOGW("Received packet has wrong (no) peerTag");
return;
}
if (memcmp(buffer, srcEndpoint.IsReflector() ? (void *)srcEndpoint.peerTag : (void *)callID, 16) != 0)
{
LOGW("Received packet has wrong peerTag");
return;
}
in.Seek(16);
}
2020-03-16 16:07:13 +01:00
if (in.Remaining() >= 16 && srcEndpoint.IsReflector() && parseRelayPacket(in, srcEndpoint))
2020-01-25 20:45:43 +01:00
{
return;
}
if (in.Remaining() < 40)
{
LOGV("Received packet is too small");
return;
}
size_t innerLen = decryptPacket(buffer, in);
2020-03-16 16:07:13 +01:00
if (!innerLen) // Decryption failed
{
2020-01-25 20:45:43 +01:00
return;
}
lastRecvPacketTime = GetCurrentTime();
if (state == STATE_RECONNECTING)
{
LOGI("Received a valid packet while reconnecting - setting state to established");
SetState(STATE_ESTABLISHED);
}
if (srcEndpoint.type == Endpoint::Type::UDP_P2P_INET && !srcEndpoint.IsIPv6Only())
{
2020-03-22 20:09:44 +01:00
if (srcEndpoint.port != npacket.port || srcEndpoint.address != npacket.address)
2020-01-25 20:45:43 +01:00
{
2020-03-22 20:09:44 +01:00
if (!npacket.address.isIPv6)
2020-01-25 20:45:43 +01:00
{
2020-03-22 20:09:44 +01:00
LOGI("Incoming packet was decrypted successfully, changing P2P endpoint to %s:%u", npacket.address.ToString().c_str(), npacket.port);
srcEndpoint.address = npacket.address;
srcEndpoint.port = npacket.port;
2020-01-25 20:45:43 +01:00
}
}
}
2020-03-22 20:09:44 +01:00
Packet packet;
if (!packet.parse(in, ver)) {
LOGW("Failure parsing incoming packet!");
2020-01-25 20:45:43 +01:00
}
2020-03-22 20:09:44 +01:00
ProcessIncomingPacket(packet, srcEndpoint);
if (packet.otherPackets.size()) { // Legacy
for (Packet &packet : packet.otherPackets) {
ProcessIncomingPacket(packet, srcEndpoint);
}
2020-01-25 20:45:43 +01:00
}
2020-03-22 20:09:44 +01:00
}
void VoIPController::ProcessIncomingPacket(Packet &packet, Endpoint &srcEndpoint)
{
2020-01-25 20:45:43 +01:00
packetsReceived++;
2020-01-28 23:45:47 +01:00
// Use packet manager of outgoing streams on both sides (probably should separate in separate vector)
PacketManager *manager = transportId == 0xFF ? &packetManager : &outgoingStreams[transportId]->packetSender->getPacketManager();
if (!manager->ackRemoteSeq(pseq))
{
2020-01-25 20:45:43 +01:00
return;
}
2020-01-29 19:12:12 +01:00
#ifdef LOG_PACKETS
2020-01-29 15:52:43 +01:00
LOGV("Received: from=%s:%u, seq=%u, length=%u, type=%s, transportId=%hhu", srcEndpoint.GetAddress().ToString().c_str(), srcEndpoint.port, pseq, (unsigned int)packet.data.Length(), GetPacketTypeString(type).c_str(), transportId);
2020-01-29 19:12:12 +01:00
#endif
2020-01-29 15:52:43 +01:00
2020-01-25 20:45:43 +01:00
// Extra data
if (pflags & XPFLAG_HAS_EXTRA)
{
unsigned char extraCount = in.ReadByte();
for (int i = 0; i < extraCount; i++)
{
size_t extraLen = in.ReadByte();
Buffer xbuffer(extraLen);
in.ReadBytes(*xbuffer, extraLen);
ProcessExtraData(xbuffer);
}
}
uint32_t recvTS = 0;
if (pflags & XPFLAG_HAS_RECV_TS)
{
2020-01-26 11:18:18 +01:00
recvTS = in.ReadUInt32();
2020-01-25 20:45:43 +01:00
}
2020-01-26 10:28:30 +01:00
2020-01-28 23:45:47 +01:00
if (seqgt(ackId, manager->getLastAckedSeq()))
2020-01-25 20:45:43 +01:00
{
2020-01-28 23:45:47 +01:00
if (waitingForAcks && manager->getLastAckedSeq() >= firstSentPing)
2020-01-25 20:45:43 +01:00
{
rttHistory.Reset();
waitingForAcks = false;
dontSendPackets = 10;
messageThread.Post(
[this] {
dontSendPackets = 0;
},
1.0);
LOGI("resuming sending");
}
conctl.PacketAcknowledged(ackId);
2020-01-28 23:45:47 +01:00
manager->ackLocal(ackId, acks);
2020-01-29 19:12:12 +01:00
2020-01-29 15:52:43 +01:00
if (transportId != 0xFF && !incomingStreams.empty() && incomingStreams[transportId]->jitterBuffer)
{
// Technically I should be using the specific packet manager's rtt history but will separate later
uint32_t tooOldSeq = incomingStreams[transportId]->jitterBuffer->GetSeqTooLate(rttHistory[0]);
LOGW("Reverse acking seqs older than %u, newest seq received from remote %u (transportId %hhu)", tooOldSeq, manager->getLastRemoteSeq(), transportId);
manager->ackRemoteSeqsOlderThan(tooOldSeq);
}
2020-01-25 20:45:43 +01:00
2020-01-29 15:52:43 +01:00
for (auto &opkt : manager->getRecentOutgoingPackets())
2020-01-25 20:45:43 +01:00
{
2020-01-26 11:18:18 +01:00
if (opkt.ackTime)
2020-01-29 15:52:43 +01:00
{
2020-01-25 20:45:43 +01:00
continue;
2020-01-29 15:52:43 +01:00
}
2020-01-28 23:45:47 +01:00
if (manager->wasLocalAcked(opkt.seq))
2020-01-25 20:45:43 +01:00
{
2020-01-29 15:52:43 +01:00
opkt.data = Buffer();
2020-01-25 20:45:43 +01:00
opkt.ackTime = GetCurrentTime();
2020-01-25 21:31:26 +01:00
opkt.rttTime = opkt.ackTime - opkt.sendTime;
2020-01-25 20:45:43 +01:00
if (opkt.lost)
{
2020-01-29 15:52:43 +01:00
LOGW("acknowledged lost packet %u (transportId %hhu)", opkt.seq, transportId);
2020-01-25 20:45:43 +01:00
sendLosses--;
}
if (opkt.sender && !opkt.lost)
{ // don't report lost packets as acknowledged to PacketSenders
opkt.sender->PacketAcknowledged(opkt.seq, opkt.sendTime, recvTS / 1000.0f, opkt.type, opkt.size);
}
// TODO move this to a PacketSender
conctl.PacketAcknowledged(opkt.seq);
}
2020-03-22 20:09:44 +01:00
else if (ver.peerVersion >= PROTOCOL_RELIABLE && opkt.data.Length() && opkt.seq < manager->getLastAckedSeq())
2020-01-29 15:52:43 +01:00
{
if (manager->getLastAckedSeq() - opkt.seq > 32)
{
LOGW("Marking reliable NACK packet %u as lost, since is more than 32 seqs old (last acked %u, transportId %hhu)", opkt.seq, manager->getLastAckedSeq(), transportId);
opkt.lost = true;
opkt.data = Buffer();
continue;
}
if (opkt.sendTime + rttHistory[0] < VoIPController::GetCurrentTime())
{
LOGW("It is possibly a bit too early to resend NACK packet %u (transportId %hhu)", opkt.seq, transportId);
}
LOGW("Resending reliable NACK packet %u, (transportId %hhu)", opkt.seq, transportId);
BufferOutputStream copy(opkt.data.Length());
copy.WriteBytes(opkt.data);
Endpoint *endpoint = GetEndpointById(opkt.endpoint);
if (!endpoint)
{
LOGE("Recent NACK queue contained packet (%u) for nonexistent endpoint, (transportId %hhu)", opkt.seq, transportId);
opkt.lost = true;
opkt.data = Buffer();
continue;
}
opkt.sendTime = GetCurrentTime();
SendPacket(copy.GetBuffer(), copy.GetLength(), *endpoint, opkt.seq, opkt.type, transportId);
//SendOrEnqueuePacket()
}
2020-01-25 20:45:43 +01:00
}
2020-03-22 20:09:44 +01:00
if (ver.peerVersion >= 6)
2020-01-25 20:45:43 +01:00
{
for (auto x = currentExtras.begin(); x != currentExtras.end();)
{
2020-01-28 23:45:47 +01:00
if (x->firstContainingSeq != 0 && seqgte(manager->getLastAckedSeq(), x->firstContainingSeq))
2020-01-25 20:45:43 +01:00
{
LOGV("Peer acknowledged extra type %u length %u", x->type, (unsigned int)x->data.Length());
ProcessAcknowledgedOutgoingExtra(*x);
x = currentExtras.erase(x);
continue;
}
++x;
}
}
2020-03-22 20:09:44 +01:00
if (ver.peerVersion < PROTOCOL_RELIABLE)
2020-01-28 23:45:47 +01:00
handleReliablePackets(); // Use old reliability logic
2020-01-25 20:45:43 +01:00
}
Endpoint &_currentEndpoint = endpoints.at(currentEndpoint);
if (srcEndpoint.id != currentEndpoint && srcEndpoint.IsReflector() && (_currentEndpoint.IsP2P() || _currentEndpoint.averageRTT == 0))
{
2020-01-28 23:45:47 +01:00
if (seqgt(manager->getLastSentSeq() - 32, manager->getLastAckedSeq()))
2020-01-25 20:45:43 +01:00
{
currentEndpoint = srcEndpoint.id;
_currentEndpoint = srcEndpoint;
LOGI("Peer network address probably changed, switching to relay");
if (allowP2p)
SendPublicEndpointsRequest();
}
}
2020-01-26 10:28:30 +01:00
/*
2020-01-25 20:45:43 +01:00
if (config.logPacketStats)
{
DebugLoggedPacket dpkt = {
static_cast<int32_t>(pseq),
GetCurrentTime() - connectionInitTime,
static_cast<int32_t>(packet.data.Length())};
debugLoggedPackets.push_back(dpkt);
if (debugLoggedPackets.size() >= 2500)
{
debugLoggedPackets.erase(debugLoggedPackets.begin(), debugLoggedPackets.begin() + 500);
}
2020-01-26 10:28:30 +01:00
}*/
2020-01-25 20:45:43 +01:00
2020-01-29 15:52:43 +01:00
if (unacknowledgedIncomingPacketCount++ > unackNopThreshold)
2020-01-25 20:45:43 +01:00
{
//LOGV("Sending nop packet as ack");
2020-01-29 15:52:43 +01:00
SendNopPacket(packetManager);
2020-01-25 20:45:43 +01:00
}
2020-01-28 23:45:47 +01:00
//LOGV("acks: %u -> %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf", manager.getLastAckedSeq()(), remoteAcks[0], remoteAcks[1], remoteAcks[2], remoteAcks[3], remoteAcks[4], remoteAcks[5], remoteAcks[6], remoteAcks[7]);
2020-01-28 21:45:56 +01:00
//LOGD("recv: %u -> %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf, %.2lf", getLastRemoteSeq(), recvPacketTimes[0], recvPacketTimes[1], recvPacketTimes[2], recvPacketTimes[3], recvPacketTimes[4], recvPacketTimes[5], recvPacketTimes[6], recvPacketTimes[7]);
2020-01-25 20:45:43 +01:00
//LOGI("RTT = %.3lf", GetAverageRTT());
//LOGV("Packet %u type is %d", pseq, type);
if (type == PKT_INIT)
{
LOGD("Received init");
uint32_t ver = in.ReadUInt32();
if (!receivedInit)
2020-03-22 20:09:44 +01:00
ver.peerVersion = ver;
2020-01-25 20:45:43 +01:00
LOGI("Peer version is %d", peerVersion);
uint32_t minVer = in.ReadUInt32();
2020-03-22 20:09:44 +01:00
if (minVer > PROTOCOL_VERSION || ver.peerVersion < MIN_PROTOCOL_VERSION)
2020-01-25 20:45:43 +01:00
{
lastError = ERROR_INCOMPATIBLE;
SetState(STATE_FAILED);
return;
}
uint32_t flags = in.ReadUInt32();
if (!receivedInit)
{
2020-03-21 15:08:03 +01:00
if (flags & ExtraInit::Flags::DataSavingEnabled)
2020-01-25 20:45:43 +01:00
{
dataSavingRequestedByPeer = true;
UpdateDataSavingState();
UpdateAudioBitrateLimit();
}
2020-03-21 15:08:03 +01:00
if (flags & ExtraInit::Flags::GroupCallSupported)
2020-01-25 20:45:43 +01:00
{
peerCapabilities |= TGVOIP_PEER_CAP_GROUP_CALLS;
}
2020-03-21 15:08:03 +01:00
if (flags & ExtraInit::Flags::VideoRecvSupported)
2020-01-25 20:45:43 +01:00
{
peerCapabilities |= TGVOIP_PEER_CAP_VIDEO_DISPLAY;
}
2020-03-21 15:08:03 +01:00
if (flags & ExtraInit::Flags::VideoSendSupported)
2020-01-25 20:45:43 +01:00
{
peerCapabilities |= TGVOIP_PEER_CAP_VIDEO_CAPTURE;
}
}
2020-01-26 18:51:45 +01:00
unsigned char numSupportedAudioCodecs = in.ReadByte();
2020-03-18 19:54:56 +01:00
for (auto i = 0; i < numSupportedAudioCodecs; i++)
2020-01-25 20:45:43 +01:00
{
2020-03-22 20:09:44 +01:00
if (ver.peerVersion < 5)
2020-01-25 20:45:43 +01:00
in.ReadByte(); // ignore for now
else
in.ReadInt32();
}
2020-03-21 15:08:03 +01:00
if (!receivedInit && ((flags & ExtraInit::Flags::VideoSendSupported && config.enableVideoReceive) || (flags & ExtraInit::Flags::VideoRecvSupported && config.enableVideoSend)))
2020-01-25 20:45:43 +01:00
{
LOGD("Peer video decoders:");
2020-03-21 15:08:03 +01:00
uint8_t numSupportedVideoDecoders = in.ReadByte();
2020-03-18 19:54:56 +01:00
for (auto i = 0; i < numSupportedVideoDecoders; i++)
2020-01-25 20:45:43 +01:00
{
2020-01-26 11:18:18 +01:00
uint32_t id = in.ReadUInt32();
2020-01-25 20:45:43 +01:00
peerVideoDecoders.push_back(id);
char *_id = reinterpret_cast<char *>(&id);
LOGD("%c%c%c%c", _id[3], _id[2], _id[1], _id[0]);
}
protocolInfo.maxVideoResolution = in.ReadByte();
SetupOutgoingVideoStream();
}
BufferOutputStream out(1024);
out.WriteInt32(PROTOCOL_VERSION);
out.WriteInt32(MIN_PROTOCOL_VERSION);
out.WriteByte((unsigned char)outgoingStreams.size());
2020-03-15 19:43:35 +01:00
for (const auto &stream : outgoingStreams)
2020-01-25 20:45:43 +01:00
{
2020-03-15 19:43:35 +01:00
out.WriteByte(stream->id);
out.WriteByte(stream->type);
2020-03-22 20:09:44 +01:00
if (ver.peerVersion < 5)
2020-03-21 15:08:03 +01:00
out.WriteByte((unsigned char)(stream->codec == Codec::Opus ? CODEC_OPUS_OLD : 0));
2020-01-25 20:45:43 +01:00
else
2020-03-15 19:43:35 +01:00
out.WriteInt32(stream->codec);
out.WriteInt16(stream->frameDuration);
out.WriteByte((unsigned char)(stream->enabled ? 1 : 0));
2020-01-25 20:45:43 +01:00
}
LOGI("Sending init ack");
2020-03-18 19:54:56 +01:00
size_t outLength = out.GetOffset();
2020-01-25 20:45:43 +01:00
SendOrEnqueuePacket(PendingOutgoingPacket{
2020-01-28 23:45:47 +01:00
/*.seq=*/packetManager.nextLocalSeq(),
2020-01-25 20:45:43 +01:00
/*.type=*/PKT_INIT_ACK,
/*.len=*/outLength,
/*.data=*/Buffer(move(out)),
/*.endpoint=*/0});
if (!receivedInit)
{
receivedInit = true;
if ((srcEndpoint.type == Endpoint::Type::UDP_RELAY && udpConnectivityState != UDP_BAD && udpConnectivityState != UDP_NOT_AVAILABLE) || srcEndpoint.type == Endpoint::Type::TCP_RELAY)
{
currentEndpoint = srcEndpoint.id;
if (srcEndpoint.type == Endpoint::Type::UDP_RELAY || (useTCP && srcEndpoint.type == Endpoint::Type::TCP_RELAY))
preferredRelay = srcEndpoint.id;
}
}
if (!audioStarted && receivedInitAck)
{
StartAudio();
audioStarted = true;
}
}
if (type == PKT_INIT_ACK)
{
LOGD("Received init ack");
if (!receivedInitAck)
{
receivedInitAck = true;
messageThread.Cancel(initTimeoutID);
initTimeoutID = MessageThread::INVALID_ID;
if (packetInnerLen > 10)
{
2020-03-22 20:09:44 +01:00
ver.peerVersion = in.ReadInt32();
2020-01-25 20:45:43 +01:00
uint32_t minVer = in.ReadUInt32();
2020-03-22 20:09:44 +01:00
if (minVer > PROTOCOL_VERSION || ver.peerVersion < MIN_PROTOCOL_VERSION)
2020-01-25 20:45:43 +01:00
{
lastError = ERROR_INCOMPATIBLE;
SetState(STATE_FAILED);
return;
}
}
else
{
2020-03-22 20:09:44 +01:00
ver.peerVersion = 1;
2020-01-25 20:45:43 +01:00
}
2020-03-22 20:09:44 +01:00
LOGI("peer version from init ack %d", ver.peerVersion);
2020-01-25 20:45:43 +01:00
unsigned char streamCount = in.ReadByte();
if (streamCount == 0)
return;
int i;
shared_ptr<Stream> incomingAudioStream = nullptr;
for (i = 0; i < streamCount; i++)
{
shared_ptr<Stream> stm = make_shared<Stream>();
stm->id = in.ReadByte();
2020-03-21 15:08:03 +01:00
stm->type = static_cast<StreamInfo::Type>(in.ReadByte());
2020-03-22 20:09:44 +01:00
if (ver.peerVersion < 5)
2020-01-25 20:45:43 +01:00
{
unsigned char codec = in.ReadByte();
if (codec == CODEC_OPUS_OLD)
2020-03-21 15:08:03 +01:00
stm->codec = Codec::Opus;
2020-01-25 20:45:43 +01:00
}
else
{
stm->codec = in.ReadUInt32();
}
in.ReadInt16();
stm->frameDuration = 60;
stm->enabled = in.ReadByte() == 1;
2020-03-22 20:09:44 +01:00
if (stm->type == StreamInfo::Type::Video && ver.peerVersion < 9)
2020-01-25 20:45:43 +01:00
{
LOGV("Skipping video stream for old protocol version");
continue;
}
2020-03-21 15:08:03 +01:00
if (stm->type == StreamInfo::Type::Audio)
2020-01-25 20:45:43 +01:00
{
stm->jitterBuffer = make_shared<JitterBuffer>(stm->frameDuration);
if (stm->frameDuration > 50)
stm->jitterBuffer->SetMinPacketCount(ServerConfig::GetSharedInstance()->GetUInt("jitter_initial_delay_60", 2));
else if (stm->frameDuration > 30)
stm->jitterBuffer->SetMinPacketCount(ServerConfig::GetSharedInstance()->GetUInt("jitter_initial_delay_40", 4));
else
stm->jitterBuffer->SetMinPacketCount(ServerConfig::GetSharedInstance()->GetUInt("jitter_initial_delay_20", 6));
stm->decoder = nullptr;
}
2020-03-21 15:08:03 +01:00
else if (stm->type == StreamInfo::Type::Video)
2020-01-25 20:45:43 +01:00
{
if (!stm->packetReassembler)
{
stm->packetReassembler = make_shared<PacketReassembler>();
stm->packetReassembler->SetCallback(bind(&VoIPController::ProcessIncomingVideoFrame, this, placeholders::_1, placeholders::_2, placeholders::_3, placeholders::_4));
}
}
else
{
LOGW("Unknown incoming stream type: %d", stm->type);
continue;
}
incomingStreams.push_back(stm);
2020-03-21 15:08:03 +01:00
if (stm->type == StreamInfo::Type::Audio && !incomingAudioStream)
2020-01-25 20:45:43 +01:00
incomingAudioStream = stm;
}
if (!incomingAudioStream)
return;
2020-03-22 20:09:44 +01:00
if (ver.peerVersion >= 5 && !useMTProto2)
2020-01-25 20:45:43 +01:00
{
useMTProto2 = true;
LOGD("MTProto2 wasn't initially enabled for whatever reason but peer supports it; upgrading");
}
if (!audioStarted && receivedInit)
{
StartAudio();
audioStarted = true;
}
messageThread.Post(
[this] {
if (state == STATE_WAIT_INIT_ACK)
{
SetState(STATE_ESTABLISHED);
}
},
ServerConfig::GetSharedInstance()->GetDouble("established_delay_if_no_stream_data", 1.5));
if (allowP2p)
SendPublicEndpointsRequest();
}
}
if (type == PKT_STREAM_DATA || type == PKT_STREAM_DATA_X2 || type == PKT_STREAM_DATA_X3)
{
if (!receivedFirstStreamPacket)
{
receivedFirstStreamPacket = true;
if (state != STATE_ESTABLISHED && receivedInitAck)
{
messageThread.Post(
[this]() {
SetState(STATE_ESTABLISHED);
},
.5);
LOGW("First audio packet - setting state to ESTABLISHED");
}
}
2020-01-26 18:51:45 +01:00
if (srcEndpoint.type == Endpoint::Type::UDP_RELAY && srcEndpoint.id != peerPreferredRelay)
{
peerPreferredRelay = srcEndpoint.id;
}
uint8_t count = 1;
2020-01-25 20:45:43 +01:00
switch (type)
{
case PKT_STREAM_DATA_X2:
count = 2;
break;
case PKT_STREAM_DATA_X3:
count = 3;
break;
}
2020-01-26 18:51:45 +01:00
uint8_t i;
2020-01-25 20:45:43 +01:00
for (i = 0; i < count; i++)
{
unsigned char streamID = in.ReadByte();
unsigned char flags = (unsigned char)(streamID & 0xC0);
streamID &= 0x3F;
uint16_t sdlen = (uint16_t)(flags & STREAM_DATA_FLAG_LEN16 ? in.ReadInt16() : in.ReadByte());
uint32_t pts = in.ReadUInt32();
unsigned char fragmentCount = 1;
unsigned char fragmentIndex = 0;
2020-01-29 15:52:43 +01:00
//LOGE("RECV: For pts %u = seq %u, got seq %u", pts, pts/60 + 1, pseq);
2020-01-25 20:45:43 +01:00
//LOGD("stream data, pts=%d, len=%d, rem=%d", pts, sdlen, in.Remaining());
audioTimestampIn = pts;
if (!audioOutStarted && audioOutput)
{
MutexGuard m(audioIOMutex);
audioOutput->Start();
audioOutStarted = true;
}
bool fragmented = static_cast<bool>(sdlen & STREAM_DATA_XFLAG_FRAGMENTED);
bool extraFEC = static_cast<bool>(sdlen & STREAM_DATA_XFLAG_EXTRA_FEC);
bool keyframe = static_cast<bool>(sdlen & STREAM_DATA_XFLAG_KEYFRAME);
if (fragmented)
{
fragmentIndex = in.ReadByte();
fragmentCount = in.ReadByte();
}
sdlen &= 0x7FF;
if (in.GetOffset() + sdlen > len)
{
return;
}
shared_ptr<Stream> stm;
for (shared_ptr<Stream> &ss : incomingStreams)
{
if (ss->id == streamID)
{
stm = ss;
break;
}
}
2020-03-21 15:08:03 +01:00
if (stm && stm->type == StreamInfo::Type::Audio)
2020-01-25 20:45:43 +01:00
{
if (stm->jitterBuffer)
{
stm->jitterBuffer->HandleInput(static_cast<unsigned char *>(buffer + in.GetOffset()), sdlen, pts, false);
2020-01-29 15:52:43 +01:00
/*if (peerVersion >= PROTOCOL_RELIABLE)
{
// Technically I should be using the specific packet manager's rtt history but will separate later
uint32_t tooOldSeq = stm->jitterBuffer->GetSeqTooLate(rttHistory[0]) - 1;
LOGW("Reverse acking seqs older than %u, newest acked seq %u (transportId %hhu)", tooOldSeq, manager->getLastRemoteSeq(), transportId);
manager->ackRemoteSeqsOlderThan(tooOldSeq);
}*/
2020-01-25 20:45:43 +01:00
if (extraFEC)
{
in.Seek(in.GetOffset() + sdlen);
unsigned int fecCount = in.ReadByte();
for (unsigned int j = 0; j < fecCount; j++)
{
unsigned char dlen = in.ReadByte();
unsigned char data[256];
in.ReadBytes(data, dlen);
2020-03-10 20:31:58 +01:00
stm->jitterBuffer->HandleInput(data, dlen, pts - (fecCount - j) * stm->frameDuration, true);
2020-01-25 20:45:43 +01:00
}
}
}
}
2020-03-21 15:08:03 +01:00
else if (stm && stm->type == StreamInfo::Type::Video)
2020-01-25 20:45:43 +01:00
{
if (stm->packetReassembler)
{
uint8_t frameSeq = in.ReadByte();
Buffer pdata(sdlen);
uint16_t rotation = 0;
if (fragmentIndex == 0)
{
unsigned char _rotation = in.ReadByte() & (unsigned char)VIDEO_ROTATION_MASK;
switch (_rotation)
{
case VIDEO_ROTATION_0:
rotation = 0;
break;
case VIDEO_ROTATION_90:
rotation = 90;
break;
case VIDEO_ROTATION_180:
rotation = 180;
break;
case VIDEO_ROTATION_270:
rotation = 270;
break;
default: // unreachable on sane CPUs
abort();
}
//if(rotation!=stm->rotation){
// stm->rotation=rotation;
// LOGI("Video rotation: %u", rotation);
//}
}
pdata.CopyFrom(buffer + in.GetOffset(), 0, sdlen);
stm->packetReassembler->AddFragment(std::move(pdata), fragmentIndex, fragmentCount, pts, frameSeq, keyframe, rotation);
}
//LOGV("Received video fragment %u of %u", fragmentIndex, fragmentCount);
}
else
{
LOGW("received packet for unknown stream %u", (unsigned int)streamID);
}
if (i < count - 1)
in.Seek(in.GetOffset() + sdlen);
}
}
if (type == PKT_PING)
{
//LOGD("Received ping from %s:%d", srcEndpoint.address.ToString().c_str(), srcEndpoint.port);
if (srcEndpoint.type != Endpoint::Type::UDP_RELAY && srcEndpoint.type != Endpoint::Type::TCP_RELAY && !allowP2p)
{
LOGW("Received p2p ping but p2p is disabled by manual override");
return;
}
BufferOutputStream pkt(128);
pkt.WriteInt32(pseq);
size_t pktLength = pkt.GetLength();
SendOrEnqueuePacket(PendingOutgoingPacket{
2020-01-28 23:45:47 +01:00
/*.seq=*/packetManager.nextLocalSeq(),
2020-01-25 20:45:43 +01:00
/*.type=*/PKT_PONG,
/*.len=*/pktLength,
/*.data=*/Buffer(move(pkt)),
/*.endpoint=*/srcEndpoint.id,
});
}
if (type == PKT_PONG)
{
if (packetInnerLen >= 4)
{
uint32_t pingSeq = in.ReadUInt32();
#ifdef LOG_PACKETS
LOGD("Received pong for ping in seq %u", pingSeq);
#endif
if (pingSeq == srcEndpoint.lastPingSeq)
{
srcEndpoint.rtts.Add(GetCurrentTime() - srcEndpoint.lastPingTime);
srcEndpoint.averageRTT = srcEndpoint.rtts.NonZeroAverage();
LOGD("Current RTT via %s: %.3f, average: %.3f", packet.address.ToString().c_str(), srcEndpoint.rtts[0], srcEndpoint.averageRTT);
if (srcEndpoint.averageRTT > rateMaxAcceptableRTT)
needRate = true;
}
}
}
if (type == PKT_STREAM_STATE)
{
unsigned char id = in.ReadByte();
unsigned char enabled = in.ReadByte();
LOGV("Peer stream state: id %u flags %u", (int)id, (int)enabled);
for (vector<shared_ptr<Stream>>::iterator s = incomingStreams.begin(); s != incomingStreams.end(); ++s)
{
if ((*s)->id == id)
{
(*s)->enabled = enabled == 1;
UpdateAudioOutputState();
break;
}
}
}
if (type == PKT_LAN_ENDPOINT)
{
LOGV("received lan endpoint");
uint32_t peerAddr = in.ReadUInt32();
uint16_t peerPort = (uint16_t)in.ReadInt32();
unsigned char peerTag[16];
2020-03-24 12:15:04 +01:00
Endpoint lan(Endpoint::ID::LANv4, peerPort, NetworkAddress::IPv4(peerAddr), NetworkAddress::Empty(), Endpoint::Type::UDP_P2P_LAN, peerTag);
2020-01-25 20:45:43 +01:00
2020-03-24 12:15:04 +01:00
if (currentEndpoint == Endpoint::ID::LANv4)
2020-01-25 20:45:43 +01:00
currentEndpoint = preferredRelay;
MutexGuard m(endpointsMutex);
2020-03-24 12:15:04 +01:00
endpoints[Endpoint::ID::LANv4] = lan;
2020-01-25 20:45:43 +01:00
}
if (type == PKT_NETWORK_CHANGED && _currentEndpoint.IsP2P())
{
currentEndpoint = preferredRelay;
if (allowP2p)
SendPublicEndpointsRequest();
if (peerVersion >= 2)
{
uint32_t flags = in.ReadUInt32();
2020-03-21 15:08:03 +01:00
dataSavingRequestedByPeer = flags & ExtraInit::Flags::DataSavingEnabled;
2020-01-25 20:45:43 +01:00
UpdateDataSavingState();
UpdateAudioBitrateLimit();
ResetEndpointPingStats();
}
}
if (type == PKT_STREAM_EC)
{
unsigned char streamID = in.ReadByte();
if (peerVersion < 7)
{
uint32_t lastTimestamp = in.ReadUInt32();
unsigned char count = in.ReadByte();
for (shared_ptr<Stream> &stm : incomingStreams)
{
if (stm->id == streamID)
{
for (unsigned int i = 0; i < count; i++)
{
unsigned char dlen = in.ReadByte();
unsigned char data[256];
in.ReadBytes(data, dlen);
if (stm->jitterBuffer)
{
2020-03-10 20:31:58 +01:00
stm->jitterBuffer->HandleInput(data, dlen, lastTimestamp - (count - i) * stm->frameDuration, true);
2020-01-25 20:45:43 +01:00
}
}
break;
}
}
}
else
{
shared_ptr<Stream> stm = GetStreamByID(streamID, false);
if (!stm)
{
LOGW("Received FEC packet for unknown stream %u", streamID);
return;
}
2020-03-21 15:08:03 +01:00
if (stm->type != StreamInfo::Type::Video)
2020-01-25 20:45:43 +01:00
{
LOGW("Received FEC packet for non-video stream %u", streamID);
return;
}
if (!stm->packetReassembler)
return;
uint8_t fseq = in.ReadByte();
unsigned char fecScheme = in.ReadByte();
unsigned char prevFrameCount = in.ReadByte();
uint16_t fecLen = in.ReadUInt16();
if (fecLen > in.Remaining())
return;
Buffer fecData(fecLen);
in.ReadBytes(fecData);
stm->packetReassembler->AddFEC(std::move(fecData), fseq, prevFrameCount, fecScheme);
}
}
}
void VoIPController::ProcessExtraData(Buffer &data)
{
BufferInputStream in(*data, data.Length());
unsigned char type = in.ReadByte();
unsigned char fullHash[SHA1_LENGTH];
crypto.sha1(*data, data.Length(), fullHash);
uint64_t hash = *reinterpret_cast<uint64_t *>(fullHash);
if (lastReceivedExtrasByType[type] == hash)
{
return;
}
LOGE("ProcessExtraData");
lastReceivedExtrasByType[type] = hash;
2020-03-21 15:08:03 +01:00
if (type == ExtraStreamFlags::ID)
2020-01-25 20:45:43 +01:00
{
unsigned char id = in.ReadByte();
2020-01-26 11:18:18 +01:00
uint32_t flags = in.ReadUInt32();
2020-01-25 20:45:43 +01:00
LOGV("Peer stream state: id %u flags %u", (unsigned int)id, (unsigned int)flags);
for (shared_ptr<Stream> &s : incomingStreams)
{
if (s->id == id)
{
bool prevEnabled = s->enabled;
bool prevPaused = s->paused;
2020-03-21 15:08:03 +01:00
s->enabled = flags & ExtraStreamFlags::Flags::Enabled;
s->paused = flags & ExtraStreamFlags::Flags::Paused;
if (flags & ExtraStreamFlags::Flags::ExtraEC)
2020-01-25 20:45:43 +01:00
{
if (!s->extraECEnabled)
{
s->extraECEnabled = true;
if (s->jitterBuffer)
s->jitterBuffer->SetMinPacketCount(4);
}
}
else
{
if (s->extraECEnabled)
{
s->extraECEnabled = false;
if (s->jitterBuffer)
s->jitterBuffer->SetMinPacketCount(2);
}
}
2020-03-21 15:08:03 +01:00
if (prevEnabled != s->enabled && s->type == StreamInfo::Type::Video && videoRenderer)
2020-01-25 20:45:43 +01:00
videoRenderer->SetStreamEnabled(s->enabled);
2020-03-21 15:08:03 +01:00
if (prevPaused != s->paused && s->type == StreamInfo::Type::Video && videoRenderer)
2020-01-25 20:45:43 +01:00
videoRenderer->SetStreamPaused(s->paused);
UpdateAudioOutputState();
break;
}
}
}
2020-03-21 15:08:03 +01:00
else if (type == ExtraStreamCsd::ID)
2020-01-25 20:45:43 +01:00
{
LOGI("Received codec specific data");
unsigned char streamID = in.ReadByte();
for (shared_ptr<Stream> &stm : incomingStreams)
{
if (stm->id == streamID)
{
stm->codecSpecificData.clear();
stm->csdIsValid = false;
stm->width = static_cast<unsigned int>(in.ReadInt16());
stm->height = static_cast<unsigned int>(in.ReadInt16());
2020-01-26 18:51:45 +01:00
uint8_t count = in.ReadByte();
for (uint8_t i = 0; i < count; i++)
2020-01-25 20:45:43 +01:00
{
size_t len = (size_t)in.ReadByte();
Buffer csd(len);
in.ReadBytes(*csd, len);
stm->codecSpecificData.push_back(move(csd));
}
break;
}
}
}
2020-03-21 15:08:03 +01:00
else if (type == ExtraLanEndpoint::ID)
2020-01-25 20:45:43 +01:00
{
if (!allowP2p)
return;
LOGV("received lan endpoint (extra)");
uint32_t peerAddr = in.ReadUInt32();
2020-01-26 18:51:45 +01:00
uint16_t peerPort = static_cast<uint16_t>(in.ReadInt32());
2020-03-24 12:15:04 +01:00
if (currentEndpoint == Endpoint::ID::LANv4)
2020-01-25 20:45:43 +01:00
currentEndpoint = preferredRelay;
unsigned char peerTag[16];
2020-03-24 12:15:04 +01:00
Endpoint lan(Endpoint::ID::LANv4, peerPort, NetworkAddress::IPv4(peerAddr), NetworkAddress::Empty(), Endpoint::Type::UDP_P2P_LAN, peerTag);
2020-01-25 20:45:43 +01:00
MutexGuard m(endpointsMutex);
2020-03-24 12:15:04 +01:00
endpoints[Endpoint::ID::LANv4] = lan;
2020-01-25 20:45:43 +01:00
}
2020-03-21 15:08:03 +01:00
else if (type == ExtraNetworkChanged::ID)
2020-01-25 20:45:43 +01:00
{
LOGI("Peer network changed");
wasNetworkHandover = true;
const Endpoint &_currentEndpoint = endpoints.at(currentEndpoint);
if (_currentEndpoint.type != Endpoint::Type::UDP_RELAY && _currentEndpoint.type != Endpoint::Type::TCP_RELAY)
currentEndpoint = preferredRelay;
if (allowP2p)
SendPublicEndpointsRequest();
uint32_t flags = in.ReadUInt32();
2020-03-21 15:08:03 +01:00
dataSavingRequestedByPeer = flags & ExtraInit::Flags::DataSavingEnabled;
2020-01-25 20:45:43 +01:00
UpdateDataSavingState();
UpdateAudioBitrateLimit();
ResetEndpointPingStats();
}
2020-03-21 15:08:03 +01:00
else if (type == ExtraGroupCallKey::ID)
2020-01-25 20:45:43 +01:00
{
if (!didReceiveGroupCallKey && !didSendGroupCallKey)
{
unsigned char groupKey[256];
in.ReadBytes(groupKey, 256);
messageThread.Post([this, &groupKey] {
if (callbacks.groupCallKeyReceived)
callbacks.groupCallKeyReceived(this, groupKey);
});
didReceiveGroupCallKey = true;
}
}
2020-03-21 15:08:03 +01:00
else if (type == ExtraGroupCallUpgrade::ID)
2020-01-25 20:45:43 +01:00
{
if (!didInvokeUpgradeCallback)
{
messageThread.Post([this] {
if (callbacks.upgradeToGroupCallRequested)
callbacks.upgradeToGroupCallRequested(this);
});
didInvokeUpgradeCallback = true;
}
}
2020-03-21 15:08:03 +01:00
else if (type == ExtraIpv6Endpoint::ID)
2020-01-25 20:45:43 +01:00
{
if (!allowP2p)
return;
2020-03-17 20:11:27 +01:00
NetworkAddress addr = NetworkAddress::IPv6(in);
uint16_t port = in.ReadUInt16();
2020-01-25 20:45:43 +01:00
peerIPv6Available = true;
LOGV("Received peer IPv6 endpoint [%s]:%u", addr.ToString().c_str(), port);
Endpoint ep;
ep.type = Endpoint::Type::UDP_P2P_INET;
ep.port = port;
ep.v6address = addr;
2020-03-24 12:15:04 +01:00
ep.id = Endpoint::ID::P2Pv6;
endpoints[Endpoint::ID::P2Pv6] = ep;
2020-01-25 20:45:43 +01:00
if (!myIPv6.IsEmpty())
2020-03-24 12:15:04 +01:00
currentEndpoint = Endpoint::ID::P2Pv6;
2020-01-25 20:45:43 +01:00
}
}
void VoIPController::ProcessAcknowledgedOutgoingExtra(UnacknowledgedExtraData &extra)
{
2020-03-21 15:08:03 +01:00
if (extra.type == ExtraGroupCallKey::ID)
2020-01-25 20:45:43 +01:00
{
if (!didReceiveGroupCallKeyAck)
{
didReceiveGroupCallKeyAck = true;
messageThread.Post([this] {
if (callbacks.groupCallKeySent)
callbacks.groupCallKeySent(this);
});
}
}
}
2020-01-29 15:52:43 +01:00
uint8_t VoIPController::WritePacketHeader(PendingOutgoingPacket &pkt, BufferOutputStream &s, PacketSender *source)
2020-01-25 20:45:43 +01:00
{
2020-03-22 20:09:44 +01:00
PacketManager &manager = ver.peerVersion >= PROTOCOL_RELIABLE && source ? source->getPacketManager() : packetManager;
2020-01-28 23:45:47 +01:00
uint32_t acks = manager.getRemoteAckMask();
2020-01-25 20:45:43 +01:00
2020-03-22 20:09:44 +01:00
if (ver.peerVersion >= 8 || (!peerVersion && ver.connectionMaxLayer >= 92))
2020-01-25 20:45:43 +01:00
{
2020-01-29 15:52:43 +01:00
s.WriteByte(pkt.type);
s.WriteInt32(manager.getLastRemoteSeq());
s.WriteInt32(pkt.seq);
s.WriteInt32(acks);
2020-01-27 16:02:59 +01:00
unsigned char flags = currentExtras.empty() ? 0 : XPFLAG_HAS_EXTRA;
2020-01-25 20:45:43 +01:00
2020-03-21 15:08:03 +01:00
shared_ptr<Stream> videoStream = GetStreamByType(StreamInfo::Type::Video, false);
2020-01-25 20:45:43 +01:00
if (peerVersion >= 9 && videoStream && videoStream->enabled)
flags |= XPFLAG_HAS_RECV_TS;
2020-01-29 15:52:43 +01:00
s.WriteByte(flags);
2020-01-25 20:45:43 +01:00
if (!currentExtras.empty())
{
2020-01-29 15:52:43 +01:00
s.WriteByte(static_cast<unsigned char>(currentExtras.size()));
2020-01-27 16:02:59 +01:00
for (auto &x : currentExtras)
2020-01-25 20:45:43 +01:00
{
2020-01-29 19:12:12 +01:00
//LOGV("Writing extra into header: type %u, length %d", x.type, int(x.data.Length()));
2020-01-27 16:02:59 +01:00
assert(x.data.Length() <= 254);
2020-01-29 15:52:43 +01:00
s.WriteByte(static_cast<unsigned char>(x.data.Length() + 1));
s.WriteByte(x.type);
s.WriteBytes(*x.data, x.data.Length());
2020-01-27 16:02:59 +01:00
if (x.firstContainingSeq == 0)
2020-01-29 15:52:43 +01:00
x.firstContainingSeq = pkt.seq;
2020-01-25 20:45:43 +01:00
}
}
if (peerVersion >= 9 && videoStream && videoStream->enabled)
{
2020-01-29 15:52:43 +01:00
s.WriteUInt32((lastRecvPacketTime - connectionInitTime) * 1000.0);
2020-01-25 20:45:43 +01:00
}
}
else
{
2020-01-29 15:52:43 +01:00
legacyWritePacketHeader(pkt.seq, acks, &s, pkt.type, pkt.len);
2020-01-25 20:45:43 +01:00
}
2020-01-29 15:52:43 +01:00
s.WriteBytes(pkt.data);
Buffer copyBuf(s.GetLength());
if (peerVersion >= PROTOCOL_RELIABLE)
copyBuf.CopyFrom(s.GetBuffer(), 0, s.GetLength());
2020-01-25 20:45:43 +01:00
unacknowledgedIncomingPacketCount = 0;
2020-01-29 15:52:43 +01:00
auto &recentOutgoingPackets = manager.getRecentOutgoingPackets();
2020-01-25 20:45:43 +01:00
recentOutgoingPackets.push_back(RecentOutgoingPacket{
2020-01-29 15:52:43 +01:00
pkt.endpoint,
std::move(copyBuf),
pkt.seq,
2020-01-25 20:45:43 +01:00
0,
GetCurrentTime(),
0,
0,
2020-01-29 15:52:43 +01:00
pkt.type,
static_cast<uint32_t>(pkt.len),
2020-01-25 20:45:43 +01:00
source,
false});
while (recentOutgoingPackets.size() > MAX_RECENT_PACKETS)
{
recentOutgoingPackets.erase(recentOutgoingPackets.begin());
}
2020-01-29 15:52:43 +01:00
manager.setLastSentSeq(pkt.seq);
return manager.getTransportId();
2020-01-25 20:45:43 +01:00
//LOGI("packet header size %d", s->GetLength());
}