diff --git a/sockeye.hpp b/sockeye.hpp index b7b0a7a..fdf0dfb 100644 --- a/sockeye.hpp +++ b/sockeye.hpp @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -7,218 +8,34 @@ #include #include #include -#include -#include #include -#ifdef __linux__ - #include -#elif defined(__APPLE__) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) - #include -#else - #error "Unsupported platform - requires Linux or BSD/macOS" -#endif - namespace sockeye { -class BufferPool { -public: - static constexpr size_t BUFFER_SIZE = 24576; - - explicit BufferPool(size_t pool_size = 1024) { - buffers_.reserve(pool_size); - for (size_t i = 0; i < pool_size; ++i) { - buffers_.emplace_back(BUFFER_SIZE); - } - } - - std::vector* acquire() { - if (buffers_.empty()) { - return new std::vector(BUFFER_SIZE); - } - auto* buffer = new std::vector(std::move(buffers_.back())); - buffers_.pop_back(); - return buffer; - } - - void release(std::vector* buffer) { - if (!buffer) return; - if (buffers_.size() < buffers_.capacity()) { - buffer->clear(); - buffer->resize(BUFFER_SIZE); - buffers_.emplace_back(std::move(*buffer)); - delete buffer; - } else { - delete buffer; - } - } - -private: - std::vector> buffers_; -}; - -struct ClientBuffer { - std::vector* read_buffer = nullptr; - size_t bytes_read = 0; -}; - -class SocketBase { +class Socket +{ public: using ConnectionHandler = std::function; using DataHandler = std::function; using DisconnectHandler = std::function; - explicit SocketBase(uint16_t port = 8080) : port_(port) {} - - virtual ~SocketBase() { - cleanup_resources(); - } - - virtual bool start() = 0; - virtual void run() = 0; - void stop() { running_ = false; } - - void on_connection(ConnectionHandler handler) { on_connection_ = std::move(handler); } - void on_data(DataHandler handler) { on_data_ = std::move(handler); } - void on_disconnect(DisconnectHandler handler) { on_disconnect_ = std::move(handler); } - -protected: - static constexpr int LISTEN_BACKLOG = SOMAXCONN; - - uint16_t port_; - int server_fd_ = -1; - bool running_ = true; - - BufferPool buffer_pool_; - std::unordered_map client_buffers_; - - ConnectionHandler on_connection_; - DataHandler on_data_; - DisconnectHandler on_disconnect_; - - void cleanup_resources() { - for (auto& [fd, client_buffer] : client_buffers_) { - buffer_pool_.release(client_buffer.read_buffer); - close(fd); - } - client_buffers_.clear(); + explicit Socket(uint16_t port = 8080) : port_(port) {} + ~Socket() + { if (server_fd_ != -1) close(server_fd_); - } - - bool create_server_socket() { - server_fd_ = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); - if (server_fd_ == -1) return false; - - int opt = 1; - if (setsockopt(server_fd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1 || - setsockopt(server_fd_, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) == -1 || - setsockopt(server_fd_, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) == -1) { - return false; - } - - sockaddr_in addr{}; - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = INADDR_ANY; - addr.sin_port = htons(port_); - - return bind(server_fd_, reinterpret_cast(&addr), sizeof(addr)) != -1 && - listen(server_fd_, LISTEN_BACKLOG) != -1; - } - - bool add_client_buffer(int client_fd) { - client_buffers_[client_fd] = {buffer_pool_.acquire(), 0}; - return true; - } - - void remove_client_common(int client_fd) { - auto it = client_buffers_.find(client_fd); - if (it != client_buffers_.end()) { - buffer_pool_.release(it->second.read_buffer); - client_buffers_.erase(it); - } - - close(client_fd); - if (on_disconnect_) on_disconnect_(client_fd); - } - - void accept_connections() { - while (true) { - sockaddr_in client_addr{}; - socklen_t client_len = sizeof(client_addr); - - int client_fd = accept4(server_fd_, - reinterpret_cast(&client_addr), - &client_len, - SOCK_NONBLOCK | SOCK_CLOEXEC); - - if (client_fd == -1) { - if (errno == EAGAIN || errno == EWOULDBLOCK) break; - continue; - } - - int opt = 1; - setsockopt(client_fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)); - - if (add_client_impl(client_fd)) { - if (on_connection_) on_connection_(client_fd); - } else { - close(client_fd); - } - } - } - - void handle_client_data(int client_fd) { - auto it = client_buffers_.find(client_fd); - if (it == client_buffers_.end()) { - remove_client_impl(client_fd); - return; - } - - auto& client_buffer = it->second; - while (true) { - ssize_t bytes = recv(client_fd, - client_buffer.read_buffer->data() + client_buffer.bytes_read, - BufferPool::BUFFER_SIZE - client_buffer.bytes_read, 0); - - if (bytes <= 0) { - if (bytes == 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) { - remove_client_impl(client_fd); - } - break; - } - - client_buffer.bytes_read += bytes; - - if (on_data_) { - on_data_(client_fd, client_buffer.read_buffer->data(), client_buffer.bytes_read); - } - - client_buffer.bytes_read = 0; - } - } - - virtual bool add_client_impl(int client_fd) = 0; - virtual void remove_client_impl(int client_fd) = 0; -}; - -#ifdef __linux__ - -class EpollSocket : public SocketBase { -public: - explicit EpollSocket(uint16_t port = 8080) : SocketBase(port) {} - - ~EpollSocket() { if (epoll_fd_ != -1) close(epoll_fd_); } - bool start() override { + bool start() + { return create_server_socket() && create_epoll() && add_server_to_epoll(); } - void run() override { + void run() + { std::array events; while (running_) { @@ -238,126 +55,121 @@ public: } } - bool add_client(int client_fd) { + void stop() { running_ = false; } + + bool add_client(int client_fd) + { epoll_event event{}; event.events = EPOLLIN | EPOLLET; event.data.fd = client_fd; - - return epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, client_fd, &event) != -1 && - add_client_buffer(client_fd); + return epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, client_fd, &event) != -1; } - void remove_client(int client_fd) { + void remove_client(int client_fd) + { epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, client_fd, nullptr); - remove_client_common(client_fd); + close(client_fd); + if (on_disconnect_) on_disconnect_(client_fd); } + void on_connection(ConnectionHandler handler) { on_connection_ = std::move(handler); } + void on_data(DataHandler handler) { on_data_ = std::move(handler); } + void on_disconnect(DisconnectHandler handler) { on_disconnect_ = std::move(handler); } + private: static constexpr int MAX_EVENTS = 1024; - int epoll_fd_ = -1; + static constexpr int BUFFER_SIZE = 8192; - bool create_epoll() { + uint16_t port_; + int server_fd_ = -1; + int epoll_fd_ = -1; + bool running_ = true; + + ConnectionHandler on_connection_; + DataHandler on_data_; + DisconnectHandler on_disconnect_; + + bool create_server_socket() + { + server_fd_ = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); + if (server_fd_ == -1) return false; + + int opt = 1; + if (setsockopt(server_fd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1 || + setsockopt(server_fd_, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) == -1 || + setsockopt(server_fd_, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) == -1) { + return false; + } + + sockaddr_in addr{}; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = INADDR_ANY; + addr.sin_port = htons(port_); + + return bind(server_fd_, reinterpret_cast(&addr), sizeof(addr)) != -1 && + listen(server_fd_, SOMAXCONN) != -1; + } + + bool create_epoll() + { epoll_fd_ = epoll_create1(EPOLL_CLOEXEC); return epoll_fd_ != -1; } - bool add_server_to_epoll() { + bool add_server_to_epoll() + { epoll_event event{}; event.events = EPOLLIN | EPOLLET; event.data.fd = server_fd_; return epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, server_fd_, &event) != -1; } - bool add_client_impl(int client_fd) override { - return add_client(client_fd); - } + inline void accept_connections() + { + while (true) { + sockaddr_in client_addr{}; + socklen_t client_len = sizeof(client_addr); - void remove_client_impl(int client_fd) override { - remove_client(client_fd); - } -}; + int client_fd = accept4(server_fd_, + reinterpret_cast(&client_addr), + &client_len, + SOCK_NONBLOCK | SOCK_CLOEXEC); -using Socket = EpollSocket; - -#else // BSD/macOS - -class KqueueSocket : public SocketBase { -public: - explicit KqueueSocket(uint16_t port = 8080) : SocketBase(port) {} - - ~KqueueSocket() { - if (kqueue_fd_ != -1) close(kqueue_fd_); - } - - bool start() override { - return create_server_socket() && - create_kqueue() && - add_server_to_kqueue(); - } - - void run() override { - std::array events; - - while (running_) { - struct timespec timeout{1, 0}; - int num_events = kevent(kqueue_fd_, nullptr, 0, events.data(), MAX_EVENTS, &timeout); - if (num_events == -1) { - if (errno == EINTR) continue; - break; + if (client_fd == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) break; + continue; } - for (int i = 0; i < num_events; ++i) { - int fd = static_cast(events[i].ident); - if (fd == server_fd_) { - accept_connections(); - } else { - handle_client_data(fd); - } + int opt = 1; + setsockopt(client_fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)); + + if (add_client(client_fd)) { + if (on_connection_) on_connection_(client_fd); + } else { + close(client_fd); } } } - bool add_client(int client_fd) { - struct kevent event; - EV_SET(&event, client_fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, nullptr); + inline void handle_client_data(int client_fd) + { + if (!on_data_) return; - return kevent(kqueue_fd_, &event, 1, nullptr, 0, nullptr) != -1 && - add_client_buffer(client_fd); - } - - void remove_client(int client_fd) { - struct kevent event; - EV_SET(&event, client_fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr); - kevent(kqueue_fd_, &event, 1, nullptr, 0, nullptr); - remove_client_common(client_fd); - } - -private: - static constexpr int MAX_EVENTS = 1024; - int kqueue_fd_ = -1; - - bool create_kqueue() { - kqueue_fd_ = kqueue(); - return kqueue_fd_ != -1; - } - - bool add_server_to_kqueue() { - struct kevent event; - EV_SET(&event, server_fd_, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, nullptr); - return kevent(kqueue_fd_, &event, 1, nullptr, 0, nullptr) != -1; - } - - bool add_client_impl(int client_fd) override { - return add_client(client_fd); - } - - void remove_client_impl(int client_fd) override { - remove_client(client_fd); + char buffer[BUFFER_SIZE]; + while (true) { + ssize_t bytes = recv(client_fd, buffer, BUFFER_SIZE, 0); + if (bytes > 0) { + on_data_(client_fd, buffer, bytes); + } else if (bytes == 0) { + remove_client(client_fd); + break; + } else { + if (errno == EAGAIN || errno == EWOULDBLOCK) break; + remove_client(client_fd); + break; + } + } } }; -using Socket = KqueueSocket; - -#endif - } // namespace sockeye