have sockeye handle clients internally, rewrite test to be more http
related
This commit is contained in:
parent
2e1793998b
commit
988872babd
180
README.md
180
README.md
@ -1,6 +1,6 @@
|
|||||||
# Sockeye
|
# Sockeye
|
||||||
|
|
||||||
An easy-to-use, fast socket C++ library! Uses epoll on Linux, kqueue on BSD/macOS.
|
An easy-to-use, fast, and robust C++ epoll socket library.
|
||||||
|
|
||||||
## API Reference
|
## API Reference
|
||||||
|
|
||||||
@ -9,29 +9,35 @@ An easy-to-use, fast socket C++ library! Uses epoll on Linux, kqueue on BSD/macO
|
|||||||
Main server class for handling TCP connections.
|
Main server class for handling TCP connections.
|
||||||
|
|
||||||
#### Constructor
|
#### Constructor
|
||||||
|
|
||||||
```cpp
|
```cpp
|
||||||
explicit Socket(uint16_t port = 8080)
|
explicit Socket(uint16_t port = 8080, int timeout_ms = 5000)
|
||||||
```
|
```
|
||||||
|
|
||||||
|
Constructs a server instance. `timeout_ms` is the duration of inactivity in milliseconds before a client connection is automatically closed.
|
||||||
|
|
||||||
#### Methods
|
#### Methods
|
||||||
|
|
||||||
**`bool start()`**
|
**`bool start()`**
|
||||||
Initialize the server socket and event system. Returns `true` on success.
|
Initialize the server socket and event system. Returns `true` on success.
|
||||||
|
|
||||||
**`void run()`**
|
**`void run()`**
|
||||||
Start the event loop. Blocks until `stop()` is called.
|
Start the event loop. This is a blocking call that runs until `stop()` is called.
|
||||||
|
|
||||||
**`void stop()`**
|
**`void stop()`**
|
||||||
Stop the server and exit the event loop.
|
Stops the server and causes the `run()` loop to exit.
|
||||||
|
|
||||||
|
**`bool send(int client_fd, const std::string& data)`**
|
||||||
|
Sends data to a connected client. This method handles partial sends and ensures all data is written. Returns `true` on success.
|
||||||
|
|
||||||
**`void on_connection(ConnectionHandler handler)`**
|
**`void on_connection(ConnectionHandler handler)`**
|
||||||
Set callback for new client connections.
|
Set a callback to be executed when a new client connects.
|
||||||
|
|
||||||
**`void on_data(DataHandler handler)`**
|
**`void on_data(DataHandler handler)`**
|
||||||
Set callback for incoming data from clients.
|
Set a callback to be executed when data is received from a client.
|
||||||
|
|
||||||
**`void on_disconnect(DisconnectHandler handler)`**
|
**`void on_disconnect(DisconnectHandler handler)`**
|
||||||
Set callback for client disconnections.
|
Set a callback to be executed when a client disconnects for any reason (including timeout).
|
||||||
|
|
||||||
#### Handler Types
|
#### Handler Types
|
||||||
|
|
||||||
@ -45,38 +51,7 @@ using DisconnectHandler = std::function<void(int client_fd)>;
|
|||||||
|
|
||||||
### Basic Echo Server
|
### Basic Echo Server
|
||||||
|
|
||||||
```cpp
|
This example demonstrates how to echo all received data back to the client. It captures the `server` object to use the integrated `send` method.
|
||||||
#include "sockeye.hpp"
|
|
||||||
#include <iostream>
|
|
||||||
#include <cstring>
|
|
||||||
|
|
||||||
int main() {
|
|
||||||
sockeye::Socket server(8080);
|
|
||||||
|
|
||||||
server.on_connection([](int client_fd) {
|
|
||||||
std::cout << "Client connected: " << client_fd << std::endl;
|
|
||||||
});
|
|
||||||
|
|
||||||
server.on_data([](int client_fd, const char* data, size_t len) {
|
|
||||||
// Echo data back to client
|
|
||||||
send(client_fd, data, len, 0);
|
|
||||||
});
|
|
||||||
|
|
||||||
server.on_disconnect([](int client_fd) {
|
|
||||||
std::cout << "Client disconnected: " << client_fd << std::endl;
|
|
||||||
});
|
|
||||||
|
|
||||||
if (!server.start()) {
|
|
||||||
std::cerr << "Failed to start server" << std::endl;
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
server.run();
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
### HTTP-like Server
|
|
||||||
|
|
||||||
```cpp
|
```cpp
|
||||||
#include "sockeye.hpp"
|
#include "sockeye.hpp"
|
||||||
@ -84,61 +59,124 @@ int main() {
|
|||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
int main() {
|
int main() {
|
||||||
sockeye::Socket server(8080);
|
sockeye::Socket server(8080);
|
||||||
|
|
||||||
server.on_data([](int client_fd, const char* data, size_t len) {
|
server.on_connection([](int client_fd) {
|
||||||
std::string request(data, len);
|
std::cout << "Client connected: " << client_fd << std::endl;
|
||||||
|
});
|
||||||
|
|
||||||
// Simple HTTP response
|
// Capture server to use its send method
|
||||||
std::string response =
|
server.on_data([&server](int client_fd, const char* data, size_t len) {
|
||||||
"HTTP/1.1 200 OK\r\n"
|
// Echo data back to client using the server's send method
|
||||||
"Content-Length: 13\r\n"
|
server.send(client_fd, std::string(data, len));
|
||||||
"Connection: close\r\n\r\n"
|
});
|
||||||
"Hello, World!";
|
|
||||||
|
|
||||||
send(client_fd, response.c_str(), response.length(), 0);
|
server.on_disconnect([](int client_fd) {
|
||||||
close(client_fd);
|
std::cout << "Client disconnected: " << client_fd << std::endl;
|
||||||
});
|
});
|
||||||
|
|
||||||
if (!server.start()) {
|
if (!server.start()) {
|
||||||
std::cerr << "Failed to start server" << std::endl;
|
std::cerr << "Failed to start server" << std::endl;
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::cout << "Server listening on port 8080" << std::endl;
|
server.run();
|
||||||
server.run();
|
return 0;
|
||||||
return 0;
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### HTTP Server with Keep-Alive
|
||||||
|
|
||||||
|
This example shows a simple HTTP server that properly handles keep-alive connections. The library automatically manages client timeouts.
|
||||||
|
|
||||||
|
```cpp
|
||||||
|
#include "sockeye.hpp"
|
||||||
|
#include <iostream>
|
||||||
|
#include <string>
|
||||||
|
#include <unordered_map>
|
||||||
|
#include <mutex>
|
||||||
|
|
||||||
|
int main() {
|
||||||
|
sockeye::Socket server(8080);
|
||||||
|
|
||||||
|
// Buffers for accumulating request data per client
|
||||||
|
std::unordered_map<int, std::string> request_buffers;
|
||||||
|
std::mutex buffer_mutex;
|
||||||
|
|
||||||
|
server.on_data([&](int client_fd, const char* data, size_t len) {
|
||||||
|
std::string request_chunk(data, len);
|
||||||
|
|
||||||
|
std::lock_guard<std::mutex> lock(buffer_mutex);
|
||||||
|
request_buffers[client_fd] += request_chunk;
|
||||||
|
|
||||||
|
// Check if we have a full HTTP request
|
||||||
|
if (request_buffers[client_fd].find("\r\n\r\n") != std::string::npos) {
|
||||||
|
std::string response =
|
||||||
|
"HTTP/1.1 200 OK\r\n"
|
||||||
|
"Content-Length: 13\r\n"
|
||||||
|
"Connection: keep-alive\r\n\r\n"
|
||||||
|
"Hello, World!";
|
||||||
|
|
||||||
|
server.send(client_fd, response);
|
||||||
|
|
||||||
|
// Clear buffer for this client for the next request
|
||||||
|
request_buffers[client_fd].clear();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
server.on_disconnect([&](int client_fd) {
|
||||||
|
std::cout << "Client disconnected: " << client_fd << std::endl;
|
||||||
|
std::lock_guard<std::mutex> lock(buffer_mutex);
|
||||||
|
request_buffers.erase(client_fd);
|
||||||
|
});
|
||||||
|
|
||||||
|
if (!server.start()) {
|
||||||
|
std::cerr << "Failed to start server" << std::endl;
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::cout << "Server listening on port 8080" << std::endl;
|
||||||
|
server.run();
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
### Graceful Shutdown
|
### Graceful Shutdown
|
||||||
|
|
||||||
|
This example shows how to catch `SIGINT` (Ctrl+C) and `SIGTERM` signals to shut down the server gracefully.
|
||||||
|
|
||||||
```cpp
|
```cpp
|
||||||
#include "sockeye.hpp"
|
#include "sockeye.hpp"
|
||||||
#include <signal.h>
|
#include <signal.h>
|
||||||
|
#include <iostream>
|
||||||
|
|
||||||
sockeye::Socket* server_ptr = nullptr;
|
sockeye::Socket* server_ptr = nullptr;
|
||||||
|
|
||||||
void signal_handler(int signal) {
|
void signal_handler(int signal) {
|
||||||
if (server_ptr) {
|
if (server_ptr) {
|
||||||
server_ptr->stop();
|
std::cout << "\nCaught signal " << signal << ", stopping server..." << std::endl;
|
||||||
}
|
server_ptr->stop();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int main() {
|
int main() {
|
||||||
sockeye::Socket server(8080);
|
sockeye::Socket server(8080);
|
||||||
server_ptr = &server;
|
server_ptr = &server;
|
||||||
|
|
||||||
signal(SIGINT, signal_handler);
|
signal(SIGINT, signal_handler);
|
||||||
signal(SIGTERM, signal_handler);
|
signal(SIGTERM, signal_handler);
|
||||||
|
|
||||||
// Set up handlers...
|
// Set up handlers...
|
||||||
|
server.on_connection([](int fd){ /* ... */ });
|
||||||
|
|
||||||
if (!server.start()) {
|
if (!server.start()) {
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
server.run();
|
std::cout << "Server started. Press Ctrl+C to exit." << std::endl;
|
||||||
return 0;
|
server.run();
|
||||||
|
std::cout << "Server stopped." << std::endl;
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
90
sockeye.hpp
90
sockeye.hpp
@ -9,6 +9,10 @@
|
|||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <array>
|
#include <array>
|
||||||
#include <functional>
|
#include <functional>
|
||||||
|
#include <chrono>
|
||||||
|
#include <unordered_map>
|
||||||
|
#include <vector>
|
||||||
|
#include <string> // Added for std::string
|
||||||
|
|
||||||
namespace sockeye {
|
namespace sockeye {
|
||||||
|
|
||||||
@ -19,7 +23,8 @@ public:
|
|||||||
using DataHandler = std::function<void(int client_fd, const char* data, size_t len)>;
|
using DataHandler = std::function<void(int client_fd, const char* data, size_t len)>;
|
||||||
using DisconnectHandler = std::function<void(int client_fd)>;
|
using DisconnectHandler = std::function<void(int client_fd)>;
|
||||||
|
|
||||||
explicit Socket(uint16_t port = 8080) : port_(port) {}
|
explicit Socket(uint16_t port = 8080, int timeout_ms = 5000)
|
||||||
|
: port_(port), timeout_ms_(timeout_ms) {}
|
||||||
|
|
||||||
~Socket()
|
~Socket()
|
||||||
{
|
{
|
||||||
@ -52,24 +57,32 @@ public:
|
|||||||
handle_client_data(events[i].data.fd);
|
handle_client_data(events[i].data.fd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
check_timeouts();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void stop() { running_ = false; }
|
void stop() { running_ = false; }
|
||||||
|
|
||||||
bool add_client(int client_fd)
|
// New: Send data to a client
|
||||||
{
|
bool send(int client_fd, const std::string& data) {
|
||||||
epoll_event event{};
|
ssize_t total_sent = 0;
|
||||||
event.events = EPOLLIN | EPOLLET;
|
const char* p_data = data.c_str();
|
||||||
event.data.fd = client_fd;
|
size_t len = data.length();
|
||||||
return epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, client_fd, &event) != -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
void remove_client(int client_fd)
|
while (total_sent < static_cast<ssize_t>(len)) {
|
||||||
{
|
ssize_t sent = ::send(client_fd, p_data + total_sent, len - total_sent, MSG_NOSIGNAL);
|
||||||
epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, client_fd, nullptr);
|
if (sent == -1) {
|
||||||
close(client_fd);
|
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||||
if (on_disconnect_) on_disconnect_(client_fd);
|
// Can't send right now, handle as an error for simplicity in this context
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
// Other error
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
total_sent += sent;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
void on_connection(ConnectionHandler handler) { on_connection_ = std::move(handler); }
|
void on_connection(ConnectionHandler handler) { on_connection_ = std::move(handler); }
|
||||||
@ -77,14 +90,22 @@ public:
|
|||||||
void on_disconnect(DisconnectHandler handler) { on_disconnect_ = std::move(handler); }
|
void on_disconnect(DisconnectHandler handler) { on_disconnect_ = std::move(handler); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
struct Client {
|
||||||
|
int fd;
|
||||||
|
std::chrono::steady_clock::time_point last_activity;
|
||||||
|
};
|
||||||
|
|
||||||
static constexpr int MAX_EVENTS = 1024;
|
static constexpr int MAX_EVENTS = 1024;
|
||||||
static constexpr int BUFFER_SIZE = 8192;
|
static constexpr int BUFFER_SIZE = 8192;
|
||||||
|
|
||||||
uint16_t port_;
|
uint16_t port_;
|
||||||
|
int timeout_ms_;
|
||||||
int server_fd_ = -1;
|
int server_fd_ = -1;
|
||||||
int epoll_fd_ = -1;
|
int epoll_fd_ = -1;
|
||||||
bool running_ = true;
|
bool running_ = true;
|
||||||
|
|
||||||
|
std::unordered_map<int, Client> clients_;
|
||||||
|
|
||||||
ConnectionHandler on_connection_;
|
ConnectionHandler on_connection_;
|
||||||
DataHandler on_data_;
|
DataHandler on_data_;
|
||||||
DisconnectHandler on_disconnect_;
|
DisconnectHandler on_disconnect_;
|
||||||
@ -124,6 +145,26 @@ private:
|
|||||||
return epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, server_fd_, &event) != -1;
|
return epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, server_fd_, &event) != -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool add_client(int client_fd)
|
||||||
|
{
|
||||||
|
epoll_event event{};
|
||||||
|
event.events = EPOLLIN | EPOLLET;
|
||||||
|
event.data.fd = client_fd;
|
||||||
|
if (epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, client_fd, &event) == -1) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
clients_[client_fd] = {client_fd, std::chrono::steady_clock::now()};
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void remove_client(int client_fd)
|
||||||
|
{
|
||||||
|
epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, client_fd, nullptr);
|
||||||
|
close(client_fd);
|
||||||
|
clients_.erase(client_fd);
|
||||||
|
if (on_disconnect_) on_disconnect_(client_fd);
|
||||||
|
}
|
||||||
|
|
||||||
inline void accept_connections()
|
inline void accept_connections()
|
||||||
{
|
{
|
||||||
while (true) {
|
while (true) {
|
||||||
@ -153,6 +194,11 @@ private:
|
|||||||
|
|
||||||
inline void handle_client_data(int client_fd)
|
inline void handle_client_data(int client_fd)
|
||||||
{
|
{
|
||||||
|
auto it = clients_.find(client_fd);
|
||||||
|
if (it != clients_.end()) {
|
||||||
|
it->second.last_activity = std::chrono::steady_clock::now();
|
||||||
|
}
|
||||||
|
|
||||||
if (!on_data_) return;
|
if (!on_data_) return;
|
||||||
|
|
||||||
char buffer[BUFFER_SIZE];
|
char buffer[BUFFER_SIZE];
|
||||||
@ -170,6 +216,24 @@ private:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void check_timeouts() {
|
||||||
|
if (timeout_ms_ <= 0) return;
|
||||||
|
|
||||||
|
auto now = std::chrono::steady_clock::now();
|
||||||
|
std::vector<int> timed_out_clients;
|
||||||
|
|
||||||
|
for (const auto& pair : clients_) {
|
||||||
|
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(now - pair.second.last_activity);
|
||||||
|
if (duration.count() > timeout_ms_) {
|
||||||
|
timed_out_clients.push_back(pair.first);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int client_fd : timed_out_clients) {
|
||||||
|
remove_client(client_fd);
|
||||||
|
}
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace sockeye
|
} // namespace sockeye
|
||||||
|
282
test.cpp
282
test.cpp
@ -1,4 +1,5 @@
|
|||||||
#include "sockeye.hpp"
|
#include "sockeye.hpp"
|
||||||
|
#include <cstring>
|
||||||
#include <thread>
|
#include <thread>
|
||||||
#include <chrono>
|
#include <chrono>
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
@ -9,19 +10,21 @@
|
|||||||
#include <arpa/inet.h>
|
#include <arpa/inet.h>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <sys/resource.h>
|
#include <sys/resource.h>
|
||||||
|
#include <unordered_map>
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
class SocketTest {
|
class SocketTest
|
||||||
|
{
|
||||||
public:
|
public:
|
||||||
void run() {
|
void run()
|
||||||
cout << "Starting socket tests...\n";
|
{
|
||||||
|
cout << "Starting socket tests with HTTP workload...\n";
|
||||||
|
|
||||||
start_server();
|
start_server();
|
||||||
this_thread::sleep_for(chrono::milliseconds(100));
|
this_thread::sleep_for(chrono::milliseconds(100));
|
||||||
|
|
||||||
test_basic_functionality();
|
test_http_workload();
|
||||||
test_throughput();
|
|
||||||
|
|
||||||
server_.stop();
|
server_.stop();
|
||||||
if (server_thread_.joinable()) {
|
if (server_thread_.joinable()) {
|
||||||
@ -35,24 +38,22 @@ private:
|
|||||||
struct MemoryInfo {
|
struct MemoryInfo {
|
||||||
size_t rss_kb = 0;
|
size_t rss_kb = 0;
|
||||||
size_t peak_rss_kb = 0;
|
size_t peak_rss_kb = 0;
|
||||||
size_t vsize_kb = 0;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
MemoryInfo get_memory_info() {
|
MemoryInfo get_memory_info()
|
||||||
|
{
|
||||||
MemoryInfo info;
|
MemoryInfo info;
|
||||||
|
|
||||||
struct rusage usage;
|
struct rusage usage;
|
||||||
if (getrusage(RUSAGE_SELF, &usage) == 0) {
|
if (getrusage(RUSAGE_SELF, &usage) == 0) {
|
||||||
info.rss_kb = usage.ru_maxrss;
|
|
||||||
info.peak_rss_kb = usage.ru_maxrss;
|
|
||||||
|
|
||||||
#ifdef __APPLE__
|
#ifdef __APPLE__
|
||||||
// macOS reports in bytes, convert to KB
|
// macOS reports in bytes, convert to KB
|
||||||
info.rss_kb /= 1024;
|
info.rss_kb = usage.ru_maxrss / 1024;
|
||||||
info.peak_rss_kb /= 1024;
|
#else
|
||||||
|
// Linux reports in KB
|
||||||
|
info.rss_kb = usage.ru_maxrss;
|
||||||
#endif
|
#endif
|
||||||
|
info.peak_rss_kb = info.rss_kb;
|
||||||
}
|
}
|
||||||
|
|
||||||
return info;
|
return info;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -60,38 +61,55 @@ private:
|
|||||||
thread server_thread_;
|
thread server_thread_;
|
||||||
atomic<int> peak_connections_{0};
|
atomic<int> peak_connections_{0};
|
||||||
atomic<int> current_connections_{0};
|
atomic<int> current_connections_{0};
|
||||||
atomic<int> messages_received_{0};
|
atomic<long long> requests_processed_{0};
|
||||||
atomic<size_t> bytes_received_{0};
|
atomic<size_t> bytes_received_{0};
|
||||||
chrono::steady_clock::time_point start_time_;
|
chrono::steady_clock::time_point start_time_;
|
||||||
|
|
||||||
|
// Per-client request buffering
|
||||||
|
mutex request_buffer_mutex_;
|
||||||
|
unordered_map<int, string> request_buffers_;
|
||||||
|
|
||||||
// Memory tracking
|
// Memory tracking
|
||||||
MemoryInfo baseline_memory_;
|
MemoryInfo baseline_memory_;
|
||||||
MemoryInfo peak_memory_;
|
MemoryInfo peak_memory_;
|
||||||
|
|
||||||
// Synchronization for reliable testing
|
void start_server()
|
||||||
atomic<int> clients_connected_{0};
|
{
|
||||||
atomic<int> clients_finished_sending_{0};
|
|
||||||
mutex stats_mutex_;
|
|
||||||
|
|
||||||
void start_server() {
|
|
||||||
server_.on_connection([this](int fd) {
|
server_.on_connection([this](int fd) {
|
||||||
int current = ++current_connections_;
|
int current = ++current_connections_;
|
||||||
peak_connections_.store(max(peak_connections_.load(), current));
|
peak_connections_.store(max(peak_connections_.load(), current));
|
||||||
clients_connected_++;
|
|
||||||
});
|
});
|
||||||
|
|
||||||
server_.on_data([this](int fd, const char* data, size_t len) {
|
server_.on_data([this](int fd, const char* data, size_t len) {
|
||||||
// Count messages by looking for our delimiter
|
|
||||||
for (size_t i = 0; i < len; ++i) {
|
|
||||||
if (data[i] == '\n') {
|
|
||||||
messages_received_++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
bytes_received_ += len;
|
bytes_received_ += len;
|
||||||
|
string request_chunk(data, len);
|
||||||
|
|
||||||
|
lock_guard<mutex> lock(request_buffer_mutex_);
|
||||||
|
request_buffers_[fd] += request_chunk;
|
||||||
|
|
||||||
|
// Process all complete requests in the buffer
|
||||||
|
while (true) {
|
||||||
|
auto& buffer = request_buffers_[fd];
|
||||||
|
size_t pos = buffer.find("\r\n\r\n");
|
||||||
|
if (pos == string::npos) {
|
||||||
|
break; // No complete request found
|
||||||
|
}
|
||||||
|
|
||||||
|
requests_processed_++;
|
||||||
|
|
||||||
|
// Remove the processed request from the buffer
|
||||||
|
buffer.erase(0, pos + 4);
|
||||||
|
|
||||||
|
// Send a standard HTTP response
|
||||||
|
string response = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\nConnection: keep-alive\r\n\r\nHello, World!";
|
||||||
|
server_.send(fd, response);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
server_.on_disconnect([this](int fd) {
|
server_.on_disconnect([this](int fd) {
|
||||||
current_connections_--;
|
current_connections_--;
|
||||||
|
lock_guard<mutex> lock(request_buffer_mutex_);
|
||||||
|
request_buffers_.erase(fd);
|
||||||
});
|
});
|
||||||
|
|
||||||
if (!server_.start()) {
|
if (!server_.start()) {
|
||||||
@ -104,48 +122,28 @@ private:
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
void test_basic_functionality() {
|
void test_http_workload()
|
||||||
cout << "Testing basic functionality...\n";
|
{
|
||||||
|
|
||||||
int sock = create_client_socket();
|
|
||||||
if (sock == -1) {
|
|
||||||
cerr << "Failed to create client socket\n";
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
string msg = "Hello, server!\n";
|
|
||||||
send(sock, msg.c_str(), msg.length(), 0);
|
|
||||||
|
|
||||||
this_thread::sleep_for(chrono::milliseconds(50));
|
|
||||||
close(sock);
|
|
||||||
|
|
||||||
cout << "Basic test completed\n";
|
|
||||||
}
|
|
||||||
|
|
||||||
void test_throughput() {
|
|
||||||
constexpr int num_clients = 100;
|
constexpr int num_clients = 100;
|
||||||
constexpr int messages_per_client = 500;
|
constexpr int requests_per_client = 1000;
|
||||||
constexpr int message_size = 1024;
|
constexpr long long expected_requests = num_clients * requests_per_client;
|
||||||
constexpr int expected_messages = num_clients * messages_per_client;
|
|
||||||
|
|
||||||
cout << "Testing throughput: " << expected_messages << " messages...\n";
|
cout << "Testing HTTP workload: " << num_clients << " clients, "
|
||||||
|
<< requests_per_client << " req/client (" << expected_requests << " total)...\n";
|
||||||
|
|
||||||
// Capture baseline memory
|
|
||||||
baseline_memory_ = get_memory_info();
|
baseline_memory_ = get_memory_info();
|
||||||
peak_memory_ = baseline_memory_;
|
peak_memory_ = baseline_memory_;
|
||||||
|
|
||||||
// Reset counters
|
requests_processed_ = 0;
|
||||||
messages_received_ = 0;
|
|
||||||
bytes_received_ = 0;
|
bytes_received_ = 0;
|
||||||
clients_connected_ = 0;
|
current_connections_ = 0;
|
||||||
clients_finished_sending_ = 0;
|
peak_connections_ = 0;
|
||||||
|
|
||||||
start_time_ = chrono::steady_clock::now();
|
start_time_ = chrono::steady_clock::now();
|
||||||
|
|
||||||
vector<thread> clients;
|
vector<thread> clients;
|
||||||
clients.reserve(num_clients);
|
clients.reserve(num_clients);
|
||||||
|
|
||||||
// Start memory monitoring thread
|
|
||||||
atomic<bool> monitoring{true};
|
atomic<bool> monitoring{true};
|
||||||
thread memory_monitor([this, &monitoring]() {
|
thread memory_monitor([this, &monitoring]() {
|
||||||
while (monitoring) {
|
while (monitoring) {
|
||||||
@ -158,149 +156,74 @@ private:
|
|||||||
});
|
});
|
||||||
|
|
||||||
for (int i = 0; i < num_clients; ++i) {
|
for (int i = 0; i < num_clients; ++i) {
|
||||||
clients.emplace_back([this, i, messages_per_client, message_size]() {
|
clients.emplace_back([this, requests_per_client]() {
|
||||||
reliable_client_worker(i, messages_per_client, message_size);
|
http_client_worker(requests_per_client);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for all clients to connect
|
|
||||||
while (clients_connected_.load() < num_clients) {
|
|
||||||
this_thread::sleep_for(chrono::milliseconds(10));
|
|
||||||
}
|
|
||||||
cout << "All clients connected\n";
|
|
||||||
|
|
||||||
// Wait for all clients to finish
|
|
||||||
for (auto& t : clients) {
|
for (auto& t : clients) {
|
||||||
t.join();
|
t.join();
|
||||||
}
|
}
|
||||||
cout << "All clients finished sending\n";
|
|
||||||
|
|
||||||
// Wait for server to process all data with timeout
|
// Wait for server to process all requests
|
||||||
auto deadline = chrono::steady_clock::now() + chrono::seconds(5);
|
auto deadline = chrono::steady_clock::now() + chrono::seconds(10);
|
||||||
auto last_count = messages_received_.load();
|
while (requests_processed_.load() < expected_requests && chrono::steady_clock::now() < deadline) {
|
||||||
int stable_iterations = 0;
|
|
||||||
|
|
||||||
while (chrono::steady_clock::now() < deadline) {
|
|
||||||
this_thread::sleep_for(chrono::milliseconds(50));
|
this_thread::sleep_for(chrono::milliseconds(50));
|
||||||
auto current_count = messages_received_.load();
|
|
||||||
|
|
||||||
if (current_count == last_count) {
|
|
||||||
stable_iterations++;
|
|
||||||
if (stable_iterations >= 5) break; // 250ms of stability
|
|
||||||
} else {
|
|
||||||
stable_iterations = 0;
|
|
||||||
last_count = current_count;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
monitoring = false;
|
monitoring = false;
|
||||||
memory_monitor.join();
|
memory_monitor.join();
|
||||||
|
|
||||||
cout << "Expected: " << expected_messages << ", Received: " << messages_received_.load() << "\n";
|
cout << "Expected: " << expected_requests << ", Processed: " << requests_processed_.load() << "\n";
|
||||||
}
|
}
|
||||||
|
|
||||||
void reliable_client_worker(int client_id, int message_count, int message_size) {
|
void http_client_worker(int request_count)
|
||||||
int sock = -1;
|
{
|
||||||
int retry_count = 0;
|
int sock = create_client_socket();
|
||||||
constexpr int max_retries = 10;
|
if (sock == -1) return;
|
||||||
|
|
||||||
// Retry connection with exponential backoff
|
string request = "GET / HTTP/1.1\r\nHost: localhost\r\n\r\n";
|
||||||
while (sock == -1 && retry_count < max_retries) {
|
char response_buf[1024];
|
||||||
sock = create_client_socket();
|
|
||||||
if (sock == -1) {
|
for (int i = 0; i < request_count; ++i) {
|
||||||
retry_count++;
|
if (::send(sock, request.c_str(), request.length(), 0) < 0) {
|
||||||
this_thread::sleep_for(chrono::milliseconds(10 << retry_count));
|
break; // Send failed
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (sock == -1) {
|
// Read response (basic implementation for testing)
|
||||||
cout << "Client " << client_id << " failed to connect\n";
|
ssize_t bytes_read = 0;
|
||||||
return;
|
bool response_complete = false;
|
||||||
}
|
while (!response_complete) {
|
||||||
|
ssize_t result = ::recv(sock, response_buf, sizeof(response_buf) - 1, 0);
|
||||||
// Create message with delimiter
|
if (result > 0) {
|
||||||
string base_msg(message_size - 1, 'A' + (client_id % 26));
|
bytes_read += result;
|
||||||
string msg = base_msg + "\n"; // Add delimiter
|
response_buf[result] = '\0';
|
||||||
|
// Simple check for end of our known response
|
||||||
int sent_count = 0;
|
if (strstr(response_buf, "Hello, World!")) {
|
||||||
|
response_complete = true;
|
||||||
// Send messages with proper flow control
|
|
||||||
for (int i = 0; i < message_count; ++i) {
|
|
||||||
int attempts = 0;
|
|
||||||
bool sent = false;
|
|
||||||
|
|
||||||
while (!sent && attempts < 5) {
|
|
||||||
ssize_t result = send(sock, msg.c_str(), msg.length(), MSG_NOSIGNAL);
|
|
||||||
|
|
||||||
if (result == static_cast<ssize_t>(msg.length())) {
|
|
||||||
sent = true;
|
|
||||||
sent_count++;
|
|
||||||
} else if (result == -1) {
|
|
||||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
|
||||||
// Socket buffer full, wait longer
|
|
||||||
this_thread::sleep_for(chrono::milliseconds(1 << attempts));
|
|
||||||
attempts++;
|
|
||||||
} else {
|
|
||||||
// Connection error
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
} else if (result > 0) {
|
|
||||||
// Partial send - handle remaining data
|
|
||||||
size_t remaining = msg.length() - result;
|
|
||||||
const char* remaining_data = msg.c_str() + result;
|
|
||||||
|
|
||||||
while (remaining > 0) {
|
|
||||||
ssize_t more = send(sock, remaining_data, remaining, MSG_NOSIGNAL);
|
|
||||||
if (more <= 0) break;
|
|
||||||
remaining -= more;
|
|
||||||
remaining_data += more;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (remaining == 0) {
|
|
||||||
sent = true;
|
|
||||||
sent_count++;
|
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
break; // Connection closed or error
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (!response_complete) break;
|
||||||
if (!sent) break;
|
|
||||||
|
|
||||||
// Flow control: small delay every batch
|
|
||||||
if ((i + 1) % 100 == 0) {
|
|
||||||
this_thread::sleep_for(chrono::microseconds(500));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure all data is sent before closing
|
|
||||||
shutdown(sock, SHUT_WR);
|
|
||||||
|
|
||||||
// Give server time to process
|
|
||||||
this_thread::sleep_for(chrono::milliseconds(50));
|
|
||||||
|
|
||||||
close(sock);
|
close(sock);
|
||||||
clients_finished_sending_++;
|
|
||||||
|
|
||||||
if (sent_count != message_count) {
|
|
||||||
cout << "Client " << client_id << " sent " << sent_count << "/" << message_count << " messages\n";
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int create_client_socket() {
|
int create_client_socket()
|
||||||
|
{
|
||||||
int sock = socket(AF_INET, SOCK_STREAM, 0);
|
int sock = socket(AF_INET, SOCK_STREAM, 0);
|
||||||
if (sock == -1) return -1;
|
if (sock == -1) return -1;
|
||||||
|
|
||||||
// Optimize socket
|
|
||||||
int opt = 1;
|
int opt = 1;
|
||||||
setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt));
|
setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt));
|
||||||
|
|
||||||
// Larger send buffer
|
|
||||||
int buf_size = 1048576; // 1MB
|
|
||||||
setsockopt(sock, SOL_SOCKET, SO_SNDBUF, &buf_size, sizeof(buf_size));
|
|
||||||
|
|
||||||
// Set send timeout
|
|
||||||
struct timeval timeout;
|
struct timeval timeout;
|
||||||
timeout.tv_sec = 5;
|
timeout.tv_sec = 5;
|
||||||
timeout.tv_usec = 0;
|
timeout.tv_usec = 0;
|
||||||
|
setsockopt(sock, SOL_SOCKET, SO_RCVTIMEO, &timeout, sizeof(timeout));
|
||||||
setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout));
|
setsockopt(sock, SOL_SOCKET, SO_SNDTIMEO, &timeout, sizeof(timeout));
|
||||||
|
|
||||||
sockaddr_in addr{};
|
sockaddr_in addr{};
|
||||||
@ -316,33 +239,36 @@ private:
|
|||||||
return sock;
|
return sock;
|
||||||
}
|
}
|
||||||
|
|
||||||
void print_results() {
|
void print_results()
|
||||||
|
{
|
||||||
auto end_time = chrono::steady_clock::now();
|
auto end_time = chrono::steady_clock::now();
|
||||||
auto duration = chrono::duration_cast<chrono::milliseconds>(end_time - start_time_);
|
auto duration_ms = chrono::duration_cast<chrono::milliseconds>(end_time - start_time_).count();
|
||||||
|
if (duration_ms == 0) duration_ms = 1;
|
||||||
|
|
||||||
double throughput_mps = static_cast<double>(messages_received_) / duration.count() * 1000;
|
long long final_requests = requests_processed_.load();
|
||||||
double throughput_mbps = static_cast<double>(bytes_received_) / (1024 * 1024) / duration.count() * 1000;
|
double duration_s = static_cast<double>(duration_ms) / 1000.0;
|
||||||
|
double rps = static_cast<double>(final_requests) / duration_s;
|
||||||
|
double throughput_mbps = static_cast<double>(bytes_received_) / (1024 * 1024) / duration_s;
|
||||||
|
|
||||||
cout << "\n=== Test Results ===\n";
|
cout << "\n=== Test Results ===\n";
|
||||||
cout << "Duration: " << duration.count() << "ms\n";
|
cout << "Duration: " << duration_ms << " ms\n";
|
||||||
cout << "Messages received: " << messages_received_.load() << "\n";
|
cout << "Requests processed: " << final_requests << "\n";
|
||||||
cout << "Bytes received: " << bytes_received_.load() << "\n";
|
cout << "Throughput: " << static_cast<int>(rps) << " req/sec\n";
|
||||||
cout << "Throughput: " << static_cast<int>(throughput_mps) << " msg/sec\n";
|
cout << "Data Rate (RX): " << throughput_mbps << " MB/sec\n";
|
||||||
cout << "Throughput: " << throughput_mbps << " MB/sec\n";
|
|
||||||
cout << "Peak connections: " << peak_connections_.load() << "\n";
|
cout << "Peak connections: " << peak_connections_.load() << "\n";
|
||||||
|
|
||||||
cout << "\n=== Memory Usage ===\n";
|
cout << "\n=== Memory Usage ===\n";
|
||||||
cout << "Baseline RSS: " << baseline_memory_.rss_kb << " KB\n";
|
cout << "Baseline RSS: " << baseline_memory_.rss_kb << " KB\n";
|
||||||
cout << "Peak RSS: " << peak_memory_.rss_kb << " KB\n";
|
cout << "Peak RSS: " << peak_memory_.rss_kb << " KB\n";
|
||||||
cout << "Memory increase: " << (peak_memory_.rss_kb - baseline_memory_.rss_kb) << " KB\n";
|
cout << "Memory increase: " << (peak_memory_.rss_kb - baseline_memory_.rss_kb) << " KB\n";
|
||||||
if (peak_memory_.vsize_kb > 0) {
|
if (peak_connections_.load() > 0) {
|
||||||
cout << "Virtual memory: " << peak_memory_.vsize_kb << " KB\n";
|
cout << "Memory per connection: " << static_cast<double>(peak_memory_.rss_kb - baseline_memory_.rss_kb) / peak_connections_.load() << " KB\n";
|
||||||
}
|
}
|
||||||
cout << "Memory per connection: " << (peak_memory_.rss_kb - baseline_memory_.rss_kb) / peak_connections_.load() << " KB\n";
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
int main() {
|
int main()
|
||||||
|
{
|
||||||
SocketTest test;
|
SocketTest test;
|
||||||
test.run();
|
test.run();
|
||||||
return 0;
|
return 0;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user