#include "sockeye.hpp" #include #include #include #include #include #include #include #include #include #include using namespace std; class SocketTest { public: void run() { cout << "Starting socket tests...\n"; start_server(); this_thread::sleep_for(chrono::milliseconds(100)); test_basic_functionality(); test_throughput(); server_.stop(); if (server_thread_.joinable()) { server_thread_.join(); } print_results(); } private: struct MemoryInfo { size_t rss_kb = 0; size_t peak_rss_kb = 0; size_t vsize_kb = 0; }; MemoryInfo get_memory_info() { MemoryInfo info; struct rusage usage; if (getrusage(RUSAGE_SELF, &usage) == 0) { info.rss_kb = usage.ru_maxrss; info.peak_rss_kb = usage.ru_maxrss; #ifdef __APPLE__ // macOS reports in bytes, convert to KB info.rss_kb /= 1024; info.peak_rss_kb /= 1024; #endif } return info; } sockeye::Socket server_{8080}; thread server_thread_; atomic peak_connections_{0}; atomic current_connections_{0}; atomic messages_received_{0}; atomic bytes_received_{0}; chrono::steady_clock::time_point start_time_; // Memory tracking MemoryInfo baseline_memory_; MemoryInfo peak_memory_; // Synchronization for reliable testing atomic clients_connected_{0}; atomic clients_finished_sending_{0}; mutex stats_mutex_; void start_server() { server_.on_connection([this](int fd) { int current = ++current_connections_; peak_connections_.store(max(peak_connections_.load(), current)); clients_connected_++; }); server_.on_data([this](int fd, const char* data, size_t len) { // Count messages by looking for our delimiter for (size_t i = 0; i < len; ++i) { if (data[i] == '\n') { messages_received_++; } } bytes_received_ += len; }); server_.on_disconnect([this](int fd) { current_connections_--; }); if (!server_.start()) { cerr << "Failed to start server\n"; exit(1); } server_thread_ = thread([this]() { server_.run(); }); } void test_basic_functionality() { cout << "Testing basic functionality...\n"; int sock = create_client_socket(); if (sock == -1) { cerr << "Failed to create client socket\n"; return; } string msg = "Hello, server!\n"; send(sock, msg.c_str(), msg.length(), 0); this_thread::sleep_for(chrono::milliseconds(50)); close(sock); cout << "Basic test completed\n"; } void test_throughput() { constexpr int num_clients = 100; constexpr int messages_per_client = 500; constexpr int message_size = 1024; constexpr int expected_messages = num_clients * messages_per_client; cout << "Testing throughput: " << expected_messages << " messages...\n"; // Capture baseline memory baseline_memory_ = get_memory_info(); peak_memory_ = baseline_memory_; // Reset counters messages_received_ = 0; bytes_received_ = 0; clients_connected_ = 0; clients_finished_sending_ = 0; start_time_ = chrono::steady_clock::now(); vector clients; clients.reserve(num_clients); // Start memory monitoring thread atomic monitoring{true}; thread memory_monitor([this, &monitoring]() { while (monitoring) { auto current = get_memory_info(); if (current.rss_kb > peak_memory_.rss_kb) { peak_memory_ = current; } this_thread::sleep_for(chrono::milliseconds(10)); } }); for (int i = 0; i < num_clients; ++i) { clients.emplace_back([this, i, messages_per_client, message_size]() { reliable_client_worker(i, messages_per_client, message_size); }); } // Wait for all clients to connect while (clients_connected_.load() < num_clients) { this_thread::sleep_for(chrono::milliseconds(10)); } cout << "All clients connected\n"; // Wait for all clients to finish for (auto& t : clients) { t.join(); } cout << "All clients finished sending\n"; // Wait for server to process all data with timeout auto deadline = chrono::steady_clock::now() + chrono::seconds(5); auto last_count = messages_received_.load(); int stable_iterations = 0; while (chrono::steady_clock::now() < deadline) { this_thread::sleep_for(chrono::milliseconds(50)); auto current_count = messages_received_.load(); if (current_count == last_count) { stable_iterations++; if (stable_iterations >= 5) break; // 250ms of stability } else { stable_iterations = 0; last_count = current_count; } } monitoring = false; memory_monitor.join(); cout << "Expected: " << expected_messages << ", Received: " << messages_received_.load() << "\n"; } void reliable_client_worker(int client_id, int message_count, int message_size) { int sock = -1; int retry_count = 0; constexpr int max_retries = 10; // Retry connection with exponential backoff while (sock == -1 && retry_count < max_retries) { sock = create_client_socket(); if (sock == -1) { retry_count++; this_thread::sleep_for(chrono::milliseconds(10 << retry_count)); } } if (sock == -1) { cout << "Client " << client_id << " failed to connect\n"; return; } // Create message with delimiter string base_msg(message_size - 1, 'A' + (client_id % 26)); string msg = base_msg + "\n"; // Add delimiter int sent_count = 0; // Send messages with proper flow control for (int i = 0; i < message_count; ++i) { int attempts = 0; bool sent = false; while (!sent && attempts < 5) { ssize_t result = send(sock, msg.c_str(), msg.length(), MSG_NOSIGNAL); if (result == static_cast(msg.length())) { sent = true; sent_count++; } else if (result == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) { // Socket buffer full, wait longer this_thread::sleep_for(chrono::milliseconds(1 << attempts)); attempts++; } else { // Connection error break; } } else if (result > 0) { // Partial send - handle remaining data size_t remaining = msg.length() - result; const char* remaining_data = msg.c_str() + result; while (remaining > 0) { ssize_t more = send(sock, remaining_data, remaining, MSG_NOSIGNAL); if (more <= 0) break; remaining -= more; remaining_data += more; } if (remaining == 0) { sent = true; sent_count++; } } } if (!sent) break; // Flow control: small delay every batch if ((i + 1) % 100 == 0) { this_thread::sleep_for(chrono::microseconds(500)); } } // Ensure all data is sent before closing shutdown(sock, SHUT_WR); // Give server time to process this_thread::sleep_for(chrono::milliseconds(50)); close(sock); clients_finished_sending_++; if (sent_count != message_count) { cout << "Client " << client_id << " sent " << sent_count << "/" << message_count << " messages\n"; } } int create_client_socket() { int sock = socket(AF_INET, SOCK_STREAM, 0); if (sock == -1) return -1; // Optimize socket int opt = 1; setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)); // Larger send buffer int buf_size = 1048576; // 1MB setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &buf_size, sizeof(buf_size)); // Set send timeout struct timeval timeout; timeout.tv_sec = 5; timeout.tv_usec = 0; setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout)); sockaddr_in addr{}; addr.sin_family = AF_INET; addr.sin_port = htons(8080); inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr); if (connect(sock, reinterpret_cast(&addr), sizeof(addr)) == -1) { close(sock); return -1; } return sock; } void print_results() { auto end_time = chrono::steady_clock::now(); auto duration = chrono::duration_cast(end_time - start_time_); double throughput_mps = static_cast(messages_received_) / duration.count() * 1000; double throughput_mbps = static_cast(bytes_received_) / (1024 * 1024) / duration.count() * 1000; cout << "\n=== Test Results ===\n"; cout << "Duration: " << duration.count() << "ms\n"; cout << "Messages received: " << messages_received_.load() << "\n"; cout << "Bytes received: " << bytes_received_.load() << "\n"; cout << "Throughput: " << static_cast(throughput_mps) << " msg/sec\n"; cout << "Throughput: " << throughput_mbps << " MB/sec\n"; cout << "Peak connections: " << peak_connections_.load() << "\n"; cout << "\n=== Memory Usage ===\n"; cout << "Baseline RSS: " << baseline_memory_.rss_kb << " KB\n"; cout << "Peak RSS: " << peak_memory_.rss_kb << " KB\n"; cout << "Memory increase: " << (peak_memory_.rss_kb - baseline_memory_.rss_kb) << " KB\n"; if (peak_memory_.vsize_kb > 0) { cout << "Virtual memory: " << peak_memory_.vsize_kb << " KB\n"; } cout << "Memory per connection: " << (peak_memory_.rss_kb - baseline_memory_.rss_kb) / peak_connections_.load() << " KB\n"; } }; int main() { SocketTest test; test.run(); return 0; }