Reactor/tests/test_tcp_server.cpp
2025-06-27 18:16:01 -05:00

407 lines
9.8 KiB
C++

#include "../lib/TcpServer.hpp"
#include "../lib/TcpConnection.hpp"
#include <cassert>
#include <iostream>
#include <set>
#include <thread>
#include <chrono>
#include <atomic>
#include <memory>
class TestClient
{
private:
reactor::Socket socket_;
public:
TestClient() : socket_(reactor::Socket::createTcp()) {}
bool connect(const reactor::InetAddress& addr)
{
int result = socket_.connect(addr);
if (result == 0 || errno == EINPROGRESS) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
return true;
}
return false;
}
bool send(const std::string& data)
{
ssize_t sent = socket_.write(data.data(), data.size());
return sent == static_cast<ssize_t>(data.size());
}
std::string receive(size_t max_size = 1024)
{
char buffer[1024];
ssize_t received = socket_.read(buffer, std::min(max_size, sizeof(buffer)));
if (received > 0) {
return std::string(buffer, received);
}
return "";
}
void close()
{
socket_.shutdownWrite();
}
};
void test_tcp_server_basic()
{
std::cout << "Testing basic TCP server...\n";
reactor::EventLoop loop;
reactor::InetAddress listen_addr(0);
reactor::TcpServer server(&loop, listen_addr, "TestServer");
std::atomic<bool> server_started{false};
std::atomic<bool> connection_received{false};
std::atomic<bool> message_received{false};
server.setConnectionCallback([&](const reactor::TcpConnectionPtr& conn) {
if (conn->connected()) {
connection_received = true;
std::cout << "New connection: " << conn->name() << "\n";
} else {
std::cout << "Connection closed: " << conn->name() << "\n";
}
});
server.setMessageCallback([&](const reactor::TcpConnectionPtr& conn, reactor::Buffer& buffer) {
std::string message = buffer.readAll();
std::cout << "Received: " << message << "\n";
message_received = true;
conn->send("Echo: " + message);
});
server.start();
std::thread server_thread([&loop, &server_started]() {
server_started = true;
loop.loop();
});
while (!server_started) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
TestClient client;
bool connected = client.connect(reactor::InetAddress("127.0.0.1", listen_addr.port()));
assert(connected);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
assert(connection_received);
assert(client.send("Hello Server"));
std::this_thread::sleep_for(std::chrono::milliseconds(10));
assert(message_received);
std::string response = client.receive();
assert(response == "Echo: Hello Server");
client.close();
loop.quit();
server_thread.join();
std::cout << "✓ Basic TCP server passed\n";
}
void test_multiple_connections()
{
std::cout << "Testing multiple connections...\n";
reactor::EventLoop loop;
reactor::InetAddress listen_addr(0);
reactor::TcpServer server(&loop, listen_addr, "MultiServer");
std::atomic<int> connection_count{0};
std::atomic<int> message_count{0};
server.setConnectionCallback([&](const reactor::TcpConnectionPtr& conn) {
if (conn->connected()) {
connection_count++;
} else {
connection_count--;
}
});
server.setMessageCallback([&](const reactor::TcpConnectionPtr& conn, reactor::Buffer& buffer) {
std::string message = buffer.readAll();
message_count++;
conn->send("Response: " + message);
});
server.start();
std::thread server_thread([&loop]() {
loop.loop();
});
std::this_thread::sleep_for(std::chrono::milliseconds(10));
constexpr int num_clients = 5;
std::vector<std::unique_ptr<TestClient>> clients;
for (int i = 0; i < num_clients; ++i) {
auto client = std::make_unique<TestClient>();
bool connected = client->connect(reactor::InetAddress("127.0.0.1", listen_addr.port()));
assert(connected);
clients.push_back(std::move(client));
}
std::this_thread::sleep_for(std::chrono::milliseconds(20));
assert(connection_count == num_clients);
for (int i = 0; i < num_clients; ++i) {
std::string message = "Message " + std::to_string(i);
assert(clients[i]->send(message));
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
assert(message_count == num_clients);
for (auto& client : clients) {
client->close();
}
clients.clear();
std::this_thread::sleep_for(std::chrono::milliseconds(50));
assert(connection_count == 0);
loop.quit();
server_thread.join();
std::cout << "✓ Multiple connections passed\n";
}
void test_server_with_thread_pool()
{
std::cout << "Testing server with thread pool...\n";
reactor::EventLoop loop;
reactor::InetAddress listen_addr(0);
reactor::TcpServer server(&loop, listen_addr, "ThreadPoolServer");
server.setThreadNum(2);
std::atomic<int> message_count{0};
std::vector<std::thread::id> thread_ids;
std::mutex thread_ids_mutex;
server.setMessageCallback([&](const reactor::TcpConnectionPtr& conn, reactor::Buffer& buffer) {
{
std::lock_guard<std::mutex> lock(thread_ids_mutex);
thread_ids.push_back(std::this_thread::get_id());
}
std::string message = buffer.readAll();
message_count++;
std::this_thread::sleep_for(std::chrono::milliseconds(10));
conn->send("Processed: " + message);
});
server.start();
std::thread server_thread([&loop]() {
loop.loop();
});
std::this_thread::sleep_for(std::chrono::milliseconds(20));
constexpr int num_clients = 4;
std::vector<std::thread> client_threads;
for (int i = 0; i < num_clients; ++i) {
client_threads.emplace_back([&listen_addr, i]() {
TestClient client;
bool connected = client.connect(reactor::InetAddress("127.0.0.1", listen_addr.port()));
assert(connected);
std::string message = "Client" + std::to_string(i);
assert(client.send(message));
std::string response = client.receive();
assert(response == "Processed: " + message);
client.close();
});
}
for (auto& thread : client_threads) {
thread.join();
}
std::this_thread::sleep_for(std::chrono::milliseconds(50));
assert(message_count == num_clients);
{
std::lock_guard<std::mutex> lock(thread_ids_mutex);
std::set<std::thread::id> unique_threads(thread_ids.begin(), thread_ids.end());
assert(unique_threads.size() >= 2);
}
loop.quit();
server_thread.join();
std::cout << "✓ Server with thread pool passed\n";
}
void test_connection_lifecycle()
{
std::cout << "Testing connection lifecycle...\n";
reactor::EventLoop loop;
reactor::InetAddress listen_addr(0);
reactor::TcpServer server(&loop, listen_addr, "LifecycleServer");
std::atomic<bool> connected{false};
std::atomic<bool> disconnected{false};
server.setConnectionCallback([&](const reactor::TcpConnectionPtr& conn) {
if (conn->connected()) {
connected = true;
conn->send("Welcome");
} else {
disconnected = true;
}
});
server.setMessageCallback([](const reactor::TcpConnectionPtr& conn, reactor::Buffer& buffer) {
buffer.readAll();
conn->shutdown();
});
server.start();
std::thread server_thread([&loop]() {
loop.loop();
});
std::this_thread::sleep_for(std::chrono::milliseconds(10));
TestClient client;
bool conn_result = client.connect(reactor::InetAddress("127.0.0.1", listen_addr.port()));
assert(conn_result);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
assert(connected);
std::string welcome = client.receive();
assert(welcome == "Welcome");
assert(client.send("Goodbye"));
std::this_thread::sleep_for(std::chrono::milliseconds(20));
assert(disconnected);
loop.quit();
server_thread.join();
std::cout << "✓ Connection lifecycle passed\n";
}
void test_large_message_handling()
{
std::cout << "Testing large message handling...\n";
reactor::EventLoop loop;
reactor::InetAddress listen_addr(0);
reactor::TcpServer server(&loop, listen_addr, "LargeMessageServer");
std::atomic<bool> large_message_received{false};
server.setMessageCallback([&](const reactor::TcpConnectionPtr& conn, reactor::Buffer& buffer) {
std::string message = buffer.readAll();
if (message.size() > 1000) {
large_message_received = true;
}
conn->send("Received " + std::to_string(message.size()) + " bytes");
});
server.start();
std::thread server_thread([&loop]() {
loop.loop();
});
std::this_thread::sleep_for(std::chrono::milliseconds(10));
TestClient client;
bool connected = client.connect(reactor::InetAddress("127.0.0.1", listen_addr.port()));
assert(connected);
std::string large_message(5000, 'X');
assert(client.send(large_message));
std::this_thread::sleep_for(std::chrono::milliseconds(50));
assert(large_message_received);
std::string response = client.receive();
assert(response == "Received 5000 bytes");
client.close();
loop.quit();
server_thread.join();
std::cout << "✓ Large message handling passed\n";
}
void test_server_stats()
{
std::cout << "Testing server statistics...\n";
reactor::EventLoop loop;
reactor::InetAddress listen_addr(0);
reactor::TcpServer server(&loop, listen_addr, "StatsServer");
server.start();
std::thread server_thread([&loop]() {
loop.loop();
});
std::this_thread::sleep_for(std::chrono::milliseconds(10));
assert(server.numConnections() == 0);
TestClient client1, client2;
assert(client1.connect(reactor::InetAddress("127.0.0.1", listen_addr.port())));
assert(client2.connect(reactor::InetAddress("127.0.0.1", listen_addr.port())));
std::this_thread::sleep_for(std::chrono::milliseconds(20));
assert(server.numConnections() == 2);
auto connections = server.getConnections();
assert(connections.size() == 2);
client1.close();
client2.close();
std::this_thread::sleep_for(std::chrono::milliseconds(20));
assert(server.numConnections() == 0);
loop.quit();
server_thread.join();
std::cout << "✓ Server statistics passed\n";
}
int main()
{
std::cout << "=== TCP Server Tests ===\n";
test_tcp_server_basic();
test_multiple_connections();
test_server_with_thread_pool();
test_connection_lifecycle();
test_large_message_handling();
test_server_stats();
std::cout << "All TCP server tests passed! ✓\n";
return 0;
}