1
0
mirror of https://github.com/danog/libtgvoip.git synced 2024-12-12 17:17:24 +01:00
libtgvoip/tools/MessageThread.cpp

211 lines
4.6 KiB
C++
Executable File

//
// Created by Grishka on 17.06.2018.
//
#include <assert.h>
#include <time.h>
#include <math.h>
#include <float.h>
#include <stdint.h>
#ifndef _WIN32
#include <sys/time.h>
#endif
#include "tools/MessageThread.h"
#include "VoIPController.h"
#include "tools/logging.h"
using namespace tgvoip;
MessageThread::MessageThread() : Thread(std::bind(&MessageThread::Run, this)), running(true)
{
SetName("MessageThread");
#ifdef _WIN32
#if !defined(WINAPI_FAMILY) || WINAPI_FAMILY != WINAPI_FAMILY_PHONE_APP
event = CreateEvent(NULL, false, false, NULL);
#else
event = CreateEventEx(NULL, NULL, 0, EVENT_ALL_ACCESS);
#endif
#else
pthread_cond_init(&cond, NULL);
#endif
}
MessageThread::~MessageThread()
{
Stop();
#ifdef _WIN32
CloseHandle(event);
#else
pthread_cond_destroy(&cond);
#endif
}
void MessageThread::Stop()
{
if (running)
{
running = false;
#ifdef _WIN32
SetEvent(event);
#else
pthread_cond_signal(&cond);
#endif
Join();
}
}
void MessageThread::Run()
{
loopMutex.Lock();
while (running)
{
double currentTime = VoIPController::GetCurrentTime();
double waitTimeout;
{
MutexGuard _m(queueAccessMutex);
waitTimeout = queue.empty() ? DBL_MAX : (queue[0].deliverAt - currentTime);
};
//LOGW("MessageThread wait timeout %f", waitTimeout);
if (waitTimeout > 0.0)
{
#ifdef _WIN32
loopMutex.Unlock();
DWORD actualWaitTimeout = waitTimeout == DBL_MAX ? INFINITE : ((DWORD)round(waitTimeout * 1000.0));
#if !defined(WINAPI_FAMILY) || WINAPI_FAMILY != WINAPI_FAMILY_PHONE_APP
WaitForSingleObject(event, actualWaitTimeout);
#else
WaitForSingleObjectEx(event, actualWaitTimeout, false);
#endif
// we don't really care if a context switch happens here and anything gets added to the queue by another thread
// since any new no-delay messages will get delivered on this iteration anyway
loopMutex.Lock();
#else
if (waitTimeout != DBL_MAX)
{
struct timeval now;
struct timespec timeout;
gettimeofday(&now, NULL);
waitTimeout += now.tv_sec;
waitTimeout += (now.tv_usec / 1000000.0);
timeout.tv_sec = (time_t)(floor(waitTimeout));
timeout.tv_nsec = (long)((waitTimeout - floor(waitTimeout)) * 1000000000.0);
pthread_cond_timedwait(&cond, loopMutex.NativeHandle(), &timeout);
}
else
{
pthread_cond_wait(&cond, loopMutex.NativeHandle());
}
#endif
}
if (!running)
{
loopMutex.Unlock();
return;
}
currentTime = VoIPController::GetCurrentTime();
std::vector<Message> msgsToDeliverNow;
{
MutexGuard _m(queueAccessMutex);
std::vector<Message> newQueue;
std::partition_copy(
std::make_move_iterator(queue.begin()),
std::make_move_iterator(queue.end()),
std::back_inserter(msgsToDeliverNow),
std::back_inserter(newQueue),
[&currentTime](const Message &message) { // Deliver now if
return message.deliverAt == 0.0 || currentTime >= message.deliverAt;
});
queue = std::move(newQueue);
}
for (Message &m : msgsToDeliverNow)
{
//LOGI("MessageThread delivering %u", m.msg);
cancelCurrent = false;
if (m.deliverAt == 0.0)
m.deliverAt = VoIPController::GetCurrentTime();
if (m.func != nullptr)
{
m.func();
}
if (!cancelCurrent && m.interval > 0.0)
{
m.deliverAt += m.interval;
InsertMessageInternal(m);
}
}
}
loopMutex.Unlock();
}
uint32_t MessageThread::Post(std::function<void()> func, double delay, double interval)
{
assert(delay >= 0);
//LOGI("MessageThread post [function] delay %f", delay);
double currentTime = VoIPController::GetCurrentTime();
Message m{lastMessageID++, delay == 0.0 ? 0.0 : (currentTime + delay), interval, func};
InsertMessageInternal(m);
if (!IsCurrent())
{
#ifdef _WIN32
SetEvent(event);
#else
pthread_cond_signal(&cond);
#endif
}
return m.id;
}
void MessageThread::InsertMessageInternal(MessageThread::Message &m)
{
MutexGuard _m(queueAccessMutex);
if (queue.empty())
{
queue.push_back(m);
}
else
{
if (queue[0].deliverAt > m.deliverAt)
{
queue.insert(queue.begin(), m);
}
else
{
for (auto insertAfter = queue.begin(); insertAfter != queue.end(); ++insertAfter)
{
std::vector<Message>::iterator next = std::next(insertAfter);
if (next == queue.end() || (next->deliverAt > m.deliverAt && insertAfter->deliverAt <= m.deliverAt))
{
queue.insert(next, m);
break;
}
}
}
}
}
void MessageThread::Cancel(uint32_t id)
{
MutexGuard _m(queueAccessMutex);
queue.erase(
std::remove_if(
queue.begin(),
queue.end(),
[&id](const Message &message) {
return message.id == id;
}),
queue.end());
}
void MessageThread::CancelSelf()
{
assert(IsCurrent());
cancelCurrent = true;
}