Reactor/tests/test_threading.cpp
2025-06-28 17:32:57 -05:00

347 lines
8.2 KiB
C++

#include "../lib/EventLoopThread.hpp"
#include <cassert>
#include <iostream>
#include <map>
#include <thread>
#include <chrono>
#include <atomic>
#include <vector>
#include <future>
#include <memory>
/*
* Tests basic creation and task execution in an EventLoopThread.
*/
void test_event_loop_thread_basic()
{
std::cout << "Testing basic EventLoopThread...\n";
reactor::EventLoopThread loop_thread("TestThread");
auto loop = loop_thread.getLoop();
assert(loop != nullptr);
assert(loop_thread.name() == "TestThread");
std::atomic<bool> task_executed{false};
loop->runInLoop([&task_executed]() {
task_executed = true;
});
std::this_thread::sleep_for(std::chrono::milliseconds(50));
assert(task_executed);
std::cout << "✓ Basic EventLoopThread passed\n";
}
/*
* Tests timer functionality within an EventLoopThread.
*/
void test_event_loop_thread_timer()
{
std::cout << "Testing EventLoopThread timer...\n";
reactor::EventLoopThread loop_thread("TimerThread");
auto loop = loop_thread.getLoop();
std::atomic<int> timer_count{0};
auto timer_id = loop->runEvery(reactor::Duration(20), [&timer_count]() {
timer_count++;
});
std::this_thread::sleep_for(std::chrono::milliseconds(85));
loop->cancel(timer_id);
int final_count = timer_count;
assert(final_count >= 3 && final_count <= 5);
std::cout << "✓ EventLoopThread timer passed (count: " << final_count << ")\n";
}
/*
* Tests the creation and interaction of multiple EventLoopThreads.
*/
void test_multiple_event_loop_threads()
{
std::cout << "Testing multiple EventLoopThreads...\n";
constexpr int num_threads = 3;
std::vector<std::unique_ptr<reactor::EventLoopThread>> threads;
std::vector<std::unique_ptr<std::atomic<int>>> counters;
for (int i = 0; i < num_threads; ++i) {
std::string name = "Thread-" + std::to_string(i);
threads.push_back(std::make_unique<reactor::EventLoopThread>(name));
counters.push_back(std::make_unique<std::atomic<int>>(0));
}
for (int i = 0; i < num_threads; ++i) {
auto loop = threads[i]->getLoop();
auto* counter = counters[i].get();
for (int j = 0; j < 10; ++j) {
loop->queueInLoop([counter]() {
(*counter)++;
});
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
for (int i = 0; i < num_threads; ++i) {
assert(*counters[i] == 10);
}
std::cout << "✓ Multiple EventLoopThreads passed\n";
}
/*
* Tests the basic creation of an EventLoopThreadPool.
*/
void test_event_loop_thread_pool_basic()
{
std::cout << "Testing basic EventLoopThreadPool...\n";
reactor::EventLoopThreadPool pool(3, "PoolThread");
assert(pool.size() == 3);
assert(pool.getBaseName() == "PoolThread");
auto loops = pool.getAllLoops();
assert(loops.size() == 3);
for (auto loop : loops) {
assert(loop != nullptr);
}
std::cout << "✓ Basic EventLoopThreadPool passed\n";
}
/*
* Tests the round-robin distribution of loops from the pool.
*/
void test_thread_pool_round_robin()
{
std::cout << "Testing thread pool round robin...\n";
reactor::EventLoopThreadPool pool(3, "RoundRobin");
std::vector<reactor::EventLoop*> selected_loops;
for (int i = 0; i < 9; ++i) {
selected_loops.push_back(pool.getNextLoop());
}
// With 3 threads, loop i, i+3, and i+6 should be the same.
for (int i = 0; i < 3; ++i) {
assert(selected_loops[i] == selected_loops[i + 3]);
assert(selected_loops[i] == selected_loops[i + 6]);
}
std::cout << "✓ Thread pool round robin passed\n";
}
/*
* Tests that tasks are distributed among threads in the pool.
*/
void test_thread_pool_task_distribution()
{
std::cout << "Testing thread pool task distribution...\n";
reactor::EventLoopThreadPool pool(3, "TaskDist");
std::vector<std::unique_ptr<std::atomic<int>>> counters;
for (int i = 0; i < 3; ++i) {
counters.push_back(std::make_unique<std::atomic<int>>(0));
}
std::map<reactor::EventLoop*, int> loop_to_index;
auto loops = pool.getAllLoops();
for (int i = 0; i < 3; ++i) {
loop_to_index[loops[i]] = i;
}
constexpr int tasks_per_loop = 10;
for (int i = 0; i < 3 * tasks_per_loop; ++i) {
auto loop = pool.getNextLoop();
int index = loop_to_index.at(loop);
loop->queueInLoop([counter = counters[index].get()]() {
(*counter)++;
});
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
for (int i = 0; i < 3; ++i) {
assert(*counters[i] == tasks_per_loop);
}
std::cout << "✓ Thread pool task distribution passed\n";
}
/*
* Tests the behavior of an empty thread pool.
*/
void test_empty_thread_pool()
{
std::cout << "Testing empty thread pool...\n";
reactor::EventLoopThreadPool pool(0, "EmptyPool");
assert(pool.size() == 0);
assert(pool.getNextLoop() == nullptr);
assert(pool.getAllLoops().empty());
std::cout << "✓ Empty thread pool passed\n";
}
/*
* Tests concurrent access to the thread pool's getNextLoop method.
*/
void test_thread_pool_concurrent_access()
{
std::cout << "Testing thread pool concurrent access...\n";
reactor::EventLoopThreadPool pool(4, "ConcurrentAccess");
std::atomic<int> total_tasks{0};
std::vector<std::thread> client_threads;
for (int t = 0; t < 8; ++t) {
client_threads.emplace_back([&pool, &total_tasks]() {
for (int i = 0; i < 25; ++i) {
auto loop = pool.getNextLoop();
assert(loop != nullptr);
loop->queueInLoop([&total_tasks]() {
total_tasks++;
});
}
});
}
for (auto& thread : client_threads) {
thread.join();
}
std::this_thread::sleep_for(std::chrono::milliseconds(200));
assert(total_tasks == 8 * 25);
std::cout << "✓ Thread pool concurrent access passed\n";
}
/*
* Tests timer functionality across all threads in a pool.
*/
void test_thread_pool_with_timers()
{
std::cout << "Testing thread pool with timers...\n";
reactor::EventLoopThreadPool pool(2, "TimerPool");
std::atomic<int> timer_count{0};
std::vector<uint64_t> timer_ids;
auto loops = pool.getAllLoops();
for (auto loop : loops) {
auto timer_id = loop->runEvery(reactor::Duration(30), [&timer_count]() {
timer_count++;
});
timer_ids.push_back(timer_id);
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
for (size_t i = 0; i < loops.size(); ++i) {
loops[i]->cancel(timer_ids[i]);
}
int final_count = timer_count;
assert(final_count >= 4 && final_count <= 8);
std::cout << "✓ Thread pool with timers passed (count: " << final_count << ")\n";
}
/*
* Tests task synchronization between the main thread and a loop thread.
*/
void test_thread_synchronization()
{
std::cout << "Testing thread synchronization...\n";
reactor::EventLoopThread loop_thread("SyncThread");
auto loop = loop_thread.getLoop();
std::vector<int> results;
std::mutex results_mutex;
std::vector<std::future<void>> futures;
for (int i = 0; i < 10; ++i) {
// Create a shared_ptr to the promise to make the lambda copyable
auto promise_ptr = std::make_shared<std::promise<void>>();
futures.push_back(promise_ptr->get_future());
loop->queueInLoop([i, &results, &results_mutex, p = promise_ptr]() {
{
std::lock_guard<std::mutex> lock(results_mutex);
results.push_back(i);
}
p->set_value();
});
}
for (auto& future : futures) {
future.wait();
}
assert(results.size() == 10);
std::cout << "✓ Thread synchronization passed\n";
}
/*
* Tests that tasks are completed before a thread pool is destructed.
*/
void test_thread_pool_destruction()
{
std::cout << "Testing thread pool destruction...\n";
std::atomic<int> task_count{0};
{
reactor::EventLoopThreadPool pool(2, "DestroyTest");
auto loops = pool.getAllLoops();
for (auto loop : loops) {
loop->queueInLoop([&task_count]() {
std::this_thread::sleep_for(std::chrono::milliseconds(20));
task_count++;
});
}
} // pool is destroyed here, blocking until threads finish
assert(task_count == 2);
std::cout << "✓ Thread pool destruction passed\n";
}
/*
* Main entry point for running all threading tests.
*/
int main()
{
std::cout << "=== Threading Tests ===\n";
test_event_loop_thread_basic();
test_event_loop_thread_timer();
test_multiple_event_loop_threads();
test_event_loop_thread_pool_basic();
test_thread_pool_round_robin();
test_thread_pool_task_distribution();
test_empty_thread_pool();
test_thread_pool_concurrent_access();
test_thread_pool_with_timers();
test_thread_synchronization();
test_thread_pool_destruction();
std::cout << "\nAll threading tests passed! ✓\n";
return 0;
}