#include "../lib/EventLoopThread.hpp" #include #include #include #include #include #include #include #include #include /* * Tests basic creation and task execution in an EventLoopThread. */ void test_event_loop_thread_basic() { std::cout << "Testing basic EventLoopThread...\n"; reactor::EventLoopThread loop_thread("TestThread"); auto loop = loop_thread.getLoop(); assert(loop != nullptr); assert(loop_thread.name() == "TestThread"); std::atomic task_executed{false}; loop->runInLoop([&task_executed]() { task_executed = true; }); std::this_thread::sleep_for(std::chrono::milliseconds(50)); assert(task_executed); std::cout << "✓ Basic EventLoopThread passed\n"; } /* * Tests timer functionality within an EventLoopThread. */ void test_event_loop_thread_timer() { std::cout << "Testing EventLoopThread timer...\n"; reactor::EventLoopThread loop_thread("TimerThread"); auto loop = loop_thread.getLoop(); std::atomic timer_count{0}; auto timer_id = loop->runEvery(reactor::Duration(20), [&timer_count]() { timer_count++; }); std::this_thread::sleep_for(std::chrono::milliseconds(85)); loop->cancel(timer_id); int final_count = timer_count; assert(final_count >= 3 && final_count <= 5); std::cout << "✓ EventLoopThread timer passed (count: " << final_count << ")\n"; } /* * Tests the creation and interaction of multiple EventLoopThreads. */ void test_multiple_event_loop_threads() { std::cout << "Testing multiple EventLoopThreads...\n"; constexpr int num_threads = 3; std::vector> threads; std::vector>> counters; for (int i = 0; i < num_threads; ++i) { std::string name = "Thread-" + std::to_string(i); threads.push_back(std::make_unique(name)); counters.push_back(std::make_unique>(0)); } for (int i = 0; i < num_threads; ++i) { auto loop = threads[i]->getLoop(); auto* counter = counters[i].get(); for (int j = 0; j < 10; ++j) { loop->queueInLoop([counter]() { (*counter)++; }); } } std::this_thread::sleep_for(std::chrono::milliseconds(100)); for (int i = 0; i < num_threads; ++i) { assert(*counters[i] == 10); } std::cout << "✓ Multiple EventLoopThreads passed\n"; } /* * Tests the basic creation of an EventLoopThreadPool. */ void test_event_loop_thread_pool_basic() { std::cout << "Testing basic EventLoopThreadPool...\n"; reactor::EventLoopThreadPool pool(3, "PoolThread"); assert(pool.size() == 3); assert(pool.getBaseName() == "PoolThread"); auto loops = pool.getAllLoops(); assert(loops.size() == 3); for (auto loop : loops) { assert(loop != nullptr); } std::cout << "✓ Basic EventLoopThreadPool passed\n"; } /* * Tests the round-robin distribution of loops from the pool. */ void test_thread_pool_round_robin() { std::cout << "Testing thread pool round robin...\n"; reactor::EventLoopThreadPool pool(3, "RoundRobin"); std::vector selected_loops; for (int i = 0; i < 9; ++i) { selected_loops.push_back(pool.getNextLoop()); } // With 3 threads, loop i, i+3, and i+6 should be the same. for (int i = 0; i < 3; ++i) { assert(selected_loops[i] == selected_loops[i + 3]); assert(selected_loops[i] == selected_loops[i + 6]); } std::cout << "✓ Thread pool round robin passed\n"; } /* * Tests that tasks are distributed among threads in the pool. */ void test_thread_pool_task_distribution() { std::cout << "Testing thread pool task distribution...\n"; reactor::EventLoopThreadPool pool(3, "TaskDist"); std::vector>> counters; for (int i = 0; i < 3; ++i) { counters.push_back(std::make_unique>(0)); } std::map loop_to_index; auto loops = pool.getAllLoops(); for (int i = 0; i < 3; ++i) { loop_to_index[loops[i]] = i; } constexpr int tasks_per_loop = 10; for (int i = 0; i < 3 * tasks_per_loop; ++i) { auto loop = pool.getNextLoop(); int index = loop_to_index.at(loop); loop->queueInLoop([counter = counters[index].get()]() { (*counter)++; }); } std::this_thread::sleep_for(std::chrono::milliseconds(100)); for (int i = 0; i < 3; ++i) { assert(*counters[i] == tasks_per_loop); } std::cout << "✓ Thread pool task distribution passed\n"; } /* * Tests the behavior of an empty thread pool. */ void test_empty_thread_pool() { std::cout << "Testing empty thread pool...\n"; reactor::EventLoopThreadPool pool(0, "EmptyPool"); assert(pool.size() == 0); assert(pool.getNextLoop() == nullptr); assert(pool.getAllLoops().empty()); std::cout << "✓ Empty thread pool passed\n"; } /* * Tests concurrent access to the thread pool's getNextLoop method. */ void test_thread_pool_concurrent_access() { std::cout << "Testing thread pool concurrent access...\n"; reactor::EventLoopThreadPool pool(4, "ConcurrentAccess"); std::atomic total_tasks{0}; std::vector client_threads; for (int t = 0; t < 8; ++t) { client_threads.emplace_back([&pool, &total_tasks]() { for (int i = 0; i < 25; ++i) { auto loop = pool.getNextLoop(); assert(loop != nullptr); loop->queueInLoop([&total_tasks]() { total_tasks++; }); } }); } for (auto& thread : client_threads) { thread.join(); } std::this_thread::sleep_for(std::chrono::milliseconds(200)); assert(total_tasks == 8 * 25); std::cout << "✓ Thread pool concurrent access passed\n"; } /* * Tests timer functionality across all threads in a pool. */ void test_thread_pool_with_timers() { std::cout << "Testing thread pool with timers...\n"; reactor::EventLoopThreadPool pool(2, "TimerPool"); std::atomic timer_count{0}; std::vector timer_ids; auto loops = pool.getAllLoops(); for (auto loop : loops) { auto timer_id = loop->runEvery(reactor::Duration(30), [&timer_count]() { timer_count++; }); timer_ids.push_back(timer_id); } std::this_thread::sleep_for(std::chrono::milliseconds(100)); for (size_t i = 0; i < loops.size(); ++i) { loops[i]->cancel(timer_ids[i]); } int final_count = timer_count; assert(final_count >= 4 && final_count <= 8); std::cout << "✓ Thread pool with timers passed (count: " << final_count << ")\n"; } /* * Tests task synchronization between the main thread and a loop thread. */ void test_thread_synchronization() { std::cout << "Testing thread synchronization...\n"; reactor::EventLoopThread loop_thread("SyncThread"); auto loop = loop_thread.getLoop(); std::vector results; std::mutex results_mutex; std::vector> futures; for (int i = 0; i < 10; ++i) { // Create a shared_ptr to the promise to make the lambda copyable auto promise_ptr = std::make_shared>(); futures.push_back(promise_ptr->get_future()); loop->queueInLoop([i, &results, &results_mutex, p = promise_ptr]() { { std::lock_guard lock(results_mutex); results.push_back(i); } p->set_value(); }); } for (auto& future : futures) { future.wait(); } assert(results.size() == 10); std::cout << "✓ Thread synchronization passed\n"; } /* * Tests that tasks are completed before a thread pool is destructed. */ void test_thread_pool_destruction() { std::cout << "Testing thread pool destruction...\n"; std::atomic task_count{0}; { reactor::EventLoopThreadPool pool(2, "DestroyTest"); auto loops = pool.getAllLoops(); for (auto loop : loops) { loop->queueInLoop([&task_count]() { std::this_thread::sleep_for(std::chrono::milliseconds(20)); task_count++; }); } } // pool is destroyed here, blocking until threads finish assert(task_count == 2); std::cout << "✓ Thread pool destruction passed\n"; } /* * Main entry point for running all threading tests. */ int main() { std::cout << "=== Threading Tests ===\n"; test_event_loop_thread_basic(); test_event_loop_thread_timer(); test_multiple_event_loop_threads(); test_event_loop_thread_pool_basic(); test_thread_pool_round_robin(); test_thread_pool_task_distribution(); test_empty_thread_pool(); test_thread_pool_concurrent_access(); test_thread_pool_with_timers(); test_thread_synchronization(); test_thread_pool_destruction(); std::cout << "\nAll threading tests passed! ✓\n"; return 0; }