#include "sockeye.hpp" #include #include #include #include #include #include #include #include #include #include #include #include using namespace std; class SocketTest { public: void run() { cout << "Starting socket tests with HTTP workload...\n"; start_server(); this_thread::sleep_for(chrono::milliseconds(100)); test_http_workload(); server_.stop(); if (server_thread_.joinable()) { server_thread_.join(); } print_results(); } private: struct MemoryInfo { size_t rss_kb = 0; size_t peak_rss_kb = 0; }; MemoryInfo get_memory_info() { MemoryInfo info; struct rusage usage; if (getrusage(RUSAGE_SELF, &usage) == 0) { #ifdef __APPLE__ // macOS reports in bytes, convert to KB info.rss_kb = usage.ru_maxrss / 1024; #else // Linux reports in KB info.rss_kb = usage.ru_maxrss; #endif info.peak_rss_kb = info.rss_kb; } return info; } sockeye::Socket server_{8080}; thread server_thread_; atomic peak_connections_{0}; atomic current_connections_{0}; atomic requests_processed_{0}; atomic bytes_received_{0}; chrono::steady_clock::time_point start_time_; // Per-client request buffering mutex request_buffer_mutex_; unordered_map request_buffers_; // Memory tracking MemoryInfo baseline_memory_; MemoryInfo peak_memory_; void start_server() { server_.on_connection([this](int fd) { int current = ++current_connections_; peak_connections_.store(max(peak_connections_.load(), current)); }); server_.on_data([this](int fd, const char* data, size_t len) { bytes_received_ += len; string request_chunk(data, len); lock_guard lock(request_buffer_mutex_); request_buffers_[fd] += request_chunk; // Process all complete requests in the buffer while (true) { auto& buffer = request_buffers_[fd]; size_t pos = buffer.find("\r\n\r\n"); if (pos == string::npos) { break; // No complete request found } requests_processed_++; // Remove the processed request from the buffer buffer.erase(0, pos + 4); // Send a standard HTTP response string response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\nConnection: keep-alive\r\n\r\nHello, World!"; server_.send(fd, response); } }); server_.on_disconnect([this](int fd) { current_connections_--; lock_guard lock(request_buffer_mutex_); request_buffers_.erase(fd); }); if (!server_.start()) { cerr << "Failed to start server\n"; exit(1); } server_thread_ = thread([this]() { server_.run(); }); } void test_http_workload() { constexpr int num_clients = 100; constexpr int requests_per_client = 1000; constexpr long long expected_requests = num_clients * requests_per_client; cout << "Testing HTTP workload: " << num_clients << " clients, " << requests_per_client << " req/client (" << expected_requests << " total)...\n"; baseline_memory_ = get_memory_info(); peak_memory_ = baseline_memory_; requests_processed_ = 0; bytes_received_ = 0; current_connections_ = 0; peak_connections_ = 0; start_time_ = chrono::steady_clock::now(); vector clients; clients.reserve(num_clients); atomic monitoring{true}; thread memory_monitor([this, &monitoring]() { while (monitoring) { auto current = get_memory_info(); if (current.rss_kb > peak_memory_.rss_kb) { peak_memory_ = current; } this_thread::sleep_for(chrono::milliseconds(10)); } }); for (int i = 0; i < num_clients; ++i) { clients.emplace_back([this, requests_per_client]() { http_client_worker(requests_per_client); }); } for (auto& t : clients) { t.join(); } // Wait for server to process all requests auto deadline = chrono::steady_clock::now() + chrono::seconds(10); while (requests_processed_.load() < expected_requests && chrono::steady_clock::now() < deadline) { this_thread::sleep_for(chrono::milliseconds(50)); } monitoring = false; memory_monitor.join(); cout << "Expected: " << expected_requests << ", Processed: " << requests_processed_.load() << "\n"; } void http_client_worker(int request_count) { int sock = create_client_socket(); if (sock == -1) return; string request = "GET / HTTP/1.1\r\nHost: localhost\r\n\r\n"; char response_buf[1024]; for (int i = 0; i < request_count; ++i) { if (::send(sock, request.c_str(), request.length(), 0) < 0) { break; // Send failed } // Read response (basic implementation for testing) ssize_t bytes_read = 0; bool response_complete = false; while (!response_complete) { ssize_t result = ::recv(sock, response_buf, sizeof(response_buf) - 1, 0); if (result > 0) { bytes_read += result; response_buf[result] = '\0'; // Simple check for end of our known response if (strstr(response_buf, "Hello, World!")) { response_complete = true; } } else { break; // Connection closed or error } } if (!response_complete) break; } close(sock); } int create_client_socket() { int sock = socket(AF_INET, SOCK_STREAM, 0); if (sock == -1) return -1; int opt = 1; setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)); struct timeval timeout; timeout.tv_sec = 5; timeout.tv_usec = 0; setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout)); setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout)); sockaddr_in addr{}; addr.sin_family = AF_INET; addr.sin_port = htons(8080); inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr); if (connect(sock, reinterpret_cast(&addr), sizeof(addr)) == -1) { close(sock); return -1; } return sock; } void print_results() { auto end_time = chrono::steady_clock::now(); auto duration_ms = chrono::duration_cast(end_time - start_time_).count(); if (duration_ms == 0) duration_ms = 1; long long final_requests = requests_processed_.load(); double duration_s = static_cast(duration_ms) / 1000.0; double rps = static_cast(final_requests) / duration_s; double throughput_mbps = static_cast(bytes_received_) / (1024 * 1024) / duration_s; cout << "\n=== Test Results ===\n"; cout << "Duration: " << duration_ms << " ms\n"; cout << "Requests processed: " << final_requests << "\n"; cout << "Throughput: " << static_cast(rps) << " req/sec\n"; cout << "Data Rate (RX): " << throughput_mbps << " MB/sec\n"; cout << "Peak connections: " << peak_connections_.load() << "\n"; cout << "\n=== Memory Usage ===\n"; cout << "Baseline RSS: " << baseline_memory_.rss_kb << " KB\n"; cout << "Peak RSS: " << peak_memory_.rss_kb << " KB\n"; cout << "Memory increase: " << (peak_memory_.rss_kb - baseline_memory_.rss_kb) << " KB\n"; if (peak_connections_.load() > 0) { cout << "Memory per connection: " << static_cast(peak_memory_.rss_kb - baseline_memory_.rss_kb) / peak_connections_.load() << " KB\n"; } } }; int main() { SocketTest test; test.run(); return 0; }