1
0
mirror of https://github.com/danog/libtgvoip.git synced 2024-12-04 02:27:46 +01:00
libtgvoip/controller/protocol/NetworkAPI.cpp

538 lines
17 KiB
C++
Raw Normal View History

2020-01-25 18:36:49 +01:00
#include "../PrivateDefines.cpp"
2020-03-13 21:12:58 +01:00
//#include <random>
2020-01-25 18:11:15 +01:00
using namespace tgvoip;
using namespace std;
2020-03-13 21:12:58 +01:00
//std::random_device dev;
//std::mt19937 rng(dev());
//std::uniform_int_distribution<std::mt19937::result_type> dist6(0, 9); // distribution in range [1, 6]
2020-01-25 20:45:43 +01:00
bool VoIPController::SendOrEnqueuePacket(PendingOutgoingPacket pkt, bool enqueue, PacketSender *source)
{
ENFORCE_MSG_THREAD;
Endpoint *endpoint = GetEndpointForPacket(pkt);
if (!endpoint)
{
abort();
return false;
}
bool canSend;
if (endpoint->type != Endpoint::Type::TCP_RELAY)
{
canSend = realUdpSocket->IsReadyToSend();
}
else
{
if (!endpoint->socket)
{
LOGV("Connecting to %s:%u", endpoint->GetAddress().ToString().c_str(), endpoint->port);
if (proxyProtocol == PROXY_NONE)
{
endpoint->socket = make_shared<NetworkSocketTCPObfuscated>(NetworkSocket::Create(NetworkProtocol::TCP));
endpoint->socket->Connect(endpoint->GetAddress(), endpoint->port);
}
else if (proxyProtocol == PROXY_SOCKS5)
{
std::shared_ptr<NetworkSocket> tcp = NetworkSocket::Create(NetworkProtocol::TCP);
tcp->Connect(resolvedProxyAddress, proxyPort);
shared_ptr<NetworkSocketSOCKS5Proxy> proxy = make_shared<NetworkSocketSOCKS5Proxy>(tcp, nullptr, proxyUsername, proxyPassword);
endpoint->socket = proxy;
endpoint->socket->Connect(endpoint->GetAddress(), endpoint->port);
}
selectCanceller->CancelSelect();
}
canSend = endpoint->socket && endpoint->socket->IsReadyToSend();
}
if (!canSend)
{
if (enqueue)
{
LOGW("Not ready to send - enqueueing");
sendQueue.push_back(move(pkt));
}
return false;
}
if ((endpoint->type == Endpoint::Type::TCP_RELAY && useTCP) || (endpoint->type != Endpoint::Type::TCP_RELAY && useUDP))
{
2020-03-22 20:09:44 +01:00
if (ver.isNew())
{
//BufferOutputStream out(pkt.packet.getSize()); // Can precalc, should check if it's worth it
BufferOutputStream out(1500);
pkt.packet.serialize(out, ver);
SendPacket(out.GetBuffer(), out.GetLength(), *endpoint, )
}
else
{
SendPacket(out.GetBuffer(), out.GetLength(), *endpoint, pkt.seq, pkt.type, streamId);
}
conctl.PacketSent(seq, len);
//LOGV("Sending %d bytes to %s:%d", out.GetLength(), ep.address.ToString().c_str(), ep.port);
#ifdef LOG_PACKETS
LOGV("Sending: to=%s:%u, seq=%u, length=%u, type=%s, streamId=%hhu", ep.GetAddress().ToString().c_str(), ep.port, seq, (unsigned int)out.GetLength(), GetPacketTypeString(type).c_str(), streamId);
#endif
//BufferOutputStream out(1500);
//uint8_t streamId = WritePacketHeader(pkt, out, source);
2020-03-13 21:12:58 +01:00
/*if (!dist6(rng))
{
LOGW("DROPPING");
}
else*/
2020-01-29 19:12:12 +01:00
/*if (pkt.type == PKT_STREAM_DATA)
2020-01-25 20:45:43 +01:00
{
unsentStreamPackets--;
2020-01-29 19:12:12 +01:00
}*/
2020-01-25 20:45:43 +01:00
}
return true;
}
2020-03-22 20:09:44 +01:00
void VoIPController::SendPacket(unsigned char *data, size_t len, Endpoint &ep)
2020-01-25 20:45:43 +01:00
{
if (stopping)
return;
if (ep.type == Endpoint::Type::TCP_RELAY && !useTCP)
return;
BufferOutputStream out(len + 128);
if (ep.IsReflector())
out.WriteBytes((unsigned char *)ep.peerTag, 16);
2020-03-22 20:09:44 +01:00
else if (ver.peerVersion < 9)
2020-01-25 20:45:43 +01:00
out.WriteBytes(callID, 16);
2020-01-27 16:02:59 +01:00
2020-01-25 20:45:43 +01:00
if (len > 0)
{
2020-01-26 11:18:18 +01:00
encryptPacket(data, len, out);
2020-01-25 20:45:43 +01:00
}
2020-01-26 11:18:18 +01:00
2020-01-25 20:45:43 +01:00
rawSendQueue.Put(
RawPendingOutgoingPacket{
NetworkPacket{
Buffer(std::move(out)),
ep.GetAddress(),
ep.port,
ep.type == Endpoint::Type::TCP_RELAY ? NetworkProtocol::TCP : NetworkProtocol::UDP},
ep.type == Endpoint::Type::TCP_RELAY ? ep.socket : nullptr});
}
void VoIPController::SendInit()
{
ENFORCE_MSG_THREAD;
2020-01-28 23:45:47 +01:00
uint32_t initSeq = packetManager.nextLocalSeq();
2020-01-25 20:45:43 +01:00
for (pair<const int64_t, Endpoint> &_e : endpoints)
{
Endpoint &e = _e.second;
if (e.type == Endpoint::Type::TCP_RELAY && !useTCP)
continue;
BufferOutputStream out(1024);
out.WriteInt32(PROTOCOL_VERSION);
out.WriteInt32(MIN_PROTOCOL_VERSION);
uint32_t flags = 0;
if (config.enableCallUpgrade)
2020-03-21 15:08:03 +01:00
flags |= ExtraInit::Flags::GroupCallSupported;
2020-01-25 20:45:43 +01:00
if (config.enableVideoReceive)
2020-03-21 15:08:03 +01:00
flags |= ExtraInit::Flags::VideoRecvSupported;
2020-01-25 20:45:43 +01:00
if (config.enableVideoSend)
2020-03-21 15:08:03 +01:00
flags |= ExtraInit::Flags::VideoSendSupported;
2020-01-25 20:45:43 +01:00
if (dataSavingMode)
2020-03-21 15:08:03 +01:00
flags |= ExtraInit::Flags::DataSavingEnabled;
2020-01-25 20:45:43 +01:00
out.WriteInt32(flags);
if (connectionMaxLayer < 74)
{
out.WriteByte(2); // audio codecs count
out.WriteByte(CODEC_OPUS_OLD);
out.WriteByte(0);
2020-03-18 19:54:56 +01:00
out.WriteByte(0); // idk, stuff I guess
2020-01-25 20:45:43 +01:00
out.WriteByte(0);
2020-03-21 15:08:03 +01:00
out.WriteInt32(Codec::Opus);
2020-01-25 20:45:43 +01:00
out.WriteByte(0); // video codecs count (decode)
out.WriteByte(0); // video codecs count (encode)
}
else
{
out.WriteByte(1);
2020-03-21 15:08:03 +01:00
out.WriteInt32(Codec::Opus);
2020-01-25 20:45:43 +01:00
vector<uint32_t> decoders = config.enableVideoReceive ? video::VideoRenderer::GetAvailableDecoders() : vector<uint32_t>();
vector<uint32_t> encoders = config.enableVideoSend ? video::VideoSource::GetAvailableEncoders() : vector<uint32_t>();
out.WriteByte((unsigned char)decoders.size());
for (uint32_t id : decoders)
{
out.WriteInt32(id);
}
if (connectionMaxLayer >= 92)
out.WriteByte((unsigned char)video::VideoRenderer::GetMaximumResolution());
else
out.WriteByte(0);
}
SendOrEnqueuePacket(PendingOutgoingPacket{
/*.seq=*/initSeq,
/*.type=*/PKT_INIT,
/*.len=*/out.GetLength(),
/*.data=*/Buffer(move(out)),
/*.endpoint=*/e.id});
}
if (state == STATE_WAIT_INIT)
SetState(STATE_WAIT_INIT_ACK);
messageThread.Post(
[this] {
if (state == STATE_WAIT_INIT_ACK)
{
SendInit();
}
},
0.5);
}
void VoIPController::InitUDPProxy()
{
if (realUdpSocket != udpSocket)
{
udpSocket->Close();
udpSocket = realUdpSocket;
}
char sbuf[128];
snprintf(sbuf, sizeof(sbuf), "%s:%u", proxyAddress.c_str(), proxyPort);
string proxyHostPort(sbuf);
if (proxyHostPort == lastTestedProxyServer && !proxySupportsUDP)
{
LOGI("Proxy does not support UDP - using UDP directly instead");
messageThread.Post(bind(&VoIPController::ResetUdpAvailability, this));
return;
}
std::shared_ptr<NetworkSocket> tcp = NetworkSocket::Create(NetworkProtocol::TCP);
tcp->Connect(resolvedProxyAddress, proxyPort);
vector<std::shared_ptr<NetworkSocket>> writeSockets;
vector<std::shared_ptr<NetworkSocket>> readSockets;
vector<std::shared_ptr<NetworkSocket>> errorSockets;
while (!tcp->IsFailed() && !tcp->IsReadyToSend())
{
writeSockets.push_back(tcp);
if (!NetworkSocket::Select(readSockets, writeSockets, errorSockets, selectCanceller))
{
LOGW("Select canceled while waiting for proxy control socket to connect");
return;
}
}
LOGV("UDP proxy control socket ready to send");
std::shared_ptr<NetworkSocketSOCKS5Proxy> udpProxy = std::make_shared<NetworkSocketSOCKS5Proxy>(tcp, realUdpSocket, proxyUsername, proxyPassword);
udpProxy->OnReadyToSend();
writeSockets.clear();
while (!udpProxy->IsFailed() && !tcp->IsFailed() && !udpProxy->IsReadyToSend())
{
readSockets.clear();
errorSockets.clear();
readSockets.push_back(tcp);
errorSockets.push_back(tcp);
if (!NetworkSocket::Select(readSockets, writeSockets, errorSockets, selectCanceller))
{
LOGW("Select canceled while waiting for UDP proxy to initialize");
return;
}
if (!readSockets.empty())
udpProxy->OnReadyToReceive();
}
LOGV("UDP proxy initialized");
if (udpProxy->IsFailed())
{
udpProxy->Close();
proxySupportsUDP = false;
}
else
{
udpSocket = udpProxy;
}
messageThread.Post(bind(&VoIPController::ResetUdpAvailability, this));
}
2020-01-26 21:06:16 +01:00
void VoIPController::TrySendOutgoingPackets()
2020-01-25 20:45:43 +01:00
{
ENFORCE_MSG_THREAD;
for (vector<PendingOutgoingPacket>::iterator opkt = sendQueue.begin(); opkt != sendQueue.end();)
{
Endpoint *endpoint = GetEndpointForPacket(*opkt);
if (!endpoint)
{
opkt = sendQueue.erase(opkt);
LOGE("SendQueue contained packet for nonexistent endpoint");
continue;
}
bool canSend;
if (endpoint->type != Endpoint::Type::TCP_RELAY)
canSend = realUdpSocket->IsReadyToSend();
else
canSend = endpoint->socket && endpoint->socket->IsReadyToSend();
if (canSend)
{
LOGI("Sending queued packet");
SendOrEnqueuePacket(move(*opkt), false);
opkt = sendQueue.erase(opkt);
}
else
{
++opkt;
}
}
}
2020-01-25 18:11:15 +01:00
void VoIPController::SendRelayPings()
{
ENFORCE_MSG_THREAD;
if ((state == STATE_ESTABLISHED || state == STATE_RECONNECTING) && endpoints.size() > 1)
{
Endpoint *_preferredRelay = &endpoints.at(preferredRelay);
Endpoint *_currentEndpoint = &endpoints.at(currentEndpoint);
Endpoint *minPingRelay = _preferredRelay;
double minPing = _preferredRelay->averageRTT * (_preferredRelay->type == Endpoint::Type::TCP_RELAY ? 2 : 1);
if (minPing == 0.0) // force the switch to an available relay, if any
minPing = DBL_MAX;
for (pair<const int64_t, Endpoint> &_endpoint : endpoints)
{
Endpoint &endpoint = _endpoint.second;
if (endpoint.type == Endpoint::Type::TCP_RELAY && !useTCP)
continue;
if (endpoint.type == Endpoint::Type::UDP_RELAY && !useUDP)
continue;
if (GetCurrentTime() - endpoint.lastPingTime >= 10)
{
LOGV("Sending ping to %s", endpoint.GetAddress().ToString().c_str());
SendOrEnqueuePacket(PendingOutgoingPacket{
2020-01-28 23:45:47 +01:00
/*.seq=*/(endpoint.lastPingSeq = packetManager.nextLocalSeq()),
2020-01-25 18:11:15 +01:00
/*.type=*/PKT_PING,
/*.len=*/0,
/*.data=*/Buffer(),
/*.endpoint=*/endpoint.id});
endpoint.lastPingTime = GetCurrentTime();
}
if ((useUDP && endpoint.type == Endpoint::Type::UDP_RELAY) || (useTCP && endpoint.type == Endpoint::Type::TCP_RELAY))
{
double k = endpoint.type == Endpoint::Type::UDP_RELAY ? 1 : 2;
if (endpoint.averageRTT > 0 && endpoint.averageRTT * k < minPing * relaySwitchThreshold)
{
minPing = endpoint.averageRTT * k;
minPingRelay = &endpoint;
}
}
}
if (minPingRelay->id != preferredRelay)
{
preferredRelay = minPingRelay->id;
_preferredRelay = minPingRelay;
LOGV("set preferred relay to %s", _preferredRelay->address.ToString().c_str());
if (_currentEndpoint->IsReflector())
{
currentEndpoint = preferredRelay;
_currentEndpoint = _preferredRelay;
}
}
if (_currentEndpoint->type == Endpoint::Type::UDP_RELAY && useUDP)
{
constexpr int64_t p2pID = static_cast<int64_t>(FOURCC('P', '2', 'P', '4')) << 32;
constexpr int64_t lanID = static_cast<int64_t>(FOURCC('L', 'A', 'N', '4')) << 32;
if (endpoints.find(p2pID) != endpoints.end())
{
Endpoint &p2p = endpoints[p2pID];
if (endpoints.find(lanID) != endpoints.end() && endpoints[lanID].averageRTT > 0 && endpoints[lanID].averageRTT < minPing * relayToP2pSwitchThreshold)
{
currentEndpoint = lanID;
LOGI("Switching to p2p (LAN)");
}
else
{
if (p2p.averageRTT > 0 && p2p.averageRTT < minPing * relayToP2pSwitchThreshold)
{
currentEndpoint = p2pID;
LOGI("Switching to p2p (Inet)");
}
}
}
}
else
{
if (minPing > 0 && minPing < _currentEndpoint->averageRTT * p2pToRelaySwitchThreshold)
{
LOGI("Switching to relay");
currentEndpoint = preferredRelay;
}
}
}
}
2020-01-29 15:52:43 +01:00
void VoIPController::SendNopPacket(PacketManager &pm)
2020-01-25 18:11:15 +01:00
{
if (state != STATE_ESTABLISHED)
return;
2020-03-22 20:09:44 +01:00
PacketSender *source = pm.getstreamId() == 0xFF ? nullptr : outgoingStreams[pm.getstreamId()]->packetSender.get();
2020-03-21 21:33:51 +01:00
SendOrEnqueuePacket(PendingOutgoingPacket(Packet(), 0), source);
/* {
2020-03-22 20:09:44 +01:00
/*.seq=*/
(firstSentPing = pm.nextLocalSeq()),
/*.type=*/PKT_NOP,
/*.len=*/0,
/*.data=*/Buffer(),
/*.endpoint=*/0
},
source);
* /
2020-01-25 18:11:15 +01:00
}
void VoIPController::SendPublicEndpointsRequest()
{
ENFORCE_MSG_THREAD;
if (!allowP2p)
return;
LOGI("Sending public endpoints request");
for (pair<const int64_t, Endpoint> &e : endpoints)
{
if (e.second.type == Endpoint::Type::UDP_RELAY && !e.second.IsIPv6Only())
{
SendPublicEndpointsRequest(e.second);
}
}
publicEndpointsReqCount++;
if (publicEndpointsReqCount < 10)
{
messageThread.Post(
[this] {
if (waitingForRelayPeerInfo)
{
LOGW("Resending peer relay info request");
SendPublicEndpointsRequest();
}
},
5.0);
}
else
{
publicEndpointsReqCount = 0;
}
}
void VoIPController::SendPublicEndpointsRequest(const Endpoint &relay)
{
if (!useUDP)
return;
LOGD("Sending public endpoints request to %s:%d", relay.address.ToString().c_str(), relay.port);
publicEndpointsReqTime = GetCurrentTime();
waitingForRelayPeerInfo = true;
Buffer buf(32);
memcpy(*buf, relay.peerTag, 16);
memset(*buf + 16, 0xFF, 16);
udpSocket->Send(NetworkPacket{
std::move(buf),
relay.address,
relay.port,
NetworkProtocol::UDP});
}
Endpoint &VoIPController::GetEndpointByType(const Endpoint::Type type)
{
if (type == Endpoint::Type::UDP_RELAY && preferredRelay)
return endpoints.at(preferredRelay);
for (pair<const int64_t, Endpoint> &e : endpoints)
{
if (e.second.type == type)
return e.second;
}
throw out_of_range("no endpoint");
}
2020-03-22 20:09:44 +01:00
void VoIPController::SendDataSavingMode()
{
ENFORCE_MSG_THREAD;
auto s = std::make_shared<ExtraNetworkChanged>();
s->flags |= dataSavingMode ? ExtraInit::Flags::DataSavingEnabled : 0;
SendExtra(s);
}
void VoIPController::SendExtra(std::shared_ptr<Extra> &&_d)
{
SendExtra(Wrapped<Extra>(_d));
}
void VoIPController::SendExtra(std::shared_ptr<Extra> &_d)
{
SendExtra(Wrapped<Extra>(_d));
}
2020-03-21 21:33:51 +01:00
void VoIPController::SendExtra(Wrapped<Extra> &&extra)
2020-01-25 18:11:15 +01:00
{
ENFORCE_MSG_THREAD;
2020-03-21 21:33:51 +01:00
auto type = extra.getID();
LOGV("Sending extra type %hhu", type);
2020-03-17 20:11:27 +01:00
for (auto &extra : currentExtras)
2020-01-25 18:11:15 +01:00
{
2020-03-21 21:33:51 +01:00
if (extra.data.getID() == type)
2020-01-25 18:11:15 +01:00
{
2020-03-17 20:11:27 +01:00
extra.firstContainingSeq = 0;
2020-03-21 21:33:51 +01:00
extra.data = std::move(extra.data);
2020-01-25 18:11:15 +01:00
return;
}
}
2020-03-21 21:33:51 +01:00
currentExtras.push_back(UnacknowledgedExtraData(std::move(extra)));
2020-01-25 18:11:15 +01:00
}
void VoIPController::SendUdpPing(Endpoint &endpoint)
{
if (endpoint.type != Endpoint::Type::UDP_RELAY)
return;
BufferOutputStream p(1024);
p.WriteBytes(endpoint.peerTag, 16);
p.WriteInt32(-1);
p.WriteInt32(-1);
p.WriteInt32(-1);
p.WriteInt32(-2);
int64_t id;
crypto.rand_bytes(reinterpret_cast<uint8_t *>(&id), 8);
p.WriteInt64(id);
endpoint.udpPingTimes[id] = GetCurrentTime();
udpSocket->Send(NetworkPacket{
Buffer(std::move(p)),
endpoint.GetAddress(),
endpoint.port,
NetworkProtocol::UDP});
endpoint.totalUdpPings++;
LOGV("Sending UDP ping to %s:%d, id %" PRId64, endpoint.GetAddress().ToString().c_str(), endpoint.port, id);
}
void VoIPController::ResetUdpAvailability()
{
ENFORCE_MSG_THREAD;
LOGI("Resetting UDP availability");
if (udpPingTimeoutID != MessageThread::INVALID_ID)
{
messageThread.Cancel(udpPingTimeoutID);
}
{
for (pair<const int64_t, Endpoint> &e : endpoints)
{
e.second.udpPongCount = 0;
e.second.udpPingTimes.clear();
}
}
udpPingCount = 0;
udpConnectivityState = UDP_PING_PENDING;
udpPingTimeoutID = messageThread.Post(std::bind(&VoIPController::SendUdpPings, this), 0.0, 0.5);
}
void VoIPController::ResetEndpointPingStats()
{
ENFORCE_MSG_THREAD;
for (pair<const int64_t, Endpoint> &e : endpoints)
{
e.second.averageRTT = 0.0;
e.second.rtts.Reset();
}
}