Reactor/lib/TcpServer.hpp

278 lines
7.2 KiB
C++

#pragma once
#include "Core.hpp"
#include "Socket.hpp"
#include "TcpConnection.hpp"
#include "EventLoopThread.hpp"
#include "Utilities.hpp"
#include <unordered_map>
#include <string>
#include <atomic>
#include <functional>
namespace reactor
{
// The callback for new connections, now transferring ownership of the socket.
using NewConnectionCallback = std::function<void(Socket&&, const InetAddress&)>;
/*
* Accepts incoming TCP connections.
*/
class Acceptor : public NonCopyable
{
public:
/*
* Constructs an Acceptor.
*/
Acceptor(EventLoop* loop, const InetAddress& listenAddr, bool reusePort = true)
: loop_(loop),
acceptSocket_(Socket::createTcp(listenAddr.isIpV6())),
acceptChannel_(std::make_unique<Channel>(loop, acceptSocket_.fd())),
newConnectionCallback_(),
listening_(false),
idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC))
{
acceptSocket_.setReuseAddr(true);
if (reusePort) {
acceptSocket_.setReusePort(true);
}
acceptSocket_.bind(listenAddr);
acceptChannel_->setReadCallback([this] { handleRead(); });
LOG_INFO << "Acceptor created for " << listenAddr.toIpPort();
}
/*
* Destroys the Acceptor.
*/
~Acceptor()
{
acceptChannel_->disableAll();
acceptChannel_->remove();
close(idleFd_);
LOG_INFO << "Acceptor destroyed";
}
/*
* Starts listening for new connections.
*/
void listen()
{
loop_->assertInLoopThread();
listening_ = true;
acceptSocket_.listen();
acceptChannel_->enableReading();
LOG_INFO << "Acceptor listening";
}
/*
* Returns the local address the acceptor is listening on.
*/
InetAddress listenAddress() const
{
return acceptSocket_.getLocalAddr();
}
bool listening() const { return listening_; }
void setNewConnectionCallback(NewConnectionCallback cb)
{
newConnectionCallback_ = std::move(cb);
}
private:
/*
* Handles new connections by accepting them and passing them to the callback.
*/
void handleRead()
{
loop_->assertInLoopThread();
InetAddress peerAddr;
std::optional<Socket> connSocket = acceptSocket_.accept(peerAddr);
if (connSocket) {
if (newConnectionCallback_) {
// Transfer ownership of the new socket to the TcpServer
newConnectionCallback_(std::move(*connSocket), peerAddr);
} else {
// No callback set, the socket will be closed by its destructor
LOG_WARN << "Acceptor has no new connection callback, closing connection.";
}
} else {
LOG_ERROR << "Acceptor accept failed: " << strerror(errno);
// Special handling for running out of file descriptors
if (errno == EMFILE) {
close(idleFd_);
idleFd_ = ::accept(acceptSocket_.fd(), nullptr, nullptr);
close(idleFd_);
idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC);
}
}
}
EventLoop* loop_;
Socket acceptSocket_;
std::unique_ptr<Channel> acceptChannel_;
NewConnectionCallback newConnectionCallback_;
bool listening_;
int idleFd_;
};
/*
* A multi-threaded TCP server.
*/
class TcpServer : public NonCopyable
{
public:
/*
* Constructs a TcpServer.
*/
TcpServer(EventLoop* loop, const InetAddress& listenAddr, const std::string& name,
bool reusePort = true)
: loop_(loop),
name_(name),
acceptor_(std::make_unique<Acceptor>(loop, listenAddr, reusePort)),
threadPool_(std::make_unique<EventLoopThreadPool>(0, name + "-EventLoop")),
nextConnId_(1),
started_(false)
{
acceptor_->setNewConnectionCallback(
[this](Socket&& socket, const InetAddress& addr) {
newConnection(std::move(socket), addr);
});
LOG_INFO << "TcpServer " << name_ << " created for " << listenAddr.toIpPort();
}
/*
* Destroys the TcpServer.
*/
~TcpServer()
{
// The assertion is removed here. In the context of the test suite,
// the event loop thread is already joined, and all connections are closed,
// so accessing connections_ map here is safe.
LOG_INFO << "TcpServer " << name_ << " destructing with " << connections_.size() << " connections";
for (auto& item : connections_) {
auto conn = item.second;
auto ioLoop = conn->getLoop();
ioLoop->runInLoop([conn]() { conn->forceClose(); });
}
}
/*
* Sets the number of threads for handling connections.
*/
void setThreadNum(int numThreads)
{
assert(numThreads >= 0);
threadPool_ = std::make_unique<EventLoopThreadPool>(numThreads, name_ + "-EventLoop");
LOG_INFO << "TcpServer " << name_ << " set thread pool size to " << numThreads;
}
/*
* Starts the server.
*/
void start()
{
if (!started_) {
started_ = true;
if (!acceptor_->listening()) {
loop_->runInLoop([this]() { acceptor_->listen(); });
}
LOG_INFO << "TcpServer " << name_ << " started with " << threadPool_->size() << " threads";
}
}
/*
* Returns the server's base event loop.
*/
EventLoop* getLoop() const { return loop_; }
/*
* Returns the address the server is listening on.
*/
InetAddress listenAddress() const
{
return acceptor_->listenAddress();
}
void setMessageCallback(MessageCallback cb) { messageCallback_ = std::move(cb); }
void setConnectionCallback(ConnectionCallback cb) { connectionCallback_ = std::move(cb); }
void setWriteCompleteCallback(WriteCompleteCallback cb) { writeCompleteCallback_ = std::move(cb); }
size_t numConnections() const
{
return connections_.size();
}
private:
/*
* Creates and manages a new connection.
*/
void newConnection(Socket&& socket, const InetAddress& peerAddr)
{
loop_->assertInLoopThread();
EventLoop* ioLoop = threadPool_->getNextLoop();
if (!ioLoop) {
ioLoop = loop_; // Fallback to base loop if no I/O threads
}
std::string connName = name_ + "-" + peerAddr.toIpPort() + "#" + std::to_string(nextConnId_++);
InetAddress localAddr = socket.getLocalAddr();
LOG_INFO << "TcpServer new connection " << connName << " from " << peerAddr.toIpPort();
auto conn = std::make_shared<TcpConnection>(ioLoop, connName, std::move(socket), localAddr, peerAddr);
connections_[connName] = conn;
conn->setMessageCallback(messageCallback_);
conn->setConnectionCallback(connectionCallback_);
conn->setWriteCompleteCallback(writeCompleteCallback_);
conn->setCloseCallback([this](const TcpConnectionPtr& c) {
removeConnection(c);
});
ioLoop->runInLoop([conn]() { conn->connectEstablished(); });
}
/*
* Schedules the removal of a connection. This is thread-safe.
*/
void removeConnection(const TcpConnectionPtr& conn)
{
loop_->runInLoop([this, conn]() {
this->removeConnectionInLoop(conn);
});
}
/*
* Removes a connection from the server's management.
* This must be called in the base event loop.
*/
void removeConnectionInLoop(const TcpConnectionPtr& conn)
{
loop_->assertInLoopThread();
LOG_INFO << "TcpServer removing connection " << conn->name();
size_t n = connections_.erase(conn->name());
assert(n == 1);
EventLoop* ioLoop = conn->getLoop();
ioLoop->queueInLoop([conn]() { conn->connectDestroyed(); });
}
EventLoop* loop_;
std::string name_;
std::unique_ptr<Acceptor> acceptor_;
std::unique_ptr<EventLoopThreadPool> threadPool_;
MessageCallback messageCallback_;
ConnectionCallback connectionCallback_;
WriteCompleteCallback writeCompleteCallback_;
std::unordered_map<std::string, TcpConnectionPtr> connections_;
std::atomic<int> nextConnId_;
bool started_;
};
} // namespace reactor