# EQStream - Complete Technical Documentation ## Table of Contents 1. [Overview](#overview) 2. [Core Components](#core-components) 3. [Protocol Architecture](#protocol-architecture) 4. [Packet Types and Structures](#packet-types-and-structures) 5. [Encryption System](#encryption-system) 6. [Compression](#compression) 7. [Packet Sequencing and Reliability](#packet-sequencing-and-reliability) 8. [Fragmentation and Reassembly](#fragmentation-and-reassembly) 9. [Connection Management](#connection-management) 10. [Flow Control](#flow-control) 11. [Implementation Details](#implementation-details) 12. [Utilities and Helper Functions](#utilities-and-helper-functions) ## Overview EQStream is a custom network protocol implementation that sits on top of TCP/UDP, providing: - **Reliable delivery** with acknowledgments and retransmission - **Ordered delivery** with sequence numbers - **Encryption** using RC4 stream cipher - **Compression** using zlib - **Fragmentation** for large packets - **Flow control** to prevent congestion - **Session management** with keepalives ### Design Goals - Low latency for real-time gameplay - Reliable delivery of critical packets - Efficient bandwidth usage - Security through encryption - Compatibility with EverQuest II client ## Core Components ### EQStream Class The main class that manages a single connection: ```cpp class EQStream { private: // Connection state enum State { CLOSED, // No connection WAIT_CONNECT, // Awaiting session response CONNECTED, // Active connection DISCONNECTING, // Graceful shutdown DISCONNECT // Connection terminated }; // Core components RC4_KEY* encrypt_key; // Encryption key RC4_KEY* decrypt_key; // Decryption key z_stream* stream_compress; // Compression stream z_stream* stream_decompress; // Decompression stream // Sequence management uint16_t next_in_sequence; // Expected incoming sequence uint16_t next_out_sequence; // Next outgoing sequence uint32_t session_id; // Unique session identifier uint32_t key; // RC4 key seed // Buffers and queues std::queue send_queue; // Outgoing packets std::queue receive_queue; // Incoming packets std::map future_packets; // Out-of-order std::map fragment_buffer; // Fragmented packets // Configuration uint32_t max_packet_size; // Maximum packet size uint8_t crc_length; // CRC bytes (usually 2) bool compressed_mode; // Compression enabled bool encoded_mode; // Encryption enabled }; ``` ### EQStreamFactory Class Factory pattern for creating and managing multiple streams: ```cpp class EQStreamFactory { private: std::map stream_map; // Active streams std::queue new_streams; // Pending connections int listen_socket; // Listening socket uint16_t port; // Server port public: bool Open(uint16_t port); // Start listening EQStream* Pop(); // Get new connection void CheckTimeout(); // Timeout inactive streams void Close(); // Shutdown factory }; ``` ## Protocol Architecture ### Layer Model ``` ┌─────────────────────────────────────┐ │ Application Data │ ├─────────────────────────────────────┤ │ EQApplicationPacket │ Game packets ├─────────────────────────────────────┤ │ EQProtocolPacket │ Protocol control ├─────────────────────────────────────┤ │ Compression (optional zlib) │ Data compression ├─────────────────────────────────────┤ │ Encryption (RC4 cipher) │ Stream encryption ├─────────────────────────────────────┤ │ CRC32 Checksum │ Integrity check ├─────────────────────────────────────┤ │ UDP Transport │ Unreliable datagrams └─────────────────────────────────────┘ ``` ### Protocol Flow ``` Client Server │ │ ├──────── SessionRequest ──────────►│ │ (unencrypted) │ │ │ │◄──────── SessionResponse ─────────┤ │ (contains RC4 key) │ │ │ ├─ [All subsequent packets encrypted]─┤ │ │ ├──────── OP_Packet(data) ─────────►│ │ │ │◄────────── OP_Ack ───────────────┤ │ │ ``` ## Packet Types and Structures ### Protocol Control Packets ```cpp enum EQStreamOp : uint16_t { // Session Management OP_SessionRequest = 0x0001, // Client initiates connection OP_SessionResponse = 0x0002, // Server accepts connection OP_SessionDisconnect = 0x0005, // Graceful disconnect // Connection Maintenance OP_KeepAlive = 0x0006, // Heartbeat packet OP_SessionStatRequest = 0x0007, // Request statistics OP_SessionStatResponse = 0x0008, // Statistics response // Data Transfer OP_Packet = 0x0009, // Application data OP_Combined = 0x0003, // Multiple packets bundled OP_Fragment = 0x000d, // Part of fragmented packet OP_OutOfOrderAck = 0x0011, // Acknowledge out-of-order // Acknowledgments OP_Ack = 0x0015, // Standard acknowledgment OP_AckFuture = 0x0016, // Acknowledge future packet OP_AckPast = 0x0017, // Acknowledge past packet // Error Handling OP_Error = 0x0019, // Error notification OP_Reset = 0x001d // Reset connection }; ``` ### Packet Structure Definitions #### Base Protocol Packet ```cpp struct EQProtocolPacket { uint16_t opcode; // Operation code uint8_t data[0]; // Variable length data // Methods uint32_t Size() const; // Total packet size void SetOpcode(uint16_t op); uint16_t GetOpcode() const; }; ``` #### Session Request ```cpp struct SessionRequest { uint16_t opcode; // OP_SessionRequest (0x0001) uint32_t unknown; // Protocol version (usually 0x0003) uint32_t session_id; // Client's session ID uint32_t max_length; // Maximum packet size client can handle }; ``` #### Session Response ```cpp struct SessionResponse { uint16_t opcode; // OP_SessionResponse (0x0002) uint32_t session_id; // Server's session ID uint32_t key; // RC4 encryption key seed uint8_t crc_length; // CRC bytes (2 for CRC16, 4 for CRC32) uint8_t compressed_mode; // 0x01 if compression enabled uint8_t encoded_mode; // 0x01 if encryption enabled uint32_t unknown; // Reserved/padding uint32_t max_length; // Maximum packet size server can handle }; ``` #### Standard Data Packet ```cpp struct DataPacket { uint16_t opcode; // OP_Packet (0x0009) uint16_t sequence; // Sequence number uint8_t app_data[0]; // Application layer data }; ``` #### Combined Packet ```cpp struct CombinedPacket { uint16_t opcode; // OP_Combined (0x0003) uint8_t count; // Number of sub-packets struct SubPacket { uint8_t size; // Size of this sub-packet uint8_t data[0]; // Sub-packet data } packets[0]; }; ``` #### Fragment Packet ```cpp struct FragmentPacket { uint16_t opcode; // OP_Fragment (0x000d) uint16_t sequence; // Sequence number uint32_t total_size; // Total size when reassembled uint16_t fragment_number; // Fragment index (0-based) uint16_t fragment_count; // Total number of fragments uint8_t data[0]; // Fragment data }; ``` #### Acknowledgment Packet ```cpp struct AckPacket { uint16_t opcode; // OP_Ack (0x0015) uint16_t sequence; // Sequence being acknowledged }; ``` ## Encryption System ### RC4 Implementation #### Key Initialization ```cpp void EQStream::SetupEncryption(uint32_t key_seed) { // Generate key bytes from seed uint8_t key_bytes[4]; memcpy(key_bytes, &key_seed, 4); // Initialize encryption key encrypt_key = new RC4_KEY; RC4_set_key(encrypt_key, 4, key_bytes); // Initialize decryption key (separate state) decrypt_key = new RC4_KEY; RC4_set_key(decrypt_key, 4, key_bytes); encoded_mode = true; } ``` #### Packet Encryption ```cpp void EQStream::EncryptPacket(EQProtocolPacket* packet) { if (!encoded_mode || !encrypt_key) return; // Get packet data (skip opcode) uint8_t* data = packet->data; uint32_t length = packet->Size() - 2; // Exclude opcode // Apply RC4 encryption RC4(encrypt_key, length, data, data); } ``` #### Packet Decryption ```cpp void EQStream::DecryptPacket(uint8_t* data, uint32_t length) { if (!encoded_mode || !decrypt_key) return; // Skip first 2 bytes (opcode is not encrypted) if (length > 2) { RC4(decrypt_key, length - 2, data + 2, data + 2); } } ``` ### Security Considerations 1. **Key Exchange**: Key is transmitted in clear during session setup 2. **Stream Cipher**: RC4 maintains state, packets must be processed in order 3. **No IV**: Each session uses the same initial RC4 state 4. **CRC Protection**: CRC is calculated on encrypted data ## Compression ### zlib Integration #### Compression Setup ```cpp void EQStream::SetupCompression() { // Initialize compression stream stream_compress = new z_stream; memset(stream_compress, 0, sizeof(z_stream)); deflateInit(stream_compress, Z_DEFAULT_COMPRESSION); // Initialize decompression stream stream_decompress = new z_stream; memset(stream_decompress, 0, sizeof(z_stream)); inflateInit(stream_decompress); compressed_mode = true; } ``` #### Packet Compression ```cpp EQProtocolPacket* EQStream::CompressPacket(EQProtocolPacket* packet) { if (!compressed_mode || packet->Size() < COMPRESS_THRESHOLD) { return packet; } // Prepare compression buffer uint32_t source_size = packet->Size(); uint32_t dest_size = source_size + 128; // Extra space uint8_t* dest_buffer = new uint8_t[dest_size]; // Setup zlib stream stream_compress->next_in = (Bytef*)packet->data; stream_compress->avail_in = source_size; stream_compress->next_out = dest_buffer; stream_compress->avail_out = dest_size; // Compress int result = deflate(stream_compress, Z_SYNC_FLUSH); if (result == Z_OK) { uint32_t compressed_size = dest_size - stream_compress->avail_out; // Create compressed packet EQProtocolPacket* compressed = new EQProtocolPacket( OP_Compressed, dest_buffer, compressed_size); compressed->SetFlag(FLAG_COMPRESSED); delete[] dest_buffer; delete packet; return compressed; } // Compression failed, return original delete[] dest_buffer; return packet; } ``` #### Packet Decompression ```cpp bool EQStream::DecompressPacket(uint8_t* data, uint32_t length, uint8_t* dest, uint32_t dest_length) { if (!compressed_mode) return false; // Setup decompression stream_decompress->next_in = data; stream_decompress->avail_in = length; stream_decompress->next_out = dest; stream_decompress->avail_out = dest_length; // Decompress int result = inflate(stream_decompress, Z_SYNC_FLUSH); return (result == Z_OK || result == Z_STREAM_END); } ``` ## Packet Sequencing and Reliability ### Sequence Number Management ```cpp class SequenceManager { private: uint16_t next_out_sequence; // Next sequence to send uint16_t next_in_sequence; // Expected incoming sequence uint16_t last_acked_sequence; // Last acknowledged by peer // Retransmission queue struct RetransmitEntry { uint16_t sequence; EQProtocolPacket* packet; uint32_t timestamp; uint32_t retransmit_count; }; std::map retransmit_queue; public: uint16_t GetNextSequence() { return next_out_sequence++; } bool IsValidSequence(uint16_t seq) { // Handle sequence wrap-around int16_t diff = seq - next_in_sequence; return (diff >= 0 && diff < MAX_SEQUENCE_DIFF); } void AcknowledgeSequence(uint16_t seq) { // Remove from retransmit queue auto it = retransmit_queue.find(seq); if (it != retransmit_queue.end()) { delete it->second.packet; retransmit_queue.erase(it); } // Update last acknowledged if (CompareSequence(seq, last_acked_sequence) > 0) { last_acked_sequence = seq; } } }; ``` ### Retransmission Logic ```cpp void EQStream::CheckRetransmit() { uint32_t current_time = Timer::GetCurrentTime(); for (auto& [seq, entry] : retransmit_queue) { uint32_t elapsed = current_time - entry.timestamp; // Retransmit if timeout exceeded if (elapsed > RETRANSMIT_TIMEOUT) { if (entry.retransmit_count < MAX_RETRANSMIT) { // Resend packet SendPacketInternal(entry.packet->Copy()); entry.timestamp = current_time; entry.retransmit_count++; LogWrite(NET__DEBUG, "Retransmitting seq %d (attempt %d)", seq, entry.retransmit_count); } else { // Max retransmits exceeded, connection lost LogWrite(NET__ERROR, "Max retransmits for seq %d", seq); Disconnect(); } } } } ``` ### Out-of-Order Handling ```cpp void EQStream::ProcessOutOfOrder(uint16_t sequence, EQProtocolPacket* packet) { // Check if this is a future packet if (CompareSequence(sequence, next_in_sequence) > 0) { // Store for later processing future_packets[sequence] = packet; // Send future acknowledgment SendAckFuture(sequence); LogWrite(NET__DEBUG, "Stored future packet seq %d", sequence); } // Check if this is a duplicate else if (CompareSequence(sequence, next_in_sequence) < 0) { // Already processed, send past acknowledgment SendAckPast(sequence); delete packet; LogWrite(NET__DEBUG, "Duplicate packet seq %d", sequence); } } void EQStream::ProcessFuturePackets() { // Check if any future packets can now be processed while (!future_packets.empty()) { auto it = future_packets.find(next_in_sequence); if (it != future_packets.end()) { // Process this packet ProcessPacket(it->second); future_packets.erase(it); next_in_sequence++; } else { break; // Gap in sequence } } } ``` ## Fragmentation and Reassembly ### Fragment Management ```cpp class FragmentManager { private: struct FragmentSet { uint32_t total_size; uint16_t fragment_count; uint16_t received_count; std::map fragments; uint32_t timestamp; }; std::map fragment_sets; public: void AddFragment(uint16_t sequence, uint16_t fragment_num, uint16_t fragment_count, uint32_t total_size, uint8_t* data, uint32_t data_length); EQProtocolPacket* CheckComplete(uint16_t sequence); void CleanupOldFragments(uint32_t timeout); }; ``` ### Fragmentation Process ```cpp std::vector EQStream::FragmentPacket( EQProtocolPacket* packet, uint16_t max_fragment_size) { std::vector fragments; uint32_t total_size = packet->Size(); uint16_t fragment_count = (total_size + max_fragment_size - 1) / max_fragment_size; uint8_t* data_ptr = packet->data; uint32_t remaining = total_size; for (uint16_t i = 0; i < fragment_count; i++) { uint32_t fragment_size = std::min(remaining, (uint32_t)max_fragment_size); // Create fragment header FragmentHeader header; header.opcode = OP_Fragment; header.sequence = GetNextSequence(); header.total_size = total_size; header.fragment_number = i; header.fragment_count = fragment_count; // Create fragment packet uint32_t packet_size = sizeof(FragmentHeader) + fragment_size; EQProtocolPacket* fragment = new EQProtocolPacket( OP_Fragment, nullptr, packet_size); // Copy header and data memcpy(fragment->data, &header, sizeof(FragmentHeader)); memcpy(fragment->data + sizeof(FragmentHeader), data_ptr, fragment_size); fragments.push_back(fragment); data_ptr += fragment_size; remaining -= fragment_size; } delete packet; // Original no longer needed return fragments; } ``` ### Reassembly Process ```cpp EQProtocolPacket* FragmentManager::Reassemble(uint16_t sequence) { auto it = fragment_sets.find(sequence); if (it == fragment_sets.end()) { return nullptr; } FragmentSet& set = it->second; // Check if all fragments received if (set.received_count != set.fragment_count) { return nullptr; } // Allocate buffer for complete packet uint8_t* complete_data = new uint8_t[set.total_size]; uint32_t offset = 0; // Assemble fragments in order for (uint16_t i = 0; i < set.fragment_count; i++) { auto frag_it = set.fragments.find(i); if (frag_it == set.fragments.end()) { // Missing fragment - shouldn't happen delete[] complete_data; return nullptr; } // Copy fragment data uint32_t frag_size = (i == set.fragment_count - 1) ? (set.total_size - offset) : MAX_FRAGMENT_SIZE; memcpy(complete_data + offset, frag_it->second, frag_size); offset += frag_size; // Clean up fragment delete[] frag_it->second; } // Create reassembled packet EQProtocolPacket* packet = new EQProtocolPacket( OP_Packet, complete_data, set.total_size); // Clean up fragment set fragment_sets.erase(it); delete[] complete_data; return packet; } ``` ## Connection Management ### State Machine ```cpp class ConnectionStateMachine { public: enum State { CLOSED, // No connection CONNECTING, // Session request sent CONNECTED, // Active connection DISCONNECTING, // Graceful shutdown initiated ERROR // Error state }; private: State current_state; uint32_t state_timestamp; // State transition table struct Transition { State from; Event event; State to; std::function action; }; std::vector transitions = { {CLOSED, EVENT_CONNECT, CONNECTING, [this]() { SendSessionRequest(); }}, {CONNECTING, EVENT_SESSION_RESPONSE, CONNECTED, [this]() { OnConnected(); }}, {CONNECTED, EVENT_DISCONNECT, DISCONNECTING, [this]() { SendDisconnect(); }}, {DISCONNECTING, EVENT_DISCONNECT_ACK, CLOSED, [this]() { OnClosed(); }}, // ... more transitions }; public: void ProcessEvent(Event event) { for (const auto& trans : transitions) { if (trans.from == current_state && trans.event == event) { current_state = trans.to; state_timestamp = Timer::GetCurrentTime(); if (trans.action) trans.action(); break; } } } }; ``` ### Keep-Alive Mechanism ```cpp class KeepAliveManager { private: uint32_t keepalive_interval; // Milliseconds between keepalives uint32_t last_sent; // Last keepalive sent uint32_t last_received; // Last packet received uint32_t timeout_threshold; // Connection timeout public: void Process() { uint32_t current_time = Timer::GetCurrentTime(); // Check if we need to send keepalive if (current_time - last_sent > keepalive_interval) { SendKeepAlive(); last_sent = current_time; } // Check for connection timeout if (current_time - last_received > timeout_threshold) { LogWrite(NET__ERROR, "Connection timeout - no response"); Disconnect(); } } void OnPacketReceived() { last_received = Timer::GetCurrentTime(); } }; ``` ## Flow Control ### Congestion Window ```cpp class CongestionControl { private: uint32_t cwnd; // Congestion window size uint32_t ssthresh; // Slow start threshold uint32_t outstanding_bytes; // Bytes in flight uint32_t rtt_estimate; // Round trip time estimate enum State { SLOW_START, CONGESTION_AVOIDANCE, FAST_RECOVERY } state; public: bool CanSend(uint32_t packet_size) { return outstanding_bytes + packet_size <= cwnd; } void OnAck(uint32_t acked_bytes) { outstanding_bytes -= acked_bytes; switch (state) { case SLOW_START: cwnd += acked_bytes; if (cwnd >= ssthresh) { state = CONGESTION_AVOIDANCE; } break; case CONGESTION_AVOIDANCE: cwnd += (acked_bytes * acked_bytes) / cwnd; break; case FAST_RECOVERY: cwnd = ssthresh; state = CONGESTION_AVOIDANCE; break; } } void OnLoss() { ssthresh = cwnd / 2; cwnd = 1; state = SLOW_START; } }; ``` ### Rate Limiting ```cpp class RateLimiter { private: uint32_t max_bytes_per_second; uint32_t bytes_sent_this_second; uint32_t current_second; // Token bucket algorithm double tokens; double max_tokens; double tokens_per_ms; uint32_t last_update; public: bool AllowSend(uint32_t bytes) { UpdateTokens(); if (tokens >= bytes) { tokens -= bytes; return true; } return false; } void UpdateTokens() { uint32_t current_time = Timer::GetCurrentTime(); uint32_t elapsed = current_time - last_update; tokens = std::min(max_tokens, tokens + (elapsed * tokens_per_ms)); last_update = current_time; } }; ``` ## Implementation Details ### Packet Processing Pipeline ```cpp void EQStream::ProcessIncomingData(uint8_t* data, uint32_t length) { // Stage 1: CRC Verification if (!VerifyCRC(data, length)) { LogWrite(NET__ERROR, "CRC check failed"); return; } // Stage 2: Decryption if (encoded_mode) { DecryptPacket(data, length); } // Stage 3: Decompression if (IsCompressed(data)) { data = DecompressData(data, length); } // Stage 4: Protocol Processing uint16_t opcode = *(uint16_t*)data; switch (opcode) { case OP_SessionRequest: HandleSessionRequest(data, length); break; case OP_SessionResponse: HandleSessionResponse(data, length); break; case OP_Packet: HandleDataPacket(data, length); break; case OP_Fragment: HandleFragment(data, length); break; case OP_Ack: HandleAck(data, length); break; case OP_Combined: HandleCombined(data, length); break; case OP_KeepAlive: HandleKeepAlive(); break; default: LogWrite(NET__ERROR, "Unknown opcode 0x%04x", opcode); } } ``` ### Outgoing Packet Pipeline ```cpp void EQStream::SendPacket(EQApplicationPacket* app) { // Stage 1: Convert to protocol packet EQProtocolPacket* packet = WrapPacket(app); // Stage 2: Check fragmentation if (packet->Size() > max_packet_size) { auto fragments = FragmentPacket(packet, max_packet_size); for (auto frag : fragments) { SendPacketInternal(frag); } return; } // Stage 3: Compression if (compressed_mode && packet->Size() > COMPRESS_THRESHOLD) { packet = CompressPacket(packet); } // Stage 4: Add to send queue SendPacketInternal(packet); } void EQStream::SendPacketInternal(EQProtocolPacket* packet) { // Add sequence number packet->sequence = GetNextSequence(); // Store for retransmission StoreForRetransmit(packet); // Apply encryption if (encoded_mode) { EncryptPacket(packet); } // Add CRC AddCRC(packet); // Send over network SendRaw(packet->GetBuffer(), packet->GetSize()); } ``` ## Utilities and Helper Functions ### CRC Calculation ```cpp class CRCUtil { private: static uint32_t crc_table[256]; static bool table_initialized; static void InitTable() { for (uint32_t i = 0; i < 256; i++) { uint32_t c = i; for (int j = 0; j < 8; j++) { c = (c & 1) ? (0xEDB88320 ^ (c >> 1)) : (c >> 1); } crc_table[i] = c; } table_initialized = true; } public: static uint32_t Calculate(const uint8_t* data, uint32_t length) { if (!table_initialized) InitTable(); uint32_t crc = 0xFFFFFFFF; for (uint32_t i = 0; i < length; i++) { crc = crc_table[(crc ^ data[i]) & 0xFF] ^ (crc >> 8); } return crc ^ 0xFFFFFFFF; } static bool Verify(const uint8_t* data, uint32_t length, uint32_t expected_crc) { return Calculate(data, length - 4) == expected_crc; } }; ``` ### Packet Combining ```cpp class PacketCombiner { private: std::vector pending_packets; uint32_t combined_size; uint32_t max_combined_size; public: void AddPacket(EQProtocolPacket* packet) { if (combined_size + packet->Size() > max_combined_size) { Flush(); } pending_packets.push_back(packet); combined_size += packet->Size(); } EQProtocolPacket* Flush() { if (pending_packets.empty()) return nullptr; if (pending_packets.size() == 1) { auto packet = pending_packets[0]; pending_packets.clear(); combined_size = 0; return packet; } // Create combined packet uint32_t total_size = sizeof(uint16_t) + sizeof(uint8_t); for (auto p : pending_packets) { total_size += sizeof(uint8_t) + p->Size(); } EQProtocolPacket* combined = new EQProtocolPacket( OP_Combined, nullptr, total_size); uint8_t* ptr = combined->data; *ptr++ = pending_packets.size(); // Packet count for (auto p : pending_packets) { *ptr++ = p->Size(); // Sub-packet size memcpy(ptr, p->GetBuffer(), p->Size()); ptr += p->Size(); delete p; } pending_packets.clear(); combined_size = 0; return combined; } }; ``` ### Sequence Number Utilities ```cpp class SequenceUtil { public: // Compare sequence numbers with wrap-around static int CompareSequence(uint16_t a, uint16_t b) { int16_t diff = a - b; if (diff > 0x7FFF) { return -1; // a is before b (wrapped) } else if (diff < -0x7FFF) { return 1; // a is after b (wrapped) } else { return diff; // Normal comparison } } // Check if sequence is in valid window static bool InWindow(uint16_t seq, uint16_t expected, uint16_t window_size) { int diff = CompareSequence(seq, expected); return diff >= 0 && diff < window_size; } // Calculate distance between sequences static uint16_t Distance(uint16_t from, uint16_t to) { if (to >= from) { return to - from; } else { return (0xFFFF - from) + to + 1; } } }; ``` ### Buffer Management ```cpp class BufferPool { private: struct Buffer { uint8_t* data; uint32_t size; bool in_use; }; std::vector buffers; std::mutex mutex; public: uint8_t* Acquire(uint32_t size) { std::lock_guard lock(mutex); // Find existing buffer for (auto& buf : buffers) { if (!buf.in_use && buf.size >= size) { buf.in_use = true; return buf.data; } } // Allocate new buffer Buffer new_buf; new_buf.data = new uint8_t[size]; new_buf.size = size; new_buf.in_use = true; buffers.push_back(new_buf); return new_buf.data; } void Release(uint8_t* data) { std::lock_guard lock(mutex); for (auto& buf : buffers) { if (buf.data == data) { buf.in_use = false; break; } } } }; ``` ### Statistics Tracking ```cpp class StreamStatistics { public: // Packet counters std::atomic packets_sent{0}; std::atomic packets_received{0}; std::atomic bytes_sent{0}; std::atomic bytes_received{0}; // Error counters std::atomic crc_errors{0}; std::atomic sequence_errors{0}; std::atomic timeout_errors{0}; std::atomic retransmissions{0}; // Performance metrics std::atomic avg_rtt{0}; std::atomic min_rtt{UINT32_MAX}; std::atomic max_rtt{0}; std::atomic packet_loss_rate{0}; // Compression statistics std::atomic bytes_before_compression{0}; std::atomic bytes_after_compression{0}; void UpdateRTT(uint32_t rtt) { // Exponential moving average avg_rtt = (avg_rtt * 7 + rtt) / 8; // Update min/max uint32_t current_min = min_rtt.load(); while (rtt < current_min && !min_rtt.compare_exchange_weak(current_min, rtt)); uint32_t current_max = max_rtt.load(); while (rtt > current_max && !max_rtt.compare_exchange_weak(current_max, rtt)); } double GetCompressionRatio() const { if (bytes_before_compression == 0) return 1.0; return (double)bytes_after_compression / bytes_before_compression; } void DumpStats() const { LogWrite(NET__INFO, "=== Stream Statistics ==="); LogWrite(NET__INFO, "Packets: sent=%llu, received=%llu", packets_sent.load(), packets_received.load()); LogWrite(NET__INFO, "Bytes: sent=%llu, received=%llu", bytes_sent.load(), bytes_received.load()); LogWrite(NET__INFO, "RTT: avg=%u, min=%u, max=%u", avg_rtt.load(), min_rtt.load(), max_rtt.load()); LogWrite(NET__INFO, "Errors: crc=%llu, seq=%llu, timeout=%llu", crc_errors.load(), sequence_errors.load(), timeout_errors.load()); LogWrite(NET__INFO, "Compression ratio: %.2f%%", GetCompressionRatio() * 100); } }; ``` ### Debug Utilities ```cpp class PacketDebugger { public: static void DumpPacket(const EQProtocolPacket* packet) { LogWrite(NET__DEBUG, "Packet: opcode=0x%04x, size=%u, seq=%u", packet->GetOpcode(), packet->Size(), packet->sequence); HexDump(packet->GetBuffer(), packet->Size()); } static void HexDump(const uint8_t* data, uint32_t length) { char line[80]; char ascii[17]; for (uint32_t i = 0; i < length; i++) { if (i % 16 == 0) { if (i > 0) { LogWrite(NET__DEBUG, "%s %s", line, ascii); } sprintf(line, "%04x: ", i); memset(ascii, 0, 17); } sprintf(line + strlen(line), "%02x ", data[i]); ascii[i % 16] = isprint(data[i]) ? data[i] : '.'; } // Print last line if (length % 16 != 0) { for (uint32_t i = length % 16; i < 16; i++) { strcat(line, " "); } } LogWrite(NET__DEBUG, "%s %s", line, ascii); } static void TracePacketFlow(const char* stage, const EQProtocolPacket* packet) { LogWrite(NET__TRACE, "[%s] opcode=0x%04x seq=%u size=%u", stage, packet->GetOpcode(), packet->sequence, packet->Size()); } }; ``` ## Error Recovery ### Connection Reset ```cpp void EQStream::Reset() { LogWrite(NET__INFO, "Resetting connection"); // Clear all queues ClearSendQueue(); ClearReceiveQueue(); ClearRetransmitQueue(); ClearFragmentBuffer(); // Reset sequence numbers next_in_sequence = 0; next_out_sequence = 0; last_acked_sequence = 0; // Reset encryption if (encrypt_key) { delete encrypt_key; encrypt_key = nullptr; } if (decrypt_key) { delete decrypt_key; decrypt_key = nullptr; } // Reset compression if (stream_compress) { deflateEnd(stream_compress); delete stream_compress; stream_compress = nullptr; } if (stream_decompress) { inflateEnd(stream_decompress); delete stream_decompress; stream_decompress = nullptr; } // Send reset packet SendResetPacket(); // Reinitialize connection state = WAIT_CONNECT; SendSessionRequest(); } ``` ## Performance Considerations ### Optimization Strategies 1. **Packet Batching**: Combine multiple small packets 2. **Compression Threshold**: Only compress packets > 100 bytes 3. **Buffer Pooling**: Reuse buffers to reduce allocations 4. **Lock-Free Queues**: Use atomic operations where possible 5. **Zero-Copy**: Minimize data copying in pipeline ### Benchmarking ```cpp class PerformanceBenchmark { struct Metric { std::string name; uint64_t total_time; uint64_t call_count; double Average() const { return call_count ? (double)total_time / call_count : 0; } }; std::map metrics; public: class Timer { PerformanceBenchmark* benchmark; std::string metric_name; uint64_t start_time; public: Timer(PerformanceBenchmark* b, const std::string& name) : benchmark(b), metric_name(name) { start_time = GetHighResTime(); } ~Timer() { uint64_t elapsed = GetHighResTime() - start_time; benchmark->AddSample(metric_name, elapsed); } }; void AddSample(const std::string& name, uint64_t time) { metrics[name].total_time += time; metrics[name].call_count++; metrics[name].name = name; } void Report() { LogWrite(NET__INFO, "=== Performance Report ==="); for (const auto& [name, metric] : metrics) { LogWrite(NET__INFO, "%s: avg=%.2fus, calls=%llu", name.c_str(), metric.Average() / 1000.0, metric.call_count); } } }; ``` ## UDP Implementation Details ### Why UDP Instead of TCP? EverQuest II uses UDP for game client connections for several critical reasons: 1. **Lower Latency**: No TCP handshake or connection establishment overhead 2. **No Head-of-Line Blocking**: Lost packets don't block subsequent packets 3. **Custom Reliability**: Only retransmit what's necessary for gameplay 4. **Better for Real-Time**: Position updates don't need 100% reliability 5. **Multicast Support**: Can efficiently broadcast to multiple clients ### UDP Socket Implementation ```cpp class UDPSocket { private: int sock_fd; struct sockaddr_in bind_addr; public: bool Bind(uint16_t port) { // Create UDP socket sock_fd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); if (sock_fd < 0) return false; // Set socket options int reuse = 1; setsockopt(sock_fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(reuse)); // Increase buffer sizes for high throughput int rcvbuf = 256 * 1024; // 256KB receive buffer int sndbuf = 256 * 1024; // 256KB send buffer setsockopt(sock_fd, SOL_SOCKET, SO_RCVBUF, &rcvbuf, sizeof(rcvbuf)); setsockopt(sock_fd, SOL_SOCKET, SO_SNDBUF, &sndbuf, sizeof(sndbuf)); // Bind to port bind_addr.sin_family = AF_INET; bind_addr.sin_port = htons(port); bind_addr.sin_addr.s_addr = INADDR_ANY; return bind(sock_fd, (struct sockaddr*)&bind_addr, sizeof(bind_addr)) == 0; } ssize_t SendTo(const void* data, size_t len, const sockaddr_in& dest) { return sendto(sock_fd, data, len, 0, (struct sockaddr*)&dest, sizeof(dest)); } ssize_t RecvFrom(void* buffer, size_t len, sockaddr_in& source) { socklen_t addr_len = sizeof(source); return recvfrom(sock_fd, buffer, len, 0, (struct sockaddr*)&source, &addr_len); } }; ``` ### Client Identification Since UDP is connectionless, the server must track clients by their endpoint: ```cpp struct ClientEndpoint { uint32_t ip; uint16_t port; bool operator<(const ClientEndpoint& other) const { return (ip < other.ip) || (ip == other.ip && port < other.port); } }; std::map client_streams; EQStream* GetStreamForPacket(const sockaddr_in& source) { ClientEndpoint endpoint{source.sin_addr.s_addr, ntohs(source.sin_port)}; auto it = client_streams.find(endpoint); if (it != client_streams.end()) { return it->second; } // New client - check if this is a session request // If so, create new stream return nullptr; } ``` ### Reliability Layer Over UDP EQStream implements TCP-like features over UDP: ```cpp class ReliabilityLayer { // Sliding window for flow control uint16_t send_window_start; uint16_t send_window_size; uint16_t recv_window_start; uint16_t recv_window_size; // Retransmission timers struct RetransmitEntry { EQProtocolPacket* packet; uint32_t send_time; uint32_t retransmit_count; uint32_t rto; // Retransmission timeout }; // Round-trip time estimation (Jacobson/Karels algorithm) uint32_t srtt; // Smoothed RTT uint32_t rttvar; // RTT variance uint32_t rto; // Retransmission timeout void UpdateRTT(uint32_t measured_rtt) { if (srtt == 0) { // First measurement srtt = measured_rtt; rttvar = measured_rtt / 2; } else { // Update estimates uint32_t delta = abs((int)measured_rtt - (int)srtt); rttvar = (3 * rttvar + delta) / 4; srtt = (7 * srtt + measured_rtt) / 8; } // Calculate RTO (with minimum and maximum bounds) rto = srtt + 4 * rttvar; rto = std::max(rto, 200u); // Min 200ms rto = std::min(rto, 3000u); // Max 3 seconds } }; ``` ## Conclusion EQStream provides a robust, feature-rich networking layer that handles the complexities of real-time game communication over UDP. Key features include: - **Reliability** through sequence numbers and retransmission (TCP-like over UDP) - **Security** through RC4 encryption - **Efficiency** through compression and packet combining - **Low Latency** by using UDP instead of TCP - **Custom Flow Control** optimized for game traffic - **Scalability** through proper resource management - **Maintainability** through clear separation of concerns The protocol is specifically designed to handle the demands of MMO gameplay over UDP while maintaining compatibility with the EverQuest II client protocol specifications. By implementing reliability at the application layer rather than using TCP, the protocol achieves better performance for real-time gaming scenarios.