eq2go/old/common/tcp.hpp
2025-08-06 19:00:30 -05:00

1631 lines
43 KiB
C++

// EQ2Emulator: Everquest II Server Emulator - Copyright (C) 2007 EQ2EMulator Development Team - GPL v3
#pragma once
#include <iostream>
#include <string>
#include <cstring>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
#include <memory>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <netdb.h>
#include <unistd.h>
#include <errno.h>
#include <fcntl.h>
#include "queue.hpp"
#include "timer.hpp"
#include "types.hpp"
#include "servertalk.hpp"
#include "linked_list.hpp"
#include "misc_functions.hpp"
class TCPServer;
#define TCPConnection_ErrorBufferSize 1024
#define MaxTCPReceiveBufferSize 524288
#define LOOP_GRANULARITY 3
#define SERVER_LOOP_GRANULARITY 3
#define TCPS_Ready 0
#define TCPS_Connecting 1
#define TCPS_Connected 100
#define TCPS_Disconnecting 200
#define TCPS_Disconnected 201
#define TCPS_Closing 250
#define TCPS_Error 255
#ifndef DEF_eConnectionType
#define DEF_eConnectionType
enum eConnectionType { Incomming, Outgoing };
#endif
enum eTCPMode { modeConsole, modeTransition, modePacket };
// Forward declarations for thread functions
void* TCPServerLoop(void* tmp);
void* TCPConnectionLoop(void* tmp);
// Primary TCP connection class for handling client connections
class TCPConnection
{
public:
#pragma pack(1)
struct TCPNetPacket_Struct
{
int32 size;
struct {
int8
compressed : 1,
destination : 1,
flag3 : 1,
flag4 : 1,
flag5 : 1,
flag6 : 1,
flag7 : 1,
flag8 : 1;
} flags;
int16 opcode;
uchar buffer[0];
};
#pragma pack()
// Creates a network packet structure from ServerPacket
static TCPNetPacket_Struct* MakePacket(ServerPacket* pack, int32 iDestination = 0);
// Constructor for incoming connections from server socket
TCPConnection(TCPServer* iServer, int iSock, int32 irIP, int16 irPort, bool iOldFormat = false);
// Constructor for outgoing connections
TCPConnection(bool iOldFormat = false, TCPServer* iRelayServer = 0, eTCPMode iMode = modePacket);
// Constructor for relay connections
TCPConnection(TCPServer* iServer, TCPConnection* iRelayLink, int32 iRemoteID, int32 irIP, int16 irPort);
virtual ~TCPConnection();
// Establishes connection to remote host by hostname
bool Connect(char* irAddress, int16 irPort, char* errbuf = 0);
// Establishes connection to remote host by IP address
bool Connect(int32 irIP, int16 irPort, char* errbuf = 0);
// Initiates asynchronous connection by hostname
void AsyncConnect(char* irAddress, int16 irPort);
// Initiates asynchronous connection by IP address
void AsyncConnect(int32 irIP, int16 irPort);
// Disconnects the TCP connection
virtual void Disconnect(bool iSendRelayDisconnect = true);
// Sends a ServerPacket over the connection
virtual bool SendPacket(ServerPacket* pack, int32 iDestination = 0);
// Sends a pre-formatted network packet
virtual bool SendPacket(TCPNetPacket_Struct* tnps);
// Sends raw data over the connection
bool Send(const uchar* data, sint32 size);
// Retrieves next line from console mode queue
char* PopLine();
// Retrieves next packet from incoming queue
ServerPacket* PopPacket();
// Accessors for connection information
inline int32 GetrIP() { return rIP; }
inline int16 GetrPort() { return rPort; }
virtual int8 GetState();
eTCPMode GetMode() { return TCPMode; }
inline bool Connected() { return (GetState() == TCPS_Connected); }
inline bool ConnectReady() { return (bool)(GetState() == TCPS_Ready && ConnectionType == Outgoing); }
inline int32 GetID() { return id; }
inline bool IsRelayServer() { return RelayServer; }
inline int32 GetRemoteID() { return RemoteID; }
inline TCPConnection* GetRelayLink() { return RelayLink; }
// Marks connection as ready for cleanup
void Free();
// Gets echo mode status for console connections
bool GetEcho();
// Sets echo mode for console connections
void SetEcho(bool iValue);
protected:
friend class TCPServer;
friend void* TCPConnectionLoop(void* tmp);
// Main processing loop for handling network I/O
virtual bool Process();
// Sets the connection state with thread safety
void SetState(int8 iState);
// Checks if connection object is marked for deletion
inline bool IsFree() { return pFree; }
// Verifies if connection is active for network operations
bool CheckNetActive();
int sock; // Socket file descriptor
// Main connection processing loop control
bool RunLoop();
std::mutex MLoopRunning; // Guards loop execution state
std::mutex MAsyncConnect; // Guards async connection state
bool GetAsyncConnect();
bool SetAsyncConnect(bool iValue);
char* charAsyncConnect; // Hostname for async connection
// Adds packet to outgoing queue
void OutQueuePush(ServerPacket* pack);
// Removes relay connection from this server
void RemoveRelay(TCPConnection* relay, bool iSendRelayDisconnect);
private:
// Handles special network layer control packets
void ProcessNetworkLayerPacket(ServerPacket* pack);
// Sends error packet to remote connection
void SendNetErrorPacket(const char* reason = 0);
TCPServer* Server; // Parent server instance
TCPConnection* RelayLink; // Link to relay connection
int32 RemoteID; // Remote connection identifier
sint32 RelayCount; // Number of relay connections
bool pOldFormat; // Legacy packet format flag
// Core network I/O functions
bool SendData(char* errbuf = 0);
bool RecvData(char* errbuf = 0);
bool ProcessReceivedData(char* errbuf = 0);
bool ProcessReceivedDataAsPackets(char* errbuf = 0);
bool ProcessReceivedDataAsOldPackets(char* errbuf = 0);
// Clears all internal buffers and queues
void ClearBuffers();
bool pAsyncConnect; // Async connection in progress flag
eConnectionType ConnectionType; // Incoming or outgoing connection
eTCPMode TCPMode; // Current connection mode
bool RelayServer; // Server acting as relay
std::mutex MRunLoop; // Guards main loop execution
bool pRunLoop; // Main loop control flag
int connection_socket; // Active connection socket
int32 id; // Unique connection identifier
int32 rIP; // Remote IP address
int16 rPort; // Remote port (host byte order)
bool pFree; // Marked for deletion flag
std::mutex MState; // Guards connection state
int8 pState; // Current connection state
// Console mode line processing
void LineOutQueuePush(char* line);
MyQueue<char> LineOutQueue; // Outgoing console lines
MyQueue<ServerPacket> OutQueue; // Outgoing packets
std::mutex MOutQueueLock; // Guards output queues
Timer* keepalive_timer; // Connection keepalive timer
Timer* timeout_timer; // Connection timeout timer
// Network receive buffer management
uchar* recvbuf; // Receive buffer
sint32 recvbuf_size; // Total buffer size
sint32 recvbuf_used; // Used buffer space
sint32 recvbuf_echo; // Echo position for console mode
bool pEcho; // Echo mode enabled flag
std::mutex MEcho; // Guards echo state
// Packet mode queue for state transitions
void InModeQueuePush(TCPNetPacket_Struct* tnps);
MyQueue<TCPNetPacket_Struct> InModeQueue; // Transition mode packet queue
// Send buffer management
std::mutex MSendQueue; // Guards send operations
uchar* sendbuf; // Send buffer
sint32 sendbuf_size; // Total send buffer size
sint32 sendbuf_used; // Used send buffer space
// Send queue manipulation functions
bool ServerSendQueuePop(uchar** data, sint32* size);
void ServerSendQueuePushEnd(const uchar* data, sint32 size);
void ServerSendQueuePushEnd(uchar** data, sint32 size);
void ServerSendQueuePushFront(uchar* data, sint32 size);
};
// TCP server class for accepting and managing client connections
class TCPServer
{
public:
// Creates TCP server instance on specified port
TCPServer(int16 iPort = 0, bool iOldFormat = false);
virtual ~TCPServer();
// Opens server socket for listening
bool Open(int16 iPort = 0, char* errbuf = 0);
// Closes server socket
void Close();
// Checks if server socket is open
bool IsOpen();
// Gets configured server port
inline int16 GetPort() { return pPort; }
// Retrieves new connection from queue
TCPConnection* NewQueuePop();
// Broadcasts packet to all connected clients
void SendPacket(ServerPacket* pack);
// Broadcasts pre-formatted packet to all clients
void SendPacket(TCPConnection::TCPNetPacket_Struct** tnps);
protected:
friend void* TCPServerLoop(void* tmp);
friend class TCPConnection;
// Main server processing loop
void Process();
// Controls main server loop execution
bool RunLoop();
std::mutex MLoopRunning; // Guards server loop state
// Generates unique connection ID
inline int32 GetNextID() { return NextID++; }
// Adds new connection to managed list
void AddConnection(TCPConnection* con);
// Finds connection by ID
TCPConnection* GetConnection(int32 iID);
private:
// Accepts new incoming connections
void ListenNewConnections();
int32 NextID; // Next connection ID to assign
bool pOldFormat; // Legacy packet format support
std::mutex MRunLoop; // Guards main loop control
bool pRunLoop; // Main loop execution flag
std::mutex MSock; // Guards socket operations
int sock; // Server listening socket
int16 pPort; // Server listening port
// New connection queue management
std::mutex MNewQueue; // Guards new connection queue
MyQueue<TCPConnection> NewQueue; // Queue of new connections
// Broadcast packet queue management
void CheckInQueue();
std::mutex MInQueue; // Guards broadcast queue
TCPConnection::TCPNetPacket_Struct* InQueuePop();
MyQueue<TCPConnection::TCPNetPacket_Struct> InQueue; // Broadcast packet queue
LinkedList<TCPConnection*>* list; // List of active connections
};
//=============================================================================
// TCPConnection Implementation
//=============================================================================
// Creates network packet from ServerPacket with optional destination routing
TCPConnection::TCPNetPacket_Struct* TCPConnection::MakePacket(ServerPacket* pack, int32 iDestination)
{
sint32 size = sizeof(TCPNetPacket_Struct) + pack->size;
if (pack->compressed) {
size += 4;
}
if (iDestination) {
size += 4;
}
TCPNetPacket_Struct* tnps = (TCPNetPacket_Struct*)new uchar[size];
tnps->size = size;
tnps->opcode = pack->opcode;
*((int8*)&tnps->flags) = 0;
uchar* buffer = tnps->buffer;
if (pack->compressed) {
tnps->flags.compressed = 1;
*((sint32*)buffer) = pack->InflatedSize;
buffer += 4;
}
if (iDestination) {
tnps->flags.destination = 1;
*((sint32*)buffer) = iDestination;
buffer += 4;
}
memcpy(buffer, pack->pBuffer, pack->size);
return tnps;
}
// Constructor for outgoing connections with optional relay server support
TCPConnection::TCPConnection(bool iOldFormat, TCPServer* iRelayServer, eTCPMode iMode)
{
id = 0;
Server = iRelayServer;
if (Server)
RelayServer = true;
else
RelayServer = false;
RelayLink = 0;
RelayCount = 0;
RemoteID = 0;
pOldFormat = iOldFormat;
ConnectionType = Outgoing;
TCPMode = iMode;
pState = TCPS_Ready;
pFree = false;
pEcho = false;
sock = 0;
rIP = 0;
rPort = 0;
keepalive_timer = new Timer(SERVER_TIMEOUT);
timeout_timer = new Timer(SERVER_TIMEOUT * 2);
recvbuf = 0;
sendbuf = 0;
pRunLoop = false;
charAsyncConnect = 0;
pAsyncConnect = false;
connection_socket = 0;
recvbuf_size = 0;
recvbuf_used = 0;
recvbuf_echo = 0;
sendbuf_size = 0;
sendbuf_used = 0;
}
// Constructor for incoming connections from server socket
TCPConnection::TCPConnection(TCPServer* iServer, int in_socket, int32 irIP, int16 irPort, bool iOldFormat)
{
Server = iServer;
RelayLink = 0;
RelayServer = false;
RelayCount = 0;
RemoteID = 0;
id = Server->GetNextID();
ConnectionType = Incomming;
pOldFormat = iOldFormat;
TCPMode = modePacket;
pState = TCPS_Connected;
pFree = false;
pEcho = false;
sock = 0;
connection_socket = in_socket;
rIP = irIP;
rPort = irPort;
keepalive_timer = new Timer(SERVER_TIMEOUT);
timeout_timer = new Timer(SERVER_TIMEOUT * 2);
recvbuf = 0;
sendbuf = 0;
pRunLoop = false;
charAsyncConnect = 0;
pAsyncConnect = false;
recvbuf_size = 0;
recvbuf_used = 0;
recvbuf_echo = 0;
sendbuf_size = 0;
sendbuf_used = 0;
}
// Constructor for relay connections that forward data through another connection
TCPConnection::TCPConnection(TCPServer* iServer, TCPConnection* iRelayLink, int32 iRemoteID, int32 irIP, int16 irPort)
{
Server = iServer;
RelayLink = iRelayLink;
RelayServer = true;
id = Server->GetNextID();
RelayCount = 0;
RemoteID = iRemoteID;
if (!RemoteID)
ThrowError("Error: TCPConnection: RemoteID == 0 on RelayLink constructor");
pOldFormat = false;
ConnectionType = Incomming;
TCPMode = modePacket;
pState = TCPS_Connected;
pFree = false;
pEcho = false;
sock = 0;
connection_socket = 0;
rIP = irIP;
rPort = irPort;
keepalive_timer = 0;
timeout_timer = 0;
recvbuf = 0;
sendbuf = 0;
pRunLoop = false;
charAsyncConnect = 0;
pAsyncConnect = false;
recvbuf_size = 0;
recvbuf_used = 0;
recvbuf_echo = 0;
sendbuf_size = 0;
sendbuf_used = 0;
}
// Destructor that cleans up all resources and stops processing threads
TCPConnection::~TCPConnection()
{
Disconnect();
ClearBuffers();
if (ConnectionType == Outgoing) {
std::lock_guard<std::mutex> lock1(MRunLoop);
pRunLoop = false;
}
delete keepalive_timer;
delete timeout_timer;
delete[] recvbuf;
delete[] sendbuf;
delete[] charAsyncConnect;
}
// Thread-safe state setter with mutex protection
void TCPConnection::SetState(int8 in_state)
{
std::lock_guard<std::mutex> lock(MState);
pState = in_state;
}
// Thread-safe state getter with mutex protection
int8 TCPConnection::GetState()
{
std::lock_guard<std::mutex> lock(MState);
return pState;
}
// Marks incoming connection as ready for cleanup by server
void TCPConnection::Free()
{
if (ConnectionType == Outgoing) {
ThrowError("TCPConnection::Free() called on an Outgoing connection");
}
Disconnect();
pFree = true;
}
// Sends ServerPacket over connection with optional destination routing
bool TCPConnection::SendPacket(ServerPacket* pack, int32 iDestination)
{
std::lock_guard<std::mutex> lock(MState);
if (!Connected())
return false;
eTCPMode tmp = GetMode();
if (tmp != modePacket && tmp != modeTransition)
return false;
if (RemoteID)
return RelayLink->SendPacket(pack, RemoteID);
else {
TCPNetPacket_Struct* tnps = MakePacket(pack, iDestination);
if (tmp == modeTransition) {
InModeQueuePush(tnps);
}
else {
ServerSendQueuePushEnd((uchar**)&tnps, tnps->size);
}
}
return true;
}
// Sends pre-formatted network packet structure
bool TCPConnection::SendPacket(TCPNetPacket_Struct* tnps)
{
std::lock_guard<std::mutex> lock(MState);
if (RemoteID)
return false;
if (!Connected())
return false;
eTCPMode tmp = GetMode();
if (tmp == modeTransition) {
TCPNetPacket_Struct* tnps2 = (TCPNetPacket_Struct*)new uchar[tnps->size];
memcpy(tnps2, tnps, tnps->size);
InModeQueuePush(tnps2);
return true;
}
if (GetMode() != modePacket)
return false;
ServerSendQueuePushEnd((const uchar*)tnps, tnps->size);
return true;
}
// Sends raw data for console mode connections
bool TCPConnection::Send(const uchar* data, sint32 size)
{
if (!Connected())
return false;
if (GetMode() != modeConsole)
return false;
if (!size)
return true;
ServerSendQueuePushEnd(data, size);
return true;
}
// Adds packet to mode transition queue during state changes
void TCPConnection::InModeQueuePush(TCPNetPacket_Struct* tnps)
{
std::lock_guard<std::mutex> lock(MSendQueue);
InModeQueue.push(tnps);
}
// Appends data to send buffer with automatic resizing
void TCPConnection::ServerSendQueuePushEnd(const uchar* data, sint32 size)
{
std::lock_guard<std::mutex> lock(MSendQueue);
if (sendbuf == 0) {
sendbuf = new uchar[size];
sendbuf_size = size;
sendbuf_used = 0;
}
else if (size > (sendbuf_size - sendbuf_used)) {
sendbuf_size += size + 1024;
uchar* tmp = new uchar[sendbuf_size];
memcpy(tmp, sendbuf, sendbuf_used);
delete[] sendbuf;
sendbuf = tmp;
}
memcpy(&sendbuf[sendbuf_used], data, size);
sendbuf_used += size;
}
// Transfers ownership of data buffer to send queue for efficient packet handling
void TCPConnection::ServerSendQueuePushEnd(uchar** data, sint32 size)
{
std::lock_guard<std::mutex> lock(MSendQueue);
if (sendbuf == 0) {
sendbuf = *data;
sendbuf_size = size;
sendbuf_used = size;
*data = 0;
return;
}
if (size > (sendbuf_size - sendbuf_used)) {
sendbuf_size += size;
uchar* tmp = new uchar[sendbuf_size];
memcpy(tmp, sendbuf, sendbuf_used);
delete[] sendbuf;
sendbuf = tmp;
}
memcpy(&sendbuf[sendbuf_used], *data, size);
sendbuf_used += size;
delete[] (TCPNetPacket_Struct*)*data;
}
// Prepends data to front of send buffer for priority transmission
void TCPConnection::ServerSendQueuePushFront(uchar* data, sint32 size)
{
std::lock_guard<std::mutex> lock(MSendQueue);
if (sendbuf == 0) {
sendbuf = new uchar[size];
sendbuf_size = size;
sendbuf_used = 0;
}
else if (size > (sendbuf_size - sendbuf_used)) {
sendbuf_size += size;
uchar* tmp = new uchar[sendbuf_size];
memcpy(&tmp[size], sendbuf, sendbuf_used);
delete[] sendbuf;
sendbuf = tmp;
}
memcpy(sendbuf, data, size);
sendbuf_used += size;
}
// Non-blocking retrieval of send buffer contents
bool TCPConnection::ServerSendQueuePop(uchar** data, sint32* size)
{
if (!MSendQueue.try_lock())
return false;
std::lock_guard<std::mutex> lock(MSendQueue, std::adopt_lock);
if (sendbuf) {
*data = sendbuf;
*size = sendbuf_used;
sendbuf = 0;
return true;
}
return false;
}
// Retrieves next packet from incoming queue for processing
ServerPacket* TCPConnection::PopPacket()
{
if (!MOutQueueLock.try_lock())
return 0;
std::lock_guard<std::mutex> lock(MOutQueueLock, std::adopt_lock);
return OutQueue.pop();
}
// Retrieves next console line from incoming queue
char* TCPConnection::PopLine()
{
if (!MOutQueueLock.try_lock())
return 0;
std::lock_guard<std::mutex> lock(MOutQueueLock, std::adopt_lock);
return (char*)LineOutQueue.pop();
}
// Adds incoming packet to processing queue
void TCPConnection::OutQueuePush(ServerPacket* pack)
{
std::lock_guard<std::mutex> lock(MOutQueueLock);
OutQueue.push(pack);
}
// Processes console line input with special command handling
void TCPConnection::LineOutQueuePush(char* line)
{
if (strcmp(line, "**PACKETMODE**") == 0) {
std::lock_guard<std::mutex> lock(MSendQueue);
delete[] sendbuf;
sendbuf = 0;
if (TCPMode == modeConsole)
Send((const uchar*)"\0**PACKETMODE**\r", 16);
TCPMode = modePacket;
TCPNetPacket_Struct* tnps = 0;
while ((tnps = InModeQueue.pop())) {
SendPacket(tnps);
delete[] tnps;
}
delete[] line;
return;
}
std::lock_guard<std::mutex> lock(MOutQueueLock);
LineOutQueue.push(line);
}
// Cleanly disconnects TCP connection and cleans up resources
void TCPConnection::Disconnect(bool iSendRelayDisconnect)
{
if (connection_socket != -1 && connection_socket != 0) {
std::lock_guard<std::mutex> lock(MState);
if (pState == TCPS_Connected || pState == TCPS_Disconnecting || pState == TCPS_Disconnected)
SendData();
pState = TCPS_Closing;
shutdown(connection_socket, SHUT_WR);
shutdown(connection_socket, SHUT_RD);
close(connection_socket);
connection_socket = 0;
rIP = 0;
rPort = 0;
ClearBuffers();
}
SetState(TCPS_Ready);
if (RelayLink) {
RelayLink->RemoveRelay(this, iSendRelayDisconnect);
RelayLink = 0;
}
}
// Thread-safe accessor for async connection state
bool TCPConnection::GetAsyncConnect()
{
std::lock_guard<std::mutex> lock(MAsyncConnect);
return pAsyncConnect;
}
// Thread-safe setter for async connection state
bool TCPConnection::SetAsyncConnect(bool iValue)
{
std::lock_guard<std::mutex> lock(MAsyncConnect);
bool ret = pAsyncConnect;
pAsyncConnect = iValue;
return ret;
}
// Initiates asynchronous connection to remote host by hostname
void TCPConnection::AsyncConnect(char* irAddress, int16 irPort)
{
if (ConnectionType != Outgoing) {
ThrowError("TCPConnection::AsyncConnect() call on a Incomming connection object!");
return;
}
if (GetState() != TCPS_Ready)
return;
std::lock_guard<std::mutex> lock(MAsyncConnect);
if (pAsyncConnect) {
return;
}
pAsyncConnect = true;
delete[] charAsyncConnect;
charAsyncConnect = new char[strlen(irAddress) + 1];
strcpy(charAsyncConnect, irAddress);
rPort = irPort;
if (!pRunLoop) {
pRunLoop = true;
std::thread thread(TCPConnectionLoop, this);
thread.detach();
}
}
// Initiates asynchronous connection to remote host by IP address
void TCPConnection::AsyncConnect(int32 irIP, int16 irPort)
{
if (ConnectionType != Outgoing) {
ThrowError("TCPConnection::AsyncConnect() call on a Incomming connection object!");
return;
}
if (GetState() != TCPS_Ready)
return;
std::lock_guard<std::mutex> lock(MAsyncConnect);
if (pAsyncConnect) {
return;
}
pAsyncConnect = true;
delete charAsyncConnect;
charAsyncConnect = 0;
rIP = irIP;
rPort = irPort;
if (!pRunLoop) {
pRunLoop = true;
std::thread thread(TCPConnectionLoop, this);
thread.detach();
}
}
// Establishes synchronous connection to remote host by hostname
bool TCPConnection::Connect(char* irAddress, int16 irPort, char* errbuf)
{
if (errbuf)
errbuf[0] = 0;
int32 tmpIP = ResolveIP(irAddress);
if (!tmpIP) {
if (errbuf) {
snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): Couldn't resolve hostname. Error #%i: %s", errno, strerror(errno));
}
return false;
}
return Connect(tmpIP, irPort, errbuf);
}
// Establishes synchronous connection to remote host by IP address
bool TCPConnection::Connect(int32 in_ip, int16 in_port, char* errbuf)
{
if (errbuf)
errbuf[0] = 0;
if (ConnectionType != Outgoing) {
ThrowError("TCPConnection::Connect() call on a Incomming connection object!");
return false;
}
std::lock_guard<std::mutex> lock(MState);
if (pState == TCPS_Ready) {
pState = TCPS_Connecting;
}
else {
SetAsyncConnect(false);
return false;
}
if (!pRunLoop) {
pRunLoop = true;
std::thread thread(TCPConnectionLoop, this);
thread.detach();
}
connection_socket = -1;
struct sockaddr_in server_sin;
if ((connection_socket = socket(AF_INET, SOCK_STREAM, 0)) == -1 || connection_socket == 0) {
if (errbuf)
snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): Allocating socket failed. Error: %s", strerror(errno));
SetState(TCPS_Ready);
SetAsyncConnect(false);
return false;
}
server_sin.sin_family = AF_INET;
server_sin.sin_addr.s_addr = in_ip;
server_sin.sin_port = htons(in_port);
if (connect(connection_socket, (struct sockaddr*)&server_sin, sizeof(server_sin)) == -1) {
if (errbuf)
snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::Connect(): connect() failed. Error: %s", strerror(errno));
close(connection_socket);
connection_socket = 0;
SetState(TCPS_Ready);
SetAsyncConnect(false);
return false;
}
int bufsize = 64 * 1024;
setsockopt(connection_socket, SOL_SOCKET, SO_RCVBUF, (char*)&bufsize, sizeof(bufsize));
fcntl(connection_socket, F_SETFL, O_NONBLOCK);
SetEcho(false);
std::lock_guard<std::mutex> sendLock(MSendQueue);
ClearBuffers();
TCPMode = modePacket;
rIP = in_ip;
rPort = in_port;
SetState(TCPS_Connected);
SetAsyncConnect(false);
return true;
}
// Clears all internal buffers and resets timers
void TCPConnection::ClearBuffers()
{
std::lock_guard<std::mutex> lock1(MSendQueue);
std::lock_guard<std::mutex> lock2(MOutQueueLock);
std::lock_guard<std::mutex> lock3(MRunLoop);
std::lock_guard<std::mutex> lock4(MState);
delete[] recvbuf;
recvbuf = 0;
delete[] sendbuf;
sendbuf = 0;
ServerPacket* pack = 0;
while ((pack = PopPacket()))
delete pack;
TCPNetPacket_Struct* tnps = 0;
while ((tnps = InModeQueue.pop()))
delete tnps;
char* line = 0;
while ((line = LineOutQueue.pop()))
delete[] line;
if (keepalive_timer)
keepalive_timer->Start();
if (timeout_timer)
timeout_timer->Start();
}
// Checks if connection is in active network state
bool TCPConnection::CheckNetActive()
{
std::lock_guard<std::mutex> lock(MState);
if (pState == TCPS_Connected || pState == TCPS_Disconnecting) {
return true;
}
return false;
}
// Main processing function handling network I/O and state management
bool TCPConnection::Process()
{
char errbuf[TCPConnection_ErrorBufferSize];
if (!CheckNetActive()) {
if (ConnectionType == Outgoing) {
if (GetAsyncConnect()) {
if (charAsyncConnect)
rIP = ResolveIP(charAsyncConnect);
Connect(rIP, rPort);
}
}
if (GetState() == TCPS_Disconnected) {
Disconnect();
return false;
}
else if (GetState() == TCPS_Connecting)
return true;
else
return false;
}
if (!SendData(errbuf)) {
struct in_addr in;
in.s_addr = GetrIP();
std::cout << inet_ntoa(in) << ":" << GetrPort() << ": " << errbuf << std::endl;
return false;
}
if (!Connected())
return false;
if (!RecvData(errbuf)) {
struct in_addr in;
in.s_addr = GetrIP();
std::cout << inet_ntoa(in) << ":" << GetrPort() << ": " << errbuf << std::endl;
return false;
}
return true;
}
// Receives data from socket and processes it based on current mode
bool TCPConnection::RecvData(char* errbuf)
{
if (errbuf)
errbuf[0] = 0;
if (!Connected()) {
return false;
}
int status = 0;
if (recvbuf == 0) {
recvbuf = new uchar[5120];
recvbuf_size = 5120;
recvbuf_used = 0;
recvbuf_echo = 0;
}
else if ((recvbuf_size - recvbuf_used) < 2048) {
uchar* tmpbuf = new uchar[recvbuf_size + 5120];
memcpy(tmpbuf, recvbuf, recvbuf_used);
recvbuf_size += 5120;
delete[] recvbuf;
recvbuf = tmpbuf;
if (recvbuf_size >= MaxTCPReceiveBufferSize) {
if (errbuf)
snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::RecvData(): recvbuf_size >= MaxTCPReceiveBufferSize");
return false;
}
}
status = recv(connection_socket, (char*)&recvbuf[recvbuf_used], (recvbuf_size - recvbuf_used), 0);
if (status >= 1) {
recvbuf_used += status;
if (timeout_timer)
timeout_timer->Start();
if (!ProcessReceivedData(errbuf))
return false;
}
else if (status == -1) {
if (!(errno == EWOULDBLOCK)) {
if (errbuf)
snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::RecvData(): Error: %s", strerror(errno));
return false;
}
}
if ((TCPMode == modePacket || TCPMode == modeTransition) && timeout_timer && timeout_timer->Check()) {
if (errbuf)
snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::RecvData(): Connection timeout");
return false;
}
return true;
}
// Thread-safe accessor for echo mode state
bool TCPConnection::GetEcho()
{
std::lock_guard<std::mutex> lock(MEcho);
return pEcho;
}
// Thread-safe setter for echo mode state
void TCPConnection::SetEcho(bool iValue)
{
std::lock_guard<std::mutex> lock(MEcho);
pEcho = iValue;
}
// Processes received data based on current connection mode
bool TCPConnection::ProcessReceivedData(char* errbuf)
{
if (errbuf)
errbuf[0] = 0;
if (!recvbuf)
return true;
if (TCPMode == modePacket) {
return ProcessReceivedDataAsPackets(errbuf);
}
else {
// Console mode processing with line parsing and echo support
for (int i = 0; i < recvbuf_used; i++) {
if (GetEcho() && i >= recvbuf_echo) {
Send(&recvbuf[i], 1);
recvbuf_echo = i + 1;
}
switch (recvbuf[i]) {
case 0: { // Clear buffer command
if (i == 0) {
recvbuf_used--;
recvbuf_echo--;
memcpy(recvbuf, &recvbuf[1], recvbuf_used);
i = -1;
} else {
if (i == recvbuf_used) {
delete[] recvbuf;
recvbuf = 0;
i = -1;
}
else {
uchar* tmpdel = recvbuf;
recvbuf = new uchar[recvbuf_size];
memcpy(recvbuf, &tmpdel[i + 1], recvbuf_used - i);
recvbuf_used -= i + 1;
recvbuf_echo -= i + 1;
delete tmpdel;
i = -1;
}
}
break;
}
case 10:
case 13: { // Newline processing
if (i == 0) {
recvbuf_used--;
recvbuf_echo--;
memcpy(recvbuf, &recvbuf[1], recvbuf_used);
i = -1;
} else {
char* line = new char[i + 1];
memset(line, 0, i + 1);
memcpy(line, recvbuf, i);
uchar* tmpdel = recvbuf;
recvbuf = new uchar[recvbuf_size];
recvbuf_used -= i + 1;
recvbuf_echo -= i + 1;
memcpy(recvbuf, &tmpdel[i + 1], recvbuf_used);
delete tmpdel;
if (strlen(line) > 0)
LineOutQueuePush(line);
else
delete[] line;
if (TCPMode == modePacket) {
return ProcessReceivedDataAsPackets(errbuf);
}
i = -1;
}
break;
}
case 8: { // Backspace processing
if (i == 0) {
recvbuf_used--;
recvbuf_echo--;
memcpy(recvbuf, &recvbuf[1], recvbuf_used);
i = -1;
} else {
uchar* tmpdel = recvbuf;
recvbuf = new uchar[recvbuf_size];
memcpy(recvbuf, tmpdel, i - 1);
memcpy(&recvbuf[i - 1], &tmpdel[i + 1], recvbuf_used - i);
recvbuf_used -= 2;
recvbuf_echo -= 2;
delete tmpdel;
i -= 2;
}
break;
}
}
}
if (recvbuf_used < 0) {
delete[] recvbuf;
recvbuf = 0;
}
}
return true;
}
// Processes received data as structured network packets
bool TCPConnection::ProcessReceivedDataAsPackets(char* errbuf)
{
if (errbuf)
errbuf[0] = 0;
sint32 base = 0;
sint32 size = 0;
uchar* buffer;
sint32 sizeReq = sizeof(TCPNetPacket_Struct);
ServerPacket* pack = 0;
while ((recvbuf_used - base) >= size) {
TCPNetPacket_Struct* tnps = (TCPNetPacket_Struct*)&recvbuf[base];
buffer = tnps->buffer;
size = tnps->size;
if (size < sizeReq || recvbuf_used < sizeReq || size >= MaxTCPReceiveBufferSize) {
if (errbuf)
snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::ProcessReceivedDataAsPackets(): size provided %i, recvbuf_used %i, checks failed: struct_size < %i || recvbuf_used < sizeReq || size >= MaxTCPReceiveBufferSize", size, recvbuf_used, sizeReq);
return false;
}
if ((recvbuf_used - base) >= size) {
delete pack;
pack = new ServerPacket;
pack->size = size - sizeof(TCPNetPacket_Struct);
pack->opcode = tnps->opcode;
// Process compression flag
if (tnps->flags.compressed) {
sizeReq += 4;
if (size < sizeReq || recvbuf_used < sizeReq) {
if (errbuf)
snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::ProcessReceivedDataAsPackets(Flags.Compressed): size provided %i, recvbuf_used %i, checks failed: struct_size < %i || recvbuf_used < sizeReq", size, recvbuf_used, sizeReq);
delete pack;
return false;
}
pack->compressed = true;
pack->InflatedSize = *((sint32*)buffer);
pack->size -= 4;
buffer += 4;
}
// Process destination flag for routing
if (tnps->flags.destination) {
sizeReq += 4;
if (size < sizeReq || recvbuf_used < sizeReq) {
if (errbuf)
snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::ProcessReceivedDataAsPackets(Flags.Destination): size provided %i, recvbuf_used %i, checks failed: struct_size < %i || recvbuf_used < sizeReq", size, recvbuf_used, sizeReq);
delete pack;
return false;
}
pack->destination = *((sint32*)buffer);
pack->size -= 4;
buffer += 4;
}
// Copy packet data
if (pack->size > 0) {
if (tnps->flags.compressed) {
pack->compressed = false;
if (pack->InflatedSize < MaxTCPReceiveBufferSize) {
pack->pBuffer = new uchar[pack->InflatedSize];
pack->size = InflatePacket(buffer, pack->size, pack->pBuffer, pack->InflatedSize);
if (!pack->size) {
if (errbuf)
snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::ProcessReceivedDataAsPackets(InflatePacket): size provided %i, recvbuf_used: %i, sizeReq: %i, could not inflate packet", size, recvbuf_used, sizeReq);
delete pack;
return false;
}
}
else {
std::cout << "Invalid inflated packet." << std::endl;
delete pack;
return false;
}
}
else {
pack->pBuffer = new uchar[pack->size];
memcpy(pack->pBuffer, buffer, pack->size);
}
}
// Handle different packet types
if (pack->opcode == 0) {
if (pack->size) {
ProcessNetworkLayerPacket(pack);
}
}
else {
if (RelayServer && Server && pack->destination) {
TCPConnection* con = Server->GetConnection(pack->destination);
if (!con) {
// Connection not found for relay
}
else {
con->OutQueuePush(pack);
pack = 0;
}
}
else {
OutQueuePush(pack);
pack = 0;
}
}
base += size;
size = 7;
}
}
delete pack;
// Clean up processed data from buffer
if (base != 0) {
if (base >= recvbuf_used) {
delete[] recvbuf;
recvbuf = 0;
}
else {
uchar* tmpbuf = new uchar[recvbuf_size - base];
memcpy(tmpbuf, &recvbuf[base], recvbuf_used - base);
delete[] recvbuf;
recvbuf = tmpbuf;
recvbuf_used -= base;
recvbuf_size -= base;
}
}
return true;
}
// Processes old format packets for backwards compatibility
bool TCPConnection::ProcessReceivedDataAsOldPackets(char* errbuf)
{
sint32 base = 0;
sint32 size = 4;
uchar* buffer;
ServerPacket* pack = 0;
while ((recvbuf_used - base) >= size) {
buffer = &recvbuf[base];
memcpy(&size, &buffer[2], 2);
if (size >= MaxTCPReceiveBufferSize) {
if (errbuf)
snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::ProcessReceivedDataAsPackets(): size >= MaxTCPReceiveBufferSize");
return false;
}
if ((recvbuf_used - base) >= size) {
pack = new ServerPacket;
memcpy(&pack->opcode, &buffer[0], 2);
pack->size = size - 4;
if (pack->size > 0) {
pack->pBuffer = new uchar[pack->size];
memcpy(pack->pBuffer, &buffer[4], pack->size);
}
if (pack->opcode == 0) {
delete pack;
}
else {
OutQueuePush(pack);
}
base += size;
size = 4;
}
}
// Clean up processed data
if (base != 0) {
if (base >= recvbuf_used) {
delete[] recvbuf;
recvbuf = 0;
}
else {
uchar* tmpbuf = new uchar[recvbuf_size - base];
memcpy(tmpbuf, &recvbuf[base], recvbuf_used - base);
delete[] recvbuf;
recvbuf = tmpbuf;
recvbuf_used -= base;
recvbuf_size -= base;
}
}
return true;
}
// Handles network layer control packets for connection management
void TCPConnection::ProcessNetworkLayerPacket(ServerPacket* pack)
{
int8 opcode = pack->pBuffer[0];
// Disable relay capabilities for security - only allow keepalive
if (opcode > 0) {
Disconnect();
return;
}
int8* data = &pack->pBuffer[1];
switch (opcode) {
case 0: { // Keepalive packet
break;
}
}
}
// Sends error packet to remote connection for debugging
void TCPConnection::SendNetErrorPacket(const char* reason)
{
ServerPacket* pack = new ServerPacket(0);
pack->size = 1;
if (reason)
pack->size += strlen(reason) + 1;
pack->pBuffer = new uchar[pack->size];
memset(pack->pBuffer, 0, pack->size);
pack->pBuffer[0] = 255;
strcpy((char*)&pack->pBuffer[1], reason);
SendPacket(pack);
delete pack;
}
// Removes relay connection and optionally notifies remote endpoint
void TCPConnection::RemoveRelay(TCPConnection* relay, bool iSendRelayDisconnect)
{
if (iSendRelayDisconnect) {
ServerPacket* pack = new ServerPacket(0, 5);
pack->pBuffer[0] = 3;
*((int32*)&pack->pBuffer[1]) = relay->GetRemoteID();
SendPacket(pack);
delete pack;
}
RelayCount--;
}
// Sends queued data over socket with congestion handling
bool TCPConnection::SendData(char* errbuf)
{
if (errbuf)
errbuf[0] = 0;
uchar* data = 0;
sint32 size = 0;
int status = 0;
if (ServerSendQueuePop(&data, &size)) {
status = send(connection_socket, data, size, MSG_NOSIGNAL);
if (errno == EPIPE) status = -1;
if (status >= 1) {
if (keepalive_timer)
keepalive_timer->Start();
if (status < (signed)size) {
// Network congestion - push remaining data back to queue
ServerSendQueuePushFront(&data[status], size - status);
}
else if (status > (signed)size) {
ThrowError("TCPConnection::SendData(): WTF! status > size");
return false;
}
}
else {
ServerSendQueuePushFront(data, size);
}
delete[] data;
if (status == -1) {
if (errno != EWOULDBLOCK) {
if (errbuf) {
snprintf(errbuf, TCPConnection_ErrorBufferSize, "TCPConnection::SendData(): send(): Errorcode: %s", strerror(errno));
}
return false;
}
}
}
// Send keepalive packet if timer expired
if (TCPMode == modePacket && keepalive_timer && keepalive_timer->Check()) {
ServerPacket* pack = new ServerPacket(0, 0);
SendPacket(pack);
delete pack;
}
return true;
}
// Main connection processing loop for outgoing connections
void* TCPConnectionLoop(void* tmp)
{
if (tmp == 0) {
ThrowError("TCPConnectionLoop(): tmp = 0!");
return nullptr;
}
TCPConnection* tcpc = (TCPConnection*)tmp;
std::lock_guard<std::mutex> lock(tcpc->MLoopRunning);
while (tcpc->RunLoop()) {
std::this_thread::sleep_for(std::chrono::milliseconds(LOOP_GRANULARITY));
if (tcpc->GetState() != TCPS_Ready) {
if (!tcpc->Process()) {
tcpc->Disconnect();
}
}
else if (tcpc->GetAsyncConnect()) {
if (tcpc->charAsyncConnect)
tcpc->Connect(tcpc->charAsyncConnect, tcpc->GetrPort());
else
tcpc->Connect(tcpc->GetrIP(), tcpc->GetrPort());
tcpc->SetAsyncConnect(false);
}
else
std::this_thread::sleep_for(std::chrono::milliseconds(10));
}
return nullptr;
}
// Thread-safe accessor for main loop control
bool TCPConnection::RunLoop()
{
std::lock_guard<std::mutex> lock(MRunLoop);
return pRunLoop;
}
//=============================================================================
// TCPServer Implementation
//=============================================================================
// Constructor that initializes server and starts listening thread
TCPServer::TCPServer(int16 in_port, bool iOldFormat)
{
NextID = 1;
pPort = in_port;
sock = 0;
pOldFormat = iOldFormat;
list = new LinkedList<TCPConnection*>;
pRunLoop = true;
std::thread thread(TCPServerLoop, this);
thread.detach();
}
// Destructor that stops server loop and cleans up connections
TCPServer::~TCPServer()
{
std::lock_guard<std::mutex> lock1(MRunLoop);
pRunLoop = false;
while (NewQueue.pop()); // Clear queue without deleting objects
delete list;
}
// Thread-safe accessor for server loop control
bool TCPServer::RunLoop()
{
std::lock_guard<std::mutex> lock(MRunLoop);
return pRunLoop;
}
// Main server processing loop that handles connections and I/O
void* TCPServerLoop(void* tmp)
{
if (tmp == 0) {
ThrowError("TCPServerLoop(): tmp = 0!");
return nullptr;
}
TCPServer* tcps = (TCPServer*)tmp;
std::lock_guard<std::mutex> lock(tcps->MLoopRunning);
while (tcps->RunLoop()) {
std::this_thread::sleep_for(std::chrono::milliseconds(SERVER_LOOP_GRANULARITY));
tcps->Process();
}
return nullptr;
}
// Processes server operations including new connections and client I/O
void TCPServer::Process()
{
CheckInQueue();
ListenNewConnections();
LinkedListIterator<TCPConnection*> iterator(*list);
iterator.Reset();
while (iterator.MoreElements()) {
if (iterator.GetData()->IsFree() && (!iterator.GetData()->CheckNetActive())) {
iterator.RemoveCurrent();
}
else {
if (!iterator.GetData()->Process())
iterator.GetData()->Disconnect();
iterator.Advance();
}
}
}
// Accepts new incoming connections and creates TCPConnection objects
void TCPServer::ListenNewConnections()
{
int tmpsock;
struct sockaddr_in from;
struct in_addr in;
unsigned int fromlen;
TCPConnection* con;
from.sin_family = AF_INET;
fromlen = sizeof(from);
std::lock_guard<std::mutex> lock(MSock);
if (!sock)
return;
// Accept pending connections
while ((tmpsock = accept(sock, (struct sockaddr*)&from, &fromlen)) != -1) {
fcntl(tmpsock, F_SETFL, O_NONBLOCK);
int bufsize = 64 * 1024;
setsockopt(tmpsock, SOL_SOCKET, SO_RCVBUF, (char*)&bufsize, sizeof(bufsize));
in.s_addr = from.sin_addr.s_addr;
con = new TCPConnection(this, tmpsock, in.s_addr, ntohs(from.sin_port), pOldFormat);
AddConnection(con);
}
}
// Opens server socket for listening on specified port
bool TCPServer::Open(int16 in_port, char* errbuf)
{
if (errbuf)
errbuf[0] = 0;
std::lock_guard<std::mutex> lock(MSock);
if (sock != 0) {
if (errbuf)
snprintf(errbuf, TCPConnection_ErrorBufferSize, "Listening socket already open");
return false;
}
if (in_port != 0) {
pPort = in_port;
}
struct sockaddr_in address;
int reuse_addr = 1;
memset((char*)&address, 0, sizeof(address));
address.sin_family = AF_INET;
address.sin_port = htons(pPort);
address.sin_addr.s_addr = htonl(INADDR_ANY);
sock = socket(AF_INET, SOCK_STREAM, 0);
if (sock == -1) {
if (errbuf)
snprintf(errbuf, TCPConnection_ErrorBufferSize, "socket(): INVALID_SOCKET");
return false;
}
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, (char*)&reuse_addr, sizeof(reuse_addr));
if (::bind(sock, (struct sockaddr*)&address, sizeof(address)) < 0) {
close(sock);
sock = 0;
if (errbuf)
sprintf(errbuf, "bind(): <0");
return false;
}
int bufsize = 64 * 1024;
setsockopt(sock, SOL_SOCKET, SO_RCVBUF, (char*)&bufsize, sizeof(bufsize));
fcntl(sock, F_SETFL, O_NONBLOCK);
if (listen(sock, SOMAXCONN) == -1) {
close(sock);
if (errbuf)
snprintf(errbuf, TCPConnection_ErrorBufferSize, "listen() failed, Error: %s", strerror(errno));
sock = 0;
return false;
}
return true;
}
// Closes server listening socket
void TCPServer::Close()
{
std::lock_guard<std::mutex> lock(MSock);
if (sock) {
close(sock);
}
sock = 0;
}
// Checks if server socket is currently open
bool TCPServer::IsOpen()
{
std::lock_guard<std::mutex> lock(MSock);
return (bool)(sock != 0);
}
// Retrieves new connection from queue for processing
TCPConnection* TCPServer::NewQueuePop()
{
std::lock_guard<std::mutex> lock(MNewQueue);
return NewQueue.pop();
}
// Adds new connection to server's managed connection list
void TCPServer::AddConnection(TCPConnection* con)
{
list->Append(con);
std::lock_guard<std::mutex> lock(MNewQueue);
NewQueue.push(con);
}
// Finds connection by unique identifier
TCPConnection* TCPServer::GetConnection(int32 iID)
{
LinkedListIterator<TCPConnection*> iterator(*list);
iterator.Reset();
while (iterator.MoreElements()) {
if (iterator.GetData()->GetID() == iID)
return iterator.GetData();
iterator.Advance();
}
return 0;
}
// Broadcasts ServerPacket to all connected clients
void TCPServer::SendPacket(ServerPacket* pack)
{
TCPConnection::TCPNetPacket_Struct* tnps = TCPConnection::MakePacket(pack);
SendPacket(&tnps);
}
// Broadcasts pre-formatted packet to all connected clients
void TCPServer::SendPacket(TCPConnection::TCPNetPacket_Struct** tnps)
{
std::lock_guard<std::mutex> lock(MInQueue);
InQueue.push(*tnps);
*tnps = 0;
}
// Processes broadcast packet queue and sends to all clients
void TCPServer::CheckInQueue()
{
LinkedListIterator<TCPConnection*> iterator(*list);
TCPConnection::TCPNetPacket_Struct* tnps = 0;
while ((tnps = InQueuePop())) {
iterator.Reset();
while (iterator.MoreElements()) {
if (iterator.GetData()->GetMode() != modeConsole && iterator.GetData()->GetRemoteID() == 0)
iterator.GetData()->SendPacket(tnps);
iterator.Advance();
}
delete tnps;
}
}
// Retrieves packet from broadcast queue
TCPConnection::TCPNetPacket_Struct* TCPServer::InQueuePop()
{
std::lock_guard<std::mutex> lock(MInQueue);
return InQueue.pop();
}