Reactor/lib/EventLoopThread.hpp
2025-06-28 17:32:57 -05:00

158 lines
3.3 KiB
C++

#pragma once
#include "Core.hpp"
#include "Utilities.hpp"
#include <thread>
#include <string>
#include <vector>
#include <memory>
#include <atomic>
#include <latch>
namespace reactor
{
class EventLoopThread : public NonCopyable
{
private:
std::jthread thread_;
EventLoop* loop_;
std::latch readyLatch_;
std::string name_;
/*
* The function executed by the internal thread.
* It creates the EventLoop, signals readiness, and runs the loop.
*/
void threadFunc()
{
EventLoop loop;
loop_ = &loop;
readyLatch_.count_down();
loop.loop();
}
public:
/*
* Constructs an EventLoopThread.
* Starts a new thread and creates an EventLoop within it,
* waiting until the loop is fully initialized.
*/
explicit EventLoopThread(const std::string& name = "EventLoopThread")
: loop_(nullptr), readyLatch_(1), name_(name)
{
thread_ = std::jthread(&EventLoopThread::threadFunc, this);
readyLatch_.wait();
LOG_INFO << "EventLoopThread '" << name_ << "' initialized";
}
/*
* Destructs the EventLoopThread.
* Quits the event loop. The underlying jthread automatically joins.
*/
~EventLoopThread()
{
if (loop_) {
loop_->quit();
}
LOG_INFO << "EventLoopThread '" << name_ << "' destroyed";
}
/*
* Returns the EventLoop associated with this thread.
* The pointer is valid for the lifetime of the thread.
*/
EventLoop* getLoop()
{
return loop_;
}
/*
* Returns the name of the thread.
*/
const std::string& name() const
{
return name_;
}
};
class EventLoopThreadPool : public NonCopyable
{
private:
std::vector<std::unique_ptr<EventLoopThread>> threads_;
std::vector<EventLoop*> loops_;
std::atomic<size_t> next_;
std::string baseThreadName_;
public:
/*
* Constructs an EventLoopThreadPool.
* Creates a pool of threads, each running an EventLoop.
*/
explicit EventLoopThreadPool(size_t numThreads, const std::string& baseName = "EventLoopThread")
: next_(0), baseThreadName_(baseName)
{
LOG_INFO << "Creating EventLoopThreadPool with " << numThreads << " threads";
for (size_t i = 0; i < numThreads; ++i) {
std::string threadName = baseThreadName_ + "-" + std::to_string(i);
auto thread = std::make_unique<EventLoopThread>(threadName);
loops_.push_back(thread->getLoop());
threads_.push_back(std::move(thread));
}
LOG_INFO << "EventLoopThreadPool created with " << numThreads << " threads";
}
/*
* Destructs the thread pool.
* The threads will be quit and joined automatically via their destructors.
*/
~EventLoopThreadPool()
{
LOG_INFO << "EventLoopThreadPool destroying " << threads_.size() << " threads";
}
/*
* Gets the next EventLoop from the pool in a round-robin fashion.
* This method is thread-safe.
*/
EventLoop* getNextLoop()
{
if (loops_.empty()) {
LOG_WARN << "EventLoopThreadPool has no loops available";
return nullptr;
}
size_t index = next_.fetch_add(1, std::memory_order_relaxed) % loops_.size();
LOG_TRACE << "EventLoopThreadPool returning loop " << index;
return loops_[index];
}
/*
* Returns pointers to all EventLoops in the pool.
*/
std::vector<EventLoop*> getAllLoops() const
{
return loops_;
}
/*
* Returns the number of threads in the pool.
*/
size_t size() const
{
return loops_.size();
}
/*
* Returns the base name for threads in the pool.
*/
const std::string& getBaseName() const
{
return baseThreadName_;
}
};
}