// // Created by Grishka on 17.06.2018. // #include #include #include #include #include #ifndef _WIN32 #include #endif #include "tools/MessageThread.h" #include "VoIPController.h" #include "tools/logging.h" using namespace tgvoip; MessageThread::MessageThread() : Thread(std::bind(&MessageThread::Run, this)) { running = true; SetName("MessageThread"); #ifdef _WIN32 #if !defined(WINAPI_FAMILY) || WINAPI_FAMILY != WINAPI_FAMILY_PHONE_APP event = CreateEvent(NULL, false, false, NULL); #else event = CreateEventEx(NULL, NULL, 0, EVENT_ALL_ACCESS); #endif #else pthread_cond_init(&cond, NULL); #endif } MessageThread::~MessageThread() { Stop(); #ifdef _WIN32 CloseHandle(event); #else pthread_cond_destroy(&cond); #endif } void MessageThread::Stop() { if (running) { running = false; #ifdef _WIN32 SetEvent(event); #else pthread_cond_signal(&cond); #endif Join(); } } void MessageThread::Run() { queueMutex.Lock(); while (running) { double currentTime = VoIPController::GetCurrentTime(); double waitTimeout = queue.empty() ? DBL_MAX : (queue[0].deliverAt - currentTime); //LOGW("MessageThread wait timeout %f", waitTimeout); if (waitTimeout > 0.0) { #ifdef _WIN32 queueMutex.Unlock(); DWORD actualWaitTimeout = waitTimeout == DBL_MAX ? INFINITE : ((DWORD)round(waitTimeout * 1000.0)); #if !defined(WINAPI_FAMILY) || WINAPI_FAMILY != WINAPI_FAMILY_PHONE_APP WaitForSingleObject(event, actualWaitTimeout); #else WaitForSingleObjectEx(event, actualWaitTimeout, false); #endif // we don't really care if a context switch happens here and anything gets added to the queue by another thread // since any new no-delay messages will get delivered on this iteration anyway queueMutex.Lock(); #else if (waitTimeout != DBL_MAX) { struct timeval now; struct timespec timeout; gettimeofday(&now, NULL); waitTimeout += now.tv_sec; waitTimeout += (now.tv_usec / 1000000.0); timeout.tv_sec = (time_t)(floor(waitTimeout)); timeout.tv_nsec = (long)((waitTimeout - floor(waitTimeout)) * 1000000000.0); pthread_cond_timedwait(&cond, queueMutex.NativeHandle(), &timeout); } else { pthread_cond_wait(&cond, queueMutex.NativeHandle()); } #endif } if (!running) { queueMutex.Unlock(); return; } currentTime = VoIPController::GetCurrentTime(); std::vector msgsToDeliverNow; std::vector newQueue; std::partition_copy( std::make_move_iterator(queue.begin()), std::make_move_iterator(queue.end()), std::back_inserter(msgsToDeliverNow), std::back_inserter(newQueue), [¤tTime](const Message &message) { // Deliver now if return message.deliverAt == 0.0 || currentTime >= message.deliverAt; } ); queue = std::move(newQueue); for (Message &m : msgsToDeliverNow) { //LOGI("MessageThread delivering %u", m.msg); cancelCurrent = false; if (m.deliverAt == 0.0) m.deliverAt = VoIPController::GetCurrentTime(); if (m.func != nullptr) { m.func(); } if (!cancelCurrent && m.interval > 0.0) { m.deliverAt += m.interval; InsertMessageInternal(m); } } } queueMutex.Unlock(); } uint32_t MessageThread::Post(std::function func, double delay, double interval) { assert(delay >= 0); //LOGI("MessageThread post [function] delay %f", delay); if (!IsCurrent()) { queueMutex.Lock(); } double currentTime = VoIPController::GetCurrentTime(); Message m{lastMessageID++, delay == 0.0 ? 0.0 : (currentTime + delay), interval, func}; InsertMessageInternal(m); if (!IsCurrent()) { #ifdef _WIN32 SetEvent(event); #else pthread_cond_signal(&cond); #endif queueMutex.Unlock(); } return m.id; } void MessageThread::InsertMessageInternal(MessageThread::Message &m) { if (queue.empty()) { queue.push_back(m); } else { if (queue[0].deliverAt > m.deliverAt) { queue.insert(queue.begin(), m); } else { for (auto insertAfter = queue.begin(); insertAfter != queue.end(); ++insertAfter) { std::vector::iterator next = std::next(insertAfter); if (next == queue.end() || (next->deliverAt > m.deliverAt && insertAfter->deliverAt <= m.deliverAt)) { queue.insert(next, m); break; } } } } } void MessageThread::Cancel(uint32_t id) { if (!IsCurrent()) { queueMutex.Lock(); } queue.erase( std::remove_if( queue.begin(), queue.end(), [&id](const Message &message) { return message.id == id; } ), queue.end() ); if (!IsCurrent()) { queueMutex.Unlock(); } } void MessageThread::CancelSelf() { assert(IsCurrent()); cancelCurrent = true; }