From 4ef162f9b54bb2016aec7dacc4722a954f2f23c9 Mon Sep 17 00:00:00 2001 From: Sky Johnson Date: Mon, 16 Jun 2025 11:26:19 -0500 Subject: [PATCH] Upload single-header library and test program --- .gitignore | 2 +- README.md | 143 ++++++++++++++++++++- sockeye.hpp | 363 ++++++++++++++++++++++++++++++++++++++++++++++++++++ test.cpp | 349 ++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 855 insertions(+), 2 deletions(-) create mode 100644 sockeye.hpp create mode 100644 test.cpp diff --git a/.gitignore b/.gitignore index e257658..61f36cf 100644 --- a/.gitignore +++ b/.gitignore @@ -31,4 +31,4 @@ *.exe *.out *.app - +test diff --git a/README.md b/README.md index 3982aaa..bc46e50 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,144 @@ # Sockeye -Easy-to-use C++ library for utilizing sockets. \ No newline at end of file +An easy-to-use, fast socket C++ library! Uses epoll on Linux, kqueue on BSD/macOS. + +## API Reference + +### sockeye::Socket + +Main server class for handling TCP connections. + +#### Constructor +```cpp +explicit Socket(uint16_t port = 8080) +``` + +#### Methods + +**`bool start()`** +Initialize the server socket and event system. Returns `true` on success. + +**`void run()`** +Start the event loop. Blocks until `stop()` is called. + +**`void stop()`** +Stop the server and exit the event loop. + +**`void on_connection(ConnectionHandler handler)`** +Set callback for new client connections. + +**`void on_data(DataHandler handler)`** +Set callback for incoming data from clients. + +**`void on_disconnect(DisconnectHandler handler)`** +Set callback for client disconnections. + +#### Handler Types + +```cpp +using ConnectionHandler = std::function; +using DataHandler = std::function; +using DisconnectHandler = std::function; +``` + +## Examples + +### Basic Echo Server + +```cpp +#include "sockeye.hpp" +#include +#include + +int main() { + sockeye::Socket server(8080); + + server.on_connection([](int client_fd) { + std::cout << "Client connected: " << client_fd << std::endl; + }); + + server.on_data([](int client_fd, const char* data, size_t len) { + // Echo data back to client + send(client_fd, data, len, 0); + }); + + server.on_disconnect([](int client_fd) { + std::cout << "Client disconnected: " << client_fd << std::endl; + }); + + if (!server.start()) { + std::cerr << "Failed to start server" << std::endl; + return 1; + } + + server.run(); + return 0; +} +``` + +### HTTP-like Server + +```cpp +#include "sockeye.hpp" +#include +#include + +int main() { + sockeye::Socket server(8080); + + server.on_data([](int client_fd, const char* data, size_t len) { + std::string request(data, len); + + // Simple HTTP response + std::string response = + "HTTP/1.1 200 OK\r\n" + "Content-Length: 13\r\n" + "Connection: close\r\n\r\n" + "Hello, World!"; + + send(client_fd, response.c_str(), response.length(), 0); + close(client_fd); + }); + + if (!server.start()) { + std::cerr << "Failed to start server" << std::endl; + return 1; + } + + std::cout << "Server listening on port 8080" << std::endl; + server.run(); + return 0; +} +``` + +### Graceful Shutdown + +```cpp +#include "sockeye.hpp" +#include + +sockeye::Socket* server_ptr = nullptr; + +void signal_handler(int signal) { + if (server_ptr) { + server_ptr->stop(); + } +} + +int main() { + sockeye::Socket server(8080); + server_ptr = &server; + + signal(SIGINT, signal_handler); + signal(SIGTERM, signal_handler); + + // Set up handlers... + + if (!server.start()) { + return 1; + } + + server.run(); + return 0; +} +``` diff --git a/sockeye.hpp b/sockeye.hpp new file mode 100644 index 0000000..0e2955b --- /dev/null +++ b/sockeye.hpp @@ -0,0 +1,363 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef __linux__ + #include +#elif defined(__APPLE__) || defined(__FreeBSD__) || defined(__OpenBSD__) || defined(__NetBSD__) + #include +#else + #error "Unsupported platform - requires Linux or BSD/macOS" +#endif + +namespace sockeye { + +class BufferPool { +public: + static constexpr size_t BUFFER_SIZE = 4096; + + explicit BufferPool(size_t pool_size = 1024) { + buffers_.reserve(pool_size); + for (size_t i = 0; i < pool_size; ++i) { + buffers_.emplace_back(BUFFER_SIZE); + } + } + + std::vector* acquire() { + if (buffers_.empty()) { + return new std::vector(BUFFER_SIZE); + } + auto* buffer = new std::vector(std::move(buffers_.back())); + buffers_.pop_back(); + return buffer; + } + + void release(std::vector* buffer) { + if (!buffer) return; + if (buffers_.size() < buffers_.capacity()) { + buffer->clear(); + buffer->resize(BUFFER_SIZE); + buffers_.emplace_back(std::move(*buffer)); + delete buffer; + } else { + delete buffer; + } + } + +private: + std::vector> buffers_; +}; + +struct ClientBuffer { + std::vector* read_buffer = nullptr; + size_t bytes_read = 0; +}; + +class SocketBase { +public: + using ConnectionHandler = std::function; + using DataHandler = std::function; + using DisconnectHandler = std::function; + + explicit SocketBase(uint16_t port = 8080) : port_(port) {} + + virtual ~SocketBase() { + cleanup_resources(); + } + + virtual bool start() = 0; + virtual void run() = 0; + void stop() { running_ = false; } + + void on_connection(ConnectionHandler handler) { on_connection_ = std::move(handler); } + void on_data(DataHandler handler) { on_data_ = std::move(handler); } + void on_disconnect(DisconnectHandler handler) { on_disconnect_ = std::move(handler); } + +protected: + static constexpr int LISTEN_BACKLOG = SOMAXCONN; + + uint16_t port_; + int server_fd_ = -1; + bool running_ = true; + + BufferPool buffer_pool_; + std::unordered_map client_buffers_; + + ConnectionHandler on_connection_; + DataHandler on_data_; + DisconnectHandler on_disconnect_; + + void cleanup_resources() { + for (auto& [fd, client_buffer] : client_buffers_) { + buffer_pool_.release(client_buffer.read_buffer); + close(fd); + } + client_buffers_.clear(); + + if (server_fd_ != -1) close(server_fd_); + } + + bool create_server_socket() { + server_fd_ = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); + if (server_fd_ == -1) return false; + + int opt = 1; + if (setsockopt(server_fd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) == -1 || + setsockopt(server_fd_, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) == -1 || + setsockopt(server_fd_, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) == -1) { + return false; + } + + sockaddr_in addr{}; + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = INADDR_ANY; + addr.sin_port = htons(port_); + + return bind(server_fd_, reinterpret_cast(&addr), sizeof(addr)) != -1 && + listen(server_fd_, LISTEN_BACKLOG) != -1; + } + + bool add_client_buffer(int client_fd) { + client_buffers_[client_fd] = {buffer_pool_.acquire(), 0}; + return true; + } + + void remove_client_common(int client_fd) { + auto it = client_buffers_.find(client_fd); + if (it != client_buffers_.end()) { + buffer_pool_.release(it->second.read_buffer); + client_buffers_.erase(it); + } + + close(client_fd); + if (on_disconnect_) on_disconnect_(client_fd); + } + + void accept_connections() { + while (true) { + sockaddr_in client_addr{}; + socklen_t client_len = sizeof(client_addr); + + int client_fd = accept4(server_fd_, + reinterpret_cast(&client_addr), + &client_len, + SOCK_NONBLOCK | SOCK_CLOEXEC); + + if (client_fd == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) break; + continue; + } + + int opt = 1; + setsockopt(client_fd, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)); + + if (add_client_impl(client_fd)) { + if (on_connection_) on_connection_(client_fd); + } else { + close(client_fd); + } + } + } + + void handle_client_data(int client_fd) { + auto it = client_buffers_.find(client_fd); + if (it == client_buffers_.end()) { + remove_client_impl(client_fd); + return; + } + + auto& client_buffer = it->second; + while (true) { + ssize_t bytes = recv(client_fd, + client_buffer.read_buffer->data() + client_buffer.bytes_read, + BufferPool::BUFFER_SIZE - client_buffer.bytes_read, 0); + + if (bytes <= 0) { + if (bytes == 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) { + remove_client_impl(client_fd); + } + break; + } + + client_buffer.bytes_read += bytes; + + if (on_data_) { + on_data_(client_fd, client_buffer.read_buffer->data(), client_buffer.bytes_read); + } + + client_buffer.bytes_read = 0; + } + } + + virtual bool add_client_impl(int client_fd) = 0; + virtual void remove_client_impl(int client_fd) = 0; +}; + +#ifdef __linux__ + +class EpollSocket : public SocketBase { +public: + explicit EpollSocket(uint16_t port = 8080) : SocketBase(port) {} + + ~EpollSocket() { + if (epoll_fd_ != -1) close(epoll_fd_); + } + + bool start() override { + return create_server_socket() && + create_epoll() && + add_server_to_epoll(); + } + + void run() override { + std::array events; + + while (running_) { + int num_events = epoll_wait(epoll_fd_, events.data(), MAX_EVENTS, 1000); + if (num_events == -1) { + if (errno == EINTR) continue; + break; + } + + for (int i = 0; i < num_events; ++i) { + if (events[i].data.fd == server_fd_) { + accept_connections(); + } else { + handle_client_data(events[i].data.fd); + } + } + } + } + + bool add_client(int client_fd) { + epoll_event event{}; + event.events = EPOLLIN | EPOLLET; + event.data.fd = client_fd; + + return epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, client_fd, &event) != -1 && + add_client_buffer(client_fd); + } + + void remove_client(int client_fd) { + epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, client_fd, nullptr); + remove_client_common(client_fd); + } + +private: + static constexpr int MAX_EVENTS = 1024; + int epoll_fd_ = -1; + + bool create_epoll() { + epoll_fd_ = epoll_create1(EPOLL_CLOEXEC); + return epoll_fd_ != -1; + } + + bool add_server_to_epoll() { + epoll_event event{}; + event.events = EPOLLIN | EPOLLET; + event.data.fd = server_fd_; + return epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, server_fd_, &event) != -1; + } + + bool add_client_impl(int client_fd) override { + return add_client(client_fd); + } + + void remove_client_impl(int client_fd) override { + remove_client(client_fd); + } +}; + +using Socket = EpollSocket; + +#else // BSD/macOS + +class KqueueSocket : public SocketBase { +public: + explicit KqueueSocket(uint16_t port = 8080) : SocketBase(port) {} + + ~KqueueSocket() { + if (kqueue_fd_ != -1) close(kqueue_fd_); + } + + bool start() override { + return create_server_socket() && + create_kqueue() && + add_server_to_kqueue(); + } + + void run() override { + std::array events; + + while (running_) { + struct timespec timeout{1, 0}; + int num_events = kevent(kqueue_fd_, nullptr, 0, events.data(), MAX_EVENTS, &timeout); + if (num_events == -1) { + if (errno == EINTR) continue; + break; + } + + for (int i = 0; i < num_events; ++i) { + int fd = static_cast(events[i].ident); + if (fd == server_fd_) { + accept_connections(); + } else { + handle_client_data(fd); + } + } + } + } + + bool add_client(int client_fd) { + struct kevent event; + EV_SET(&event, client_fd, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, nullptr); + + return kevent(kqueue_fd_, &event, 1, nullptr, 0, nullptr) != -1 && + add_client_buffer(client_fd); + } + + void remove_client(int client_fd) { + struct kevent event; + EV_SET(&event, client_fd, EVFILT_READ, EV_DELETE, 0, 0, nullptr); + kevent(kqueue_fd_, &event, 1, nullptr, 0, nullptr); + remove_client_common(client_fd); + } + +private: + static constexpr int MAX_EVENTS = 1024; + int kqueue_fd_ = -1; + + bool create_kqueue() { + kqueue_fd_ = kqueue(); + return kqueue_fd_ != -1; + } + + bool add_server_to_kqueue() { + struct kevent event; + EV_SET(&event, server_fd_, EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, nullptr); + return kevent(kqueue_fd_, &event, 1, nullptr, 0, nullptr) != -1; + } + + bool add_client_impl(int client_fd) override { + return add_client(client_fd); + } + + void remove_client_impl(int client_fd) override { + remove_client(client_fd); + } +}; + +using Socket = KqueueSocket; + +#endif + +} // namespace sockeye diff --git a/test.cpp b/test.cpp new file mode 100644 index 0000000..5ee3430 --- /dev/null +++ b/test.cpp @@ -0,0 +1,349 @@ +#include "sockeye.hpp" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace std; + +class SocketTest { +public: + void run() { + cout << "Starting socket tests...\n"; + + start_server(); + this_thread::sleep_for(chrono::milliseconds(100)); + + test_basic_functionality(); + test_throughput(); + + server_.stop(); + if (server_thread_.joinable()) { + server_thread_.join(); + } + + print_results(); + } + +private: + struct MemoryInfo { + size_t rss_kb = 0; + size_t peak_rss_kb = 0; + size_t vsize_kb = 0; + }; + + MemoryInfo get_memory_info() { + MemoryInfo info; + + struct rusage usage; + if (getrusage(RUSAGE_SELF, &usage) == 0) { + info.rss_kb = usage.ru_maxrss; + info.peak_rss_kb = usage.ru_maxrss; + + #ifdef __APPLE__ + // macOS reports in bytes, convert to KB + info.rss_kb /= 1024; + info.peak_rss_kb /= 1024; + #endif + } + + return info; + } + + sockeye::Socket server_{8080}; + thread server_thread_; + atomic peak_connections_{0}; + atomic current_connections_{0}; + atomic messages_received_{0}; + atomic bytes_received_{0}; + chrono::steady_clock::time_point start_time_; + + // Memory tracking + MemoryInfo baseline_memory_; + MemoryInfo peak_memory_; + + // Synchronization for reliable testing + atomic clients_connected_{0}; + atomic clients_finished_sending_{0}; + mutex stats_mutex_; + + void start_server() { + server_.on_connection([this](int fd) { + int current = ++current_connections_; + peak_connections_.store(max(peak_connections_.load(), current)); + clients_connected_++; + }); + + server_.on_data([this](int fd, const char* data, size_t len) { + // Count messages by looking for our delimiter + for (size_t i = 0; i < len; ++i) { + if (data[i] == '\n') { + messages_received_++; + } + } + bytes_received_ += len; + }); + + server_.on_disconnect([this](int fd) { + current_connections_--; + }); + + if (!server_.start()) { + cerr << "Failed to start server\n"; + exit(1); + } + + server_thread_ = thread([this]() { + server_.run(); + }); + } + + void test_basic_functionality() { + cout << "Testing basic functionality...\n"; + + int sock = create_client_socket(); + if (sock == -1) { + cerr << "Failed to create client socket\n"; + return; + } + + string msg = "Hello, server!\n"; + send(sock, msg.c_str(), msg.length(), 0); + + this_thread::sleep_for(chrono::milliseconds(50)); + close(sock); + + cout << "Basic test completed\n"; + } + + void test_throughput() { + constexpr int num_clients = 100; + constexpr int messages_per_client = 500; + constexpr int message_size = 1024; + constexpr int expected_messages = num_clients * messages_per_client; + + cout << "Testing throughput: " << expected_messages << " messages...\n"; + + // Capture baseline memory + baseline_memory_ = get_memory_info(); + peak_memory_ = baseline_memory_; + + // Reset counters + messages_received_ = 0; + bytes_received_ = 0; + clients_connected_ = 0; + clients_finished_sending_ = 0; + + start_time_ = chrono::steady_clock::now(); + + vector clients; + clients.reserve(num_clients); + + // Start memory monitoring thread + atomic monitoring{true}; + thread memory_monitor([this, &monitoring]() { + while (monitoring) { + auto current = get_memory_info(); + if (current.rss_kb > peak_memory_.rss_kb) { + peak_memory_ = current; + } + this_thread::sleep_for(chrono::milliseconds(10)); + } + }); + + for (int i = 0; i < num_clients; ++i) { + clients.emplace_back([this, i, messages_per_client, message_size]() { + reliable_client_worker(i, messages_per_client, message_size); + }); + } + + // Wait for all clients to connect + while (clients_connected_.load() < num_clients) { + this_thread::sleep_for(chrono::milliseconds(10)); + } + cout << "All clients connected\n"; + + // Wait for all clients to finish + for (auto& t : clients) { + t.join(); + } + cout << "All clients finished sending\n"; + + // Wait for server to process all data with timeout + auto deadline = chrono::steady_clock::now() + chrono::seconds(5); + auto last_count = messages_received_.load(); + int stable_iterations = 0; + + while (chrono::steady_clock::now() < deadline) { + this_thread::sleep_for(chrono::milliseconds(50)); + auto current_count = messages_received_.load(); + + if (current_count == last_count) { + stable_iterations++; + if (stable_iterations >= 5) break; // 250ms of stability + } else { + stable_iterations = 0; + last_count = current_count; + } + } + + monitoring = false; + memory_monitor.join(); + + cout << "Expected: " << expected_messages << ", Received: " << messages_received_.load() << "\n"; + } + + void reliable_client_worker(int client_id, int message_count, int message_size) { + int sock = -1; + int retry_count = 0; + constexpr int max_retries = 10; + + // Retry connection with exponential backoff + while (sock == -1 && retry_count < max_retries) { + sock = create_client_socket(); + if (sock == -1) { + retry_count++; + this_thread::sleep_for(chrono::milliseconds(10 << retry_count)); + } + } + + if (sock == -1) { + cout << "Client " << client_id << " failed to connect\n"; + return; + } + + // Create message with delimiter + string base_msg(message_size - 1, 'A' + (client_id % 26)); + string msg = base_msg + "\n"; // Add delimiter + + int sent_count = 0; + + // Send messages with proper flow control + for (int i = 0; i < message_count; ++i) { + int attempts = 0; + bool sent = false; + + while (!sent && attempts < 5) { + ssize_t result = send(sock, msg.c_str(), msg.length(), MSG_NOSIGNAL); + + if (result == static_cast(msg.length())) { + sent = true; + sent_count++; + } else if (result == -1) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + // Socket buffer full, wait longer + this_thread::sleep_for(chrono::milliseconds(1 << attempts)); + attempts++; + } else { + // Connection error + break; + } + } else if (result > 0) { + // Partial send - handle remaining data + size_t remaining = msg.length() - result; + const char* remaining_data = msg.c_str() + result; + + while (remaining > 0) { + ssize_t more = send(sock, remaining_data, remaining, MSG_NOSIGNAL); + if (more <= 0) break; + remaining -= more; + remaining_data += more; + } + + if (remaining == 0) { + sent = true; + sent_count++; + } + } + } + + if (!sent) break; + + // Flow control: small delay every batch + if ((i + 1) % 100 == 0) { + this_thread::sleep_for(chrono::microseconds(500)); + } + } + + // Ensure all data is sent before closing + shutdown(sock, SHUT_WR); + + // Give server time to process + this_thread::sleep_for(chrono::milliseconds(50)); + + close(sock); + clients_finished_sending_++; + + if (sent_count != message_count) { + cout << "Client " << client_id << " sent " << sent_count << "/" << message_count << " messages\n"; + } + } + + int create_client_socket() { + int sock = socket(AF_INET, SOCK_STREAM, 0); + if (sock == -1) return -1; + + // Optimize socket + int opt = 1; + setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)); + + // Larger send buffer + int buf_size = 1048576; // 1MB + setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &buf_size, sizeof(buf_size)); + + // Set send timeout + struct timeval timeout; + timeout.tv_sec = 5; + timeout.tv_usec = 0; + setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout)); + + sockaddr_in addr{}; + addr.sin_family = AF_INET; + addr.sin_port = htons(8080); + inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr); + + if (connect(sock, reinterpret_cast(&addr), sizeof(addr)) == -1) { + close(sock); + return -1; + } + + return sock; + } + + void print_results() { + auto end_time = chrono::steady_clock::now(); + auto duration = chrono::duration_cast(end_time - start_time_); + + double throughput_mps = static_cast(messages_received_) / duration.count() * 1000; + double throughput_mbps = static_cast(bytes_received_) / (1024 * 1024) / duration.count() * 1000; + + cout << "\n=== Test Results ===\n"; + cout << "Duration: " << duration.count() << "ms\n"; + cout << "Messages received: " << messages_received_.load() << "\n"; + cout << "Bytes received: " << bytes_received_.load() << "\n"; + cout << "Throughput: " << static_cast(throughput_mps) << " msg/sec\n"; + cout << "Throughput: " << throughput_mbps << " MB/sec\n"; + cout << "Peak connections: " << peak_connections_.load() << "\n"; + + cout << "\n=== Memory Usage ===\n"; + cout << "Baseline RSS: " << baseline_memory_.rss_kb << " KB\n"; + cout << "Peak RSS: " << peak_memory_.rss_kb << " KB\n"; + cout << "Memory increase: " << (peak_memory_.rss_kb - baseline_memory_.rss_kb) << " KB\n"; + if (peak_memory_.vsize_kb > 0) { + cout << "Virtual memory: " << peak_memory_.vsize_kb << " KB\n"; + } + cout << "Memory per connection: " << (peak_memory_.rss_kb - baseline_memory_.rss_kb) / peak_connections_.load() << " KB\n"; + } +}; + +int main() { + SocketTest test; + test.run(); + return 0; +}