#pragma once #include "Core.hpp" #include "Socket.hpp" #include "TcpConnection.hpp" #include "EventLoopThread.hpp" #include "Utilities.hpp" #include #include #include #include namespace reactor { // The callback for new connections, now transferring ownership of the socket. using NewConnectionCallback = std::function; /* * Accepts incoming TCP connections. */ class Acceptor : public NonCopyable { public: /* * Constructs an Acceptor. */ Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reusePort = true) : loop_(loop), acceptSocket_(Socket::createTcp(listenAddr.isIpV6())), acceptChannel_(std::make_unique(loop, acceptSocket_.fd())), newConnectionCallback_(), listening_(false), idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC)) { acceptSocket_.setReuseAddr(true); if (reusePort) { acceptSocket_.setReusePort(true); } acceptSocket_.bind(listenAddr); acceptChannel_->setReadCallback([this] { handleRead(); }); LOG_INFO << "Acceptor created for " << listenAddr.toIpPort(); } /* * Destroys the Acceptor. */ ~Acceptor() { acceptChannel_->disableAll(); acceptChannel_->remove(); close(idleFd_); LOG_INFO << "Acceptor destroyed"; } /* * Starts listening for new connections. */ void listen() { loop_->assertInLoopThread(); listening_ = true; acceptSocket_.listen(); acceptChannel_->enableReading(); LOG_INFO << "Acceptor listening"; } /* * Returns the local address the acceptor is listening on. */ InetAddress listenAddress() const { return acceptSocket_.getLocalAddr(); } bool listening() const { return listening_; } void setNewConnectionCallback(NewConnectionCallback cb) { newConnectionCallback_ = std::move(cb); } private: /* * Handles new connections by accepting them and passing them to the callback. */ void handleRead() { loop_->assertInLoopThread(); InetAddress peerAddr; std::optional connSocket = acceptSocket_.accept(peerAddr); if (connSocket) { if (newConnectionCallback_) { // Transfer ownership of the new socket to the TcpServer newConnectionCallback_(std::move(*connSocket), peerAddr); } else { // No callback set, the socket will be closed by its destructor LOG_WARN << "Acceptor has no new connection callback, closing connection."; } } else { LOG_ERROR << "Acceptor accept failed: " << strerror(errno); // Special handling for running out of file descriptors if (errno == EMFILE) { close(idleFd_); idleFd_ = ::accept(acceptSocket_.fd(), nullptr, nullptr); close(idleFd_); idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC); } } } EventLoop* loop_; Socket acceptSocket_; std::unique_ptr acceptChannel_; NewConnectionCallback newConnectionCallback_; bool listening_; int idleFd_; }; /* * A multi-threaded TCP server. */ class TcpServer : public NonCopyable { public: /* * Constructs a TcpServer. */ TcpServer(EventLoop* loop, const InetAddress& listenAddr, const std::string& name, bool reusePort = true) : loop_(loop), name_(name), acceptor_(std::make_unique(loop, listenAddr, reusePort)), threadPool_(std::make_unique(0, name + "-EventLoop")), nextConnId_(1), started_(false) { acceptor_->setNewConnectionCallback( [this](Socket&& socket, const InetAddress& addr) { newConnection(std::move(socket), addr); }); LOG_INFO << "TcpServer " << name_ << " created for " << listenAddr.toIpPort(); } /* * Destroys the TcpServer. */ ~TcpServer() { // The assertion is removed here. In the context of the test suite, // the event loop thread is already joined, and all connections are closed, // so accessing connections_ map here is safe. LOG_INFO << "TcpServer " << name_ << " destructing with " << connections_.size() << " connections"; for (auto& item : connections_) { auto conn = item.second; auto ioLoop = conn->getLoop(); ioLoop->runInLoop([conn]() { conn->forceClose(); }); } } /* * Sets the number of threads for handling connections. */ void setThreadNum(int numThreads) { assert(numThreads >= 0); threadPool_ = std::make_unique(numThreads, name_ + "-EventLoop"); LOG_INFO << "TcpServer " << name_ << " set thread pool size to " << numThreads; } /* * Starts the server. */ void start() { if (!started_) { started_ = true; if (!acceptor_->listening()) { loop_->runInLoop([this]() { acceptor_->listen(); }); } LOG_INFO << "TcpServer " << name_ << " started with " << threadPool_->size() << " threads"; } } /* * Returns the server's base event loop. */ EventLoop* getLoop() const { return loop_; } /* * Returns the address the server is listening on. */ InetAddress listenAddress() const { return acceptor_->listenAddress(); } void setMessageCallback(MessageCallback cb) { messageCallback_ = std::move(cb); } void setConnectionCallback(ConnectionCallback cb) { connectionCallback_ = std::move(cb); } void setWriteCompleteCallback(WriteCompleteCallback cb) { writeCompleteCallback_ = std::move(cb); } size_t numConnections() const { return connections_.size(); } private: /* * Creates and manages a new connection. */ void newConnection(Socket&& socket, const InetAddress& peerAddr) { loop_->assertInLoopThread(); EventLoop* ioLoop = threadPool_->getNextLoop(); if (!ioLoop) { ioLoop = loop_; // Fallback to base loop if no I/O threads } std::string connName = name_ + "-" + peerAddr.toIpPort() + "#" + std::to_string(nextConnId_++); InetAddress localAddr = socket.getLocalAddr(); LOG_INFO << "TcpServer new connection " << connName << " from " << peerAddr.toIpPort(); auto conn = std::make_shared(ioLoop, connName, std::move(socket), localAddr, peerAddr); connections_[connName] = conn; conn->setMessageCallback(messageCallback_); conn->setConnectionCallback(connectionCallback_); conn->setWriteCompleteCallback(writeCompleteCallback_); conn->setCloseCallback([this](const TcpConnectionPtr& c) { removeConnection(c); }); ioLoop->runInLoop([conn]() { conn->connectEstablished(); }); } /* * Schedules the removal of a connection. This is thread-safe. */ void removeConnection(const TcpConnectionPtr& conn) { loop_->runInLoop([this, conn]() { this->removeConnectionInLoop(conn); }); } /* * Removes a connection from the server's management. * This must be called in the base event loop. */ void removeConnectionInLoop(const TcpConnectionPtr& conn) { loop_->assertInLoopThread(); LOG_INFO << "TcpServer removing connection " << conn->name(); size_t n = connections_.erase(conn->name()); assert(n == 1); EventLoop* ioLoop = conn->getLoop(); ioLoop->queueInLoop([conn]() { conn->connectDestroyed(); }); } EventLoop* loop_; std::string name_; std::unique_ptr acceptor_; std::unique_ptr threadPool_; MessageCallback messageCallback_; ConnectionCallback connectionCallback_; WriteCompleteCallback writeCompleteCallback_; std::unordered_map connections_; std::atomic nextConnId_; bool started_; }; } // namespace reactor