276 lines
7.0 KiB
C++
276 lines
7.0 KiB
C++
#include "sockeye.hpp"
|
|
#include <cstring>
|
|
#include <thread>
|
|
#include <chrono>
|
|
#include <atomic>
|
|
#include <iostream>
|
|
#include <vector>
|
|
#include <string>
|
|
#include <sys/socket.h>
|
|
#include <arpa/inet.h>
|
|
#include <mutex>
|
|
#include <sys/resource.h>
|
|
#include <unordered_map>
|
|
|
|
using namespace std;
|
|
|
|
class SocketTest
|
|
{
|
|
public:
|
|
void run()
|
|
{
|
|
cout << "Starting socket tests with HTTP workload...\n";
|
|
|
|
start_server();
|
|
this_thread::sleep_for(chrono::milliseconds(100));
|
|
|
|
test_http_workload();
|
|
|
|
server_.stop();
|
|
if (server_thread_.joinable()) {
|
|
server_thread_.join();
|
|
}
|
|
|
|
print_results();
|
|
}
|
|
|
|
private:
|
|
struct MemoryInfo {
|
|
size_t rss_kb = 0;
|
|
size_t peak_rss_kb = 0;
|
|
};
|
|
|
|
MemoryInfo get_memory_info()
|
|
{
|
|
MemoryInfo info;
|
|
struct rusage usage;
|
|
if (getrusage(RUSAGE_SELF, &usage) == 0) {
|
|
#ifdef __APPLE__
|
|
// macOS reports in bytes, convert to KB
|
|
info.rss_kb = usage.ru_maxrss / 1024;
|
|
#else
|
|
// Linux reports in KB
|
|
info.rss_kb = usage.ru_maxrss;
|
|
#endif
|
|
info.peak_rss_kb = info.rss_kb;
|
|
}
|
|
return info;
|
|
}
|
|
|
|
sockeye::Socket server_{8080};
|
|
thread server_thread_;
|
|
atomic<int> peak_connections_{0};
|
|
atomic<int> current_connections_{0};
|
|
atomic<long long> requests_processed_{0};
|
|
atomic<size_t> bytes_received_{0};
|
|
chrono::steady_clock::time_point start_time_;
|
|
|
|
// Per-client request buffering
|
|
mutex request_buffer_mutex_;
|
|
unordered_map<int, string> request_buffers_;
|
|
|
|
// Memory tracking
|
|
MemoryInfo baseline_memory_;
|
|
MemoryInfo peak_memory_;
|
|
|
|
void start_server()
|
|
{
|
|
server_.on_connection([this](int fd) {
|
|
int current = ++current_connections_;
|
|
peak_connections_.store(max(peak_connections_.load(), current));
|
|
});
|
|
|
|
server_.on_data([this](int fd, const char* data, size_t len) {
|
|
bytes_received_ += len;
|
|
string request_chunk(data, len);
|
|
|
|
lock_guard<mutex> lock(request_buffer_mutex_);
|
|
request_buffers_[fd] += request_chunk;
|
|
|
|
// Process all complete requests in the buffer
|
|
while (true) {
|
|
auto& buffer = request_buffers_[fd];
|
|
size_t pos = buffer.find("\r\n\r\n");
|
|
if (pos == string::npos) {
|
|
break; // No complete request found
|
|
}
|
|
|
|
requests_processed_++;
|
|
|
|
// Remove the processed request from the buffer
|
|
buffer.erase(0, pos + 4);
|
|
|
|
// Send a standard HTTP response
|
|
string response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\nConnection: keep-alive\r\n\r\nHello, World!";
|
|
server_.send(fd, response);
|
|
}
|
|
});
|
|
|
|
server_.on_disconnect([this](int fd) {
|
|
current_connections_--;
|
|
lock_guard<mutex> lock(request_buffer_mutex_);
|
|
request_buffers_.erase(fd);
|
|
});
|
|
|
|
if (!server_.start()) {
|
|
cerr << "Failed to start server\n";
|
|
exit(1);
|
|
}
|
|
|
|
server_thread_ = thread([this]() {
|
|
server_.run();
|
|
});
|
|
}
|
|
|
|
void test_http_workload()
|
|
{
|
|
constexpr int num_clients = 100;
|
|
constexpr int requests_per_client = 1000;
|
|
constexpr long long expected_requests = num_clients * requests_per_client;
|
|
|
|
cout << "Testing HTTP workload: " << num_clients << " clients, "
|
|
<< requests_per_client << " req/client (" << expected_requests << " total)...\n";
|
|
|
|
baseline_memory_ = get_memory_info();
|
|
peak_memory_ = baseline_memory_;
|
|
|
|
requests_processed_ = 0;
|
|
bytes_received_ = 0;
|
|
current_connections_ = 0;
|
|
peak_connections_ = 0;
|
|
|
|
start_time_ = chrono::steady_clock::now();
|
|
|
|
vector<thread> clients;
|
|
clients.reserve(num_clients);
|
|
|
|
atomic<bool> monitoring{true};
|
|
thread memory_monitor([this, &monitoring]() {
|
|
while (monitoring) {
|
|
auto current = get_memory_info();
|
|
if (current.rss_kb > peak_memory_.rss_kb) {
|
|
peak_memory_ = current;
|
|
}
|
|
this_thread::sleep_for(chrono::milliseconds(10));
|
|
}
|
|
});
|
|
|
|
for (int i = 0; i < num_clients; ++i) {
|
|
clients.emplace_back([this, requests_per_client]() {
|
|
http_client_worker(requests_per_client);
|
|
});
|
|
}
|
|
|
|
for (auto& t : clients) {
|
|
t.join();
|
|
}
|
|
|
|
// Wait for server to process all requests
|
|
auto deadline = chrono::steady_clock::now() + chrono::seconds(10);
|
|
while (requests_processed_.load() < expected_requests && chrono::steady_clock::now() < deadline) {
|
|
this_thread::sleep_for(chrono::milliseconds(50));
|
|
}
|
|
|
|
monitoring = false;
|
|
memory_monitor.join();
|
|
|
|
cout << "Expected: " << expected_requests << ", Processed: " << requests_processed_.load() << "\n";
|
|
}
|
|
|
|
void http_client_worker(int request_count)
|
|
{
|
|
int sock = create_client_socket();
|
|
if (sock == -1) return;
|
|
|
|
string request = "GET / HTTP/1.1\r\nHost: localhost\r\n\r\n";
|
|
char response_buf[1024];
|
|
|
|
for (int i = 0; i < request_count; ++i) {
|
|
if (::send(sock, request.c_str(), request.length(), 0) < 0) {
|
|
break; // Send failed
|
|
}
|
|
|
|
// Read response (basic implementation for testing)
|
|
ssize_t bytes_read = 0;
|
|
bool response_complete = false;
|
|
while (!response_complete) {
|
|
ssize_t result = ::recv(sock, response_buf, sizeof(response_buf) - 1, 0);
|
|
if (result > 0) {
|
|
bytes_read += result;
|
|
response_buf[result] = '\0';
|
|
// Simple check for end of our known response
|
|
if (strstr(response_buf, "Hello, World!")) {
|
|
response_complete = true;
|
|
}
|
|
} else {
|
|
break; // Connection closed or error
|
|
}
|
|
}
|
|
if (!response_complete) break;
|
|
}
|
|
|
|
close(sock);
|
|
}
|
|
|
|
int create_client_socket()
|
|
{
|
|
int sock = socket(AF_INET, SOCK_STREAM, 0);
|
|
if (sock == -1) return -1;
|
|
|
|
int opt = 1;
|
|
setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt));
|
|
|
|
struct timeval timeout;
|
|
timeout.tv_sec = 5;
|
|
timeout.tv_usec = 0;
|
|
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout));
|
|
setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout));
|
|
|
|
sockaddr_in addr{};
|
|
addr.sin_family = AF_INET;
|
|
addr.sin_port = htons(8080);
|
|
inet_pton(AF_INET, "127.0.0.1", &addr.sin_addr);
|
|
|
|
if (connect(sock, reinterpret_cast<sockaddr*>(&addr), sizeof(addr)) == -1) {
|
|
close(sock);
|
|
return -1;
|
|
}
|
|
|
|
return sock;
|
|
}
|
|
|
|
void print_results()
|
|
{
|
|
auto end_time = chrono::steady_clock::now();
|
|
auto duration_ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time_).count();
|
|
if (duration_ms == 0) duration_ms = 1;
|
|
|
|
long long final_requests = requests_processed_.load();
|
|
double duration_s = static_cast<double>(duration_ms) / 1000.0;
|
|
double rps = static_cast<double>(final_requests) / duration_s;
|
|
double throughput_mbps = static_cast<double>(bytes_received_) / (1024 * 1024) / duration_s;
|
|
|
|
cout << "\n=== Test Results ===\n";
|
|
cout << "Duration: " << duration_ms << " ms\n";
|
|
cout << "Requests processed: " << final_requests << "\n";
|
|
cout << "Throughput: " << static_cast<int>(rps) << " req/sec\n";
|
|
cout << "Data Rate (RX): " << throughput_mbps << " MB/sec\n";
|
|
cout << "Peak connections: " << peak_connections_.load() << "\n";
|
|
|
|
cout << "\n=== Memory Usage ===\n";
|
|
cout << "Baseline RSS: " << baseline_memory_.rss_kb << " KB\n";
|
|
cout << "Peak RSS: " << peak_memory_.rss_kb << " KB\n";
|
|
cout << "Memory increase: " << (peak_memory_.rss_kb - baseline_memory_.rss_kb) << " KB\n";
|
|
if (peak_connections_.load() > 0) {
|
|
cout << "Memory per connection: " << static_cast<double>(peak_memory_.rss_kb - baseline_memory_.rss_kb) / peak_connections_.load() << " KB\n";
|
|
}
|
|
}
|
|
};
|
|
|
|
int main()
|
|
{
|
|
SocketTest test;
|
|
test.run();
|
|
return 0;
|
|
}
|