/* This file is part of TON Blockchain Library. TON Blockchain Library is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation, either version 2 of the License, or (at your option) any later version. TON Blockchain Library is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with TON Blockchain Library. If not, see . Copyright 2017-2019 Telegram Systems LLP */ #include "td/actor/core/ActorLocker.h" #include "td/actor/actor.h" #include "td/actor/PromiseFuture.h" #include "td/utils/format.h" #include "td/utils/logging.h" #include "td/utils/port/thread.h" #include "td/utils/Random.h" #include "td/utils/Slice.h" #include "td/utils/StringBuilder.h" #include "td/utils/tests.h" #include "td/utils/Time.h" #include #include #include #include TEST(Actor2, signals) { using td::actor::core::ActorSignals; ActorSignals signals; signals.add_signal(ActorSignals::Wakeup); signals.add_signal(ActorSignals::Cpu); signals.add_signal(ActorSignals::Kill); signals.clear_signal(ActorSignals::Cpu); bool was_kill = false; bool was_wakeup = false; while (!signals.empty()) { auto s = signals.first_signal(); if (s == ActorSignals::Kill) { was_kill = true; } else if (s == ActorSignals::Wakeup) { was_wakeup = true; } else { UNREACHABLE(); } signals.clear_signal(s); } CHECK(was_kill && was_wakeup); } TEST(Actors2, flags) { using namespace td::actor::core; ActorState::Flags flags; CHECK(!flags.is_locked()); flags.set_locked(true); CHECK(flags.is_locked()); flags.set_locked(false); CHECK(!flags.is_locked()); flags.set_scheduler_id(SchedulerId{123}); auto signals = flags.get_signals(); CHECK(signals.empty()); signals.add_signal(ActorSignals::Cpu); signals.add_signal(ActorSignals::Kill); CHECK(signals.has_signal(ActorSignals::Cpu)); CHECK(signals.has_signal(ActorSignals::Kill)); flags.set_signals(signals); LOG_CHECK(flags.get_signals().raw() == signals.raw()) << flags.get_signals().raw() << " " << signals.raw(); auto wakeup = ActorSignals{}; wakeup.add_signal(ActorSignals::Wakeup); flags.add_signals(wakeup); signals.add_signal(ActorSignals::Wakeup); CHECK(flags.get_signals().raw() == signals.raw()); flags.clear_signals(); CHECK(flags.get_signals().empty()); flags.add_signals(ActorSignals::one(ActorSignals::Pause)); CHECK(flags.get_scheduler_id().value() == 123); CHECK(flags.get_signals().has_signal(ActorSignals::Pause)); } TEST(Actor2, locker) { using namespace td::actor::core; ActorState state; ActorSignals kill_signal; kill_signal.add_signal(ActorSignals::Kill); ActorSignals wakeup_signal; kill_signal.add_signal(ActorSignals::Wakeup); ActorSignals cpu_signal; kill_signal.add_signal(ActorSignals::Cpu); { ActorLocker lockerA(&state); ActorLocker lockerB(&state); ActorLocker lockerC(&state); CHECK(lockerA.try_lock()); CHECK(lockerA.own_lock()); auto flagsA = lockerA.flags(); CHECK(lockerA.try_unlock(flagsA)); CHECK(!lockerA.own_lock()); CHECK(lockerA.try_lock()); CHECK(!lockerB.try_lock()); CHECK(!lockerC.try_lock()); CHECK(lockerB.try_add_signals(kill_signal)); CHECK(!lockerC.try_add_signals(wakeup_signal)); CHECK(lockerC.try_add_signals(wakeup_signal)); CHECK(!lockerC.add_signals(cpu_signal)); CHECK(!lockerA.flags().has_signals()); CHECK(!lockerA.try_unlock(lockerA.flags())); { auto flags = lockerA.flags(); auto signals = flags.get_signals(); bool was_kill = false; bool was_wakeup = false; bool was_cpu = false; while (!signals.empty()) { auto s = signals.first_signal(); if (s == ActorSignals::Kill) { was_kill = true; } else if (s == ActorSignals::Wakeup) { was_wakeup = true; } else if (s == ActorSignals::Cpu) { was_cpu = true; } else { UNREACHABLE(); } signals.clear_signal(s); } CHECK(was_kill && was_wakeup && was_cpu); flags.clear_signals(); CHECK(lockerA.try_unlock(flags)); } } { ActorLocker lockerB(&state); CHECK(lockerB.try_lock()); CHECK(lockerB.try_unlock(lockerB.flags())); CHECK(lockerB.add_signals(kill_signal)); CHECK(lockerB.flags().get_signals().has_signal(ActorSignals::Kill)); auto flags = lockerB.flags(); flags.clear_signals(); ActorLocker lockerA(&state); CHECK(!lockerA.add_signals(kill_signal)); CHECK(!lockerB.try_unlock(flags)); CHECK(!lockerA.add_signals(kill_signal)); // do not loose this signal! CHECK(!lockerB.try_unlock(flags)); CHECK(lockerB.flags().get_signals().has_signal(ActorSignals::Kill)); CHECK(lockerB.try_unlock(flags)); } { ActorLocker lockerA(&state); CHECK(lockerA.try_lock()); auto flags = lockerA.flags(); flags.add_signals(ActorSignals::one(ActorSignals::Pause)); CHECK(lockerA.try_unlock(flags)); //We have to lock, though we can't execute. CHECK(lockerA.add_signals(wakeup_signal)); } } #if !TD_THREAD_UNSUPPORTED TEST(Actor2, locker_stress) { using namespace td::actor::core; ActorState state; constexpr size_t threads_n = 5; auto stage = [&](std::atomic &value, int need) { value.fetch_add(1, std::memory_order_release); while (value.load(std::memory_order_acquire) < need) { td::this_thread::yield(); } }; struct Node { std::atomic request{0}; td::uint32 response = 0; char pad[64]; }; std::array nodes; auto do_work = [&]() { for (auto &node : nodes) { auto query = node.request.load(std::memory_order_acquire); if (query) { node.response = query * query; node.request.store(0, std::memory_order_relaxed); } } }; std::atomic begin{0}; std::atomic ready{0}; std::atomic check{0}; std::vector threads; for (size_t i = 0; i < threads_n; i++) { threads.push_back(td::thread([&, id = i] { for (size_t i = 1; i < 1000000; i++) { ActorLocker locker(&state); auto need = static_cast(threads_n * i); auto query = static_cast(id + need); stage(begin, need); nodes[id].request = 0; nodes[id].response = 0; stage(ready, need); if (locker.try_lock()) { nodes[id].response = query * query; } else { auto cpu = ActorSignals::one(ActorSignals::Cpu); nodes[id].request.store(query, std::memory_order_release); locker.add_signals(cpu); } while (locker.own_lock()) { auto flags = locker.flags(); auto signals = flags.get_signals(); if (!signals.empty()) { do_work(); } flags.clear_signals(); locker.try_unlock(flags); } stage(check, need); if (id == 0) { CHECK(locker.add_signals(ActorSignals{})); CHECK(!locker.flags().has_signals()); CHECK(locker.try_unlock(locker.flags())); for (size_t thread_id = 0; thread_id < threads_n; thread_id++) { LOG_CHECK(nodes[thread_id].response == static_cast(thread_id + need) * static_cast(thread_id + need)) << td::tag("thread", thread_id) << " " << nodes[thread_id].response << " " << nodes[thread_id].request.load(); } } } })); } for (auto &thread : threads) { thread.join(); } } namespace { const size_t BUF_SIZE = 1024 * 1024; char buf[BUF_SIZE]; td::StringBuilder sb(td::MutableSlice(buf, BUF_SIZE - 1)); } // namespace TEST(Actor2, executor_simple) { using namespace td::actor::core; using namespace td::actor; using td::actor::detail::ActorMessageCreator; struct Dispatcher : public SchedulerDispatcher { void add_to_queue(ActorInfoPtr ptr, SchedulerId scheduler_id, bool need_poll) override { queue.push_back(std::move(ptr)); } void set_alarm_timestamp(const ActorInfoPtr &actor_info_ptr) override { UNREACHABLE(); } SchedulerId get_scheduler_id() const override { return SchedulerId{0}; } std::deque queue; }; Dispatcher dispatcher; class TestActor : public Actor { public: void close() { stop(); } private: void start_up() override { sb << "StartUp"; } void tear_down() override { sb << "TearDown"; } }; { ActorInfoCreator actor_info_creator; auto actor = actor_info_creator.create( std::make_unique(), ActorInfoCreator::Options().on_scheduler(SchedulerId{0}).with_name("TestActor")); dispatcher.add_to_queue(actor, SchedulerId{0}, false); { ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options()); CHECK(!executor.is_closed()); CHECK(executor.can_send_immediate()); LOG_CHECK(sb.as_cslice() == "StartUp") << sb.as_cslice(); sb.clear(); executor.send(ActorMessageCreator::lambda([&] { sb << "A"; })); LOG_CHECK(sb.as_cslice() == "A") << sb.as_cslice(); sb.clear(); auto big_message = ActorMessageCreator::lambda([&] { sb << "big"; }); big_message.set_big(); executor.send(std::move(big_message)); LOG_CHECK(sb.as_cslice() == "") << sb.as_cslice(); executor.send(ActorMessageCreator::lambda([&] { sb << "B"; })); LOG_CHECK(sb.as_cslice() == "") << sb.as_cslice(); } CHECK(dispatcher.queue.size() == 1); { ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options().with_from_queue()); } CHECK(dispatcher.queue.size() == 1); dispatcher.queue.clear(); LOG_CHECK(sb.as_cslice() == "bigB") << sb.as_cslice(); sb.clear(); { ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options()); executor.send( ActorMessageCreator::lambda([&] { static_cast(ActorExecuteContext::get()->actor()).close(); })); } LOG_CHECK(sb.as_cslice() == "TearDown") << sb.as_cslice(); sb.clear(); CHECK(!actor->has_actor()); { ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options()); executor.send( ActorMessageCreator::lambda([&] { static_cast(ActorExecuteContext::get()->actor()).close(); })); } CHECK(dispatcher.queue.empty()); CHECK(sb.as_cslice() == ""); } { ActorInfoCreator actor_info_creator; auto actor = actor_info_creator.create( std::make_unique(), ActorInfoCreator::Options().on_scheduler(SchedulerId{0}).with_name("TestActor")); dispatcher.add_to_queue(actor, SchedulerId{0}, false); { ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options()); CHECK(!executor.is_closed()); CHECK(executor.can_send_immediate()); LOG_CHECK(sb.as_cslice() == "StartUp") << sb.as_cslice(); sb.clear(); auto a_msg = ActorMessageCreator::lambda([&] { sb << "big pause"; ActorExecuteContext::get()->set_pause(); }); a_msg.set_big(); executor.send(std::move(a_msg)); executor.send(ActorMessageCreator::lambda([&] { sb << "A"; })); LOG_CHECK(sb.as_cslice() == "") << sb.as_cslice(); } { CHECK(dispatcher.queue.size() == 1); dispatcher.queue.clear(); ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options().with_from_queue()); CHECK(!executor.is_closed()); CHECK(!executor.can_send_immediate()); LOG_CHECK(sb.as_cslice() == "big pause") << sb.as_cslice(); sb.clear(); } { CHECK(dispatcher.queue.size() == 1); dispatcher.queue.clear(); ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options().with_from_queue()); CHECK(!executor.is_closed()); CHECK(executor.can_send_immediate()); LOG_CHECK(sb.as_cslice() == "A") << sb.as_cslice(); sb.clear(); } { ActorExecutor executor(*actor, dispatcher, ActorExecutor::Options()); executor.send( ActorMessageCreator::lambda([&] { static_cast(ActorExecuteContext::get()->actor()).close(); })); } LOG_CHECK(sb.as_cslice() == "TearDown") << sb.as_cslice(); sb.clear(); dispatcher.queue.clear(); } } using namespace td::actor; using td::uint32; static std::atomic global_cnt; class Worker : public Actor { public: void query(uint32 x, core::ActorInfoPtr master); void close() { stop(); } }; class Master : public Actor { public: void on_result(uint32 x, uint32 y) { loop(); } private: uint32 l = 0; uint32 r = 100000; core::ActorInfoPtr worker; void start_up() override { worker = detail::create_actor(ActorOptions().with_name("Master")); loop(); } void loop() override { l++; if (l == r) { if (!--global_cnt) { SchedulerContext::get()->stop(); } detail::send_closure(*worker, &Worker::close); stop(); return; } detail::send_lambda(*worker, [x = l, self = get_actor_info_ptr()] { detail::current_actor().query(x, self); }); } }; void Worker::query(uint32 x, core::ActorInfoPtr master) { auto y = x; for (int i = 0; i < 100; i++) { y = y * y; } detail::send_lambda(*master, [result = y, x] { detail::current_actor().on_result(x, result); }); } TEST(Actor2, scheduler_simple) { auto group_info = std::make_shared(1); core::Scheduler scheduler{group_info, SchedulerId{0}, 2}; scheduler.start(); scheduler.run_in_context([] { global_cnt = 10; for (int i = 0; i < 10; i++) { detail::create_actor(ActorOptions().with_name("Master")); } }); while (scheduler.run(1000)) { } core::Scheduler::close_scheduler_group(*group_info); } TEST(Actor2, actor_id_simple) { auto group_info = std::make_shared(1); core::Scheduler scheduler{group_info, SchedulerId{0}, 2}; sb.clear(); scheduler.start(); scheduler.run_in_context([] { class A : public Actor { public: A(int value) : value_(value) { sb << "A" << value_; } void hello() { sb << "hello"; } ~A() { sb << "~A"; if (--global_cnt <= 0) { SchedulerContext::get()->stop(); } } private: int value_; }; global_cnt = 1; auto id = create_actor("A", 123); CHECK(sb.as_cslice() == "A123"); sb.clear(); send_closure(id, &A::hello); }); while (scheduler.run(1000)) { } CHECK(sb.as_cslice() == "hello~A"); core::Scheduler::close_scheduler_group(*group_info); sb.clear(); } TEST(Actor2, actor_creation) { auto group_info = std::make_shared(1); core::Scheduler scheduler{group_info, SchedulerId{0}, 1}; scheduler.start(); scheduler.run_in_context([] { class B; class A : public Actor { public: void f() { check(); stop(); } private: void start_up() override { check(); create_actor("Simple", actor_id(this)).release(); } void check() { auto &context = *SchedulerContext::get(); CHECK(context.has_poll()); context.get_poll(); } void tear_down() override { if (--global_cnt <= 0) { SchedulerContext::get()->stop(); } } }; class B : public Actor { public: B(ActorId a) : a_(a) { } private: void start_up() override { auto &context = *SchedulerContext::get(); CHECK(!context.has_poll()); send_closure(a_, &A::f); stop(); } void tear_down() override { if (--global_cnt <= 0) { SchedulerContext::get()->stop(); } } ActorId a_; }; global_cnt = 2; create_actor(ActorOptions().with_name("Poll").with_poll()).release(); }); while (scheduler.run(1000)) { } scheduler.stop(); core::Scheduler::close_scheduler_group(*group_info); } TEST(Actor2, actor_timeout_simple) { auto group_info = std::make_shared(1); core::Scheduler scheduler{group_info, SchedulerId{0}, 2}; sb.clear(); scheduler.start(); auto watcher = td::create_shared_destructor([] { SchedulerContext::get()->stop(); }); scheduler.run_in_context([watcher = std::move(watcher)] { class A : public Actor { public: A(std::shared_ptr watcher) : watcher_(std::move(watcher)) { } void start_up() override { set_timeout(); } void alarm() override { double diff = td::Time::now() - expected_timeout_; LOG_CHECK(-0.001 < diff && diff < 0.1) << diff; if (cnt_-- > 0) { set_timeout(); } else { stop(); } } private: std::shared_ptr watcher_; double expected_timeout_; int cnt_ = 5; void set_timeout() { auto wakeup_timestamp = td::Timestamp::in(0.1); expected_timeout_ = wakeup_timestamp.at(); alarm_timestamp() = wakeup_timestamp; } }; create_actor(core::ActorInfoCreator::Options().with_name("A").with_poll(), watcher).release(); create_actor(core::ActorInfoCreator::Options().with_name("B"), watcher).release(); }); watcher.reset(); while (scheduler.run(1000)) { } core::Scheduler::close_scheduler_group(*group_info); sb.clear(); } TEST(Actor2, actor_timeout_simple2) { auto group_info = std::make_shared(1); core::Scheduler scheduler{group_info, SchedulerId{0}, 2}; sb.clear(); scheduler.start(); auto watcher = td::create_shared_destructor([] { SchedulerContext::get()->stop(); }); scheduler.run_in_context([watcher = std::move(watcher)] { class A : public Actor { public: A(std::shared_ptr watcher) : watcher_(std::move(watcher)) { } void start_up() override { set_timeout(); } void alarm() override { set_timeout(); } private: std::shared_ptr watcher_; void set_timeout() { auto wakeup_timestamp = td::Timestamp::in(0.001); alarm_timestamp() = wakeup_timestamp; } }; class B : public Actor { public: B(std::shared_ptr watcher, ActorOwn<> actor_own) : watcher_(std::move(watcher)), actor_own_(std::move(actor_own)) { } void start_up() override { set_timeout(); } void alarm() override { stop(); } private: std::shared_ptr watcher_; ActorOwn<> actor_own_; void set_timeout() { auto wakeup_timestamp = td::Timestamp::in(0.005); alarm_timestamp() = wakeup_timestamp; } }; auto actor_own = create_actor(core::ActorInfoCreator::Options().with_name("A").with_poll(), watcher); create_actor(core::ActorInfoCreator::Options().with_name("B"), watcher, std::move(actor_own)).release(); }); watcher.reset(); while (scheduler.run(1000)) { } core::Scheduler::close_scheduler_group(*group_info); sb.clear(); } TEST(Actor2, actor_function_result) { auto group_info = std::make_shared(1); core::Scheduler scheduler{group_info, SchedulerId{0}, 2}; sb.clear(); scheduler.start(); auto watcher = td::create_shared_destructor([] { SchedulerContext::get()->stop(); }); scheduler.run_in_context([watcher = std::move(watcher)] { class B : public Actor { public: uint32 query(uint32 x) { return x * x; } void query_async(uint32 x, td::Promise promise) { promise(x * x); } }; class A : public Actor { public: A(std::shared_ptr watcher) : watcher_(std::move(watcher)) { } void on_result(uint32 x, uint32 y) { LOG_CHECK(x * x == y) << x << " " << y; if (--cnt_ == 0) { stop(); } } void start_up() { b_ = create_actor(ActorOptions().with_name("B")); cnt_ = 3; send_closure(b_, &B::query, 3, [a = std::make_unique(), self = actor_id(this)](td::Result y) { LOG_IF(ERROR, y.is_error()) << y.error(); send_closure(self, &A::on_result, 3, y.ok()); }); send_closure(b_, &B::query_async, 2, [self = actor_id(this)](uint32 y) { CHECK(!self.empty()); send_closure(self, &A::on_result, 2, y); }); send_closure_later(b_, &B::query_async, 5, [self = actor_id(this)](uint32 y) { CHECK(!self.empty()); send_closure(self, &A::on_result, 5, y); }); } private: int cnt_{0}; std::shared_ptr watcher_; td::actor::ActorOwn b_; }; create_actor(core::ActorInfoCreator::Options().with_name("A").with_poll(), watcher).release(); create_actor(core::ActorInfoCreator::Options().with_name("B"), watcher).release(); }); watcher.reset(); while (scheduler.run(1000)) { } core::Scheduler::close_scheduler_group(*group_info); sb.clear(); } TEST(Actor2, actor_ping_pong) { auto group_info = std::make_shared(1); core::Scheduler scheduler{group_info, SchedulerId{0}, 3}; sb.clear(); scheduler.start(); auto watcher = td::create_shared_destructor([] { SchedulerContext::get()->stop(); }); for (int i = 0; i < 2000; i++) { scheduler.run_in_context([watcher] { class PingPong : public Actor { public: PingPong(std::shared_ptr watcher) : watcher_(std::move(watcher)) { } void query(td::int32 left, ActorOwn<> data) { if (td::Random::fast(0, 4) == 0) { alarm_timestamp() = td::Timestamp::in(0.01 * td::Random::fast(0, 10)); } if (left <= 0) { return; } auto dest = td::Random::fast(0, (int)next_.size() - 1); if (td::Random::fast(0, 1) == 0 && 0) { send_closure(next_[dest], &PingPong::query, left - 1, std::move(data)); } else { send_closure_later(next_[dest], &PingPong::query, left - 1, std::move(data)); } } void add_next(ActorId p) { next_.push_back(std::move(p)); } void start_up() override { } void store_data(ActorOwn<> data) { data_.push_back(std::move(data)); } private: std::vector> next_; std::vector> data_; std::shared_ptr watcher_; }; int N = td::Random::fast(2, 10); //N = 2; std::vector> actors; for (int i = 0; i < N; i++) { actors.push_back( create_actor(core::ActorInfoCreator::Options().with_name(PSLICE() << "Worker#" << i), watcher)); } for (int i = 0; i < N; i++) { for (int j = 0; j < N; j++) { send_closure(actors[i], &PingPong::add_next, actors[j].get()); } } int nn = td::Random::fast(1, N); //nn = 2; auto first = actors[0].get(); for (int i = 0; i < N; i++) { auto to = actors[i].get(); if (i < nn) { send_closure(to, &PingPong::query, td::Random::fast(10, 1000), std::move(actors[i])); } else { send_closure(first, &PingPong::store_data, std::move(actors[i])); } } }); } watcher.reset(); while (scheduler.run(1000)) { } core::Scheduler::close_scheduler_group(*group_info); sb.clear(); } TEST(Actor2, Schedulers) { for (auto mode : {Scheduler::Running, Scheduler::Paused}) { for (auto start_count : {0, 1, 2}) { for (auto run_count : {0, 1, 2}) { for (auto stop_count : {0, 1, 2}) { for (size_t threads : {0, 1}) { Scheduler scheduler({threads}, mode); for (int i = 0; i < start_count; i++) { scheduler.start(); } for (int i = 0; i < run_count; i++) { scheduler.run(0); } for (int i = 0; i < stop_count; i++) { scheduler.stop(); } } } } } } } TEST(Actor2, SchedulerZeroCpuThreads) { Scheduler scheduler({0}); scheduler.run_in_context([] { class A : public Actor { void start_up() override { SchedulerContext::get()->stop(); } }; create_actor(ActorOptions().with_name("A").with_poll(false)).release(); }); scheduler.run(); } TEST(Actor2, SchedulerTwo) { Scheduler scheduler({0, 0}); scheduler.run_in_context([] { class B : public Actor { public: void start_up() override { CHECK(SchedulerContext::get()->get_scheduler_id() == SchedulerId{1}); } void close() { CHECK(SchedulerContext::get()->get_scheduler_id() == SchedulerId{1}); SchedulerContext::get()->stop(); } }; class A : public Actor { void start_up() override { CHECK(SchedulerContext::get()->get_scheduler_id() == SchedulerId{0}); auto id = create_actor(ActorOptions().with_name("B").with_poll(false).on_scheduler(SchedulerId{1})).release(); send_closure(id, &B::close); } }; create_actor(ActorOptions().with_name("A").with_poll(false).on_scheduler(SchedulerId{0})).release(); }); scheduler.run(); } TEST(Actor2, ActorIdDynamicCast) { Scheduler scheduler({0}); scheduler.run_in_context([] { class A : public Actor { public: void close() { CHECK(actor_id().actor_info_ptr() == get_actor_info_ptr()); SchedulerContext::get()->stop(); } }; auto actor_own_a = create_actor(ActorOptions().with_name("A").with_poll(false)); auto actor = &actor_own_a.get_actor_unsafe(); ActorOwn<> actor_own = actor_dynamic_cast(std::move(actor_own_a)); CHECK(actor_own_a.empty()); actor_own_a = actor_dynamic_cast(std::move(actor_own)); CHECK(actor_own.empty()); CHECK(&actor_own_a.get_actor_unsafe() == actor); auto actor_id_a = actor_own_a.release(); ActorId<> actor_id = actor_dynamic_cast(actor_id_a); actor_id_a = actor_dynamic_cast(actor_id); CHECK(&actor_id_a.get_actor_unsafe() == actor); auto actor_shared_a = ActorShared(actor_id_a, 123); ActorShared<> actor_shared = actor_dynamic_cast(std::move(actor_shared_a)); CHECK(actor_shared_a.empty()); CHECK(actor_shared.token() == 123); actor_shared_a = actor_dynamic_cast(std::move(actor_shared)); CHECK(actor_shared.empty()); CHECK(&actor_shared_a.get_actor_unsafe() == actor); CHECK(actor_shared_a.token() == 123); send_closure(actor_shared_a, &A::close); }); scheduler.run(); } TEST(Actor2, send_vs_close) { for (int it = 0; it < 100; it++) { Scheduler scheduler({8}); auto watcher = td::create_shared_destructor([] { SchedulerContext::get()->stop(); }); scheduler.run_in_context([watcher = std::move(watcher)] { class To : public Actor { public: class Callback { public: virtual ~Callback() { } virtual void on_closed(ActorId to) = 0; }; To(int cnt, std::shared_ptr watcher) : cnt_(cnt), watcher_(std::move(watcher)) { } void on_event() { if (--cnt_ <= 0) { stop(); } } void add_callback(std::unique_ptr callback) { callbacks_.push_back(std::move(callback)); } void start_up() override { alarm_timestamp() = td::Timestamp::in(td::Random::fast(0, 4) * 0.001); } void tear_down() override { if (td::Random::fast(0, 4) == 0) { send_closure(actor_id(this), &To::self_ref, actor_id(this)); } for (auto &callback : callbacks_) { callback->on_closed(actor_id(this)); } } void self_ref(ActorId) { } void alarm() override { stop(); } private: int cnt_; std::shared_ptr watcher_; std::vector> callbacks_; }; class From : public Actor { public: From(std::vector> to, std::shared_ptr watcher) : to_(std::move(to)), watcher_(std::move(watcher)) { } void start_up() override { yield(); } void on_closed(ActorId) { } void loop() override { while (!to_.empty()) { if (td::Random::fast(0, 3) == 0) { break; } auto id = to_.back(); to_.pop_back(); if (td::Random::fast(0, 4) == 0) { class Callback : public To::Callback { public: Callback(ActorId from) : from_(std::move(from)) { } void on_closed(ActorId id) override { send_closure(from_, &From::on_closed, std::move(id)); } private: ActorId from_; }; send_closure(id, &To::add_callback, std::make_unique(actor_id(this))); } send_closure(id, &To::on_event); } if (to_.empty()) { stop(); } else { yield(); } } private: std::vector> to_; std::shared_ptr watcher_; }; class Master : public Actor { public: Master(std::shared_ptr watcher) : watcher_(std::move(watcher)) { } private: std::shared_ptr watcher_; int cnt_ = 10; void loop() override { if (cnt_-- < 0) { return stop(); } int from_n = 5; int to_n = 5; std::vector>> from(from_n); for (int i = 0; i < to_n; i++) { int cnt = td::Random::fast(1, 10); int to_cnt = td::Random::fast(1, cnt); auto to = td::actor::create_actor( td::actor::ActorOptions().with_name(PSLICE() << "To#" << i).with_poll(td::Random::fast(0, 4) == 0), to_cnt, watcher_) .release(); for (int j = 0; j < cnt; j++) { auto from_i = td::Random::fast(0, from_n - 1); from[from_i].push_back(to); } } for (int i = 0; i < from_n; i++) { td::actor::create_actor( td::actor::ActorOptions().with_name(PSLICE() << "From#" << i).with_poll(td::Random::fast(0, 4) == 0), std::move(from[i]), watcher_) .release(); } alarm_timestamp() = td::Timestamp::in(td::Random::fast(0, 10) * 0.01 / 30); } }; td::actor::create_actor("Master", watcher).release(); }); scheduler.run(); } } TEST(Actor2, send_vs_close2) { for (int it = 0; it < 100; it++) { Scheduler scheduler({8}); auto watcher = td::create_shared_destructor([] { SchedulerContext::get()->stop(); }); //std::shared_ptr watcher; scheduler.run_in_context([watcher = std::move(watcher)] { class To : public Actor { public: To(int cnt, std::shared_ptr watcher) : cnt_(cnt), watcher_(std::move(watcher)) { } void start_up() override { alarm_timestamp() = td::Timestamp::in(td::Random::fast(0, 4) * 0.001 / 30); } void alarm() override { stop(); } private: int cnt_; std::shared_ptr watcher_; }; class From : public Actor { public: From(std::vector> to, std::shared_ptr watcher) : to_(std::move(to)), watcher_(std::move(watcher)) { } void start_up() override { stop(); } private: std::vector> to_; std::shared_ptr watcher_; }; class Master : public Actor { public: Master(std::shared_ptr watcher) : watcher_(std::move(watcher)) { } private: std::shared_ptr watcher_; int cnt_ = 5; void loop() override { if (cnt_-- < 0) { return stop(); } int from_n = 2; int to_n = 2; std::vector>> from(from_n); for (int i = 0; i < to_n; i++) { int cnt = td::Random::fast(1, 2); int to_cnt = td::Random::fast(1, cnt); auto to = td::actor::create_actor( td::actor::ActorOptions().with_name(PSLICE() << "To#" << i).with_poll(td::Random::fast(0, 4) == 0), to_cnt, watcher_) .release(); for (int j = 0; j < cnt; j++) { auto from_i = td::Random::fast(0, from_n - 1); from[from_i].push_back(to); } } for (int i = 0; i < from_n; i++) { td::actor::create_actor( td::actor::ActorOptions().with_name(PSLICE() << "From#" << i).with_poll(td::Random::fast(0, 4) == 0), std::move(from[i]), watcher_) .release(); } alarm_timestamp() = td::Timestamp::in(td::Random::fast(0, 10) * 0.01 / 30); } }; td::actor::create_actor("Master", watcher).release(); }); scheduler.run(); } } #endif //!TD_THREAD_UNSUPPORTED