350 lines
8.9 KiB
C++
350 lines
8.9 KiB
C++
#include "sockeye.hpp"
|
|
#include <thread>
|
|
#include <chrono>
|
|
#include <atomic>
|
|
#include <iostream>
|
|
#include <vector>
|
|
#include <string>
|
|
#include <sys/socket.h>
|
|
#include <arpa/inet.h>
|
|
#include <mutex>
|
|
#include <sys/resource.h>
|
|
|
|
using namespace std;
|
|
|
|
class SocketTest {
|
|
public:
|
|
void run() {
|
|
cout << "Starting socket tests...\n";
|
|
|
|
start_server();
|
|
this_thread::sleep_for(chrono::milliseconds(100));
|
|
|
|
test_basic_functionality();
|
|
test_throughput();
|
|
|
|
server_.stop();
|
|
if (server_thread_.joinable()) {
|
|
server_thread_.join();
|
|
}
|
|
|
|
print_results();
|
|
}
|
|
|
|
private:
|
|
struct MemoryInfo {
|
|
size_t rss_kb = 0;
|
|
size_t peak_rss_kb = 0;
|
|
size_t vsize_kb = 0;
|
|
};
|
|
|
|
MemoryInfo get_memory_info() {
|
|
MemoryInfo info;
|
|
|
|
struct rusage usage;
|
|
if (getrusage(RUSAGE_SELF, &usage) == 0) {
|
|
info.rss_kb = usage.ru_maxrss;
|
|
info.peak_rss_kb = usage.ru_maxrss;
|
|
|
|
#ifdef __APPLE__
|
|
// macOS reports in bytes, convert to KB
|
|
info.rss_kb /= 1024;
|
|
info.peak_rss_kb /= 1024;
|
|
#endif
|
|
}
|
|
|
|
return info;
|
|
}
|
|
|
|
sockeye::Socket server_{8080};
|
|
thread server_thread_;
|
|
atomic<int> peak_connections_{0};
|
|
atomic<int> current_connections_{0};
|
|
atomic<int> messages_received_{0};
|
|
atomic<size_t> bytes_received_{0};
|
|
chrono::steady_clock::time_point start_time_;
|
|
|
|
// Memory tracking
|
|
MemoryInfo baseline_memory_;
|
|
MemoryInfo peak_memory_;
|
|
|
|
// Synchronization for reliable testing
|
|
atomic<int> clients_connected_{0};
|
|
atomic<int> clients_finished_sending_{0};
|
|
mutex stats_mutex_;
|
|
|
|
void start_server() {
|
|
server_.on_connection([this](int fd) {
|
|
int current = ++current_connections_;
|
|
peak_connections_.store(max(peak_connections_.load(), current));
|
|
clients_connected_++;
|
|
});
|
|
|
|
server_.on_data([this](int fd, const char* data, size_t len) {
|
|
// Count messages by looking for our delimiter
|
|
for (size_t i = 0; i < len; ++i) {
|
|
if (data[i] == '\n') {
|
|
messages_received_++;
|
|
}
|
|
}
|
|
bytes_received_ += len;
|
|
});
|
|
|
|
server_.on_disconnect([this](int fd) {
|
|
current_connections_--;
|
|
});
|
|
|
|
if (!server_.start()) {
|
|
cerr << "Failed to start server\n";
|
|
exit(1);
|
|
}
|
|
|
|
server_thread_ = thread([this]() {
|
|
server_.run();
|
|
});
|
|
}
|
|
|
|
void test_basic_functionality() {
|
|
cout << "Testing basic functionality...\n";
|
|
|
|
int sock = create_client_socket();
|
|
if (sock == -1) {
|
|
cerr << "Failed to create client socket\n";
|
|
return;
|
|
}
|
|
|
|
string msg = "Hello, server!\n";
|
|
send(sock, msg.c_str(), msg.length(), 0);
|
|
|
|
this_thread::sleep_for(chrono::milliseconds(50));
|
|
close(sock);
|
|
|
|
cout << "Basic test completed\n";
|
|
}
|
|
|
|
void test_throughput() {
|
|
constexpr int num_clients = 100;
|
|
constexpr int messages_per_client = 500;
|
|
constexpr int message_size = 1024;
|
|
constexpr int expected_messages = num_clients * messages_per_client;
|
|
|
|
cout << "Testing throughput: " << expected_messages << " messages...\n";
|
|
|
|
// Capture baseline memory
|
|
baseline_memory_ = get_memory_info();
|
|
peak_memory_ = baseline_memory_;
|
|
|
|
// Reset counters
|
|
messages_received_ = 0;
|
|
bytes_received_ = 0;
|
|
clients_connected_ = 0;
|
|
clients_finished_sending_ = 0;
|
|
|
|
start_time_ = chrono::steady_clock::now();
|
|
|
|
vector<thread> clients;
|
|
clients.reserve(num_clients);
|
|
|
|
// Start memory monitoring thread
|
|
atomic<bool> monitoring{true};
|
|
thread memory_monitor([this, &monitoring]() {
|
|
while (monitoring) {
|
|
auto current = get_memory_info();
|
|
if (current.rss_kb > peak_memory_.rss_kb) {
|
|
peak_memory_ = current;
|
|
}
|
|
this_thread::sleep_for(chrono::milliseconds(10));
|
|
}
|
|
});
|
|
|
|
for (int i = 0; i < num_clients; ++i) {
|
|
clients.emplace_back([this, i, messages_per_client, message_size]() {
|
|
reliable_client_worker(i, messages_per_client, message_size);
|
|
});
|
|
}
|
|
|
|
// Wait for all clients to connect
|
|
while (clients_connected_.load() < num_clients) {
|
|
this_thread::sleep_for(chrono::milliseconds(10));
|
|
}
|
|
cout << "All clients connected\n";
|
|
|
|
// Wait for all clients to finish
|
|
for (auto& t : clients) {
|
|
t.join();
|
|
}
|
|
cout << "All clients finished sending\n";
|
|
|
|
// Wait for server to process all data with timeout
|
|
auto deadline = chrono::steady_clock::now() + chrono::seconds(5);
|
|
auto last_count = messages_received_.load();
|
|
int stable_iterations = 0;
|
|
|
|
while (chrono::steady_clock::now() < deadline) {
|
|
this_thread::sleep_for(chrono::milliseconds(50));
|
|
auto current_count = messages_received_.load();
|
|
|
|
if (current_count == last_count) {
|
|
stable_iterations++;
|
|
if (stable_iterations >= 5) break; // 250ms of stability
|
|
} else {
|
|
stable_iterations = 0;
|
|
last_count = current_count;
|
|
}
|
|
}
|
|
|
|
monitoring = false;
|
|
memory_monitor.join();
|
|
|
|
cout << "Expected: " << expected_messages << ", Received: " << messages_received_.load() << "\n";
|
|
}
|
|
|
|
void reliable_client_worker(int client_id, int message_count, int message_size) {
|
|
int sock = -1;
|
|
int retry_count = 0;
|
|
constexpr int max_retries = 10;
|
|
|
|
// Retry connection with exponential backoff
|
|
while (sock == -1 && retry_count < max_retries) {
|
|
sock = create_client_socket();
|
|
if (sock == -1) {
|
|
retry_count++;
|
|
this_thread::sleep_for(chrono::milliseconds(10 << retry_count));
|
|
}
|
|
}
|
|
|
|
if (sock == -1) {
|
|
cout << "Client " << client_id << " failed to connect\n";
|
|
return;
|
|
}
|
|
|
|
// Create message with delimiter
|
|
string base_msg(message_size - 1, 'A' + (client_id % 26));
|
|
string msg = base_msg + "\n"; // Add delimiter
|
|
|
|
int sent_count = 0;
|
|
|
|
// Send messages with proper flow control
|
|
for (int i = 0; i < message_count; ++i) {
|
|
int attempts = 0;
|
|
bool sent = false;
|
|
|
|
while (!sent && attempts < 5) {
|
|
ssize_t result = send(sock, msg.c_str(), msg.length(), MSG_NOSIGNAL);
|
|
|
|
if (result == static_cast<ssize_t>(msg.length())) {
|
|
sent = true;
|
|
sent_count++;
|
|
} else if (result == -1) {
|
|
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
|
// Socket buffer full, wait longer
|
|
this_thread::sleep_for(chrono::milliseconds(1 << attempts));
|
|
attempts++;
|
|
} else {
|
|
// Connection error
|
|
break;
|
|
}
|
|
} else if (result > 0) {
|
|
// Partial send - handle remaining data
|
|
size_t remaining = msg.length() - result;
|
|
const char* remaining_data = msg.c_str() + result;
|
|
|
|
while (remaining > 0) {
|
|
ssize_t more = send(sock, remaining_data, remaining, MSG_NOSIGNAL);
|
|
if (more <= 0) break;
|
|
remaining -= more;
|
|
remaining_data += more;
|
|
}
|
|
|
|
if (remaining == 0) {
|
|
sent = true;
|
|
sent_count++;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (!sent) break;
|
|
|
|
// Flow control: small delay every batch
|
|
if ((i + 1) % 100 == 0) {
|
|
this_thread::sleep_for(chrono::microseconds(500));
|
|
}
|
|
}
|
|
|
|
// Ensure all data is sent before closing
|
|
shutdown(sock, SHUT_WR);
|
|
|
|
// Give server time to process
|
|
this_thread::sleep_for(chrono::milliseconds(50));
|
|
|
|
close(sock);
|
|
clients_finished_sending_++;
|
|
|
|
if (sent_count != message_count) {
|
|
cout << "Client " << client_id << " sent " << sent_count << "/" << message_count << " messages\n";
|
|
}
|
|
}
|
|
|
|
int create_client_socket() {
|
|
int sock = socket(AF_INET, SOCK_STREAM, 0);
|
|
if (sock == -1) return -1;
|
|
|
|
// Optimize socket
|
|
int opt = 1;
|
|
setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt));
|
|
|
|
// Larger send buffer
|
|
int buf_size = 1048576; // 1MB
|
|
setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &buf_size, sizeof(buf_size));
|
|
|
|
// Set send timeout
|
|
struct timeval timeout;
|
|
timeout.tv_sec = 5;
|
|
timeout.tv_usec = 0;
|
|
setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout));
|
|
|
|
sockaddr_in addr{};
|
|
addr.sin_family = AF_INET;
|
|
addr.sin_port = htons(8080);
|
|
inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
|
|
|
|
if (connect(sock, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) == -1) {
|
|
close(sock);
|
|
return -1;
|
|
}
|
|
|
|
return sock;
|
|
}
|
|
|
|
void print_results() {
|
|
auto end_time = chrono::steady_clock::now();
|
|
auto duration = chrono::duration_cast<chrono::milliseconds>(end_time - start_time_);
|
|
|
|
double throughput_mps = static_cast<double>(messages_received_) / duration.count() * 1000;
|
|
double throughput_mbps = static_cast<double>(bytes_received_) / (1024 * 1024) / duration.count() * 1000;
|
|
|
|
cout << "\n=== Test Results ===\n";
|
|
cout << "Duration: " << duration.count() << "ms\n";
|
|
cout << "Messages received: " << messages_received_.load() << "\n";
|
|
cout << "Bytes received: " << bytes_received_.load() << "\n";
|
|
cout << "Throughput: " << static_cast<int>(throughput_mps) << " msg/sec\n";
|
|
cout << "Throughput: " << throughput_mbps << " MB/sec\n";
|
|
cout << "Peak connections: " << peak_connections_.load() << "\n";
|
|
|
|
cout << "\n=== Memory Usage ===\n";
|
|
cout << "Baseline RSS: " << baseline_memory_.rss_kb << " KB\n";
|
|
cout << "Peak RSS: " << peak_memory_.rss_kb << " KB\n";
|
|
cout << "Memory increase: " << (peak_memory_.rss_kb - baseline_memory_.rss_kb) << " KB\n";
|
|
if (peak_memory_.vsize_kb > 0) {
|
|
cout << "Virtual memory: " << peak_memory_.vsize_kb << " KB\n";
|
|
}
|
|
cout << "Memory per connection: " << (peak_memory_.rss_kb - baseline_memory_.rss_kb) / peak_connections_.load() << " KB\n";
|
|
}
|
|
};
|
|
|
|
int main() {
|
|
SocketTest test;
|
|
test.run();
|
|
return 0;
|
|
}
|