diff --git a/source/common/EQStream.cpp b/source/common/EQStream.cpp index fef52ad..f4cda97 100644 --- a/source/common/EQStream.cpp +++ b/source/common/EQStream.cpp @@ -1,44 +1,28 @@ -/* - EQ2Emulator: Everquest II Server Emulator - Copyright (C) 2007 EQ2EMulator Development Team (http://www.eq2emulator.net) - - This file is part of EQ2Emulator. - - EQ2Emulator is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - EQ2Emulator is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with EQ2Emulator. If not, see . -*/ -#ifdef WIN32 -#include - #include -#endif +// EQ2Emulator: Everquest II Server Emulator +// Copyright (C) 2007 EQ2EMulator Development Team +// Licensed under GPL v3 - see #include "debug.h" + +// Standard library headers #include #include #include #include #include #include -#ifdef WIN32 - #include -#else - #include - #include - #include - #include - #include - #include - #include -#endif +#include +#include + +// Unix networking headers +#include +#include +#include +#include +#include +#include +#include + +// Project headers #include "EQPacket.h" #include "EQStream.h" #include "EQStreamFactory.h" @@ -47,165 +31,236 @@ #include "op_codes.h" #include "CRC16.h" #include "packet_dump.h" -#ifdef LOGIN - #include "../LoginServer/login_structs.h" -#endif #include "EQ2_Common_Structs.h" #include "Log.h" +#ifdef LOGIN + #include "../LoginServer/login_structs.h" +#endif + //#define DEBUG_EMBEDDED_PACKETS 1 -uint16 EQStream::MaxWindowSize=2048; -void EQStream::init(bool resetSession) { - if (resetSession) - { +// Static configuration +uint16 EQStream::MaxWindowSize = 2048; + +/** + * Initialize the EQ stream with default values. + * + * @param resetSession - Whether to reset session-specific data + */ +void EQStream::init(bool resetSession) +{ + if (resetSession) { streamactive = false; sessionAttempts = 0; } timeout_delays = 0; + // Initialize usage tracking MInUse.lock(); active_users = 0; MInUse.unlock(); - Session=0; - Key=0; - MaxLen=0; - NextInSeq=0; - NextOutSeq=0; - CombinedAppPacket=NULL; + // Initialize session state + Session = 0; + Key = 0; + MaxLen = 0; + NextInSeq = 0; + NextOutSeq = 0; + CombinedAppPacket = nullptr; + // Initialize acknowledgment tracking MAcks.lock(); MaxAckReceived = -1; NextAckToSend = -1; LastAckSent = -1; MAcks.unlock(); - LastSeqSent=-1; - MaxSends=5; - LastPacket=Timer::GetCurrentTime2(); - oversize_buffer=NULL; - oversize_length=0; - oversize_offset=0; - Factory = NULL; + // Initialize sequence tracking + LastSeqSent = -1; + MaxSends = 5; + LastPacket = Timer::GetCurrentTime2(); + + // Initialize buffers + oversize_buffer = nullptr; + oversize_length = 0; + oversize_offset = 0; + Factory = nullptr; - rogue_buffer=NULL; - roguebuf_offset=0; - roguebuf_size=0; + rogue_buffer = nullptr; + roguebuf_offset = 0; + roguebuf_size = 0; + // Initialize rate limiting MRate.lock(); - RateThreshold=RATEBASE/250; - DecayRate=DECAYBASE/250; + RateThreshold = RATEBASE / 250; + DecayRate = DECAYBASE / 250; MRate.unlock(); - BytesWritten=0; + // Initialize flow control + BytesWritten = 0; SequencedBase = 0; AverageDelta = 500; + // Initialize encryption crypto->setRC4Key(0); + // Initialize retransmission retransmittimer = Timer::GetCurrentTime2(); retransmittimeout = 500 * RETRANSMIT_TIMEOUT_MULT; + // Initialize reconnection reconnectAttempt = 0; + + // Validate queue consistency if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { - LogWrite(PACKET__DEBUG, 9, "Packet", "init Invalid Sequenced queue: BS %u + SQ %u != NOS %u", SequencedBase, SequencedQueue.size(), NextOutSeq); + LogWrite(PACKET__DEBUG, 9, "Packet", + "init Invalid Sequenced queue: BS %u + SQ %u != NOS %u", + SequencedBase, SequencedQueue.size(), NextOutSeq); } } -EQStream::EQStream(sockaddr_in addr){ +/** + * EQStream constructor with socket address. + * Initializes all stream components and sets up compression. + * + * @param addr - Socket address of remote endpoint + */ +EQStream::EQStream(sockaddr_in addr) +{ + // Initialize crypto and timers crypto = new Crypto(); resend_que_timer = new Timer(1000); - combine_timer = new Timer(250); //250 milliseconds + combine_timer = new Timer(250); // 250 milliseconds combine_timer->Start(); resend_que_timer->Start(); - init(); - remote_ip=addr.sin_addr.s_addr; - remote_port=addr.sin_port; - State=CLOSED; - StreamType=UnknownStream; - compressed=true; - encoded=false; - app_opcode_size=2; - #ifdef WIN32 - ZeroMemory(&stream, sizeof(z_stream)); - #else - bzero(&stream, sizeof(z_stream)); - #endif + + // Initialize base stream + init(); + + // Set remote endpoint + remote_ip = addr.sin_addr.s_addr; + remote_port = addr.sin_port; + + // Set initial state + State = CLOSED; + StreamType = UnknownStream; + compressed = true; + encoded = false; + app_opcode_size = 2; + + // Initialize compression stream (Unix only) + bzero(&stream, sizeof(z_stream)); stream.zalloc = (alloc_func)0; stream.zfree = (free_func)0; stream.opaque = (voidpf)0; deflateInit2(&stream, 9, Z_DEFLATED, 13, 9, Z_DEFAULT_STRATEGY); - //deflateInit(&stream, 5); + + // Initialize compression state compressed_offset = 0; client_version = 0; received_packets = 0; sent_packets = 0; #ifdef WRITE_PACKETS - write_packets = 0; + // Initialize packet logging + write_packets = nullptr; char write_packets_filename[64]; - snprintf(write_packets_filename, sizeof(write_packets_filename), "PacketLog%i.log", Timer::GetCurrentTime2()); + snprintf(write_packets_filename, sizeof(write_packets_filename), + "PacketLog%i.log", Timer::GetCurrentTime2()); write_packets = fopen(write_packets_filename, "w+"); #endif } -EQProtocolPacket* EQStream::ProcessEncryptedData(uchar* data, int32 size, int16 opcode){ - //cout << "B4:\n"; - //DumpPacket(data, size); - /*if(size >= 2 && data[0] == 0 && data[1] == 0){ - cout << "Attempting to fix packet!\n"; - //Have to fix bad packet from client or it will screw up encryption :P - size--; - data++; - }*/ - crypto->RC4Decrypt(data,size); +/** + * Process encrypted packet data and extract the embedded packet. + * Decrypts the data and extracts the opcode and payload. + * + * @param data - Encrypted data buffer + * @param size - Size of encrypted data + * @param opcode - Reference to opcode (will be modified) + * @return Newly created EQProtocolPacket with decrypted data + */ +EQProtocolPacket* EQStream::ProcessEncryptedData(uchar* data, int32 size, int16 opcode) +{ + // Decrypt the data using RC4 + crypto->RC4Decrypt(data, size); + int8 offset = 0; - if(data[0] == 0xFF && size > 2){ - offset = 3; - memcpy(&opcode, data+sizeof(int8), sizeof(int16)); - } - else{ + if (data[0] == 0xFF && size > 2) { + // Extended opcode format (2-byte opcode) + offset = 3; + memcpy(&opcode, data + sizeof(int8), sizeof(int16)); + } else { + // Standard opcode format (1-byte opcode) offset = 1; memcpy(&opcode, data, sizeof(int8)); } - //cout << "After:\n"; - //DumpPacket(data, size); - return new EQProtocolPacket(opcode, data+offset, size - offset); + + return new EQProtocolPacket(opcode, data + offset, size - offset); } -EQProtocolPacket* EQStream::ProcessEncryptedPacket(EQProtocolPacket *p){ - EQProtocolPacket* ret = NULL; - if(p->opcode == OP_Packet && p->size > 2) - ret = ProcessEncryptedData(p->pBuffer+2, p->size-2, p->opcode); - else +/** + * Process an encrypted protocol packet. + * Determines the correct offset and calls ProcessEncryptedData. + * + * @param p - Encrypted protocol packet to process + * @return Newly created EQProtocolPacket with decrypted data + */ +EQProtocolPacket* EQStream::ProcessEncryptedPacket(EQProtocolPacket* p) +{ + EQProtocolPacket* ret = nullptr; + + if (p->opcode == OP_Packet && p->size > 2) { + // Skip the first 2 bytes for OP_Packet + ret = ProcessEncryptedData(p->pBuffer + 2, p->size - 2, p->opcode); + } else { + // Process entire buffer for other opcodes ret = ProcessEncryptedData(p->pBuffer, p->size, p->opcode); + } + return ret; } -bool EQStream::ProcessEmbeddedPacket(uchar* pBuffer, int16 length,int8 opcode) { - if(!pBuffer || !crypto->isEncrypted()) +/** + * Process an embedded encrypted packet within another packet. + * Decrypts and queues the embedded packet for processing. + * + * @param pBuffer - Buffer containing embedded packet data + * @param length - Length of embedded packet + * @param opcode - Opcode for the embedded packet + * @return true if packet was successfully processed + */ +bool EQStream::ProcessEmbeddedPacket(uchar* pBuffer, int16 length, int8 opcode) +{ + if (!pBuffer || !crypto->isEncrypted()) { return false; + } + // Process encrypted data with thread safety MCombineQueueLock.lock(); EQProtocolPacket* newpacket = ProcessEncryptedData(pBuffer, length, opcode); MCombineQueueLock.unlock(); if (newpacket) { #ifdef DEBUG_EMBEDDED_PACKETS - printf("Opcode: %u\n", newpacket->opcode); + printf("Embedded Packet - Opcode: %u\n", newpacket->opcode); DumpPacket(newpacket->pBuffer, newpacket->size); #endif + // Convert to application packet and queue EQApplicationPacket* ap = newpacket->MakeApplicationPacket(2); - if (ap->version == 0) + if (ap->version == 0) { ap->version = client_version; + } InboundQueuePush(ap); + #ifdef WRITE_PACKETS WritePackets(ap->GetOpcodeName(), pBuffer, length, false); #endif + safe_delete(newpacket); return true; } @@ -213,300 +268,406 @@ bool EQStream::ProcessEmbeddedPacket(uchar* pBuffer, int16 length,int8 opcode) { return false; } -bool EQStream::HandleEmbeddedPacket(EQProtocolPacket *p, int16 offset, int16 length){ - if(!p) +/** + * Handle embedded packets within a protocol packet. + * Processes different types of embedded packet formats. + * + * @param p - Protocol packet containing embedded data + * @param offset - Offset to start of embedded data + * @param length - Length of embedded data (0 = auto-detect) + * @return true if embedded packet was successfully processed + */ +bool EQStream::HandleEmbeddedPacket(EQProtocolPacket* p, int16 offset, int16 length) +{ + if (!p) { return false; + } #ifdef DEBUG_EMBEDDED_PACKETS - // printf works better with DumpPacket - printf( "Start Packet with offset %u, length %u, p->size %u\n", offset, length, p->size); + printf("HandleEmbeddedPacket - offset: %u, length: %u, size: %u\n", + offset, length, p->size); #endif - if(p->size >= ((uint32)(offset+2))){ - if(p->pBuffer[offset] == 0 && p->pBuffer[offset+1] == 0x19){ - uint32 data_length = 0; - if(length == 0) { - // Ensure there are at least 2 bytes after offset. - if(p->size < offset + 2) { - return false; // Not enough data. - } - data_length = p->size - offset - 2; - } else { - // Ensure provided length is at least 2. - if(length < 2) { - return false; // Provided length too short. - } - data_length = length - 2; + // Ensure we have enough data for the offset + if (p->size < static_cast(offset + 2)) { + return false; + } + + // Check for OP_AppCombined embedded packet (0x00 0x19) + if (p->pBuffer[offset] == 0 && p->pBuffer[offset + 1] == 0x19) { + uint32 data_length = 0; + + if (length == 0) { + // Auto-detect length + data_length = p->size - offset - 2; + } else { + // Use provided length + if (length < 2) { + return false; // Length too short } + data_length = length - 2; + } + #ifdef DEBUG_EMBEDDED_PACKETS - printf( "Creating OP_AppCombined Packet with offset %u, length %u, p->size %u\n", offset, length, p->size); - DumpPacket(p->pBuffer, p->size); + printf("Processing OP_AppCombined - offset: %u, data_length: %u\n", + offset, data_length); + DumpPacket(p->pBuffer, p->size); #endif - // Verify that offset + 2 + data_length does not exceed p->size. - if(offset + 2 + data_length > p->size) { - return false; // Out-of-bounds. - } - EQProtocolPacket *subp = new EQProtocolPacket(OP_AppCombined, p->pBuffer + offset + 2, data_length); - subp->copyInfo(p); - ProcessPacket(subp, p); - safe_delete(subp); + + // Verify bounds + if (offset + 2 + data_length > p->size) { + return false; // Out of bounds + } + + // Create and process sub-packet + auto subp = new EQProtocolPacket(OP_AppCombined, + p->pBuffer + offset + 2, + data_length); + subp->copyInfo(p); + ProcessPacket(subp, p); + safe_delete(subp); + return true; + } + // Check for null opcode embedded packet (0x00 0x00) + else if (p->pBuffer[offset] == 0 && p->pBuffer[offset + 1] == 0) { + if (length == 0) { + length = p->size - 1 - offset; + } else { + length--; + } + +#ifdef DEBUG_EMBEDDED_PACKETS + printf("Processing null opcode embedded packet\n"); + DumpPacket(p->pBuffer + 1 + offset, length); +#endif + + uchar* buffer = (p->pBuffer + 1 + offset); + bool valid = ProcessEmbeddedPacket(buffer, length); + + if (valid) { return true; } - else if (p->pBuffer[offset] == 0 && p->pBuffer[offset + 1] == 0) { - if (length == 0) - length = p->size - 1 - offset; - else - length--; - + } + // Check for general embedded packet (not 0xffffffff) + else if (offset + 4 < p->size && ntohl(*(uint32*)(p->pBuffer + offset)) != 0xffffffff) { #ifdef DEBUG_EMBEDDED_PACKETS - printf( "Creating Opcode 0 Packet!"); - DumpPacket(p->pBuffer + 1 + offset, length); -#endif - uchar* buffer = (p->pBuffer + 1 + offset); - bool valid = ProcessEmbeddedPacket(buffer, length); - - if(valid) - return true; + uint16 seq = NextInSeq - 1; + sint8 check = 0; + + if (offset == 2) { + seq = ntohs(*(uint16*)(p->pBuffer)); + check = CompareSequence(NextInSeq, seq); } - else if(offset+4 < p->size && ntohl(*(uint32 *)(p->pBuffer+offset)) != 0xffffffff) { -#ifdef DEBUG_EMBEDDED_PACKETS - uint16 seq = NextInSeq-1; - sint8 check = 0; - - if(offset == 2) { - seq=ntohs(*(uint16 *)(p->pBuffer)); - check=CompareSequence(NextInSeq,seq); - } - printf( "Unhandled Packet with offset %u, length %u, p->size %u, check: %i, nextinseq: %u, seq: %u\n", offset, length, p->size, check, NextInSeq, seq); - DumpPacket(p->pBuffer, p->size); + printf("Processing general embedded packet - offset: %u, length: %u, size: %u, check: %i, seq: %u\n", + offset, length, p->size, check, seq); + DumpPacket(p->pBuffer, p->size); #endif - if(length == 0) - length = p->size - offset; - - - uchar* buffer = (p->pBuffer + offset); - - bool valid = ProcessEmbeddedPacket(buffer, length); - - if(valid) - return true; + if (length == 0) { + length = p->size - offset; } - else if(p->pBuffer[offset] != 0xff && p->pBuffer[offset+1] == 0xff && p->size >= offset + 3) { - // Read the first byte into a wider type to avoid underflow. - uint16 total_length = p->pBuffer[offset]; // promote to uint16 - // Check that there is enough data: we expect offset+2+total_length == p->size. - if(total_length + offset + 2 == p->size && total_length >= 2) { - uint32 data_length = total_length - 2; - // No additional bounds check needed because equality condition ensures it. - EQProtocolPacket *subp = new EQProtocolPacket(p->pBuffer + offset + 2, data_length, OP_Packet); - subp->copyInfo(p); - ProcessPacket(subp, p); - delete subp; - return true; - } + + uchar* buffer = (p->pBuffer + offset); + bool valid = ProcessEmbeddedPacket(buffer, length); + + if (valid) { + return true; } } + // Check for length-prefixed embedded packet (not 0xff, then 0xff) + else if (p->pBuffer[offset] != 0xff && + p->pBuffer[offset + 1] == 0xff && + p->size >= offset + 3) { + + // Get total length from first byte + uint16 total_length = p->pBuffer[offset]; + + // Verify packet size matches expected length + if (total_length + offset + 2 == p->size && total_length >= 2) { + uint32 data_length = total_length - 2; + + // Create and process sub-packet + auto subp = new EQProtocolPacket(p->pBuffer + offset + 2, + data_length, OP_Packet); + subp->copyInfo(p); + ProcessPacket(subp, p); + delete subp; + return true; + } + } + return false; } -void EQStream::ProcessPacket(EQProtocolPacket *p, EQProtocolPacket* lastp) +/** + * Process a protocol packet and handle various packet types. + * This is the main packet processing dispatcher for the stream. + * + * @param p - Protocol packet to process + * @param lastp - Previous packet in sequence (for context) + */ +void EQStream::ProcessPacket(EQProtocolPacket* p, EQProtocolPacket* lastp) { - uint32 processed=0,subpacket_length=0; + uint32 processed = 0; + uint32 subpacket_length = 0; - if (p) { + if (!p) { + return; + } - if (p->opcode!=OP_SessionRequest && p->opcode!=OP_SessionResponse && !Session) { + // Ensure session is initialized for non-session packets + if (p->opcode != OP_SessionRequest && + p->opcode != OP_SessionResponse && + !Session) { #ifdef EQN_DEBUG - LogWrite(PACKET__ERROR, 0, "Packet", "*** Session not initialized, packet ignored "); - //p->DumpRaw(); + LogWrite(PACKET__ERROR, 0, "Packet", + "Session not initialized, packet ignored"); #endif - return; - } + return; + } - //cout << "Received " << (int)p->opcode << ":\n"; - //DumpPacket(p->pBuffer, p->size); - switch (p->opcode) { - case OP_Combined: { - processed=0; - int8 offset = 0; - int count = 0; + + // Process packet based on opcode + switch (p->opcode) { + case OP_Combined: { + // Handle combined packets containing multiple sub-packets + processed = 0; + int8 offset = 0; + int count = 0; + #ifdef LE_DEBUG - printf( "OP_Combined:\n"); - DumpPacket(p); + printf("Processing OP_Combined packet\n"); + DumpPacket(p); #endif - while(processedsize) { - if ((subpacket_length=(unsigned char)*(p->pBuffer+processed))==0xff) { - subpacket_length = ntohs(*(uint16*)(p->pBuffer + processed + 1)); - //printf("OP_Combined subpacket_length %u\n",subpacket_length); - offset = 3; - } - else { - offset = 1; - } + + // Process all sub-packets within the combined packet + while (processed < p->size) { + // Determine sub-packet length and offset + subpacket_length = static_cast(*(p->pBuffer + processed)); + + if (subpacket_length == 0xff) { + // Extended length format (2-byte length) + subpacket_length = ntohs(*(uint16*)(p->pBuffer + processed + 1)); + offset = 3; + } else { + // Standard format (1-byte length) + offset = 1; + } + + count++; + +#ifdef LE_DEBUG + printf("Processing sub-packet %i: length=%u, offset=%u\n", + count, subpacket_length, processed); +#endif + + // Check if this is a valid protocol packet + bool isSubPacket = EQProtocolPacket::IsProtocolPacket( + p->pBuffer + processed + offset, subpacket_length, false); + + if (isSubPacket) { + // Create and process sub-packet + auto subp = new EQProtocolPacket( + p->pBuffer + processed + offset, subpacket_length); + subp->copyInfo(p); - //printf("OP_Combined processed %u p->size %u subpacket length %u count %i\n",processed, p->size, subpacket_length, count); - count++; #ifdef LE_DEBUG - printf( "OP_Combined Packet %i (%u) (%u):\n", count, subpacket_length, processed); + printf("Sub-packet opcode: %i\n", subp->opcode); + DumpPacket(subp); #endif - bool isSubPacket = EQProtocolPacket::IsProtocolPacket(p->pBuffer + processed + offset, subpacket_length, false); - if (isSubPacket) { - EQProtocolPacket* subp = new EQProtocolPacket(p->pBuffer + processed + offset, subpacket_length); + + ProcessPacket(subp, p); + delete subp; + } + else { + // Handle non-protocol packet data (encrypted) + offset = 1; + + // Check for potential garbage packet + uint16 header_check = ntohs(*reinterpret_cast( + p->pBuffer + processed + offset)); + + if (header_check <= 0x1e) { + // Process as potential OP_Packet + subpacket_length = static_cast( + *(p->pBuffer + processed)); + + LogWrite(PACKET__ERROR, 0, "Packet", + "Processing suspected garbage packet as OP_Packet"); + DumpPacket(p->pBuffer + processed + offset, subpacket_length); + + uchar* newbuf = p->pBuffer + processed + offset; + auto subp = new EQProtocolPacket(newbuf, subpacket_length); subp->copyInfo(p); -#ifdef LE_DEBUG - printf( "Opcode %i:\n", subp->opcode); - DumpPacket(subp); -#endif ProcessPacket(subp, p); -#ifdef LE_DEBUG - DumpPacket(subp); -#endif delete subp; - } - else { - offset = 1; // 0xFF in this case means it is actually 255 bytes of encrypted data after a 00 09 packet - //Garbage packet? - if(ntohs(*reinterpret_cast(p->pBuffer + processed + offset)) <= 0x1e) { - subpacket_length=(unsigned char)*(p->pBuffer+processed); - LogWrite(PACKET__ERROR, 0, "Packet", "!!!!!!!!!Garbage Packet Unknown Process as OP_Packet!!!!!!!!!!!!!\n"); + } else { + // Decrypt the data and process + crypto->RC4Decrypt(p->pBuffer + processed + offset, subpacket_length); + + LogWrite(PACKET__ERROR, 0, "Packet", + "Processing encrypted sub-packet: " + "processed=%u, offset=%u, count=%i, length=%u", + processed, offset, count, subpacket_length); + + if (p->pBuffer[processed + offset] == 0xff) { + // Skip 0xff marker and process data + uchar* newbuf = p->pBuffer + processed + offset + 1; + DumpPacket(p->pBuffer + processed + offset, subpacket_length); - uchar* newbuf = p->pBuffer; - newbuf += processed + offset; - EQProtocolPacket *subp=new EQProtocolPacket(newbuf,subpacket_length); + + auto subp = new EQProtocolPacket( + newbuf, subpacket_length, OP_Packet); subp->copyInfo(p); ProcessPacket(subp, p); delete subp; - } - else { - crypto->RC4Decrypt(p->pBuffer + processed + offset, subpacket_length); - LogWrite(PACKET__ERROR, 0, "Packet", "!!!!!!!!!Garbage Packet!!!!!!!!!!!!! processed: %u, offset: %u, count: %i, subpacket_length: %u, offset_pos_1: %u, oversized_buffer_present: %u, offset size: %u, offset length: %u\n", - processed, offset, count, subpacket_length, p->pBuffer[processed + offset], oversize_buffer ? 1 : 0, oversize_offset, oversize_length); - if(p->pBuffer[processed + offset] == 0xff) - { - uchar* newbuf = p->pBuffer; - newbuf += processed + offset + 1; - - DumpPacket(p->pBuffer + processed + offset, subpacket_length); - EQProtocolPacket *subp=new EQProtocolPacket(newbuf, subpacket_length, OP_Packet); - subp->copyInfo(p); - ProcessPacket(subp, p); - delete subp; - } - else - break; // bad packet + } else { + // Invalid packet format - stop processing + break; } } - processed+=subpacket_length+offset; - } - break; - } - case OP_AppCombined: { - processed=0; - EQProtocolPacket* newpacket = 0; - int8 offset = 0; -#ifdef DEBUG_EMBEDDED_PACKETS - printf( "OP_AppCombined: \n"); - DumpPacket(p); -#endif - int count = 0; - while(processedsize) { - count++; - if ((subpacket_length=(unsigned char)*(p->pBuffer+processed))==0xff) { - subpacket_length=ntohs(*(uint16 *)(p->pBuffer+processed+1)); - offset = 3; - } else - offset = 1; - - if(crypto->getRC4Key()==0 && p && subpacket_length > 8+offset){ - #ifdef DEBUG_EMBEDDED_PACKETS - DumpPacket(p->pBuffer, p->size); - #endif - p->pBuffer += offset; - processRSAKey(p, subpacket_length); - p->pBuffer -= offset; - } - else if(crypto->isEncrypted()){ -#ifdef DEBUG_EMBEDDED_PACKETS - printf( "OP_AppCombined Packet %i (%u) (%u): \n", count, subpacket_length, processed); - DumpPacket(p->pBuffer+processed+offset, subpacket_length); -#endif - if(!HandleEmbeddedPacket(p, processed + offset, subpacket_length)){ - uchar* buffer = (p->pBuffer + processed + offset); - if(!ProcessEmbeddedPacket(buffer, subpacket_length, OP_AppCombined)) { - LogWrite(PACKET__ERROR, 0, "Packet", "*** This is bad, ProcessEmbeddedPacket failed, report to Image!"); - } - } - } - processed+=subpacket_length+offset; } + + // Move to next sub-packet + processed += subpacket_length + offset; } break; - case OP_Packet: { - if (!p->pBuffer || (p->Size() < 4)) - { + } + case OP_AppCombined: { + // Handle application combined packets + processed = 0; + int8 offset = 0; + int count = 0; + +#ifdef DEBUG_EMBEDDED_PACKETS + printf("Processing OP_AppCombined packet\n"); + DumpPacket(p); +#endif + + while (processed < p->size) { + count++; + + // Determine sub-packet length and offset + subpacket_length = static_cast(*(p->pBuffer + processed)); + + if (subpacket_length == 0xff) { + // Extended length format + subpacket_length = ntohs(*(uint16*)(p->pBuffer + processed + 1)); + offset = 3; + } else { + // Standard length format + offset = 1; + } + + // Handle RSA key exchange if no encryption key is set + if (crypto->getRC4Key() == 0 && p && subpacket_length > 8 + offset) { +#ifdef DEBUG_EMBEDDED_PACKETS + DumpPacket(p->pBuffer, p->size); +#endif + p->pBuffer += offset; + processRSAKey(p, subpacket_length); + p->pBuffer -= offset; + } else if (crypto->isEncrypted()) { + // Process encrypted embedded packet +#ifdef DEBUG_EMBEDDED_PACKETS + printf("Processing encrypted sub-packet %i: length=%u, offset=%u\n", + count, subpacket_length, processed); + DumpPacket(p->pBuffer + processed + offset, subpacket_length); +#endif + + if (!HandleEmbeddedPacket(p, processed + offset, subpacket_length)) { + uchar* buffer = (p->pBuffer + processed + offset); + if (!ProcessEmbeddedPacket(buffer, subpacket_length, OP_AppCombined)) { + LogWrite(PACKET__ERROR, 0, "Packet", + "ProcessEmbeddedPacket failed for OP_AppCombined"); + } + } + } + + // Move to next sub-packet + processed += subpacket_length + offset; + } + break; + } + case OP_Packet: { + // Handle sequenced data packets + if (!p->pBuffer || (p->Size() < 4)) { + break; // Invalid packet + } + + // Extract sequence number and check ordering + uint16 seq = ntohs(*(uint16*)(p->pBuffer)); + sint8 check = CompareSequence(NextInSeq, seq); + + if (check == SeqFuture) { + // Future packet - store for later processing +#ifdef EQN_DEBUG + LogWrite(PACKET__DEBUG, 1, "Packet", + "Future packet: Expected=%i, Got=%i", NextInSeq, seq); + p->DumpRawHeader(seq); +#endif + + OutOfOrderpackets[seq] = p->Copy(); + // Note: Not sending out-of-order ACK to prevent loops + + } else if (check == SeqPast) { + // Duplicate/old packet - acknowledge but don't process +#ifdef EQN_DEBUG + LogWrite(PACKET__DEBUG, 1, "Packet", + "Duplicate packet: Expected=%i, Got=%i", NextInSeq, seq); + p->DumpRawHeader(seq); +#endif + + SendOutOfOrderAck(seq); + + } else { + // In-order packet - process normally + EQProtocolPacket* qp = RemoveQueue(seq); + if (qp) { + LogWrite(PACKET__DEBUG, 1, "Packet", + "Removing older queued packet with sequence %i", seq); + delete qp; + } + + // Update sequence tracking + SetNextAckToSend(seq); + NextInSeq++; + + // Try to handle as embedded packet first + if (HandleEmbeddedPacket(p)) { break; } - - uint16 seq=ntohs(*(uint16 *)(p->pBuffer)); - sint8 check=CompareSequence(NextInSeq,seq); - if (check == SeqFuture) { -#ifdef EQN_DEBUG - LogWrite(PACKET__DEBUG, 1, "Packet", "*** Future packet: Expecting Seq=%i, but got Seq=%i", NextInSeq, seq); - LogWrite(PACKET__DEBUG, 1, "Packet", "[Start]"); - p->DumpRawHeader(seq); - LogWrite(PACKET__DEBUG, 1, "Packet", "[End]"); -#endif - OutOfOrderpackets[seq] = p->Copy(); - - // Image (2020): Removed as this is bad contributes to infinite loop - //SendOutOfOrderAck(seq); - } else if (check == SeqPast) { -#ifdef EQN_DEBUG - LogWrite(PACKET__DEBUG, 1, "Packet", "*** Duplicate packet: Expecting Seq=%i, but got Seq=%i", NextInSeq, seq); - LogWrite(PACKET__DEBUG, 1, "Packet", "[Start]"); - p->DumpRawHeader(seq); - LogWrite(PACKET__DEBUG, 1, "Packet", "[End]"); + + // Handle RSA key exchange if no encryption key is set + if (crypto->getRC4Key() == 0 && p && p->size >= 69) { +#ifdef DEBUG_EMBEDDED_PACKETS + DumpPacket(p->pBuffer, p->size); #endif - // Image (2020): Removed as this is bad contributes to infinite loop - //OutOfOrderpackets[seq] = p->Copy(); - SendOutOfOrderAck(seq); - } else { - EQProtocolPacket* qp = RemoveQueue(seq); - if (qp) { - LogWrite(PACKET__DEBUG, 1, "Packet", "OP_Fragment: Removing older queued packet with sequence %i", seq); - delete qp; - } + processRSAKey(p); + } else if (crypto->isEncrypted() && p) { + // Process encrypted packet + MCombineQueueLock.lock(); + EQProtocolPacket* newpacket = ProcessEncryptedPacket(p); + MCombineQueueLock.unlock(); - SetNextAckToSend(seq); - NextInSeq++; - - if(HandleEmbeddedPacket(p)) - break; - if(crypto->getRC4Key()==0 && p && p->size >= 69){ - #ifdef DEBUG_EMBEDDED_PACKETS - DumpPacket(p->pBuffer, p->size); - #endif - processRSAKey(p); - } - else if(crypto->isEncrypted() && p){ - MCombineQueueLock.lock(); - EQProtocolPacket* newpacket = ProcessEncryptedPacket(p); - MCombineQueueLock.unlock(); - if(newpacket){ - EQApplicationPacket* ap = newpacket->MakeApplicationPacket(2); - if (ap->version == 0) - ap->version = client_version; -#ifdef WRITE_PACKETS - WritePackets(ap->GetOpcodeName(), p->pBuffer, p->size, false); -#endif - InboundQueuePush(ap); - safe_delete(newpacket); + if (newpacket) { + // Convert to application packet and queue + EQApplicationPacket* ap = newpacket->MakeApplicationPacket(2); + if (ap->version == 0) { + ap->version = client_version; } + +#ifdef WRITE_PACKETS + WritePackets(ap->GetOpcodeName(), p->pBuffer, p->size, false); +#endif + + InboundQueuePush(ap); + safe_delete(newpacket); } } } break; + } case OP_Fragment: { if (!p->pBuffer || (p->Size() < 4)) { @@ -639,285 +800,422 @@ void EQStream::ProcessPacket(EQProtocolPacket *p, EQProtocolPacket* lastp) SetState(ESTABLISHED); } break; - case OP_SessionResponse: { + /** + * OP_SessionResponse: Handle session establishment response from server + * Contains stream parameters, encryption keys, and stream type information + */ + case OP_SessionResponse: + { + // Validate packet size for SessionResponse structure if (p->Size() < sizeof(SessionResponse)) { break; } + + // Initialize stream and clear outbound queue init(); OutboundQueueClear(); SetActive(true); - SessionResponse *Response=(SessionResponse *)p->pBuffer; + + // Extract session response data + auto Response = reinterpret_cast(p->pBuffer); SetMaxLen(ntohl(Response->MaxLength)); - Key=ntohl(Response->Key); - NextInSeq=0; + Key = ntohl(Response->Key); + NextInSeq = 0; SetState(ESTABLISHED); + + // Set session ID if not already established if (!Session) - Session=ntohl(Response->Session); - compressed=(Response->Format&FLAG_COMPRESSED); - encoded=(Response->Format&FLAG_ENCODED); - - // Kinda kludgy, but trie for now - if (compressed) { - if (remote_port==9000 || (remote_port==0 && p->src_port==9000)) + { + Session = ntohl(Response->Session); + } + + // Extract compression and encoding flags + compressed = (Response->Format & FLAG_COMPRESSED); + encoded = (Response->Format & FLAG_ENCODED); + + // Determine stream type based on format flags and port + if (compressed) + { + if (remote_port == 9000 || (remote_port == 0 && p->src_port == 9000)) + { SetStreamType(WorldStream); + } else + { SetStreamType(ZoneStream); - } else if (encoded) + } + } + else if (encoded) + { SetStreamType(ChatOrMailStream); + } else + { SetStreamType(LoginStream); + } } break; - case OP_SessionDisconnect: { - //NextInSeq=0; + /** + * OP_SessionDisconnect: Handle graceful disconnect request + * Initiates proper connection termination sequence + */ + case OP_SessionDisconnect: + { + // Send disconnect acknowledgment to peer SendDisconnect(); - //SetState(CLOSED); } break; - case OP_OutOfOrderAck: { + /** + * OP_OutOfOrderAck: Handle out-of-order packet acknowledgment + * Marks specific packets as acknowledged and flags earlier packets for retransmission + */ + case OP_OutOfOrderAck: + { + // Validate packet structure and size if (!p->pBuffer || (p->Size() < 4)) { - LogWrite(PACKET__DEBUG, 9, "Packet", "Received OP_OutOfOrderAck that was of malformed size"); - break; - } - uint16 seq = ntohs(*(uint16*)(p->pBuffer)); - MOutboundQueue.lock(); - - if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { - LogWrite(PACKET__DEBUG, 9, "Packet", "Pre-OOA Invalid Sequenced queue: BS %u + SQ %u != NOS %u", SequencedBase, SequencedQueue.size(), NextOutSeq); - } - - //if the packet they got out of order is between our last acked packet and the last sent packet, then its valid. - if (CompareSequence(SequencedBase, seq) != SeqPast && CompareSequence(NextOutSeq, seq) == SeqPast) { - uint16 sqsize = SequencedQueue.size(); - uint16 index = seq - SequencedBase; - LogWrite(PACKET__DEBUG, 9, "Packet", "OP_OutOfOrderAck marking packet acked in queue (queue index = %u, queue size = %u)", index, sqsize); - if (index < sqsize) { - SequencedQueue[index]->acked = true; - // flag packets for a resend - uint16 count = 0; - uint32 timeout = AverageDelta * 2 + 100; - for (auto sitr = SequencedQueue.begin(); sitr != SequencedQueue.end() && count < index; ++sitr, ++count) { - if (!(*sitr)->acked && (*sitr)->sent_time > 0 && (((*sitr)->sent_time + timeout) < Timer::GetCurrentTime2())) { - (*sitr)->sent_time = 0; - LogWrite(PACKET__DEBUG, 9, "Packet", "OP_OutOfOrderAck Flagging packet %u for retransmission", SequencedBase + count); - } - } - } - - if (RETRANSMIT_TIMEOUT_MULT) { - retransmittimer = Timer::GetCurrentTime2(); - } - } - else { - LogWrite(PACKET__DEBUG, 9, "Packet", "Received OP_OutOfOrderAck for out-of-window %u. Window (%u->%u)", seq, SequencedBase, NextOutSeq); - } - - if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { - LogWrite(PACKET__DEBUG, 9, "Packet", "Post-OOA Invalid Sequenced queue: BS %u + SQ %u != NOS %u", SequencedBase, SequencedQueue.size(), NextOutSeq); - } - - MOutboundQueue.unlock(); - } - break; - case OP_ServerKeyRequest:{ - if (p->Size() < sizeof(ClientSessionStats)) - { - //_log(NET__ERROR, _L "Received OP_SessionStatRequest that was of malformed size" __L); + LogWrite(PACKET__DEBUG, 9, "Packet", "Received OP_OutOfOrderAck that was of malformed size"); break; } - ClientSessionStats* Stats = (ClientSessionStats*)p->pBuffer; + // Extract sequence number from packet + uint16 seq = ntohs(*reinterpret_cast(p->pBuffer)); + MOutboundQueue.lock(); + + // Validate sequenced queue integrity (pre-processing) + if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) + { + LogWrite(PACKET__DEBUG, 9, "Packet", "Pre-OOA Invalid Sequenced queue: BS %u + SQ %u != NOS %u", + SequencedBase, SequencedQueue.size(), NextOutSeq); + } + + // Verify that acknowledged packet is within valid window + if (CompareSequence(SequencedBase, seq) != SeqPast && CompareSequence(NextOutSeq, seq) == SeqPast) + { + uint16 sqsize = SequencedQueue.size(); + uint16 index = seq - SequencedBase; + + LogWrite(PACKET__DEBUG, 9, "Packet", "OP_OutOfOrderAck marking packet acked in queue (queue index = %u, queue size = %u)", + index, sqsize); + + // Mark the specific packet as acknowledged + if (index < sqsize) + { + SequencedQueue[index]->acked = true; + + // Flag earlier unacknowledged packets for retransmission + uint16 count = 0; + uint32 timeout = AverageDelta * 2 + 100; + + for (auto sitr = SequencedQueue.begin(); sitr != SequencedQueue.end() && count < index; ++sitr, ++count) + { + // Check if packet needs retransmission based on timeout + if (!(*sitr)->acked && (*sitr)->sent_time > 0 && + (((*sitr)->sent_time + timeout) < Timer::GetCurrentTime2())) + { + (*sitr)->sent_time = 0; // Mark for resend + LogWrite(PACKET__DEBUG, 9, "Packet", "OP_OutOfOrderAck Flagging packet %u for retransmission", + SequencedBase + count); + } + } + } + + // Update retransmit timer if enabled + if (RETRANSMIT_TIMEOUT_MULT) + { + retransmittimer = Timer::GetCurrentTime2(); + } + } + else + { + LogWrite(PACKET__DEBUG, 9, "Packet", "Received OP_OutOfOrderAck for out-of-window %u. Window (%u->%u)", + seq, SequencedBase, NextOutSeq); + } + + // Validate sequenced queue integrity (post-processing) + if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) + { + LogWrite(PACKET__DEBUG, 9, "Packet", "Post-OOA Invalid Sequenced queue: BS %u + SQ %u != NOS %u", + SequencedBase, SequencedQueue.size(), NextOutSeq); + } + + MOutboundQueue.unlock(); + } + break; + /** + * OP_ServerKeyRequest: Handle server key exchange request + * Processes client session statistics and initiates key exchange + */ + case OP_ServerKeyRequest: + { + // Validate packet size for ClientSessionStats structure + if (p->Size() < sizeof(ClientSessionStats)) + { + break; + } + + // Extract client session statistics + auto Stats = reinterpret_cast(p->pBuffer); int16 request_id = Stats->RequestID; + + // Adjust transmission rates based on client feedback AdjustRates(ntohl(Stats->average_delta)); - ServerSessionStats* stats=(ServerSessionStats*)p->pBuffer; - memset(stats, 0, sizeof(ServerSessionStats)); + + // Prepare server session statistics response + auto stats = reinterpret_cast(p->pBuffer); + memset(stats, 0, sizeof(ServerSessionStats)); stats->RequestID = request_id; stats->current_time = ntohl(Timer::GetCurrentTime2()); stats->sent_packets = ntohl(sent_packets); stats->sent_packets2 = ntohl(sent_packets); stats->received_packets = ntohl(received_packets); stats->received_packets2 = ntohl(received_packets); - NonSequencedPush(new EQProtocolPacket(OP_SessionStatResponse,p->pBuffer,p->size)); - if(!crypto->isEncrypted()) + + // Send session statistics response + NonSequencedPush(new EQProtocolPacket(OP_SessionStatResponse, p->pBuffer, p->size)); + + // Initiate appropriate response based on encryption state + if (!crypto->isEncrypted()) + { SendKeyRequest(); + } else + { SendSessionResponse(); + } } break; - case OP_SessionStatResponse: { + /** + * OP_SessionStatResponse: Handle session statistics response + * Currently just logs the receipt for debugging purposes + */ + case OP_SessionStatResponse: + { LogWrite(PACKET__INFO, 0, "Packet", "OP_SessionStatResponse"); } break; - case OP_OutOfSession: { + /** + * OP_OutOfSession: Handle out-of-session packet + * Indicates client is no longer in valid session state + */ + case OP_OutOfSession: + { LogWrite(PACKET__INFO, 0, "Packet", "OP_OutOfSession"); SendDisconnect(); SetState(CLOSED); } break; + /** + * Default case: Handle unknown/unprocessed packet types + * Attempts RSA key processing and packet decryption for debugging + */ default: - //EQApplicationPacket *ap = p->MakeApplicationPacket(app_opcode_size); - //InboundQueuePush(ap); - - cout << "Orig Packet: " << p->opcode << endl; + { + // Debug output for original packet + std::cout << "Orig Packet: " << p->opcode << std::endl; DumpPacket(p->pBuffer, p->size); - if(p && p->size >= 69){ + + // Process RSA key if packet is large enough + if (p && p->size >= 69) + { processRSAKey(p); } + + // Attempt to decrypt packet data MCombineQueueLock.lock(); EQProtocolPacket* p2 = ProcessEncryptedData(p->pBuffer, p->size, OP_Packet); MCombineQueueLock.unlock(); - cout << "Decrypted Packet: " << p2->opcode << endl; - DumpPacket(p2->pBuffer, p2->size); - safe_delete(p2); - /* if(p2) + // Debug output for decrypted packet + if (p2) { - EQApplicationPacket* ap = p2->MakeApplicationPacket(2); - if (ap->version == 0) - ap->version = client_version; - InboundQueuePush(ap); + std::cout << "Decrypted Packet: " << p2->opcode << std::endl; + DumpPacket(p2->pBuffer, p2->size); safe_delete(p2); - }*/ - - //EQProtocolPacket* puse = p2; - /* if (!rogue_buffer) { - roguebuf_size=puse->size; - rogue_buffer=new unsigned char[roguebuf_size]; - memcpy(rogue_buffer,puse->pBuffer,puse->size); - roguebuf_offset=puse->size; - cout << "RogueBuf is " << roguebuf_offset << "/" << roguebuf_size << " (" << (p->size-6) << ") NextInSeq=" << NextInSeq << endl; - } - else { - int32 new_size = roguebuf_size + puse->size; - uchar* tmp_buffer = new unsigned char[new_size]; - uchar* ptr = tmp_buffer; - - memcpy(ptr,rogue_buffer,roguebuf_size); - ptr += roguebuf_size; - memcpy(ptr,puse->pBuffer,puse->size); - roguebuf_offset=puse->size; - - safe_delete_array(rogue_buffer); - - rogue_buffer = tmp_buffer; - roguebuf_size = new_size; - roguebuf_offset = new_size; - cout << "RogueBuf is " << roguebuf_offset << "/" << roguebuf_size << " (" << (p->size-6) << ") NextInSeq=" << NextInSeq << endl; - }*/ + } + #ifdef WRITE_PACKETS - WritePackets(ap->GetOpcodeName(), p->pBuffer, p->size, false); + // Write packet data to file if enabled + WritePackets("Unknown", p->pBuffer, p->size, false); #endif - //InboundQueuePush(ap); - LogWrite(PACKET__INFO, 0, "Packet", "Received unknown packet type, not adding to inbound queue"); - //safe_delete(p2); - //SendDisconnect(); - break; + + // Log unknown packet type + LogWrite(PACKET__INFO, 0, "Packet", "Received unknown packet type, not adding to inbound queue"); + } + break; } } } -int8 EQStream::EQ2_Compress(EQ2Packet* app, int8 offset){ - +/** + * Compress EQ2 packet data using zlib deflate compression + * Compresses packet data starting from the specified offset and updates packet buffer + * + * @param app - EQ2Packet to compress (modified in-place) + * @param offset - Starting offset for compression data + * @return Compression flag offset, or 0 on failure + */ +int8 EQStream::EQ2_Compress(EQ2Packet* app, int8 offset) +{ #ifdef LE_DEBUG - printf( "Before Compress in %s, line %i:\n", __FUNCTION__, __LINE__); + std::printf("Before Compress in %s, line %i:\n", __FUNCTION__, __LINE__); DumpPacket(app); #endif - + // Set up compression parameters uchar* pDataPtr = app->pBuffer + offset; - int xpandSize = app->size * 2; - uchar* deflate_buff = new uchar[xpandSize]; + int xpandSize = app->size * 2; // Allocate double size for compression buffer + auto deflate_buff = std::make_unique(xpandSize); + + // Lock compression data and configure zlib stream MCompressData.lock(); stream.next_in = pDataPtr; stream.avail_in = app->size - offset; - stream.next_out = deflate_buff; + stream.next_out = deflate_buff.get(); stream.avail_out = xpandSize; + // Perform compression int ret = deflate(&stream, Z_SYNC_FLUSH); + // Check for compression errors if (ret != Z_OK) { - printf("ZLIB COMPRESSION RETFAIL: %i, %i (Ret: %i)\n", app->size, stream.avail_out, ret); + std::printf("ZLIB COMPRESSION RETFAIL: %i, %i (Ret: %i)\n", app->size, stream.avail_out, ret); MCompressData.unlock(); - safe_delete_array(deflate_buff); return 0; } + // Calculate new size and rebuild packet buffer int32 newsize = xpandSize - stream.avail_out; safe_delete_array(app->pBuffer); app->size = newsize + offset; app->pBuffer = new uchar[app->size]; - app->pBuffer[(offset - 1)] = 1; - memcpy(app->pBuffer + offset, deflate_buff, newsize); + + // Set compression flag and copy compressed data + app->pBuffer[offset - 1] = 1; // Compression flag + std::memcpy(app->pBuffer + offset, deflate_buff.get(), newsize); + MCompressData.unlock(); - safe_delete_array(deflate_buff); #ifdef LE_DEBUG - printf( "After Compress in %s, line %i:\n", __FUNCTION__, __LINE__); + std::printf("After Compress in %s, line %i:\n", __FUNCTION__, __LINE__); DumpPacket(app); #endif return offset - 1; } -int16 EQStream::processRSAKey(EQProtocolPacket *p, uint16 subpacket_length){ - /*int16 limit = 0; - int8 offset = 13; - int8 offset2 = 0; - if(p->pBuffer[2] == 0) - limit = p->pBuffer[9]; - else{ - limit = p->pBuffer[5]; - offset2 = 5; - offset-=1; - } - crypto->setRC4Key(Crypto::RSADecrypt(p->pBuffer + offset + (limit-8), 8)); - return (limit + offset +1) - offset2;*/ - if(subpacket_length) +/** + * Process RSA key from protocol packet and initialize RC4 encryption + * Extracts RSA-encrypted RC4 key from packet and configures stream encryption + * + * @param p - Protocol packet containing RSA key data + * @param subpacket_length - Length of subpacket, 0 to use full packet + * @return Always returns 0 (legacy return value) + */ +int16 EQStream::processRSAKey(EQProtocolPacket *p, uint16 subpacket_length) +{ + // Extract RSA key from appropriate location in packet + if (subpacket_length) + { + // Use subpacket-relative position for key extraction crypto->setRC4Key(Crypto::RSADecrypt(p->pBuffer + subpacket_length - 8, 8)); + } else + { + // Use full packet size for key extraction crypto->setRC4Key(Crypto::RSADecrypt(p->pBuffer + p->size - 8, 8)); + } return 0; } -void EQStream::SendKeyRequest(){ - int32 crypto_key_size = 60; - int16 size = sizeof(KeyGen_Struct) + sizeof(KeyGen_End_Struct) + crypto_key_size; - EQ2Packet *outapp=new EQ2Packet(OP_WSLoginRequestMsg,NULL,size); - memcpy(&outapp->pBuffer[0], &crypto_key_size, sizeof(int32)); - memset(&outapp->pBuffer[4], 0xFF, crypto_key_size); - memset(&outapp->pBuffer[size-5], 1, 1); - memset(&outapp->pBuffer[size-1], 1, 1); +/** + * Send encryption key request to establish secure communication + * Creates and sends a key generation packet to initiate encryption handshake + */ +void EQStream::SendKeyRequest() +{ + // Define key generation packet parameters + constexpr int32 crypto_key_size = 60; + const int16 size = sizeof(KeyGen_Struct) + sizeof(KeyGen_End_Struct) + crypto_key_size; + + // Create key request packet + auto outapp = new EQ2Packet(OP_WSLoginRequestMsg, nullptr, size); + + // Set crypto key size in packet header + std::memcpy(&outapp->pBuffer[0], &crypto_key_size, sizeof(int32)); + + // Fill crypto key area with placeholder data + std::memset(&outapp->pBuffer[4], 0xFF, crypto_key_size); + + // Set packet termination markers + std::memset(&outapp->pBuffer[size - 5], 1, 1); + std::memset(&outapp->pBuffer[size - 1], 1, 1); + + // Queue packet for transmission EQ2QueuePacket(outapp, true); } -void EQStream::EncryptPacket(EQ2Packet* app, int8 compress_offset, int8 offset){ - if(app->size>2 && crypto->isEncrypted()){ +/** + * Encrypt packet data using RC4 encryption + * Applies encryption to packet payload, skipping headers and unencrypted sections + * + * @param app - EQ2Packet to encrypt (modified in-place) + * @param compress_offset - Offset for compressed data encryption + * @param offset - Additional offset for uncompressed data + */ +void EQStream::EncryptPacket(EQ2Packet* app, int8 compress_offset, int8 offset) +{ + // Only encrypt packets that are large enough and encryption is enabled + if (app->size > 2 && crypto->isEncrypted()) + { app->packet_encrypted = true; uchar* crypt_buff = app->pBuffer; - if(app->eq2_compressed) + + // Choose encryption range based on compression status + if (app->eq2_compressed) + { + // Encrypt from compression offset to end of packet crypto->RC4Encrypt(crypt_buff + compress_offset, app->size - compress_offset); + } else + { + // Encrypt from header end plus offset to end of packet crypto->RC4Encrypt(crypt_buff + 2 + offset, app->size - 2 - offset); + } } } -void EQStream::EQ2QueuePacket(EQ2Packet* app, bool attempted_combine){ - if(CheckActive()){ - if(!attempted_combine){ +/** + * Queue EQ2 packet for transmission with optional combining + * Either adds packet to combine queue for batching or sends immediately + * + * @param app - EQ2Packet to queue for transmission + * @param attempted_combine - If true, send immediately; if false, queue for combining + */ +void EQStream::EQ2QueuePacket(EQ2Packet* app, bool attempted_combine) +{ + // Only process packets if stream is active + if (CheckActive()) + { + if (!attempted_combine) + { + // Add packet to combine queue for batching MCombineQueueLock.lock(); combine_queue.push_back(app); MCombineQueueLock.unlock(); } - else{ + else + { + // Prepare and send packet immediately MCombineQueueLock.lock(); PreparePacket(app); MCombineQueueLock.unlock(); + #ifdef LE_DEBUG - printf( "After B in %s, line %i:\n", __FUNCTION__, __LINE__); + std::printf("After B in %s, line %i:\n", __FUNCTION__, __LINE__); DumpPacket(app); #endif SendPacket(app); @@ -925,392 +1223,637 @@ void EQStream::EQ2QueuePacket(EQ2Packet* app, bool attempted_combine){ } } -void EQStream::UnPreparePacket(EQ2Packet* app){ - if(app->pBuffer[2] == 0 && app->pBuffer[3] == 19){ - uchar* new_buffer = new uchar[app->size-3]; - memcpy(new_buffer+2, app->pBuffer+5, app->size-3); +/** + * Remove packet preparation headers from EQ2 packet + * Strips protocol-specific headers that were added during packet preparation + * + * @param app - EQ2Packet to unprepare (modified in-place) + */ +void EQStream::UnPreparePacket(EQ2Packet* app) +{ + // Check for specific header pattern that indicates prepared packet + if (app->pBuffer[2] == 0 && app->pBuffer[3] == 19) + { + // Create new buffer without preparation headers + auto new_buffer = std::make_unique(app->size - 3); + std::memcpy(new_buffer.get() + 2, app->pBuffer + 5, app->size - 3); + + // Replace packet buffer with unprepared version delete[] app->pBuffer; - app->size-=3; - app->pBuffer = new_buffer; + app->size -= 3; + app->pBuffer = new_buffer.release(); } } #ifdef WRITE_PACKETS +/** + * Convert byte to printable character for packet dumps + * Returns '.' for non-printable characters + * + * @param in - Input byte to convert + * @return Printable character representation + */ char EQStream::GetChar(uchar in) { if (in < ' ' || in > '~') + { return '.'; - return (char)in; + } + return static_cast(in); } -void EQStream::WriteToFile(char* pFormat, ...) { +/** + * Write formatted output to packet dump file + * Provides printf-style formatting for packet logging + * + * @param pFormat - Printf-style format string + * @param ... - Variable arguments for formatting + */ +void EQStream::WriteToFile(char* pFormat, ...) +{ va_list args; va_start(args, pFormat); - vfprintf(write_packets, pFormat, args); + std::vfprintf(write_packets, pFormat, args); va_end(args); } -void EQStream::WritePackets(const char* opcodeName, uchar* data, int32 size, bool outgoing) { +/** + * Write packet data to debug file with hex dump format + * Creates formatted hex dump with ASCII representation for debugging + * + * @param opcodeName - Name of packet opcode for header + * @param data - Raw packet data to dump + * @param size - Size of packet data in bytes + * @param outgoing - True if outgoing packet, false if incoming + */ +void EQStream::WritePackets(const char* opcodeName, uchar* data, int32 size, bool outgoing) +{ MWritePackets.lock(); + + // Format connection information struct in_addr ip_addr; ip_addr.s_addr = remote_ip; char timebuffer[80]; time_t rawtime; struct tm* timeinfo; - time(&rawtime); - timeinfo = localtime(&rawtime); - strftime(timebuffer, 80, "%m/%d/%Y %H:%M:%S", timeinfo); + + std::time(&rawtime); + timeinfo = std::localtime(&rawtime); + std::strftime(timebuffer, 80, "%m/%d/%Y %H:%M:%S", timeinfo); + + // Write packet header if (outgoing) + { WriteToFile("-- %s --\n%s\nSERVER -> %s\n", opcodeName, timebuffer, inet_ntoa(ip_addr)); + } else + { WriteToFile("-- %s --\n%s\n%s -> SERVER\n", opcodeName, timebuffer, inet_ntoa(ip_addr)); - int i; + } + + // Calculate hex dump layout int nLines = size / 16; int nExtra = size % 16; uchar* pPtr = data; - for (i = 0; i < nLines; i++) + + // Write full 16-byte lines + for (int i = 0; i < nLines; i++) { - WriteToFile("%4.4X:\t%2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %c%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c\n", i * 16, pPtr[0], pPtr[1], pPtr[2], pPtr[3], pPtr[4], pPtr[5], pPtr[6], pPtr[7], pPtr[8], pPtr[9], pPtr[10], pPtr[11], pPtr[12], pPtr[13], pPtr[14], pPtr[15], GetChar(pPtr[0]), GetChar(pPtr[1]), GetChar(pPtr[2]), GetChar(pPtr[3]), GetChar(pPtr[4]), GetChar(pPtr[5]), GetChar(pPtr[6]), GetChar(pPtr[7]), GetChar(pPtr[8]), GetChar(pPtr[9]), GetChar(pPtr[10]), GetChar(pPtr[11]), GetChar(pPtr[12]), GetChar(pPtr[13]), GetChar(pPtr[14]), GetChar(pPtr[15])); + WriteToFile("%4.4X:\t%2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %c%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c\n", + i * 16, pPtr[0], pPtr[1], pPtr[2], pPtr[3], pPtr[4], pPtr[5], pPtr[6], pPtr[7], + pPtr[8], pPtr[9], pPtr[10], pPtr[11], pPtr[12], pPtr[13], pPtr[14], pPtr[15], + GetChar(pPtr[0]), GetChar(pPtr[1]), GetChar(pPtr[2]), GetChar(pPtr[3]), + GetChar(pPtr[4]), GetChar(pPtr[5]), GetChar(pPtr[6]), GetChar(pPtr[7]), + GetChar(pPtr[8]), GetChar(pPtr[9]), GetChar(pPtr[10]), GetChar(pPtr[11]), + GetChar(pPtr[12]), GetChar(pPtr[13]), GetChar(pPtr[14]), GetChar(pPtr[15])); pPtr += 16; } + + // Write partial line if remaining bytes if (nExtra) { WriteToFile("%4.4X\t", nLines * 16); - for (i = 0; i < nExtra; i++) + + // Write hex bytes + for (int i = 0; i < nExtra; i++) { WriteToFile("%2.2X ", pPtr[i]); } - for (i; i < 16; i++) + + // Pad with spaces + for (int i = nExtra; i < 16; i++) + { WriteToFile(" "); - for (i = 0; i < nExtra; i++) + } + + // Write ASCII representation + for (int i = 0; i < nExtra; i++) { WriteToFile("%c", GetChar(pPtr[i])); } WriteToFile("\n"); } + WriteToFile("\n\n"); - fflush(write_packets); + std::fflush(write_packets); MWritePackets.unlock(); } -void EQStream::WritePackets(EQ2Packet* app, bool outgoing) { +/** + * Write EQ2 packet to debug file with automatic version handling + * Convenience wrapper for WritePackets that handles version setup + * + * @param app - EQ2Packet to write to debug file + * @param outgoing - True if outgoing packet, false if incoming + */ +void EQStream::WritePackets(EQ2Packet* app, bool outgoing) +{ if (app->version == 0) + { app->version = client_version; + } WritePackets(app->GetOpcodeName(), app->pBuffer, app->size, outgoing); } #endif -void EQStream::PreparePacket(EQ2Packet* app, int8 offset){ +/** + * Prepare EQ2 packet for transmission + * Handles packet preparation, compression, and encryption in sequence + * + * @param app - EQ2Packet to prepare (modified in-place) + * @param offset - Additional offset for encryption + */ +void EQStream::PreparePacket(EQ2Packet* app, int8 offset) +{ + // Set client version and reset compression offset app->setVersion(client_version); compressed_offset = 0; #ifdef LE_DEBUG - printf( "Before A in %s, line %i:\n", __FUNCTION__, __LINE__); + std::printf("Before A in %s, line %i:\n", __FUNCTION__, __LINE__); DumpPacket(app); #endif - if(!app->packet_prepared){ - if(app->PreparePacket(MaxLen) == 255) //invalid version + + // Prepare packet headers and structure if not already done + if (!app->packet_prepared) + { + if (app->PreparePacket(MaxLen) == 255) // Invalid version + { return; + } } #ifdef LE_DEBUG - printf( "After Prepare in %s, line %i:\n", __FUNCTION__, __LINE__); + std::printf("After Prepare in %s, line %i:\n", __FUNCTION__, __LINE__); DumpPacket(app); #endif + #ifdef WRITE_PACKETS + // Write uncompressed/unencrypted packet to file if enabled if (!app->eq2_compressed && !app->packet_encrypted) + { WritePackets(app, true); + } #endif - if(!app->eq2_compressed && app->size>128){ + // Compress packet if it's large enough and not already compressed + if (!app->eq2_compressed && app->size > 128) + { compressed_offset = EQ2_Compress(app); if (compressed_offset) + { app->eq2_compressed = true; + } } - if(!app->packet_encrypted){ + + // Encrypt packet if not already encrypted + if (!app->packet_encrypted) + { EncryptPacket(app, compressed_offset, offset); - if(app->size > 2 && app->pBuffer[2] == 0){ - uchar* new_buffer = new uchar[app->size+1]; + + // Handle special case for zero byte at offset 2 + if (app->size > 2 && app->pBuffer[2] == 0) + { + auto new_buffer = std::make_unique(app->size + 1); new_buffer[2] = 0; - memcpy(new_buffer+3, app->pBuffer+2, app->size-2); + std::memcpy(new_buffer.get() + 3, app->pBuffer + 2, app->size - 2); delete[] app->pBuffer; - app->pBuffer = new_buffer; + app->pBuffer = new_buffer.release(); app->size++; } } #ifdef LE_DEBUG - printf( "After A in %s, line %i:\n", __FUNCTION__, __LINE__); + std::printf("After A in %s, line %i:\n", __FUNCTION__, __LINE__); DumpPacket(app); #endif - } +/** + * Send EQProtocolPacket with automatic fragmentation for large packets + * Fragments packets that exceed maximum transmission unit into smaller chunks + * + * @param p - EQProtocolPacket to send (ownership transferred, will be deleted) + */ void EQStream::SendPacket(EQProtocolPacket *p) { - uint32 chunksize,used; + uint32 chunksize, used; uint32 length; - // Convert the EQApplicationPacket to 1 or more EQProtocolPackets - if (p->size>( MaxLen-8)) { // proto-op(2), seq(2), app-op(2) ... data ... crc(2) - uchar* tmpbuff=p->pBuffer; - length=p->size - 2; + // Check if packet needs fragmentation + if (p->size > (MaxLen - 8)) // proto-op(2), seq(2), app-op(2) ... data ... crc(2) + { + uchar* tmpbuff = p->pBuffer; + length = p->size - 2; - EQProtocolPacket *out=new EQProtocolPacket(OP_Fragment,NULL,MaxLen-4); - *(uint32 *)(out->pBuffer+2)=htonl(length); - used=MaxLen-10; - memcpy(out->pBuffer+6,tmpbuff+2,used); + // Create first fragment with length header + auto out = new EQProtocolPacket(OP_Fragment, nullptr, MaxLen - 4); + *reinterpret_cast(out->pBuffer + 2) = htonl(length); + used = MaxLen - 10; + std::memcpy(out->pBuffer + 6, tmpbuff + 2, used); #ifdef LE_DEBUG - printf("(%s, %i) New Fragment:\n ", __FUNCTION__, __LINE__); + std::printf("(%s, %i) New Fragment:\n ", __FUNCTION__, __LINE__); DumpPacket(out); #endif SequencedPush(out); - while (usedpBuffer+2,tmpbuff,1); - memcpy(out->pBuffer+2,tmpbuff+used+2,chunksize); + // Create additional fragments for remaining data + while (used < length) + { + chunksize = std::min(length - used, MaxLen - 6); + out = new EQProtocolPacket(OP_Fragment, nullptr, chunksize + 2); + std::memcpy(out->pBuffer + 2, tmpbuff + used + 2, chunksize); + #ifdef LE_DEBUG - printf("Chunk: \n"); - DumpPacket(out); + std::printf("Chunk: \n"); + DumpPacket(out); #endif SequencedPush(out); - used+=chunksize; - + used += chunksize; } #ifdef LE_DEBUG - printf( "ChunkDelete: \n"); + std::printf("ChunkDelete: \n"); DumpPacket(out); - //cerr << "1: Deleting 0x" << hex << (uint32)(p) << dec << endl; #endif delete p; - } else { + } + else + { + // Packet fits within MTU, send directly SequencedPush(p); } } +/** + * Send EQApplicationPacket with automatic fragmentation for large packets + * Serializes application packet and fragments if necessary for transmission + * + * @param p - EQApplicationPacket to send (ownership transferred, will be deleted) + */ void EQStream::SendPacket(EQApplicationPacket *p) { -uint32 chunksize,used; -uint32 length; + uint32 chunksize, used; + uint32 length; - // Convert the EQApplicationPacket to 1 or more EQProtocolPackets - if (p->size>(MaxLen-8)) { // proto-op(2), seq(2), app-op(2) ... data ... crc(2) - //cout << "Making oversized packet for: " << endl; - //cout << p->size << endl; - //p->DumpRawHeader(); - //dump_message(p->pBuffer,p->size,timestamp()); - //cout << p->size << endl; - unsigned char *tmpbuff=new unsigned char[p->size+2]; - //cout << hex << (int)tmpbuff << dec << endl; - length=p->serialize(tmpbuff); + // Check if packet needs fragmentation + if (p->size > (MaxLen - 8)) // proto-op(2), seq(2), app-op(2) ... data ... crc(2) + { + // Serialize application packet to buffer + auto tmpbuff = std::make_unique(p->size + 2); + length = p->serialize(tmpbuff.get()); - EQProtocolPacket *out=new EQProtocolPacket(OP_Fragment,NULL,MaxLen-4); - *(uint32 *)(out->pBuffer+2)=htonl(p->Size()); - memcpy(out->pBuffer+6,tmpbuff,MaxLen-10); - used=MaxLen-10; + // Create first fragment with size header + auto out = new EQProtocolPacket(OP_Fragment, nullptr, MaxLen - 4); + *reinterpret_cast(out->pBuffer + 2) = htonl(p->Size()); + std::memcpy(out->pBuffer + 6, tmpbuff.get(), MaxLen - 10); + used = MaxLen - 10; SequencedPush(out); - //cout << "Chunk #" << ++i << " size=" << used << ", length-used=" << (length-used) << endl; - while (usedpBuffer+2,tmpbuff+used,chunksize); - out->size=chunksize+2; + + // Create additional fragments for remaining data + while (used < length) + { + out = new EQProtocolPacket(OP_Fragment, nullptr, MaxLen - 4); + chunksize = std::min(length - used, MaxLen - 6); + std::memcpy(out->pBuffer + 2, tmpbuff.get() + used, chunksize); + out->size = chunksize + 2; SequencedPush(out); - used+=chunksize; - //cout << "Chunk #"<< ++i << " size=" << chunksize << ", length-used=" << (length-used) << endl; + used += chunksize; } - //cerr << "1: Deleting 0x" << hex << (uint32)(p) << dec << endl; + delete p; - delete[] tmpbuff; - } else { - EQProtocolPacket *out=new EQProtocolPacket(OP_Packet,NULL,p->Size()+2); - p->serialize(out->pBuffer+2); + } + else + { + // Packet fits within MTU, serialize directly to protocol packet + auto out = new EQProtocolPacket(OP_Packet, nullptr, p->Size() + 2); + p->serialize(out->pBuffer + 2); SequencedPush(out); - //cerr << "2: Deleting 0x" << hex << (uint32)(p) << dec << endl; delete p; } } +/** + * Add packet to sequenced transmission queue + * Assigns sequence number and queues packet for reliable delivery + * + * @param p - EQProtocolPacket to queue (ownership transferred) + */ void EQStream::SequencedPush(EQProtocolPacket *p) { + // Set client version for compatibility p->setVersion(client_version); + MOutboundQueue.lock(); - *(uint16 *)(p->pBuffer)=htons(NextOutSeq); + + // Assign sequence number to packet header + *reinterpret_cast(p->pBuffer) = htons(NextOutSeq); SequencedQueue.push_back(p); p->sequence = NextOutSeq; NextOutSeq++; + MOutboundQueue.unlock(); } +/** + * Add packet to non-sequenced transmission queue + * Queues packet for immediate transmission without sequence numbering + * + * @param p - EQProtocolPacket to queue (ownership transferred) + */ void EQStream::NonSequencedPush(EQProtocolPacket *p) { + // Set client version for compatibility p->setVersion(client_version); + MOutboundQueue.lock(); NonSequencedQueue.push(p); MOutboundQueue.unlock(); } +/** + * Send acknowledgment packet for received sequence number + * Confirms successful receipt of sequenced packet to peer + * + * @param seq - Sequence number to acknowledge + */ void EQStream::SendAck(uint16 seq) { - uint16 Seq=htons(seq); + uint16 Seq = htons(seq); SetLastAckSent(seq); - NonSequencedPush(new EQProtocolPacket(OP_Ack,(unsigned char *)&Seq,sizeof(uint16))); + NonSequencedPush(new EQProtocolPacket(OP_Ack, reinterpret_cast(&Seq), sizeof(uint16))); } +/** + * Send out-of-order acknowledgment packet + * Notifies peer that a specific packet was received out of sequence + * + * @param seq - Sequence number that was received out of order + */ void EQStream::SendOutOfOrderAck(uint16 seq) { - uint16 Seq=htons(seq); - NonSequencedPush(new EQProtocolPacket(OP_OutOfOrderAck,(unsigned char *)&Seq,sizeof(uint16))); + uint16 Seq = htons(seq); + NonSequencedPush(new EQProtocolPacket(OP_OutOfOrderAck, reinterpret_cast(&Seq), sizeof(uint16))); } -bool EQStream::CheckCombineQueue(){ - bool ret = true; //processed all packets +/** + * Process packet combination queue to optimize transmission + * Combines multiple small packets into larger ones for efficiency + * + * @return true if all packets processed, false if processing was limited + */ +bool EQStream::CheckCombineQueue() +{ + bool ret = true; // Processed all packets flag + MCombineQueueLock.lock(); - if(combine_queue.size() > 0){ + + if (combine_queue.size() > 0) + { + // Get first packet from queue EQ2Packet* first = combine_queue.front(); combine_queue.pop_front(); - if(combine_queue.size() == 0){ //nothing to combine this with + + if (combine_queue.size() == 0) + { + // Nothing to combine with, send immediately EQ2QueuePacket(first, true); } - else{ + else + { + // Prepare first packet for combining PreparePacket(first); - EQ2Packet* second = 0; + EQ2Packet* second = nullptr; bool combine_worked = false; int16 count = 0; - while(combine_queue.size()){ + + // Attempt to combine with additional packets + while (combine_queue.size()) + { count++; second = combine_queue.front(); combine_queue.pop_front(); PreparePacket(second); - /*if(first->GetRawOpcode() != OP_AppCombined && first->pBuffer[2] == 0){ - EQ2Packet* tmp = second; - second = first; - first = tmp; - }*/ - if(!first->AppCombine(second)){ + + // Try to combine second packet with first + if (!first->AppCombine(second)) + { + // Combination failed, send first packet first->SetProtocolOpcode(OP_Packet); - if(combine_worked){ + if (combine_worked) + { SequencedPush(first); } - else{ + else + { EQ2QueuePacket(first, true); } + + // Make second packet the new first first = second; combine_worked = false; } - else{ + else + { + // Combination succeeded combine_worked = true; - //DumpPacket(first); } - if(count >= 60 || first->size > 4000){ //other clients need packets too + + // Limit processing to prevent blocking other clients + if (count >= 60 || first->size > 4000) + { ret = false; break; } } - if(first){ + + // Send final packet + if (first) + { first->SetProtocolOpcode(OP_Packet); - if(combine_worked){ + if (combine_worked) + { SequencedPush(first); } - else{ + else + { EQ2QueuePacket(first, true); } } } } + MCombineQueueLock.unlock(); return ret; } -void EQStream::CheckResend(int eq_fd){ +/** + * Check and handle packet retransmission for unacknowledged packets + * Retries failed packet deliveries up to maximum attempt limit + * + * @param eq_fd - Socket file descriptor for transmission + */ +void EQStream::CheckResend(int eq_fd) +{ int32 curr = Timer::GetCurrentTime2(); - EQProtocolPacket* packet = 0; - deque::iterator itr; + EQProtocolPacket* packet = nullptr; + MResendQue.lock(); - for(itr=resend_que.begin();itr!=resend_que.end();itr++){ + + for (auto itr = resend_que.begin(); itr != resend_que.end();) + { packet = *itr; - if(packet->attempt_count >= 5){//tried to resend this packet 5 times, client must already have it but didnt ack it + + // Check if packet has exceeded maximum retry attempts + if (packet->attempt_count >= 5) + { + // Give up on this packet - client likely has it but didn't ack safe_delete(packet); itr = resend_que.erase(itr); - if(itr == resend_que.end()) + if (itr == resend_que.end()) + { break; + } } - else{ - if((curr - packet->sent_time) < 1000) + else + { + // Check if enough time has passed for retry + if ((curr - packet->sent_time) < 1000) + { + ++itr; continue; - packet->sent_time -=1000; + } + + // Retry packet transmission + packet->sent_time -= 1000; packet->attempt_count++; WritePacket(eq_fd, packet); + ++itr; } } + MResendQue.unlock(); } -//returns SeqFuture if `seq` is later than `expected_seq` +/** + * Compare two sequence numbers considering wraparound behavior + * Determines temporal relationship between sequence numbers within sliding window + * + * @param expected_seq - Expected sequence number + * @param seq - Actual sequence number to compare + * @return SeqOrder indicating temporal relationship (past, in-order, future) + */ EQStream::SeqOrder EQStream::CompareSequence(uint16 expected_seq, uint16 seq) { - if (expected_seq == seq) { - // Curent + if (expected_seq == seq) + { + // Sequence numbers match exactly return SeqInOrder; } - else if ((seq > expected_seq && (uint32)seq < ((uint32)expected_seq + EQStream::MaxWindowSize)) || seq < (expected_seq - EQStream::MaxWindowSize)) { - // Future + else if ((seq > expected_seq && static_cast(seq) < (static_cast(expected_seq) + EQStream::MaxWindowSize)) || + seq < (expected_seq - EQStream::MaxWindowSize)) + { + // Sequence is in future window or wrapped around return SeqFuture; } - else { - // Past + else + { + // Sequence is in past window return SeqPast; } } +/** + * Process acknowledgment packet and remove acknowledged packets from queue + * Handles sliding window advancement and packet cleanup for reliable delivery + * + * @param seq - Highest sequence number being acknowledged + */ void EQStream::AckPackets(uint16 seq) { - std::deque::iterator itr, tmp; - MOutboundQueue.lock(); SeqOrder ord = CompareSequence(SequencedBase, seq); - if (ord == SeqInOrder) { - //they are not acking anything new... - LogWrite(PACKET__DEBUG, 9, "Packet", "Received an ack with no window advancement (seq %u)", seq); + + if (ord == SeqInOrder) + { + // No new acknowledgments - duplicate ack + LogWrite(PACKET__DEBUG, 9, "Packet", "Received an ack with no window advancement (seq %u)", seq); } - else if (ord == SeqPast) { - //they are nacking blocks going back before our buffer, wtf? - LogWrite(PACKET__DEBUG, 9, "Packet", "Received an ack with backward window advancement (they gave %u, our window starts at %u). This is bad" , seq, SequencedBase); + else if (ord == SeqPast) + { + // Backward acknowledgment - protocol error + LogWrite(PACKET__DEBUG, 9, "Packet", "Received an ack with backward window advancement (they gave %u, our window starts at %u). This is bad", seq, SequencedBase); } - else { - LogWrite(PACKET__DEBUG, 9, "Packet", "Received an ack up through sequence %u. Our base is %u", seq, SequencedBase); + else + { + // Valid acknowledgment - advance window + LogWrite(PACKET__DEBUG, 9, "Packet", "Received an ack up through sequence %u. Our base is %u", seq, SequencedBase); - - //this is a good ack, we get to ack some blocks. - seq++; //we stop at the block right after their ack, counting on the wrap of both numbers. - while (SequencedBase != seq) { - if (SequencedQueue.empty()) { - LogWrite(PACKET__DEBUG, 9, "Packet", "OUT OF PACKETS acked packet with sequence %u. Next send is %u before this", (unsigned long)SequencedBase, SequencedQueue.size()); + // Process all packets up to acknowledgment point + seq++; // Stop at block right after their ack + + while (SequencedBase != seq) + { + if (SequencedQueue.empty()) + { + LogWrite(PACKET__DEBUG, 9, "Packet", "OUT OF PACKETS acked packet with sequence %u. Next send is %u before this", + static_cast(SequencedBase), SequencedQueue.size()); SequencedBase = NextOutSeq; break; } - LogWrite(PACKET__DEBUG, 9, "Packet", "Removing acked packet with sequence %u", (unsigned long)SequencedBase); - //clean out the acked packet + + LogWrite(PACKET__DEBUG, 9, "Packet", "Removing acked packet with sequence %u", + static_cast(SequencedBase)); + + // Remove acknowledged packet from queue delete SequencedQueue.front(); SequencedQueue.pop_front(); - //advance the base sequence number to the seq of the block after the one we just got rid of. + + // Advance base sequence number SequencedBase++; } - if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { - LogWrite(PACKET__DEBUG, 9, "Packet", "Post-Ack on %u Invalid Sequenced queue: BS %u + SQ %u != NOS %u", seq, SequencedBase, SequencedQueue.size(), NextOutSeq); + + // Validate queue consistency + if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) + { + LogWrite(PACKET__DEBUG, 9, "Packet", "Post-Ack on %u Invalid Sequenced queue: BS %u + SQ %u != NOS %u", + seq, SequencedBase, SequencedQueue.size(), NextOutSeq); } } MOutboundQueue.unlock(); } +/** + * Main packet transmission loop - processes outbound queues and sends packets + * Handles rate limiting, packet combining, acknowledgments, and transmission scheduling + * + * @param eq_fd - Socket file descriptor for transmission + */ void EQStream::Write(int eq_fd) { queue ReadyToSend; @@ -1499,13 +2042,22 @@ void EQStream::Write(int eq_fd) } } +/** + * Write individual protocol packet to network socket + * Handles packet serialization, compression, encoding, and UDP transmission + * + * @param eq_fd - Socket file descriptor for transmission + * @param p - EQProtocolPacket to transmit + */ void EQStream::WritePacket(int eq_fd, EQProtocolPacket *p) { -uint32 length = 0; -sockaddr_in address; + uint32 length = 0; + sockaddr_in address; + + // Set up destination address address.sin_family = AF_INET; - address.sin_addr.s_addr=remote_ip; - address.sin_port=remote_port; + address.sin_addr.s_addr = remote_ip; + address.sin_port = remote_port; #ifdef NOWAY uint32 ip=address.sin_addr.s_addr; cout << "Sending to: " @@ -1572,53 +2124,96 @@ char temp[15]; return p; } +/** + * Send session establishment response to client + * Provides session parameters including encryption keys and stream format flags + */ void EQStream::SendSessionResponse() { -EQProtocolPacket *out=new EQProtocolPacket(OP_SessionResponse,NULL,sizeof(SessionResponse)); - SessionResponse *Response=(SessionResponse *)out->pBuffer; - Response->Session=htonl(Session); - Response->MaxLength=htonl(MaxLen); - Response->UnknownA=2; - Response->Format=0; + auto out = new EQProtocolPacket(OP_SessionResponse, nullptr, sizeof(SessionResponse)); + auto Response = reinterpret_cast(out->pBuffer); + + // Set session parameters + Response->Session = htonl(Session); + Response->MaxLength = htonl(MaxLen); + Response->UnknownA = 2; + Response->Format = 0; + + // Set format flags based on stream configuration if (compressed) - Response->Format|=FLAG_COMPRESSED; + { + Response->Format |= FLAG_COMPRESSED; + } if (encoded) - Response->Format|=FLAG_ENCODED; - Response->Key=htonl(Key); - - out->size=sizeof(SessionResponse); - + { + Response->Format |= FLAG_ENCODED; + } + + Response->Key = htonl(Key); + out->size = sizeof(SessionResponse); + NonSequencedPush(out); } +/** + * Send session establishment request to server + * Initiates connection handshake with session parameters + */ void EQStream::SendSessionRequest() { - EQProtocolPacket *out=new EQProtocolPacket(OP_SessionRequest,NULL,sizeof(SessionRequest)); - SessionRequest *Request=(SessionRequest *)out->pBuffer; - memset(Request,0,sizeof(SessionRequest)); - Request->Session=htonl(time(NULL)); - Request->MaxLength=htonl(512); - + auto out = new EQProtocolPacket(OP_SessionRequest, nullptr, sizeof(SessionRequest)); + auto Request = reinterpret_cast(out->pBuffer); + + // Initialize request structure + std::memset(Request, 0, sizeof(SessionRequest)); + Request->Session = htonl(std::time(nullptr)); + Request->MaxLength = htonl(512); + NonSequencedPush(out); } +/** + * Send session disconnect packet to peer + * Initiates graceful connection termination sequence + * + * @param setstate - If true, set stream state to CLOSING + */ void EQStream::SendDisconnect(bool setstate) { - try{ - if(GetState() != ESTABLISHED && GetState() != WAIT_CLOSE) + try + { + // Only send disconnect if in valid state + if (GetState() != ESTABLISHED && GetState() != WAIT_CLOSE) + { return; + } - EQProtocolPacket *out=new EQProtocolPacket(OP_SessionDisconnect,NULL,sizeof(uint32)+sizeof(int16)); - *(uint32 *)out->pBuffer=htonl(Session); + // Create disconnect packet + auto out = new EQProtocolPacket(OP_SessionDisconnect, nullptr, sizeof(uint32) + sizeof(int16)); + *reinterpret_cast(out->pBuffer) = htonl(Session); out->pBuffer[4] = 0; out->pBuffer[5] = 6; + NonSequencedPush(out); - if(setstate) + + // Update state if requested + if (setstate) + { SetState(CLOSING); + } + } + catch (...) + { + // Ignore exceptions during disconnect } - catch(...){} } +/** + * Add application packet to inbound queue for processing + * Thread-safe method to queue received packets for application layer + * + * @param p - EQApplicationPacket to add to inbound queue + */ void EQStream::InboundQueuePush(EQApplicationPacket *p) { MInboundQueue.lock(); @@ -1626,84 +2221,141 @@ void EQStream::InboundQueuePush(EQApplicationPacket *p) MInboundQueue.unlock(); } +/** + * Retrieve next application packet from inbound queue + * Thread-safe method to get received packets for application processing + * + * @return Next EQApplicationPacket from queue, or nullptr if queue is empty + */ EQApplicationPacket *EQStream::PopPacket() { -EQApplicationPacket *p=NULL; + EQApplicationPacket *p = nullptr; MInboundQueue.lock(); - if (InboundQueue.size()) { - p=InboundQueue.front(); + if (InboundQueue.size()) + { + p = InboundQueue.front(); InboundQueue.pop_front(); } MInboundQueue.unlock(); - if(p) + + // Set client version for compatibility + if (p) + { p->setVersion(client_version); + } return p; } +/** + * Clear all packets from inbound queue + * Thread-safe method to empty and delete all queued packets + */ void EQStream::InboundQueueClear() { MInboundQueue.lock(); - while(InboundQueue.size()){ + while (InboundQueue.size()) + { delete InboundQueue.front(); InboundQueue.pop_front(); } MInboundQueue.unlock(); } -void EQStream::EncryptPacket(uchar* data, int16 size){ - if(size>6){ - +/** + * Legacy packet encryption function (currently unused) + * Placeholder for packet-level encryption functionality + * + * @param data - Packet data buffer + * @param size - Size of packet data + */ +void EQStream::EncryptPacket(uchar* data, int16 size) +{ + if (size > 6) + { + // Implementation placeholder } } +/** + * Check if stream has pending outbound data + * Determines if there are packets waiting to be transmitted + * + * @return true if outbound data is available, false otherwise + */ bool EQStream::HasOutgoingData() { -bool flag; + bool flag; - //once closed, we have nothing more to say - if(CheckClosed()) - return(false); + // Once closed, we have nothing more to say + if (CheckClosed()) + { + return false; + } + // Check for queued packets MOutboundQueue.lock(); - flag=(!NonSequencedQueue.empty()); - if (!flag) { + flag = (!NonSequencedQueue.empty()); + if (!flag) + { flag = (!SequencedQueue.empty()); } MOutboundQueue.unlock(); - if (!flag) { + // Check for pending acknowledgments + if (!flag) + { MAcks.lock(); - flag= (NextAckToSend>LastAckSent); + flag = (NextAckToSend > LastAckSent); MAcks.unlock(); } - if (!flag) { + // Check for combined application packets + if (!flag) + { MCombinedAppPacket.lock(); - flag=(CombinedAppPacket!=NULL); + flag = (CombinedAppPacket != nullptr); MCombinedAppPacket.unlock(); } return flag; } +/** + * Clear all packets from outbound queues + * Thread-safe method to empty and delete all queued outbound packets + */ void EQStream::OutboundQueueClear() { MOutboundQueue.lock(); - while(NonSequencedQueue.size()) { + + // Clear non-sequenced queue + while (NonSequencedQueue.size()) + { delete NonSequencedQueue.front(); NonSequencedQueue.pop(); } - while(SequencedQueue.size()) { + + // Clear sequenced queue + while (SequencedQueue.size()) + { delete SequencedQueue.front(); SequencedQueue.pop_front(); } + MOutboundQueue.unlock(); } +/** + * Process incoming raw network data into protocol packets + * Handles decompression, decoding, validation, and packet creation + * + * @param buffer - Raw network data buffer + * @param length - Length of network data + */ void EQStream::Process(const unsigned char *buffer, const uint32 length) { received_packets++; -static unsigned char newbuffer[2048]; -uint32 newlength=0; + static unsigned char newbuffer[2048]; + uint32 newlength = 0; #ifdef LE_DEBUG printf("ProcessBuffer:\n"); @@ -1751,171 +2403,278 @@ DumpPacket(buffer, length); } } +/** + * Get the highest sequence number that has been acknowledged + * Thread-safe accessor for maximum acknowledged sequence number + * + * @return Highest acknowledged sequence number + */ long EQStream::GetMaxAckReceived() { MAcks.lock(); - long l=MaxAckReceived; + long l = MaxAckReceived; MAcks.unlock(); return l; } +/** + * Get the next sequence number that should be acknowledged + * Thread-safe accessor for next acknowledgment to send + * + * @return Next sequence number to acknowledge + */ long EQStream::GetNextAckToSend() { MAcks.lock(); - long l=NextAckToSend; + long l = NextAckToSend; MAcks.unlock(); return l; } +/** + * Get the last sequence number that was acknowledged + * Thread-safe accessor for last sent acknowledgment + * + * @return Last acknowledged sequence number + */ long EQStream::GetLastAckSent() { MAcks.lock(); - long l=LastAckSent; + long l = LastAckSent; MAcks.unlock(); return l; } +/** + * Set the maximum acknowledged sequence number + * Updates acknowledgment tracking and cleans up acknowledged packets from resend queue + * + * @param seq - New maximum acknowledged sequence number + */ void EQStream::SetMaxAckReceived(uint32 seq) { - deque::iterator itr; - MAcks.lock(); - MaxAckReceived=seq; + MaxAckReceived = seq; MAcks.unlock(); + MOutboundQueue.lock(); - if (long(seq) > LastSeqSent) - LastSeqSent=seq; + if (static_cast(seq) > LastSeqSent) + { + LastSeqSent = seq; + } + + // Clean up acknowledged packets from resend queue MResendQue.lock(); - EQProtocolPacket* packet = 0; - for(itr=resend_que.begin();itr!=resend_que.end();itr++){ - packet = *itr; - if(packet && packet->sequence <= seq){ + for (auto itr = resend_que.begin(); itr != resend_que.end();) + { + EQProtocolPacket* packet = *itr; + if (packet && packet->sequence <= seq) + { safe_delete(packet); itr = resend_que.erase(itr); - if(itr == resend_que.end()) + if (itr == resend_que.end()) + { break; + } + } + else + { + ++itr; } } MResendQue.unlock(); MOutboundQueue.unlock(); } +/** + * Set the next sequence number to acknowledge + * Thread-safe setter for next acknowledgment to send + * + * @param seq - Next sequence number to acknowledge + */ void EQStream::SetNextAckToSend(uint32 seq) { MAcks.lock(); - NextAckToSend=seq; + NextAckToSend = seq; MAcks.unlock(); } +/** + * Set the last sequence number that was acknowledged + * Thread-safe setter for last sent acknowledgment + * + * @param seq - Last acknowledged sequence number + */ void EQStream::SetLastAckSent(uint32 seq) { MAcks.lock(); - LastAckSent=seq; + LastAckSent = seq; MAcks.unlock(); } +/** + * Set the last sequence number that was sent + * Thread-safe setter for last transmitted sequence number + * + * @param seq - Last sent sequence number + */ void EQStream::SetLastSeqSent(uint32 seq) { MOutboundQueue.lock(); - LastSeqSent=seq; + LastSeqSent = seq; MOutboundQueue.unlock(); } +/** + * Configure stream parameters based on stream type + * Sets opcode size, compression, and encoding flags for different stream types + * + * @param type - EQStreamType to configure stream for + */ void EQStream::SetStreamType(EQStreamType type) { - StreamType=type; - switch (StreamType) { + StreamType = type; + + switch (StreamType) + { case LoginStream: - app_opcode_size=1; - compressed=false; - encoded=false; + app_opcode_size = 1; + compressed = false; + encoded = false; break; + case EQ2Stream: - app_opcode_size=2; - compressed=false; - encoded=false; + app_opcode_size = 2; + compressed = false; + encoded = false; break; + case ChatOrMailStream: case ChatStream: case MailStream: - app_opcode_size=1; - compressed=false; - encoded=true; + app_opcode_size = 1; + compressed = false; + encoded = true; break; + case ZoneStream: case WorldStream: default: - app_opcode_size=2; - compressed=true; - encoded=false; + app_opcode_size = 2; + compressed = true; + encoded = false; break; } } +/** + * Process queued out-of-order packets in sequence + * Handles packets that arrived before their predecessors + */ void EQStream::ProcessQueue() { - if (OutOfOrderpackets.empty()) { + if (OutOfOrderpackets.empty()) + { return; } - EQProtocolPacket* qp = NULL; - while ((qp = RemoveQueue(NextInSeq)) != NULL) { - //_log(NET__DEBUG, _L "Processing Queued Packet: Seq=%d" __L, NextInSeq); + // Process packets in sequence order + EQProtocolPacket* qp = nullptr; + while ((qp = RemoveQueue(NextInSeq)) != nullptr) + { ProcessPacket(qp); delete qp; - //_log(NET__APP_TRACE, _L "OP_Packet Queue size=%d" __L, PacketQueue.size()); } } +/** + * Remove and return packet with specific sequence number from queue + * Searches out-of-order packet queue for specified sequence + * + * @param seq - Sequence number of packet to remove + * @return EQProtocolPacket if found, nullptr if not found + */ EQProtocolPacket* EQStream::RemoveQueue(uint16 seq) { - map::iterator itr; - EQProtocolPacket* qp = NULL; - if ((itr = OutOfOrderpackets.find(seq)) != OutOfOrderpackets.end()) { + EQProtocolPacket* qp = nullptr; + auto itr = OutOfOrderpackets.find(seq); + if (itr != OutOfOrderpackets.end()) + { qp = itr->second; OutOfOrderpackets.erase(itr); - //_log(NET__APP_TRACE, _L "OP_Packet Queue size=%d" __L, PacketQueue.size()); } return qp; } +/** + * Apply bandwidth decay and check for packet timeouts + * Reduces byte counter over time and flags timed-out packets for retransmission + */ void EQStream::Decay() { + // Apply bandwidth decay MRate.lock(); - uint32 rate=DecayRate; + uint32 rate = DecayRate; MRate.unlock(); - if (BytesWritten>0) { - BytesWritten-=rate; - if (BytesWritten<0) - BytesWritten=0; - } - - int count = 0; - MOutboundQueue.lock(); - for (auto sitr = SequencedQueue.begin(); sitr != SequencedQueue.end(); ++sitr, count++) { - if (!(*sitr)->acked && (*sitr)->sent_time > 0 && ((*sitr)->sent_time + retransmittimeout) < Timer::GetCurrentTime2()) { - (*sitr)->sent_time = 0; - LogWrite(PACKET__DEBUG, 9, "Packet", "Timeout exceeded for seq %u. Flagging packet for retransmission", SequencedBase + count); + + if (BytesWritten > 0) + { + BytesWritten -= rate; + if (BytesWritten < 0) + { + BytesWritten = 0; } } + + // Check for packet timeouts and flag for retransmission + int count = 0; + MOutboundQueue.lock(); + + for (auto sitr = SequencedQueue.begin(); sitr != SequencedQueue.end(); ++sitr, count++) + { + if (!(*sitr)->acked && (*sitr)->sent_time > 0 && + ((*sitr)->sent_time + retransmittimeout) < Timer::GetCurrentTime2()) + { + (*sitr)->sent_time = 0; // Flag for retransmission + LogWrite(PACKET__DEBUG, 9, "Packet", "Timeout exceeded for seq %u. Flagging packet for retransmission", + SequencedBase + count); + } + } + MOutboundQueue.unlock(); } +/** + * Adjust transmission rates based on network conditions + * Calculates rate thresholds and decay rates from average delta timing + * + * @param average_delta - Average packet transmission time in milliseconds + */ void EQStream::AdjustRates(uint32 average_delta) { - if (average_delta && (average_delta <= AVERAGE_DELTA_MAX)) { + if (average_delta && (average_delta <= AVERAGE_DELTA_MAX)) + { MRate.lock(); + + // Calculate transmission parameters based on network timing AverageDelta = average_delta; RateThreshold = RATEBASE / average_delta; DecayRate = DECAYBASE / average_delta; + + // Adjust current byte counter if needed if (BytesWritten > RateThreshold) + { BytesWritten = RateThreshold + DecayRate; + } + MRate.unlock(); } - else { + else + { + // Use maximum delta if invalid value provided AverageDelta = AVERAGE_DELTA_MAX; } } diff --git a/source/common/EQStream.h b/source/common/EQStream.h index 3ce45a8..2abfc31 100644 --- a/source/common/EQStream.h +++ b/source/common/EQStream.h @@ -1,22 +1,6 @@ -/* - EQ2Emulator: Everquest II Server Emulator - Copyright (C) 2007 EQ2EMulator Development Team (http://www.eq2emulator.net) - - This file is part of EQ2Emulator. - - EQ2Emulator is free software: you can redistribute it and/or modify - it under the terms of the GNU General Public License as published by - the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - EQ2Emulator is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU General Public License for more details. - - You should have received a copy of the GNU General Public License - along with EQ2Emulator. If not, see . -*/ +// EQ2Emulator: Everquest II Server Emulator +// Copyright (C) 2007 EQ2EMulator Development Team +// Licensed under GPL v3 - see #ifndef _EQPROTOCOL_H #define _EQPROTOCOL_H @@ -24,12 +8,15 @@ #include #include #include - #include #include -#ifndef WIN32 +#include +#include + +// Unix networking headers #include -#endif + +// Project headers #include "EQPacket.h" #include "Mutex.h" #include "opcodemgr.h" @@ -38,338 +25,499 @@ #include "Crypto.h" #include "zlib.h" #include "timer.h" + #ifdef WRITE_PACKETS #include #endif using namespace std; +// Forward declarations +class OpcodeManager; +class EQStreamFactory; + +/** + * EverQuest stream connection states. + * Represents the current state of a network stream connection. + */ typedef enum { - ESTABLISHED, - WAIT_CLOSE, - CLOSING, - DISCONNECTING, - CLOSED + ESTABLISHED, // Active connection ready for data + WAIT_CLOSE, // Waiting for graceful close + CLOSING, // In process of closing + DISCONNECTING, // Actively disconnecting + CLOSED // Connection fully closed } EQStreamState; -#define FLAG_COMPRESSED 0x01 -#define FLAG_ENCODED 0x04 +// Packet flags +#define FLAG_COMPRESSED 0x01 // Packet is compressed +#define FLAG_ENCODED 0x04 // Packet is encoded/encrypted -#define RATEBASE 1048576 // 1 MB -#define DECAYBASE 78642 // RATEBASE/10 +// Rate limiting and bandwidth constants +#define RATEBASE 1048576 // Base rate: 1 MB +#define DECAYBASE 78642 // Decay rate: RATEBASE/10 +// Retransmission timing constants #ifndef RETRANSMIT_TIMEOUT_MULT -#define RETRANSMIT_TIMEOUT_MULT 3.0 +#define RETRANSMIT_TIMEOUT_MULT 3.0 // Timeout multiplier #endif #ifndef RETRANSMIT_TIMEOUT_MAX -#define RETRANSMIT_TIMEOUT_MAX 5000 +#define RETRANSMIT_TIMEOUT_MAX 5000 // Maximum retransmit timeout (ms) #endif #ifndef AVERAGE_DELTA_MAX -#define AVERAGE_DELTA_MAX 2500 +#define AVERAGE_DELTA_MAX 2500 // Maximum average delta (ms) #endif #pragma pack(1) + +/** + * Session request packet structure. + * Sent by client to initiate a new session. + */ struct SessionRequest { - uint32 UnknownA; - uint32 Session; - uint32 MaxLength; + uint32 UnknownA; // Unknown field A + uint32 Session; // Requested session ID + uint32 MaxLength; // Maximum packet length }; +/** + * Session response packet structure. + * Sent by server in response to session request. + */ struct SessionResponse { - uint32 Session; - uint32 Key; - uint8 UnknownA; - uint8 Format; - uint8 UnknownB; - uint32 MaxLength; - uint32 UnknownD; + uint32 Session; // Assigned session ID + uint32 Key; // Encryption/authentication key + uint8 UnknownA; // Unknown field A + uint8 Format; // Packet format version + uint8 UnknownB; // Unknown field B + uint32 MaxLength; // Maximum packet length + uint32 UnknownD; // Unknown field D }; -//Deltas are in ms, representing round trip times +/** + * Client-side session statistics. + * Deltas are in milliseconds, representing round trip times. + */ struct ClientSessionStats { -/*000*/ uint16 RequestID; -/*002*/ uint32 last_local_delta; -/*006*/ uint32 average_delta; -/*010*/ uint32 low_delta; -/*014*/ uint32 high_delta; -/*018*/ uint32 last_remote_delta; -/*022*/ uint64 packets_sent; -/*030*/ uint64 packets_recieved; +/*000*/ uint16 RequestID; // Statistics request ID +/*002*/ uint32 last_local_delta; // Last local round trip time +/*006*/ uint32 average_delta; // Average round trip time +/*010*/ uint32 low_delta; // Lowest recorded round trip time +/*014*/ uint32 high_delta; // Highest recorded round trip time +/*018*/ uint32 last_remote_delta; // Last remote round trip time +/*022*/ uint64 packets_sent; // Total packets sent +/*030*/ uint64 packets_recieved; // Total packets received /*038*/ }; +/** + * Server-side session statistics. + * Provides server perspective on connection quality. + */ struct ServerSessionStats { - uint16 RequestID; - uint32 current_time; - uint32 unknown1; - uint32 received_packets; - uint32 unknown2; - uint32 sent_packets; - uint32 unknown3; - uint32 sent_packets2; - uint32 unknown4; - uint32 received_packets2; + uint16 RequestID; // Statistics request ID + uint32 current_time; // Current server time + uint32 unknown1; // Unknown field 1 + uint32 received_packets; // Packets received by server + uint32 unknown2; // Unknown field 2 + uint32 sent_packets; // Packets sent by server + uint32 unknown3; // Unknown field 3 + uint32 sent_packets2; // Duplicate sent packets count + uint32 unknown4; // Unknown field 4 + uint32 received_packets2; // Duplicate received packets count }; #pragma pack() -class OpcodeManager; -extern OpcodeManager *EQNetworkOpcodeManager; - -class EQStreamFactory; +// External opcode manager +extern OpcodeManager* EQNetworkOpcodeManager; +/** + * Types of EverQuest network streams. + * Each type handles different aspects of the game protocol. + */ typedef enum { - UnknownStream=0, - LoginStream, - WorldStream, - ZoneStream, - ChatOrMailStream, - ChatStream, - MailStream, - EQ2Stream, + UnknownStream = 0, // Unidentified stream type + LoginStream, // Login server authentication + WorldStream, // World server communication + ZoneStream, // Zone server gameplay + ChatOrMailStream, // Combined chat/mail (legacy) + ChatStream, // Chat system only + MailStream, // Mail system only + EQ2Stream, // EverQuest 2 specific stream } EQStreamType; +/** + * EverQuest network stream class. + * Handles reliable UDP communication with sequence numbers, acknowledgments, + * compression, encryption, and packet combining for EverQuest protocols. + */ class EQStream { - protected: - typedef enum { - SeqPast, - SeqInOrder, - SeqFuture - } SeqOrder; +protected: + /** + * Sequence number ordering enumeration. + * Used to determine packet sequence relationships. + */ + typedef enum { + SeqPast, // Sequence is from the past (duplicate/old) + SeqInOrder, // Sequence is the expected next sequence + SeqFuture // Sequence is from the future (out of order) + } SeqOrder; - uint32 received_packets; - uint32 sent_packets; - uint32 remote_ip; - uint16 remote_port; - uint8 buffer[8192]; - unsigned char *oversize_buffer; - uint32 oversize_offset,oversize_length; - unsigned char *rogue_buffer; - uint32 roguebuf_offset,roguebuf_size; - uint8 app_opcode_size; - EQStreamType StreamType; - bool compressed,encoded; - - unsigned char write_buffer[2048]; - - uint32 retransmittimer; - uint32 retransmittimeout; - //uint32 buffer_len; + // Packet statistics + uint32 received_packets; // Total packets received + uint32 sent_packets; // Total packets sent + + // Remote endpoint information + uint32 remote_ip; // Remote IP address + uint16 remote_port; // Remote port number + + // Packet buffers + uint8 buffer[8192]; // Main packet buffer + unsigned char* oversize_buffer; // Buffer for oversized packets + uint32 oversize_offset; // Offset in oversize buffer + uint32 oversize_length; // Length of oversize buffer data + unsigned char* rogue_buffer; // Buffer for rogue/malformed packets + uint32 roguebuf_offset; // Offset in rogue buffer + uint32 roguebuf_size; // Size of rogue buffer + + // Protocol configuration + uint8 app_opcode_size; // Size of application opcodes + EQStreamType StreamType; // Type of this stream + bool compressed; // Stream supports compression + bool encoded; // Stream supports encoding/encryption + + // Write buffer for outgoing packets + unsigned char write_buffer[2048]; + + // Retransmission timing + uint32 retransmittimer; // Current retransmit timer + uint32 retransmittimeout; // Retransmit timeout value - uint16 sessionAttempts; - uint16 reconnectAttempt; - bool streamactive; + // Session management + uint16 sessionAttempts; // Number of session attempts + uint16 reconnectAttempt; // Number of reconnect attempts + bool streamactive; // Stream is actively connected - uint32 Session, Key; - uint16 NextInSeq; - uint16 NextOutSeq; - uint16 SequencedBase; //the sequence number of SequencedQueue[0] - uint32 MaxLen; - uint16 MaxSends; - int8 timeout_delays; + // Session state + uint32 Session; // Session ID + uint32 Key; // Session encryption key + uint16 NextInSeq; // Next expected incoming sequence + uint16 NextOutSeq; // Next outgoing sequence number + uint16 SequencedBase; // Base sequence for SequencedQueue[0] + uint32 MaxLen; // Maximum packet length + uint16 MaxSends; // Maximum send attempts + int8 timeout_delays; // Number of timeout delays accumulated - uint8 active_users; //how many things are actively using this - Mutex MInUse; + // Thread safety for stream usage + uint8 active_users; // Number of active users of this stream + Mutex MInUse; // Mutex for usage tracking #ifdef WRITE_PACKETS - FILE* write_packets = NULL; - char GetChar(uchar in); - void WriteToFile(char* pFormat, ...); - void WritePackets(const char* opcodeName, uchar* data, int32 size, bool outgoing); - void WritePackets(EQ2Packet* app, bool outgoing); - Mutex MWritePackets; + // Packet logging for debugging + FILE* write_packets = nullptr; // File handle for packet dumps + char GetChar(uchar in); // Convert byte to printable character + void WriteToFile(char* pFormat, ...); // Write formatted data to file + void WritePackets(const char* opcodeName, uchar* data, int32 size, bool outgoing); + void WritePackets(EQ2Packet* app, bool outgoing); + Mutex MWritePackets; // Mutex for packet writing #endif - EQStreamState State; - Mutex MState; + // Stream connection state + EQStreamState State; // Current connection state + Mutex MState; // Mutex for state access - uint32 LastPacket; - Mutex MVarlock; + // Timing and general variables + uint32 LastPacket; // Timestamp of last packet activity + Mutex MVarlock; // General variable lock - EQApplicationPacket* CombinedAppPacket; - Mutex MCombinedAppPacket; + // Combined application packet handling + EQApplicationPacket* CombinedAppPacket; // Current combined packet + Mutex MCombinedAppPacket; // Mutex for combined packet access - long LastSeqSent; - Mutex MLastSeqSent; - void SetLastSeqSent(uint32); + // Sequence number tracking + long LastSeqSent; // Last sequence number sent + Mutex MLastSeqSent; // Mutex for last sequence sent + void SetLastSeqSent(uint32 seq); // Set last sent sequence number - // Ack sequence tracking. - long MaxAckReceived,NextAckToSend,LastAckSent; - long GetMaxAckReceived(); - long GetNextAckToSend(); - long GetLastAckSent(); - void SetMaxAckReceived(uint32 seq); - void SetNextAckToSend(uint32); - void SetLastAckSent(uint32); + // Acknowledgment sequence tracking + long MaxAckReceived; // Highest ack sequence received + long NextAckToSend; // Next ack sequence to send + long LastAckSent; // Last ack sequence sent + + // Acknowledgment accessor methods + long GetMaxAckReceived(); + long GetNextAckToSend(); + long GetLastAckSent(); + void SetMaxAckReceived(uint32 seq); + void SetNextAckToSend(uint32 seq); + void SetLastAckSent(uint32 seq); - Mutex MAcks; + Mutex MAcks; // Mutex for acknowledgment data - // Packets waiting to be sent - queue NonSequencedQueue; - deque SequencedQueue; - map OutOfOrderpackets; - Mutex MOutboundQueue; + // Outbound packet queues + queue NonSequencedQueue; // Non-sequenced packets + deque SequencedQueue; // Sequenced packets + map OutOfOrderpackets; // Out-of-order packets + Mutex MOutboundQueue; // Mutex for outbound queues - // Packes waiting to be processed - deque InboundQueue; - Mutex MInboundQueue; + // Inbound packet queue + deque InboundQueue; // Packets waiting processing + Mutex MInboundQueue; // Mutex for inbound queue - static uint16 MaxWindowSize; + // Static configuration + static uint16 MaxWindowSize; // Maximum window size for flow control - sint32 BytesWritten; + // Rate limiting and flow control + sint32 BytesWritten; // Bytes written this period + Mutex MRate; // Mutex for rate limiting + sint32 RateThreshold; // Rate limiting threshold + sint32 DecayRate; // Rate decay per time period + uint32 AverageDelta; // Average round-trip time - Mutex MRate; - sint32 RateThreshold; - sint32 DecayRate; - uint32 AverageDelta; + // Factory reference + EQStreamFactory* Factory; // Factory that created this stream - EQStreamFactory *Factory; +public: + // EQ2-specific packet combining + Mutex MCombineQueueLock; // Mutex for combine queue operations + bool CheckCombineQueue(); // Check and process combine queue + deque combine_queue; // Queue of packets to combine + Timer* combine_timer; // Timer for combine operations - public: - Mutex MCombineQueueLock; - bool CheckCombineQueue(); - deque combine_queue; - Timer* combine_timer; - - Crypto* crypto; - int8 EQ2_Compress(EQ2Packet* app, int8 offset = 3); - z_stream stream; - uchar* stream_buffer; - int32 stream_buffer_size; - bool eq2_compressed; - int8 compressed_offset; - int16 client_version; - int16 GetClientVersion(){ return client_version; } - void SetClientVersion(int16 version){ client_version = version; } - void ResetSessionAttempts() { reconnectAttempt = 0; } - bool HasSessionAttempts() { return reconnectAttempt>0; } - EQStream() { init(); remote_ip = 0; remote_port = 0; State = CLOSED; StreamType = UnknownStream; compressed = true; - encoded = false; app_opcode_size = 2;} - EQStream(sockaddr_in addr); - virtual ~EQStream() { - MOutboundQueue.lock(); - SetState(CLOSED); - MOutboundQueue.unlock(); - RemoveData(); - safe_delete(crypto); - safe_delete(combine_timer); - safe_delete(resend_que_timer); - safe_delete_array(oversize_buffer); - safe_delete_array(rogue_buffer); - deque::iterator cmb; - MCombineQueueLock.lock(); - for (cmb = combine_queue.begin(); cmb != combine_queue.end(); cmb++){ - safe_delete(*cmb); - } - MCombineQueueLock.unlock(); - deflateEnd(&stream); - map::iterator oop; - for (oop = OutOfOrderpackets.begin(); oop != OutOfOrderpackets.end(); oop++){ - safe_delete(oop->second); - } -#ifdef WRITE_PACKETS - if (write_packets) - fclose(write_packets); -#endif + // Encryption and compression + Crypto* crypto; // Cryptographic handler + int8 EQ2_Compress(EQ2Packet* app, int8 offset = 3); // Compress EQ2 packet + z_stream stream; // zlib compression stream + uchar* stream_buffer; // Compression buffer + int32 stream_buffer_size; // Size of compression buffer + bool eq2_compressed; // Stream uses EQ2 compression + int8 compressed_offset; // Offset for compressed data + + // Client version management + int16 client_version; // Client protocol version + int16 GetClientVersion() { return client_version; } + void SetClientVersion(int16 version) { client_version = version; } + + // Session attempt management + void ResetSessionAttempts() { reconnectAttempt = 0; } + bool HasSessionAttempts() { return reconnectAttempt > 0; } + + // Constructors + EQStream() { + init(); + remote_ip = 0; + remote_port = 0; + State = CLOSED; + StreamType = UnknownStream; + compressed = true; + encoded = false; + app_opcode_size = 2; + } + + EQStream(sockaddr_in addr); + // Destructor + virtual ~EQStream() { + // Close stream safely + MOutboundQueue.lock(); + SetState(CLOSED); + MOutboundQueue.unlock(); + + // Clean up data structures + RemoveData(); + + // Clean up allocated resources + safe_delete(crypto); + safe_delete(combine_timer); + safe_delete(resend_que_timer); + safe_delete_array(oversize_buffer); + safe_delete_array(rogue_buffer); + + // Clean up combine queue + MCombineQueueLock.lock(); + for (auto cmb = combine_queue.begin(); cmb != combine_queue.end(); ++cmb) { + safe_delete(*cmb); } - inline void SetFactory(EQStreamFactory *f) { Factory=f; } - void init(bool resetSession = true); - void SetMaxLen(uint32 length) { MaxLen=length; } - int8 getTimeoutDelays(){ return timeout_delays; } - void addTimeoutDelay(){ timeout_delays++; } - void EQ2QueuePacket(EQ2Packet* app, bool attempted_combine = false); - void PreparePacket(EQ2Packet* app, int8 offset = 0); - void UnPreparePacket(EQ2Packet* app); - void EncryptPacket(EQ2Packet* app, int8 compress_offset, int8 offset); - void FlushCombinedPacket(); - void SendPacket(EQApplicationPacket *p); - void QueuePacket(EQProtocolPacket *p); - void SendPacket(EQProtocolPacket *p); - vector convert(EQApplicationPacket *p); - void NonSequencedPush(EQProtocolPacket *p); - void SequencedPush(EQProtocolPacket *p); - - Mutex MResendQue; - Mutex MCompressData; - dequeresend_que; - void CheckResend(int eq_fd); - - void AckPackets(uint16 seq); - void Write(int eq_fd); - - void SetActive(bool val) { streamactive = val; } - - void WritePacket(int fd,EQProtocolPacket *p); - - void EncryptPacket(uchar* data, int16 size); - uint32 GetKey() { return Key; } - void SetKey(uint32 k) { Key=k; } - void SetSession(uint32 s) { Session=s; } - void SetLastPacketTime(uint32 t) {LastPacket=t;} - - void Process(const unsigned char *data, const uint32 length); - void ProcessPacket(EQProtocolPacket *p, EQProtocolPacket* lastp=NULL); + MCombineQueueLock.unlock(); - bool ProcessEmbeddedPacket(uchar* pBuffer, uint16 length, int8 opcode = OP_Packet); - bool HandleEmbeddedPacket(EQProtocolPacket *p, int16 offset = 2, int16 length = 0); - - EQProtocolPacket * ProcessEncryptedPacket(EQProtocolPacket *p); - EQProtocolPacket * ProcessEncryptedData(uchar* data, int32 size, int16 opcode); - - virtual void DispatchPacket(EQApplicationPacket *p) { p->DumpRaw(); } - - void SendSessionResponse(); - void SendSessionRequest(); - void SendDisconnect(bool setstate = true); - void SendAck(uint16 seq); - void SendOutOfOrderAck(uint16 seq); - - bool CheckTimeout(uint32 now, uint32 timeout=30) { return (LastPacket && (now-LastPacket) > timeout); } - bool Stale(uint32 now, uint32 timeout=30) { return (LastPacket && (now-LastPacket) > timeout); } - - void InboundQueuePush(EQApplicationPacket *p); - EQApplicationPacket *PopPacket(); // InboundQueuePop - void InboundQueueClear(); - - void OutboundQueueClear(); - bool HasOutgoingData(); - void SendKeyRequest(); - int16 processRSAKey(EQProtocolPacket *p, uint16 subpacket_length = 0); - void RemoveData() { InboundQueueClear(); OutboundQueueClear(); if (CombinedAppPacket) delete CombinedAppPacket; } - - // - inline bool IsInUse() { bool flag; MInUse.lock(); flag=(active_users>0); MInUse.unlock(); return flag; } - inline void PutInUse() { MInUse.lock(); active_users++; MInUse.unlock(); } - inline void ReleaseFromUse() { MInUse.lock(); if(active_users > 0) active_users--; MInUse.unlock(); } - - static SeqOrder CompareSequence(uint16 expected_seq, uint16 seq); - - inline EQStreamState GetState() { return State; } - inline void SetState(EQStreamState state) { MState.lock(); State = state; MState.unlock(); } - - inline uint32 GetRemoteIP() { return remote_ip; } - inline uint32 GetrIP() { return remote_ip; } - inline uint16 GetRemotePort() { return remote_port; } - inline uint16 GetrPort() { return remote_port; } + // Clean up compression stream + deflateEnd(&stream); + // Clean up out-of-order packets + for (auto oop = OutOfOrderpackets.begin(); oop != OutOfOrderpackets.end(); ++oop) { + safe_delete(oop->second); + } + +#ifdef WRITE_PACKETS + if (write_packets) { + fclose(write_packets); + } +#endif + } + // Factory and initialization + void SetFactory(EQStreamFactory* f) { Factory = f; } + void init(bool resetSession = true); + + // Configuration + void SetMaxLen(uint32 length) { MaxLen = length; } + int8 getTimeoutDelays() { return timeout_delays; } + void addTimeoutDelay() { timeout_delays++; } + + // EQ2-specific packet handling + void EQ2QueuePacket(EQ2Packet* app, bool attempted_combine = false); + void PreparePacket(EQ2Packet* app, int8 offset = 0); + void UnPreparePacket(EQ2Packet* app); + void EncryptPacket(EQ2Packet* app, int8 compress_offset, int8 offset); + void FlushCombinedPacket(); + + // General packet transmission + void SendPacket(EQApplicationPacket* p); + void QueuePacket(EQProtocolPacket* p); + void SendPacket(EQProtocolPacket* p); + vector convert(EQApplicationPacket* p); + void NonSequencedPush(EQProtocolPacket* p); + void SequencedPush(EQProtocolPacket* p); - static EQProtocolPacket *Read(int eq_fd, sockaddr_in *from); + // Resend queue management + Mutex MResendQue; // Mutex for resend queue + Mutex MCompressData; // Mutex for compression operations + deque resend_que; // Queue of packets needing resend + void CheckResend(int eq_fd); // Check and handle packet resends - void Close() { SendDisconnect(); } - bool CheckActive() { return (GetState()==ESTABLISHED); } - bool CheckClosed() { return GetState()==CLOSED; } - void SetOpcodeSize(uint8 s) { app_opcode_size = s; } - void SetStreamType(EQStreamType t); - inline const EQStreamType GetStreamType() const { return StreamType; } + // Acknowledgment and writing + void AckPackets(uint16 seq); // Acknowledge received packets + void Write(int eq_fd); // Write packets to socket - void ProcessQueue(); - EQProtocolPacket* RemoveQueue(uint16 seq); + // Stream state management + void SetActive(bool val) { streamactive = val; } + + // Low-level packet operations + void WritePacket(int fd, EQProtocolPacket* p); + void EncryptPacket(uchar* data, int16 size); + + // Session management + uint32 GetKey() { return Key; } + void SetKey(uint32 k) { Key = k; } + void SetSession(uint32 s) { Session = s; } + void SetLastPacketTime(uint32 t) { LastPacket = t; } - void Decay(); - void AdjustRates(uint32 average_delta); - Timer* resend_que_timer; + // Packet processing + void Process(const unsigned char* data, uint32 length); + void ProcessPacket(EQProtocolPacket* p, EQProtocolPacket* lastp = nullptr); + + // Embedded packet handling + bool ProcessEmbeddedPacket(uchar* pBuffer, uint16 length, int8 opcode = OP_Packet); + bool HandleEmbeddedPacket(EQProtocolPacket* p, int16 offset = 2, int16 length = 0); + + // Encryption handling + EQProtocolPacket* ProcessEncryptedPacket(EQProtocolPacket* p); + EQProtocolPacket* ProcessEncryptedData(uchar* data, int32 size, int16 opcode); + + // Virtual packet dispatch (override in derived classes) + virtual void DispatchPacket(EQApplicationPacket* p) { p->DumpRaw(); } + + // Session protocol messages + void SendSessionResponse(); + void SendSessionRequest(); + void SendDisconnect(bool setstate = true); + void SendAck(uint16 seq); + void SendOutOfOrderAck(uint16 seq); + + // Connection health checks + bool CheckTimeout(uint32 now, uint32 timeout = 30) { + return (LastPacket && (now - LastPacket) > timeout); + } + bool Stale(uint32 now, uint32 timeout = 30) { + return (LastPacket && (now - LastPacket) > timeout); + } + + // Inbound queue management + void InboundQueuePush(EQApplicationPacket* p); + EQApplicationPacket* PopPacket(); // Pop packet from inbound queue + void InboundQueueClear(); + + // Outbound queue management + void OutboundQueueClear(); + bool HasOutgoingData(); + + // Key exchange and RSA + void SendKeyRequest(); + int16 processRSAKey(EQProtocolPacket* p, uint16 subpacket_length = 0); + + // Data cleanup + void RemoveData() { + InboundQueueClear(); + OutboundQueueClear(); + if (CombinedAppPacket) { + delete CombinedAppPacket; + CombinedAppPacket = nullptr; + } + } + + // Usage tracking (thread-safe reference counting) + bool IsInUse() { + bool flag; + MInUse.lock(); + flag = (active_users > 0); + MInUse.unlock(); + return flag; + } + + void PutInUse() { + MInUse.lock(); + active_users++; + MInUse.unlock(); + } + + void ReleaseFromUse() { + MInUse.lock(); + if (active_users > 0) { + active_users--; + } + MInUse.unlock(); + } + + // Sequence number utilities + static SeqOrder CompareSequence(uint16 expected_seq, uint16 seq); + + // State management + EQStreamState GetState() { return State; } + void SetState(EQStreamState state) { + MState.lock(); + State = state; + MState.unlock(); + } + + // Remote endpoint access + uint32 GetRemoteIP() { return remote_ip; } + uint32 GetrIP() { return remote_ip; } // Legacy alias + uint16 GetRemotePort() { return remote_port; } + uint16 GetrPort() { return remote_port; } // Legacy alias + + // Static packet reading + static EQProtocolPacket* Read(int eq_fd, sockaddr_in* from); + + // Connection management + void Close() { SendDisconnect(); } + bool CheckActive() { return (GetState() == ESTABLISHED); } + bool CheckClosed() { return GetState() == CLOSED; } + + // Stream configuration + void SetOpcodeSize(uint8 s) { app_opcode_size = s; } + void SetStreamType(EQStreamType t); + EQStreamType GetStreamType() const { return StreamType; } + + // Queue processing + void ProcessQueue(); + EQProtocolPacket* RemoveQueue(uint16 seq); + + // Rate limiting and flow control + void Decay(); + void AdjustRates(uint32 average_delta); + + // Resend timer + Timer* resend_que_timer; // Timer for checking resend queue }; #endif