// EQ2Emulator: Everquest II Server Emulator // Copyright (C) 2007 EQ2EMulator Development Team // Licensed under GPL v3 - see #include "debug.h" // Standard library headers #include #include #include #include #include #include #include #include // Unix networking headers #include #include #include #include #include #include #include // Project headers #include "EQPacket.h" #include "EQStream.h" #include "EQStreamFactory.h" #include "misc.h" #include "Mutex.h" #include "op_codes.h" #include "CRC16.h" #include "packet_dump.h" #include "EQ2_Common_Structs.h" #include "Log.h" #ifdef LOGIN #include "../LoginServer/login_structs.h" #endif //#define DEBUG_EMBEDDED_PACKETS 1 // Static configuration uint16 EQStream::MaxWindowSize = 2048; /** * Initialize the EQ stream with default values. * * @param resetSession - Whether to reset session-specific data */ void EQStream::init(bool resetSession) { if (resetSession) { streamactive = false; sessionAttempts = 0; } timeout_delays = 0; // Initialize usage tracking MInUse.lock(); active_users = 0; MInUse.unlock(); // Initialize session state Session = 0; Key = 0; MaxLen = 0; NextInSeq = 0; NextOutSeq = 0; CombinedAppPacket = nullptr; // Initialize acknowledgment tracking MAcks.lock(); MaxAckReceived = -1; NextAckToSend = -1; LastAckSent = -1; MAcks.unlock(); // Initialize sequence tracking LastSeqSent = -1; MaxSends = 5; LastPacket = Timer::GetCurrentTime2(); // Initialize buffers oversize_buffer = nullptr; oversize_length = 0; oversize_offset = 0; Factory = nullptr; rogue_buffer = nullptr; roguebuf_offset = 0; roguebuf_size = 0; // Initialize rate limiting MRate.lock(); RateThreshold = RATEBASE / 250; DecayRate = DECAYBASE / 250; MRate.unlock(); // Initialize flow control BytesWritten = 0; SequencedBase = 0; AverageDelta = 500; // Initialize encryption crypto->setRC4Key(0); // Initialize retransmission retransmittimer = Timer::GetCurrentTime2(); retransmittimeout = 500 * RETRANSMIT_TIMEOUT_MULT; // Initialize reconnection reconnectAttempt = 0; // Validate queue consistency if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { LogWrite(PACKET__DEBUG, 9, "Packet", "init Invalid Sequenced queue: BS %u + SQ %u != NOS %u", SequencedBase, SequencedQueue.size(), NextOutSeq); } } /** * EQStream constructor with socket address. * Initializes all stream components and sets up compression. * * @param addr - Socket address of remote endpoint */ EQStream::EQStream(sockaddr_in addr) { // Initialize crypto and timers crypto = new Crypto(); resend_que_timer = new Timer(1000); combine_timer = new Timer(250); // 250 milliseconds combine_timer->Start(); resend_que_timer->Start(); // Initialize base stream init(); // Set remote endpoint remote_ip = addr.sin_addr.s_addr; remote_port = addr.sin_port; // Set initial state State = CLOSED; StreamType = UnknownStream; compressed = true; encoded = false; app_opcode_size = 2; // Initialize compression stream (Unix only) bzero(&stream, sizeof(z_stream)); stream.zalloc = (alloc_func)0; stream.zfree = (free_func)0; stream.opaque = (voidpf)0; deflateInit2(&stream, 9, Z_DEFLATED, 13, 9, Z_DEFAULT_STRATEGY); // Initialize compression state compressed_offset = 0; client_version = 0; received_packets = 0; sent_packets = 0; #ifdef WRITE_PACKETS // Initialize packet logging write_packets = nullptr; char write_packets_filename[64]; snprintf(write_packets_filename, sizeof(write_packets_filename), "PacketLog%i.log", Timer::GetCurrentTime2()); write_packets = fopen(write_packets_filename, "w+"); #endif } /** * Process encrypted packet data and extract the embedded packet. * Decrypts the data and extracts the opcode and payload. * * @param data - Encrypted data buffer * @param size - Size of encrypted data * @param opcode - Reference to opcode (will be modified) * @return Newly created EQProtocolPacket with decrypted data */ EQProtocolPacket* EQStream::ProcessEncryptedData(uchar* data, int32 size, int16 opcode) { // Decrypt the data using RC4 crypto->RC4Decrypt(data, size); int8 offset = 0; if (data[0] == 0xFF && size > 2) { // Extended opcode format (2-byte opcode) offset = 3; memcpy(&opcode, data + sizeof(int8), sizeof(int16)); } else { // Standard opcode format (1-byte opcode) offset = 1; memcpy(&opcode, data, sizeof(int8)); } return new EQProtocolPacket(opcode, data + offset, size - offset); } /** * Process an encrypted protocol packet. * Determines the correct offset and calls ProcessEncryptedData. * * @param p - Encrypted protocol packet to process * @return Newly created EQProtocolPacket with decrypted data */ EQProtocolPacket* EQStream::ProcessEncryptedPacket(EQProtocolPacket* p) { EQProtocolPacket* ret = nullptr; if (p->opcode == OP_Packet && p->size > 2) { // Skip the first 2 bytes for OP_Packet ret = ProcessEncryptedData(p->pBuffer + 2, p->size - 2, p->opcode); } else { // Process entire buffer for other opcodes ret = ProcessEncryptedData(p->pBuffer, p->size, p->opcode); } return ret; } /** * Process an embedded encrypted packet within another packet. * Decrypts and queues the embedded packet for processing. * * @param pBuffer - Buffer containing embedded packet data * @param length - Length of embedded packet * @param opcode - Opcode for the embedded packet * @return true if packet was successfully processed */ bool EQStream::ProcessEmbeddedPacket(uchar* pBuffer, int16 length, int8 opcode) { if (!pBuffer || !crypto->isEncrypted()) { return false; } // Process encrypted data with thread safety MCombineQueueLock.lock(); EQProtocolPacket* newpacket = ProcessEncryptedData(pBuffer, length, opcode); MCombineQueueLock.unlock(); if (newpacket) { #ifdef DEBUG_EMBEDDED_PACKETS printf("Embedded Packet - Opcode: %u\n", newpacket->opcode); DumpPacket(newpacket->pBuffer, newpacket->size); #endif // Convert to application packet and queue EQApplicationPacket* ap = newpacket->MakeApplicationPacket(2); if (ap->version == 0) { ap->version = client_version; } InboundQueuePush(ap); #ifdef WRITE_PACKETS WritePackets(ap->GetOpcodeName(), pBuffer, length, false); #endif safe_delete(newpacket); return true; } return false; } /** * Handle embedded packets within a protocol packet. * Processes different types of embedded packet formats. * * @param p - Protocol packet containing embedded data * @param offset - Offset to start of embedded data * @param length - Length of embedded data (0 = auto-detect) * @return true if embedded packet was successfully processed */ bool EQStream::HandleEmbeddedPacket(EQProtocolPacket* p, int16 offset, int16 length) { if (!p) { return false; } #ifdef DEBUG_EMBEDDED_PACKETS printf("HandleEmbeddedPacket - offset: %u, length: %u, size: %u\n", offset, length, p->size); #endif // Ensure we have enough data for the offset if (p->size < static_cast(offset + 2)) { return false; } // Check for OP_AppCombined embedded packet (0x00 0x19) if (p->pBuffer[offset] == 0 && p->pBuffer[offset + 1] == 0x19) { uint32 data_length = 0; if (length == 0) { // Auto-detect length data_length = p->size - offset - 2; } else { // Use provided length if (length < 2) { return false; // Length too short } data_length = length - 2; } #ifdef DEBUG_EMBEDDED_PACKETS printf("Processing OP_AppCombined - offset: %u, data_length: %u\n", offset, data_length); DumpPacket(p->pBuffer, p->size); #endif // Verify bounds if (offset + 2 + data_length > p->size) { return false; // Out of bounds } // Create and process sub-packet auto subp = new EQProtocolPacket(OP_AppCombined, p->pBuffer + offset + 2, data_length); subp->copyInfo(p); ProcessPacket(subp, p); safe_delete(subp); return true; } // Check for null opcode embedded packet (0x00 0x00) else if (p->pBuffer[offset] == 0 && p->pBuffer[offset + 1] == 0) { if (length == 0) { length = p->size - 1 - offset; } else { length--; } #ifdef DEBUG_EMBEDDED_PACKETS printf("Processing null opcode embedded packet\n"); DumpPacket(p->pBuffer + 1 + offset, length); #endif uchar* buffer = (p->pBuffer + 1 + offset); bool valid = ProcessEmbeddedPacket(buffer, length); if (valid) { return true; } } // Check for general embedded packet (not 0xffffffff) else if (offset + 4 < p->size && ntohl(*(uint32*)(p->pBuffer + offset)) != 0xffffffff) { #ifdef DEBUG_EMBEDDED_PACKETS uint16 seq = NextInSeq - 1; sint8 check = 0; if (offset == 2) { seq = ntohs(*(uint16*)(p->pBuffer)); check = CompareSequence(NextInSeq, seq); } printf("Processing general embedded packet - offset: %u, length: %u, size: %u, check: %i, seq: %u\n", offset, length, p->size, check, seq); DumpPacket(p->pBuffer, p->size); #endif if (length == 0) { length = p->size - offset; } uchar* buffer = (p->pBuffer + offset); bool valid = ProcessEmbeddedPacket(buffer, length); if (valid) { return true; } } // Check for length-prefixed embedded packet (not 0xff, then 0xff) else if (p->pBuffer[offset] != 0xff && p->pBuffer[offset + 1] == 0xff && p->size >= offset + 3) { // Get total length from first byte uint16 total_length = p->pBuffer[offset]; // Verify packet size matches expected length if (total_length + offset + 2 == p->size && total_length >= 2) { uint32 data_length = total_length - 2; // Create and process sub-packet auto subp = new EQProtocolPacket(p->pBuffer + offset + 2, data_length, OP_Packet); subp->copyInfo(p); ProcessPacket(subp, p); delete subp; return true; } } return false; } /** * Process a protocol packet and handle various packet types. * This is the main packet processing dispatcher for the stream. * * @param p - Protocol packet to process * @param lastp - Previous packet in sequence (for context) */ void EQStream::ProcessPacket(EQProtocolPacket* p, EQProtocolPacket* lastp) { uint32 processed = 0; uint32 subpacket_length = 0; if (!p) { return; } // Ensure session is initialized for non-session packets if (p->opcode != OP_SessionRequest && p->opcode != OP_SessionResponse && !Session) { #ifdef EQN_DEBUG LogWrite(PACKET__ERROR, 0, "Packet", "Session not initialized, packet ignored"); #endif return; } // Process packet based on opcode switch (p->opcode) { case OP_Combined: { // Handle combined packets containing multiple sub-packets processed = 0; int8 offset = 0; int count = 0; #ifdef LE_DEBUG printf("Processing OP_Combined packet\n"); DumpPacket(p); #endif // Process all sub-packets within the combined packet while (processed < p->size) { // Determine sub-packet length and offset subpacket_length = static_cast(*(p->pBuffer + processed)); if (subpacket_length == 0xff) { // Extended length format (2-byte length) subpacket_length = ntohs(*(uint16*)(p->pBuffer + processed + 1)); offset = 3; } else { // Standard format (1-byte length) offset = 1; } count++; #ifdef LE_DEBUG printf("Processing sub-packet %i: length=%u, offset=%u\n", count, subpacket_length, processed); #endif // Check if this is a valid protocol packet bool isSubPacket = EQProtocolPacket::IsProtocolPacket( p->pBuffer + processed + offset, subpacket_length, false); if (isSubPacket) { // Create and process sub-packet auto subp = new EQProtocolPacket( p->pBuffer + processed + offset, subpacket_length); subp->copyInfo(p); #ifdef LE_DEBUG printf("Sub-packet opcode: %i\n", subp->opcode); DumpPacket(subp); #endif ProcessPacket(subp, p); delete subp; } else { // Handle non-protocol packet data (encrypted) offset = 1; // Check for potential garbage packet uint16 header_check = ntohs(*reinterpret_cast( p->pBuffer + processed + offset)); if (header_check <= 0x1e) { // Process as potential OP_Packet subpacket_length = static_cast( *(p->pBuffer + processed)); LogWrite(PACKET__ERROR, 0, "Packet", "Processing suspected garbage packet as OP_Packet"); DumpPacket(p->pBuffer + processed + offset, subpacket_length); uchar* newbuf = p->pBuffer + processed + offset; auto subp = new EQProtocolPacket(newbuf, subpacket_length); subp->copyInfo(p); ProcessPacket(subp, p); delete subp; } else { // Decrypt the data and process crypto->RC4Decrypt(p->pBuffer + processed + offset, subpacket_length); LogWrite(PACKET__ERROR, 0, "Packet", "Processing encrypted sub-packet: " "processed=%u, offset=%u, count=%i, length=%u", processed, offset, count, subpacket_length); if (p->pBuffer[processed + offset] == 0xff) { // Skip 0xff marker and process data uchar* newbuf = p->pBuffer + processed + offset + 1; DumpPacket(p->pBuffer + processed + offset, subpacket_length); auto subp = new EQProtocolPacket( newbuf, subpacket_length, OP_Packet); subp->copyInfo(p); ProcessPacket(subp, p); delete subp; } else { // Invalid packet format - stop processing break; } } } // Move to next sub-packet processed += subpacket_length + offset; } break; } case OP_AppCombined: { // Handle application combined packets processed = 0; int8 offset = 0; int count = 0; #ifdef DEBUG_EMBEDDED_PACKETS printf("Processing OP_AppCombined packet\n"); DumpPacket(p); #endif while (processed < p->size) { count++; // Determine sub-packet length and offset subpacket_length = static_cast(*(p->pBuffer + processed)); if (subpacket_length == 0xff) { // Extended length format subpacket_length = ntohs(*(uint16*)(p->pBuffer + processed + 1)); offset = 3; } else { // Standard length format offset = 1; } // Handle RSA key exchange if no encryption key is set if (crypto->getRC4Key() == 0 && p && subpacket_length > 8 + offset) { #ifdef DEBUG_EMBEDDED_PACKETS DumpPacket(p->pBuffer, p->size); #endif p->pBuffer += offset; processRSAKey(p, subpacket_length); p->pBuffer -= offset; } else if (crypto->isEncrypted()) { // Process encrypted embedded packet #ifdef DEBUG_EMBEDDED_PACKETS printf("Processing encrypted sub-packet %i: length=%u, offset=%u\n", count, subpacket_length, processed); DumpPacket(p->pBuffer + processed + offset, subpacket_length); #endif if (!HandleEmbeddedPacket(p, processed + offset, subpacket_length)) { uchar* buffer = (p->pBuffer + processed + offset); if (!ProcessEmbeddedPacket(buffer, subpacket_length, OP_AppCombined)) { LogWrite(PACKET__ERROR, 0, "Packet", "ProcessEmbeddedPacket failed for OP_AppCombined"); } } } // Move to next sub-packet processed += subpacket_length + offset; } break; } case OP_Packet: { // Handle sequenced data packets if (!p->pBuffer || (p->Size() < 4)) { break; // Invalid packet } // Extract sequence number and check ordering uint16 seq = ntohs(*(uint16*)(p->pBuffer)); sint8 check = CompareSequence(NextInSeq, seq); if (check == SeqFuture) { // Future packet - store for later processing #ifdef EQN_DEBUG LogWrite(PACKET__DEBUG, 1, "Packet", "Future packet: Expected=%i, Got=%i", NextInSeq, seq); p->DumpRawHeader(seq); #endif OutOfOrderpackets[seq] = p->Copy(); // Note: Not sending out-of-order ACK to prevent loops } else if (check == SeqPast) { // Duplicate/old packet - acknowledge but don't process #ifdef EQN_DEBUG LogWrite(PACKET__DEBUG, 1, "Packet", "Duplicate packet: Expected=%i, Got=%i", NextInSeq, seq); p->DumpRawHeader(seq); #endif SendOutOfOrderAck(seq); } else { // In-order packet - process normally EQProtocolPacket* qp = RemoveQueue(seq); if (qp) { LogWrite(PACKET__DEBUG, 1, "Packet", "Removing older queued packet with sequence %i", seq); delete qp; } // Update sequence tracking SetNextAckToSend(seq); NextInSeq++; // Try to handle as embedded packet first if (HandleEmbeddedPacket(p)) { break; } // Handle RSA key exchange if no encryption key is set if (crypto->getRC4Key() == 0 && p && p->size >= 69) { #ifdef DEBUG_EMBEDDED_PACKETS DumpPacket(p->pBuffer, p->size); #endif processRSAKey(p); } else if (crypto->isEncrypted() && p) { // Process encrypted packet MCombineQueueLock.lock(); EQProtocolPacket* newpacket = ProcessEncryptedPacket(p); MCombineQueueLock.unlock(); if (newpacket) { // Convert to application packet and queue EQApplicationPacket* ap = newpacket->MakeApplicationPacket(2); if (ap->version == 0) { ap->version = client_version; } #ifdef WRITE_PACKETS WritePackets(ap->GetOpcodeName(), p->pBuffer, p->size, false); #endif InboundQueuePush(ap); safe_delete(newpacket); } } } break; } case OP_Fragment: { if (!p->pBuffer || (p->Size() < 4)) { break; } uint16 seq=ntohs(*(uint16 *)(p->pBuffer)); sint8 check=CompareSequence(NextInSeq,seq); if (check == SeqFuture) { #ifdef EQN_DEBUG LogWrite(PACKET__DEBUG, 1, "Packet", "*** Future packet2: Expecting Seq=%i, but got Seq=%i", NextInSeq, seq); LogWrite(PACKET__DEBUG, 1, "Packet", "[Start]"); //p->DumpRawHeader(seq); LogWrite(PACKET__DEBUG, 1, "Packet", "[End]"); #endif OutOfOrderpackets[seq] = p->Copy(); //SendOutOfOrderAck(seq); } else if (check == SeqPast) { #ifdef EQN_DEBUG LogWrite(PACKET__DEBUG, 1, "Packet", "*** Duplicate packet2: Expecting Seq=%i, but got Seq=%i", NextInSeq, seq); LogWrite(PACKET__DEBUG, 1, "Packet", "[Start]"); //p->DumpRawHeader(seq); LogWrite(PACKET__DEBUG, 1, "Packet", "[End]"); #endif //OutOfOrderpackets[seq] = p->Copy(); SendOutOfOrderAck(seq); } else { // In case we did queue one before as well. EQProtocolPacket* qp = RemoveQueue(seq); if (qp) { LogWrite(PACKET__DEBUG, 1, "Packet", "OP_Fragment: Removing older queued packet with sequence %i", seq); delete qp; } SetNextAckToSend(seq); NextInSeq++; if (oversize_buffer) { memcpy(oversize_buffer+oversize_offset,p->pBuffer+2,p->size-2); oversize_offset+=p->size-2; //cout << "Oversized is " << oversize_offset << "/" << oversize_length << " (" << (p->size-2) << ") Seq=" << seq << endl; if (oversize_offset==oversize_length) { if (*(p->pBuffer+2)==0x00 && *(p->pBuffer+3)==0x19) { EQProtocolPacket *subp=new EQProtocolPacket(oversize_buffer,oversize_offset); subp->copyInfo(p); ProcessPacket(subp, p); delete subp; } else { if(crypto->isEncrypted() && p && p->size > 2){ MCombineQueueLock.lock(); EQProtocolPacket* p2 = ProcessEncryptedData(oversize_buffer, oversize_offset, p->opcode); MCombineQueueLock.unlock(); EQApplicationPacket* ap = p2->MakeApplicationPacket(2); ap->copyInfo(p); if (ap->version == 0) ap->version = client_version; #ifdef WRITE_PACKETS WritePackets(ap->GetOpcodeName(), oversize_buffer, oversize_offset, false); #endif ap->copyInfo(p); InboundQueuePush(ap); safe_delete(p2); } } delete[] oversize_buffer; oversize_buffer=NULL; oversize_offset=0; } } else if (!oversize_buffer) { oversize_length=ntohl(*(uint32 *)(p->pBuffer+2)); oversize_buffer=new unsigned char[oversize_length]; memcpy(oversize_buffer,p->pBuffer+6,p->size-6); oversize_offset=p->size-6; //cout << "Oversized is " << oversize_offset << "/" << oversize_length << " (" << (p->size-6) << ") Seq=" << seq << endl; } } } break; case OP_KeepAlive: { #ifndef COLLECTOR NonSequencedPush(new EQProtocolPacket(p->opcode,p->pBuffer,p->size)); #endif } break; case OP_Ack: { if (!p->pBuffer || (p->Size() < 4)) { LogWrite(PACKET__DEBUG, 9, "Packet", "Received OP_Ack that was of malformed size"); break; } uint16 seq = ntohs(*(uint16*)(p->pBuffer)); AckPackets(seq); retransmittimer = Timer::GetCurrentTime2(); } break; case OP_SessionRequest: { if (p->Size() < sizeof(SessionRequest)) { break; } if (GetState() == ESTABLISHED) { //_log(NET__ERROR, _L "Received OP_SessionRequest in ESTABLISHED state (%d) streamactive (%i) attempt (%i)" __L, GetState(), streamactive, sessionAttempts); // client seems to try a max of 4 times (initial +3 retries) then gives up, giving it a few more attempts just in case // streamactive means we identified the opcode, we cannot re-establish this connection if (streamactive || (sessionAttempts > 30)) { SendDisconnect(false); SetState(CLOSED); break; } } sessionAttempts++; if(GetState() == WAIT_CLOSE) { printf("WAIT_CLOSE Reconnect with streamactive %u, sessionAttempts %u\n", streamactive, sessionAttempts); reconnectAttempt++; } init(GetState() != ESTABLISHED); OutboundQueueClear(); SessionRequest *Request=(SessionRequest *)p->pBuffer; Session=ntohl(Request->Session); SetMaxLen(ntohl(Request->MaxLength)); #ifndef COLLECTOR NextInSeq=0; Key=0x33624702; SendSessionResponse(); #endif SetState(ESTABLISHED); } break; /** * OP_SessionResponse: Handle session establishment response from server * Contains stream parameters, encryption keys, and stream type information */ case OP_SessionResponse: { // Validate packet size for SessionResponse structure if (p->Size() < sizeof(SessionResponse)) { break; } // Initialize stream and clear outbound queue init(); OutboundQueueClear(); SetActive(true); // Extract session response data auto Response = reinterpret_cast(p->pBuffer); SetMaxLen(ntohl(Response->MaxLength)); Key = ntohl(Response->Key); NextInSeq = 0; SetState(ESTABLISHED); // Set session ID if not already established if (!Session) { Session = ntohl(Response->Session); } // Extract compression and encoding flags compressed = (Response->Format & FLAG_COMPRESSED); encoded = (Response->Format & FLAG_ENCODED); // Determine stream type based on format flags and port if (compressed) { if (remote_port == 9000 || (remote_port == 0 && p->src_port == 9000)) { SetStreamType(WorldStream); } else { SetStreamType(ZoneStream); } } else if (encoded) { SetStreamType(ChatOrMailStream); } else { SetStreamType(LoginStream); } } break; /** * OP_SessionDisconnect: Handle graceful disconnect request * Initiates proper connection termination sequence */ case OP_SessionDisconnect: { // Send disconnect acknowledgment to peer SendDisconnect(); } break; /** * OP_OutOfOrderAck: Handle out-of-order packet acknowledgment * Marks specific packets as acknowledged and flags earlier packets for retransmission */ case OP_OutOfOrderAck: { // Validate packet structure and size if (!p->pBuffer || (p->Size() < 4)) { LogWrite(PACKET__DEBUG, 9, "Packet", "Received OP_OutOfOrderAck that was of malformed size"); break; } // Extract sequence number from packet uint16 seq = ntohs(*reinterpret_cast(p->pBuffer)); MOutboundQueue.lock(); // Validate sequenced queue integrity (pre-processing) if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { LogWrite(PACKET__DEBUG, 9, "Packet", "Pre-OOA Invalid Sequenced queue: BS %u + SQ %u != NOS %u", SequencedBase, SequencedQueue.size(), NextOutSeq); } // Verify that acknowledged packet is within valid window if (CompareSequence(SequencedBase, seq) != SeqPast && CompareSequence(NextOutSeq, seq) == SeqPast) { uint16 sqsize = SequencedQueue.size(); uint16 index = seq - SequencedBase; LogWrite(PACKET__DEBUG, 9, "Packet", "OP_OutOfOrderAck marking packet acked in queue (queue index = %u, queue size = %u)", index, sqsize); // Mark the specific packet as acknowledged if (index < sqsize) { SequencedQueue[index]->acked = true; // Flag earlier unacknowledged packets for retransmission uint16 count = 0; uint32 timeout = AverageDelta * 2 + 100; for (auto sitr = SequencedQueue.begin(); sitr != SequencedQueue.end() && count < index; ++sitr, ++count) { // Check if packet needs retransmission based on timeout if (!(*sitr)->acked && (*sitr)->sent_time > 0 && (((*sitr)->sent_time + timeout) < Timer::GetCurrentTime2())) { (*sitr)->sent_time = 0; // Mark for resend LogWrite(PACKET__DEBUG, 9, "Packet", "OP_OutOfOrderAck Flagging packet %u for retransmission", SequencedBase + count); } } } // Update retransmit timer if enabled if (RETRANSMIT_TIMEOUT_MULT) { retransmittimer = Timer::GetCurrentTime2(); } } else { LogWrite(PACKET__DEBUG, 9, "Packet", "Received OP_OutOfOrderAck for out-of-window %u. Window (%u->%u)", seq, SequencedBase, NextOutSeq); } // Validate sequenced queue integrity (post-processing) if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { LogWrite(PACKET__DEBUG, 9, "Packet", "Post-OOA Invalid Sequenced queue: BS %u + SQ %u != NOS %u", SequencedBase, SequencedQueue.size(), NextOutSeq); } MOutboundQueue.unlock(); } break; /** * OP_ServerKeyRequest: Handle server key exchange request * Processes client session statistics and initiates key exchange */ case OP_ServerKeyRequest: { // Validate packet size for ClientSessionStats structure if (p->Size() < sizeof(ClientSessionStats)) { break; } // Extract client session statistics auto Stats = reinterpret_cast(p->pBuffer); int16 request_id = Stats->RequestID; // Adjust transmission rates based on client feedback AdjustRates(ntohl(Stats->average_delta)); // Prepare server session statistics response auto stats = reinterpret_cast(p->pBuffer); memset(stats, 0, sizeof(ServerSessionStats)); stats->RequestID = request_id; stats->current_time = ntohl(Timer::GetCurrentTime2()); stats->sent_packets = ntohl(sent_packets); stats->sent_packets2 = ntohl(sent_packets); stats->received_packets = ntohl(received_packets); stats->received_packets2 = ntohl(received_packets); // Send session statistics response NonSequencedPush(new EQProtocolPacket(OP_SessionStatResponse, p->pBuffer, p->size)); // Initiate appropriate response based on encryption state if (!crypto->isEncrypted()) { SendKeyRequest(); } else { SendSessionResponse(); } } break; /** * OP_SessionStatResponse: Handle session statistics response * Currently just logs the receipt for debugging purposes */ case OP_SessionStatResponse: { LogWrite(PACKET__INFO, 0, "Packet", "OP_SessionStatResponse"); } break; /** * OP_OutOfSession: Handle out-of-session packet * Indicates client is no longer in valid session state */ case OP_OutOfSession: { LogWrite(PACKET__INFO, 0, "Packet", "OP_OutOfSession"); SendDisconnect(); SetState(CLOSED); } break; /** * Default case: Handle unknown/unprocessed packet types * Attempts RSA key processing and packet decryption for debugging */ default: { // Debug output for original packet std::cout << "Orig Packet: " << p->opcode << std::endl; DumpPacket(p->pBuffer, p->size); // Process RSA key if packet is large enough if (p && p->size >= 69) { processRSAKey(p); } // Attempt to decrypt packet data MCombineQueueLock.lock(); EQProtocolPacket* p2 = ProcessEncryptedData(p->pBuffer, p->size, OP_Packet); MCombineQueueLock.unlock(); // Debug output for decrypted packet if (p2) { std::cout << "Decrypted Packet: " << p2->opcode << std::endl; DumpPacket(p2->pBuffer, p2->size); safe_delete(p2); } #ifdef WRITE_PACKETS // Write packet data to file if enabled WritePackets("Unknown", p->pBuffer, p->size, false); #endif // Log unknown packet type LogWrite(PACKET__INFO, 0, "Packet", "Received unknown packet type, not adding to inbound queue"); } break; } } } /** * Compress EQ2 packet data using zlib deflate compression * Compresses packet data starting from the specified offset and updates packet buffer * * @param app - EQ2Packet to compress (modified in-place) * @param offset - Starting offset for compression data * @return Compression flag offset, or 0 on failure */ int8 EQStream::EQ2_Compress(EQ2Packet* app, int8 offset) { #ifdef LE_DEBUG std::printf("Before Compress in %s, line %i:\n", __FUNCTION__, __LINE__); DumpPacket(app); #endif // Set up compression parameters uchar* pDataPtr = app->pBuffer + offset; int xpandSize = app->size * 2; // Allocate double size for compression buffer auto deflate_buff = std::make_unique(xpandSize); // Lock compression data and configure zlib stream MCompressData.lock(); stream.next_in = pDataPtr; stream.avail_in = app->size - offset; stream.next_out = deflate_buff.get(); stream.avail_out = xpandSize; // Perform compression int ret = deflate(&stream, Z_SYNC_FLUSH); // Check for compression errors if (ret != Z_OK) { std::printf("ZLIB COMPRESSION RETFAIL: %i, %i (Ret: %i)\n", app->size, stream.avail_out, ret); MCompressData.unlock(); return 0; } // Calculate new size and rebuild packet buffer int32 newsize = xpandSize - stream.avail_out; safe_delete_array(app->pBuffer); app->size = newsize + offset; app->pBuffer = new uchar[app->size]; // Set compression flag and copy compressed data app->pBuffer[offset - 1] = 1; // Compression flag std::memcpy(app->pBuffer + offset, deflate_buff.get(), newsize); MCompressData.unlock(); #ifdef LE_DEBUG std::printf("After Compress in %s, line %i:\n", __FUNCTION__, __LINE__); DumpPacket(app); #endif return offset - 1; } /** * Process RSA key from protocol packet and initialize RC4 encryption * Extracts RSA-encrypted RC4 key from packet and configures stream encryption * * @param p - Protocol packet containing RSA key data * @param subpacket_length - Length of subpacket, 0 to use full packet * @return Always returns 0 (legacy return value) */ int16 EQStream::processRSAKey(EQProtocolPacket *p, uint16 subpacket_length) { // Extract RSA key from appropriate location in packet if (subpacket_length) { // Use subpacket-relative position for key extraction crypto->setRC4Key(Crypto::RSADecrypt(p->pBuffer + subpacket_length - 8, 8)); } else { // Use full packet size for key extraction crypto->setRC4Key(Crypto::RSADecrypt(p->pBuffer + p->size - 8, 8)); } return 0; } /** * Send encryption key request to establish secure communication * Creates and sends a key generation packet to initiate encryption handshake */ void EQStream::SendKeyRequest() { // Define key generation packet parameters constexpr int32 crypto_key_size = 60; const int16 size = sizeof(KeyGen_Struct) + sizeof(KeyGen_End_Struct) + crypto_key_size; // Create key request packet auto outapp = new EQ2Packet(OP_WSLoginRequestMsg, nullptr, size); // Set crypto key size in packet header std::memcpy(&outapp->pBuffer[0], &crypto_key_size, sizeof(int32)); // Fill crypto key area with placeholder data std::memset(&outapp->pBuffer[4], 0xFF, crypto_key_size); // Set packet termination markers std::memset(&outapp->pBuffer[size - 5], 1, 1); std::memset(&outapp->pBuffer[size - 1], 1, 1); // Queue packet for transmission EQ2QueuePacket(outapp, true); } /** * Encrypt packet data using RC4 encryption * Applies encryption to packet payload, skipping headers and unencrypted sections * * @param app - EQ2Packet to encrypt (modified in-place) * @param compress_offset - Offset for compressed data encryption * @param offset - Additional offset for uncompressed data */ void EQStream::EncryptPacket(EQ2Packet* app, int8 compress_offset, int8 offset) { // Only encrypt packets that are large enough and encryption is enabled if (app->size > 2 && crypto->isEncrypted()) { app->packet_encrypted = true; uchar* crypt_buff = app->pBuffer; // Choose encryption range based on compression status if (app->eq2_compressed) { // Encrypt from compression offset to end of packet crypto->RC4Encrypt(crypt_buff + compress_offset, app->size - compress_offset); } else { // Encrypt from header end plus offset to end of packet crypto->RC4Encrypt(crypt_buff + 2 + offset, app->size - 2 - offset); } } } /** * Queue EQ2 packet for transmission with optional combining * Either adds packet to combine queue for batching or sends immediately * * @param app - EQ2Packet to queue for transmission * @param attempted_combine - If true, send immediately; if false, queue for combining */ void EQStream::EQ2QueuePacket(EQ2Packet* app, bool attempted_combine) { // Only process packets if stream is active if (CheckActive()) { if (!attempted_combine) { // Add packet to combine queue for batching MCombineQueueLock.lock(); combine_queue.push_back(app); MCombineQueueLock.unlock(); } else { // Prepare and send packet immediately MCombineQueueLock.lock(); PreparePacket(app); MCombineQueueLock.unlock(); #ifdef LE_DEBUG std::printf("After B in %s, line %i:\n", __FUNCTION__, __LINE__); DumpPacket(app); #endif SendPacket(app); } } } /** * Remove packet preparation headers from EQ2 packet * Strips protocol-specific headers that were added during packet preparation * * @param app - EQ2Packet to unprepare (modified in-place) */ void EQStream::UnPreparePacket(EQ2Packet* app) { // Check for specific header pattern that indicates prepared packet if (app->pBuffer[2] == 0 && app->pBuffer[3] == 19) { // Create new buffer without preparation headers auto new_buffer = std::make_unique(app->size - 3); std::memcpy(new_buffer.get() + 2, app->pBuffer + 5, app->size - 3); // Replace packet buffer with unprepared version delete[] app->pBuffer; app->size -= 3; app->pBuffer = new_buffer.release(); } } #ifdef WRITE_PACKETS /** * Convert byte to printable character for packet dumps * Returns '.' for non-printable characters * * @param in - Input byte to convert * @return Printable character representation */ char EQStream::GetChar(uchar in) { if (in < ' ' || in > '~') { return '.'; } return static_cast(in); } /** * Write formatted output to packet dump file * Provides printf-style formatting for packet logging * * @param pFormat - Printf-style format string * @param ... - Variable arguments for formatting */ void EQStream::WriteToFile(char* pFormat, ...) { va_list args; va_start(args, pFormat); std::vfprintf(write_packets, pFormat, args); va_end(args); } /** * Write packet data to debug file with hex dump format * Creates formatted hex dump with ASCII representation for debugging * * @param opcodeName - Name of packet opcode for header * @param data - Raw packet data to dump * @param size - Size of packet data in bytes * @param outgoing - True if outgoing packet, false if incoming */ void EQStream::WritePackets(const char* opcodeName, uchar* data, int32 size, bool outgoing) { MWritePackets.lock(); // Format connection information struct in_addr ip_addr; ip_addr.s_addr = remote_ip; char timebuffer[80]; time_t rawtime; struct tm* timeinfo; std::time(&rawtime); timeinfo = std::localtime(&rawtime); std::strftime(timebuffer, 80, "%m/%d/%Y %H:%M:%S", timeinfo); // Write packet header if (outgoing) { WriteToFile("-- %s --\n%s\nSERVER -> %s\n", opcodeName, timebuffer, inet_ntoa(ip_addr)); } else { WriteToFile("-- %s --\n%s\n%s -> SERVER\n", opcodeName, timebuffer, inet_ntoa(ip_addr)); } // Calculate hex dump layout int nLines = size / 16; int nExtra = size % 16; uchar* pPtr = data; // Write full 16-byte lines for (int i = 0; i < nLines; i++) { WriteToFile("%4.4X:\t%2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %c%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c\n", i * 16, pPtr[0], pPtr[1], pPtr[2], pPtr[3], pPtr[4], pPtr[5], pPtr[6], pPtr[7], pPtr[8], pPtr[9], pPtr[10], pPtr[11], pPtr[12], pPtr[13], pPtr[14], pPtr[15], GetChar(pPtr[0]), GetChar(pPtr[1]), GetChar(pPtr[2]), GetChar(pPtr[3]), GetChar(pPtr[4]), GetChar(pPtr[5]), GetChar(pPtr[6]), GetChar(pPtr[7]), GetChar(pPtr[8]), GetChar(pPtr[9]), GetChar(pPtr[10]), GetChar(pPtr[11]), GetChar(pPtr[12]), GetChar(pPtr[13]), GetChar(pPtr[14]), GetChar(pPtr[15])); pPtr += 16; } // Write partial line if remaining bytes if (nExtra) { WriteToFile("%4.4X\t", nLines * 16); // Write hex bytes for (int i = 0; i < nExtra; i++) { WriteToFile("%2.2X ", pPtr[i]); } // Pad with spaces for (int i = nExtra; i < 16; i++) { WriteToFile(" "); } // Write ASCII representation for (int i = 0; i < nExtra; i++) { WriteToFile("%c", GetChar(pPtr[i])); } WriteToFile("\n"); } WriteToFile("\n\n"); std::fflush(write_packets); MWritePackets.unlock(); } /** * Write EQ2 packet to debug file with automatic version handling * Convenience wrapper for WritePackets that handles version setup * * @param app - EQ2Packet to write to debug file * @param outgoing - True if outgoing packet, false if incoming */ void EQStream::WritePackets(EQ2Packet* app, bool outgoing) { if (app->version == 0) { app->version = client_version; } WritePackets(app->GetOpcodeName(), app->pBuffer, app->size, outgoing); } #endif /** * Prepare EQ2 packet for transmission * Handles packet preparation, compression, and encryption in sequence * * @param app - EQ2Packet to prepare (modified in-place) * @param offset - Additional offset for encryption */ void EQStream::PreparePacket(EQ2Packet* app, int8 offset) { // Set client version and reset compression offset app->setVersion(client_version); compressed_offset = 0; #ifdef LE_DEBUG std::printf("Before A in %s, line %i:\n", __FUNCTION__, __LINE__); DumpPacket(app); #endif // Prepare packet headers and structure if not already done if (!app->packet_prepared) { if (app->PreparePacket(MaxLen) == 255) // Invalid version { return; } } #ifdef LE_DEBUG std::printf("After Prepare in %s, line %i:\n", __FUNCTION__, __LINE__); DumpPacket(app); #endif #ifdef WRITE_PACKETS // Write uncompressed/unencrypted packet to file if enabled if (!app->eq2_compressed && !app->packet_encrypted) { WritePackets(app, true); } #endif // Compress packet if it's large enough and not already compressed if (!app->eq2_compressed && app->size > 128) { compressed_offset = EQ2_Compress(app); if (compressed_offset) { app->eq2_compressed = true; } } // Encrypt packet if not already encrypted if (!app->packet_encrypted) { EncryptPacket(app, compressed_offset, offset); // Handle special case for zero byte at offset 2 if (app->size > 2 && app->pBuffer[2] == 0) { auto new_buffer = std::make_unique(app->size + 1); new_buffer[2] = 0; std::memcpy(new_buffer.get() + 3, app->pBuffer + 2, app->size - 2); delete[] app->pBuffer; app->pBuffer = new_buffer.release(); app->size++; } } #ifdef LE_DEBUG std::printf("After A in %s, line %i:\n", __FUNCTION__, __LINE__); DumpPacket(app); #endif } /** * Send EQProtocolPacket with automatic fragmentation for large packets * Fragments packets that exceed maximum transmission unit into smaller chunks * * @param p - EQProtocolPacket to send (ownership transferred, will be deleted) */ void EQStream::SendPacket(EQProtocolPacket *p) { uint32 chunksize, used; uint32 length; // Check if packet needs fragmentation if (p->size > (MaxLen - 8)) // proto-op(2), seq(2), app-op(2) ... data ... crc(2) { uchar* tmpbuff = p->pBuffer; length = p->size - 2; // Create first fragment with length header auto out = new EQProtocolPacket(OP_Fragment, nullptr, MaxLen - 4); *reinterpret_cast(out->pBuffer + 2) = htonl(length); used = MaxLen - 10; std::memcpy(out->pBuffer + 6, tmpbuff + 2, used); #ifdef LE_DEBUG std::printf("(%s, %i) New Fragment:\n ", __FUNCTION__, __LINE__); DumpPacket(out); #endif SequencedPush(out); // Create additional fragments for remaining data while (used < length) { chunksize = std::min(length - used, MaxLen - 6); out = new EQProtocolPacket(OP_Fragment, nullptr, chunksize + 2); std::memcpy(out->pBuffer + 2, tmpbuff + used + 2, chunksize); #ifdef LE_DEBUG std::printf("Chunk: \n"); DumpPacket(out); #endif SequencedPush(out); used += chunksize; } #ifdef LE_DEBUG std::printf("ChunkDelete: \n"); DumpPacket(out); #endif delete p; } else { // Packet fits within MTU, send directly SequencedPush(p); } } /** * Send EQApplicationPacket with automatic fragmentation for large packets * Serializes application packet and fragments if necessary for transmission * * @param p - EQApplicationPacket to send (ownership transferred, will be deleted) */ void EQStream::SendPacket(EQApplicationPacket *p) { uint32 chunksize, used; uint32 length; // Check if packet needs fragmentation if (p->size > (MaxLen - 8)) // proto-op(2), seq(2), app-op(2) ... data ... crc(2) { // Serialize application packet to buffer auto tmpbuff = std::make_unique(p->size + 2); length = p->serialize(tmpbuff.get()); // Create first fragment with size header auto out = new EQProtocolPacket(OP_Fragment, nullptr, MaxLen - 4); *reinterpret_cast(out->pBuffer + 2) = htonl(p->Size()); std::memcpy(out->pBuffer + 6, tmpbuff.get(), MaxLen - 10); used = MaxLen - 10; SequencedPush(out); // Create additional fragments for remaining data while (used < length) { out = new EQProtocolPacket(OP_Fragment, nullptr, MaxLen - 4); chunksize = std::min(length - used, MaxLen - 6); std::memcpy(out->pBuffer + 2, tmpbuff.get() + used, chunksize); out->size = chunksize + 2; SequencedPush(out); used += chunksize; } delete p; } else { // Packet fits within MTU, serialize directly to protocol packet auto out = new EQProtocolPacket(OP_Packet, nullptr, p->Size() + 2); p->serialize(out->pBuffer + 2); SequencedPush(out); delete p; } } /** * Add packet to sequenced transmission queue * Assigns sequence number and queues packet for reliable delivery * * @param p - EQProtocolPacket to queue (ownership transferred) */ void EQStream::SequencedPush(EQProtocolPacket *p) { // Set client version for compatibility p->setVersion(client_version); MOutboundQueue.lock(); // Assign sequence number to packet header *reinterpret_cast(p->pBuffer) = htons(NextOutSeq); SequencedQueue.push_back(p); p->sequence = NextOutSeq; NextOutSeq++; MOutboundQueue.unlock(); } /** * Add packet to non-sequenced transmission queue * Queues packet for immediate transmission without sequence numbering * * @param p - EQProtocolPacket to queue (ownership transferred) */ void EQStream::NonSequencedPush(EQProtocolPacket *p) { // Set client version for compatibility p->setVersion(client_version); MOutboundQueue.lock(); NonSequencedQueue.push(p); MOutboundQueue.unlock(); } /** * Send acknowledgment packet for received sequence number * Confirms successful receipt of sequenced packet to peer * * @param seq - Sequence number to acknowledge */ void EQStream::SendAck(uint16 seq) { uint16 Seq = htons(seq); SetLastAckSent(seq); NonSequencedPush(new EQProtocolPacket(OP_Ack, reinterpret_cast(&Seq), sizeof(uint16))); } /** * Send out-of-order acknowledgment packet * Notifies peer that a specific packet was received out of sequence * * @param seq - Sequence number that was received out of order */ void EQStream::SendOutOfOrderAck(uint16 seq) { uint16 Seq = htons(seq); NonSequencedPush(new EQProtocolPacket(OP_OutOfOrderAck, reinterpret_cast(&Seq), sizeof(uint16))); } /** * Process packet combination queue to optimize transmission * Combines multiple small packets into larger ones for efficiency * * @return true if all packets processed, false if processing was limited */ bool EQStream::CheckCombineQueue() { bool ret = true; // Processed all packets flag MCombineQueueLock.lock(); if (combine_queue.size() > 0) { // Get first packet from queue EQ2Packet* first = combine_queue.front(); combine_queue.pop_front(); if (combine_queue.size() == 0) { // Nothing to combine with, send immediately EQ2QueuePacket(first, true); } else { // Prepare first packet for combining PreparePacket(first); EQ2Packet* second = nullptr; bool combine_worked = false; int16 count = 0; // Attempt to combine with additional packets while (combine_queue.size()) { count++; second = combine_queue.front(); combine_queue.pop_front(); PreparePacket(second); // Try to combine second packet with first if (!first->AppCombine(second)) { // Combination failed, send first packet first->SetProtocolOpcode(OP_Packet); if (combine_worked) { SequencedPush(first); } else { EQ2QueuePacket(first, true); } // Make second packet the new first first = second; combine_worked = false; } else { // Combination succeeded combine_worked = true; } // Limit processing to prevent blocking other clients if (count >= 60 || first->size > 4000) { ret = false; break; } } // Send final packet if (first) { first->SetProtocolOpcode(OP_Packet); if (combine_worked) { SequencedPush(first); } else { EQ2QueuePacket(first, true); } } } } MCombineQueueLock.unlock(); return ret; } /** * Check and handle packet retransmission for unacknowledged packets * Retries failed packet deliveries up to maximum attempt limit * * @param eq_fd - Socket file descriptor for transmission */ void EQStream::CheckResend(int eq_fd) { int32 curr = Timer::GetCurrentTime2(); EQProtocolPacket* packet = nullptr; MResendQue.lock(); for (auto itr = resend_que.begin(); itr != resend_que.end();) { packet = *itr; // Check if packet has exceeded maximum retry attempts if (packet->attempt_count >= 5) { // Give up on this packet - client likely has it but didn't ack safe_delete(packet); itr = resend_que.erase(itr); if (itr == resend_que.end()) { break; } } else { // Check if enough time has passed for retry if ((curr - packet->sent_time) < 1000) { ++itr; continue; } // Retry packet transmission packet->sent_time -= 1000; packet->attempt_count++; WritePacket(eq_fd, packet); ++itr; } } MResendQue.unlock(); } /** * Compare two sequence numbers considering wraparound behavior * Determines temporal relationship between sequence numbers within sliding window * * @param expected_seq - Expected sequence number * @param seq - Actual sequence number to compare * @return SeqOrder indicating temporal relationship (past, in-order, future) */ EQStream::SeqOrder EQStream::CompareSequence(uint16 expected_seq, uint16 seq) { if (expected_seq == seq) { // Sequence numbers match exactly return SeqInOrder; } else if ((seq > expected_seq && static_cast(seq) < (static_cast(expected_seq) + EQStream::MaxWindowSize)) || seq < (expected_seq - EQStream::MaxWindowSize)) { // Sequence is in future window or wrapped around return SeqFuture; } else { // Sequence is in past window return SeqPast; } } /** * Process acknowledgment packet and remove acknowledged packets from queue * Handles sliding window advancement and packet cleanup for reliable delivery * * @param seq - Highest sequence number being acknowledged */ void EQStream::AckPackets(uint16 seq) { MOutboundQueue.lock(); SeqOrder ord = CompareSequence(SequencedBase, seq); if (ord == SeqInOrder) { // No new acknowledgments - duplicate ack LogWrite(PACKET__DEBUG, 9, "Packet", "Received an ack with no window advancement (seq %u)", seq); } else if (ord == SeqPast) { // Backward acknowledgment - protocol error LogWrite(PACKET__DEBUG, 9, "Packet", "Received an ack with backward window advancement (they gave %u, our window starts at %u). This is bad", seq, SequencedBase); } else { // Valid acknowledgment - advance window LogWrite(PACKET__DEBUG, 9, "Packet", "Received an ack up through sequence %u. Our base is %u", seq, SequencedBase); // Process all packets up to acknowledgment point seq++; // Stop at block right after their ack while (SequencedBase != seq) { if (SequencedQueue.empty()) { LogWrite(PACKET__DEBUG, 9, "Packet", "OUT OF PACKETS acked packet with sequence %u. Next send is %u before this", static_cast(SequencedBase), SequencedQueue.size()); SequencedBase = NextOutSeq; break; } LogWrite(PACKET__DEBUG, 9, "Packet", "Removing acked packet with sequence %u", static_cast(SequencedBase)); // Remove acknowledged packet from queue delete SequencedQueue.front(); SequencedQueue.pop_front(); // Advance base sequence number SequencedBase++; } // Validate queue consistency if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { LogWrite(PACKET__DEBUG, 9, "Packet", "Post-Ack on %u Invalid Sequenced queue: BS %u + SQ %u != NOS %u", seq, SequencedBase, SequencedQueue.size(), NextOutSeq); } } MOutboundQueue.unlock(); } /** * Main packet transmission loop - processes outbound queues and sends packets * Handles rate limiting, packet combining, acknowledgments, and transmission scheduling * * @param eq_fd - Socket file descriptor for transmission */ void EQStream::Write(int eq_fd) { queue ReadyToSend; long maxack; // Check our rate to make sure we can send more MRate.lock(); sint32 threshold=RateThreshold; MRate.unlock(); if (BytesWritten > threshold) { //cout << "Over threshold: " << BytesWritten << " > " << threshold << endl; return; } MCombinedAppPacket.lock(); EQApplicationPacket *CombPack=CombinedAppPacket; CombinedAppPacket=NULL; MCombinedAppPacket.unlock(); if (CombPack) { SendPacket(CombPack); } // If we got more packets to we need to ack, send an ack on the highest one MAcks.lock(); maxack=MaxAckReceived; // Added from peaks findings if (NextAckToSend>LastAckSent || LastAckSent == 0x0000ffff) SendAck(NextAckToSend); MAcks.unlock(); // Lock the outbound queues while we process MOutboundQueue.lock(); // Adjust where we start sending in case we get a late ack //if (maxack>LastSeqSent) // LastSeqSent=maxack; // Place to hold the base packet t combine into EQProtocolPacket *p=NULL; std::deque::iterator sitr; // Find the next sequenced packet to send from the "queue" sitr = SequencedQueue.begin(); uint16 count = 0; // get to start of packets while (sitr != SequencedQueue.end() && (*sitr)->sent_time > 0) { ++sitr; ++count; } bool SeqEmpty = false, NonSeqEmpty = false; // Loop until both are empty or MaxSends is reached while (!SeqEmpty || !NonSeqEmpty) { // See if there are more non-sequenced packets left if (!NonSequencedQueue.empty()) { if (!p) { // If we don't have a packet to try to combine into, use this one as the base // And remove it form the queue p = NonSequencedQueue.front(); LogWrite(PACKET__DEBUG, 9, "Packet", "Starting combined packet with non-seq packet of len %u",p->size); NonSequencedQueue.pop(); } else if (!p->combine(NonSequencedQueue.front())) { // Trying to combine this packet with the base didn't work (too big maybe) // So just send the base packet (we'll try this packet again later) LogWrite(PACKET__DEBUG, 9, "Packet", "Combined packet full at len %u, next non-seq packet is len %u", p->size, (NonSequencedQueue.front())->size); ReadyToSend.push(p); BytesWritten += p->size; p = nullptr; if (BytesWritten > threshold) { // Sent enough this round, lets stop to be fair LogWrite(PACKET__DEBUG, 9, "Packet", "Exceeded write threshold in nonseq (%u > %u)", BytesWritten, threshold); break; } } else { // Combine worked, so just remove this packet and it's spot in the queue LogWrite(PACKET__DEBUG, 9, "Packet", "Combined non-seq packet of len %u, yeilding %u combined", (NonSequencedQueue.front())->size, p->size); delete NonSequencedQueue.front(); NonSequencedQueue.pop(); } } else { // No more non-sequenced packets NonSeqEmpty = true; } if (sitr != SequencedQueue.end()) { uint16 seq_send = SequencedBase + count; //just for logging... if (SequencedQueue.empty()) { LogWrite(PACKET__DEBUG, 9, "Packet", "Tried to write a packet with an empty queue (%u is past next out %u)", seq_send, NextOutSeq); SeqEmpty = true; continue; } if ((*sitr)->acked || (*sitr)->sent_time != 0) { ++sitr; ++count; if (p) { LogWrite(PACKET__DEBUG, 9, "Packet", "Final combined packet not full, len %u", p->size); ReadyToSend.push(p); BytesWritten += p->size; p = nullptr; } LogWrite(PACKET__DEBUG, 9, "Packet", "Not retransmitting seq packet %u because already marked as acked", seq_send); } else if (!p) { // If we don't have a packet to try to combine into, use this one as the base // Copy it first as it will still live until it is acked p = (*sitr)->Copy(); LogWrite(PACKET__DEBUG, 9, "Packet", "Starting combined packet with seq packet %u of len %u", seq_send, p->size); (*sitr)->sent_time = Timer::GetCurrentTime2(); ++sitr; ++count; } else if (!p->combine(*sitr)) { // Trying to combine this packet with the base didn't work (too big maybe) // So just send the base packet (we'll try this packet again later) LogWrite(PACKET__DEBUG, 9, "Packet", "Combined packet full at len %u, next seq packet %u is len %u", p->size, seq_send + 1, (*sitr)->size); ReadyToSend.push(p); BytesWritten += p->size; p = nullptr; if ((*sitr)->opcode != OP_Fragment && BytesWritten > threshold) { // Sent enough this round, lets stop to be fair LogWrite(PACKET__DEBUG, 9, "Packet", "Exceeded write threshold in seq (%u > %u)", BytesWritten, threshold); break; } } else { // Combine worked LogWrite(PACKET__DEBUG, 9, "Packet", "Combined seq packet %u of len %u, yeilding %u combined", seq_send, (*sitr)->size, p->size); (*sitr)->sent_time = Timer::GetCurrentTime2(); ++sitr; ++count; } if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { LogWrite(PACKET__DEBUG, 9, "Packet", "Post send Invalid Sequenced queue: BS %u + SQ %u != NOS %u", SequencedBase, SequencedQueue.size(), NextOutSeq); } } else { // No more sequenced packets SeqEmpty = true; } } MOutboundQueue.unlock(); // Unlock the queue // We have a packet still, must have run out of both seq and non-seq, so send it if (p) { LogWrite(PACKET__DEBUG, 9, "Packet", "Final combined packet not full, len %u", p->size); ReadyToSend.push(p); BytesWritten += p->size; } // Send all the packets we "made" while (!ReadyToSend.empty()) { p = ReadyToSend.front(); WritePacket(eq_fd, p); delete p; ReadyToSend.pop(); } //see if we need to send our disconnect and finish our close if (SeqEmpty && NonSeqEmpty) { //no more data to send if (GetState() == CLOSING) { MOutboundQueue.lock(); if (SequencedQueue.size() > 0 ) { // retransmission attempts } else { LogWrite(PACKET__DEBUG, 9, "Packet", "All outgoing data flushed, disconnecting client."); //we are waiting for the queues to empty, now we can do our disconnect. //this packet will not actually go out until the next call to Write(). SendDisconnect(); //SetState(CLOSED); } MOutboundQueue.unlock(); } } } /** * Write individual protocol packet to network socket * Handles packet serialization, compression, encoding, and UDP transmission * * @param eq_fd - Socket file descriptor for transmission * @param p - EQProtocolPacket to transmit */ void EQStream::WritePacket(int eq_fd, EQProtocolPacket *p) { uint32 length = 0; sockaddr_in address; // Set up destination address address.sin_family = AF_INET; address.sin_addr.s_addr = remote_ip; address.sin_port = remote_port; #ifdef NOWAY uint32 ip=address.sin_addr.s_addr; cout << "Sending to: " << (int)*(unsigned char *)&ip << "." << (int)*((unsigned char *)&ip+1) << "." << (int)*((unsigned char *)&ip+2) << "." << (int)*((unsigned char *)&ip+3) << "," << (int)ntohs(address.sin_port) << "(" << p->size << ")" << endl; p->DumpRaw(); cout << "-------------" << endl; #endif length=p->serialize(buffer); if (p->opcode!=OP_SessionRequest && p->opcode!=OP_SessionResponse) { if (compressed) { BytesWritten -= p->size; uint32 newlen=EQProtocolPacket::Compress(buffer,length,write_buffer,2048); memcpy(buffer,write_buffer,newlen); length=newlen; BytesWritten += newlen; } if (encoded) { EQProtocolPacket::ChatEncode(buffer,length,Key); } *(uint16 *)(buffer+length)=htons(CRC16(buffer,length,Key)); length+=2; } sent_packets++; //dump_message_column(buffer,length,"Writer: "); //cout << "Raw Data:\n"; //DumpPacket(buffer, length); sendto(eq_fd,(char *)buffer,length,0,(sockaddr *)&address,sizeof(address)); } EQProtocolPacket *EQStream::Read(int eq_fd, sockaddr_in *from) { int socklen; int length=0; unsigned char buffer[2048]; EQProtocolPacket *p=NULL; char temp[15]; socklen=sizeof(sockaddr); #ifdef WIN32 length=recvfrom(eq_fd, (char *)buffer, 2048, 0, (struct sockaddr*)from, (int *)&socklen); #else length=recvfrom(eq_fd, buffer, 2048, 0, (struct sockaddr*)from, (socklen_t *)&socklen); #endif if (length>=2) { DumpPacket(buffer, length); p=new EQProtocolPacket(buffer[1],&buffer[2],length-2); //printf("Read packet: opcode %i length %u, expected-length: %u\n",buffer[1], length, p->size); uint32 ip=from->sin_addr.s_addr; sprintf(temp,"%d.%d.%d.%d:%d", *(unsigned char *)&ip, *((unsigned char *)&ip+1), *((unsigned char *)&ip+2), *((unsigned char *)&ip+3), ntohs(from->sin_port)); //cout << timestamp() << "Data from: " << temp << " OpCode 0x" << hex << setw(2) << setfill('0') << (int)p->opcode << dec << endl; //dump_message(p->pBuffer,p->size,timestamp()); } return p; } /** * Send session establishment response to client * Provides session parameters including encryption keys and stream format flags */ void EQStream::SendSessionResponse() { auto out = new EQProtocolPacket(OP_SessionResponse, nullptr, sizeof(SessionResponse)); auto Response = reinterpret_cast(out->pBuffer); // Set session parameters Response->Session = htonl(Session); Response->MaxLength = htonl(MaxLen); Response->UnknownA = 2; Response->Format = 0; // Set format flags based on stream configuration if (compressed) { Response->Format |= FLAG_COMPRESSED; } if (encoded) { Response->Format |= FLAG_ENCODED; } Response->Key = htonl(Key); out->size = sizeof(SessionResponse); NonSequencedPush(out); } /** * Send session establishment request to server * Initiates connection handshake with session parameters */ void EQStream::SendSessionRequest() { auto out = new EQProtocolPacket(OP_SessionRequest, nullptr, sizeof(SessionRequest)); auto Request = reinterpret_cast(out->pBuffer); // Initialize request structure std::memset(Request, 0, sizeof(SessionRequest)); Request->Session = htonl(std::time(nullptr)); Request->MaxLength = htonl(512); NonSequencedPush(out); } /** * Send session disconnect packet to peer * Initiates graceful connection termination sequence * * @param setstate - If true, set stream state to CLOSING */ void EQStream::SendDisconnect(bool setstate) { try { // Only send disconnect if in valid state if (GetState() != ESTABLISHED && GetState() != WAIT_CLOSE) { return; } // Create disconnect packet auto out = new EQProtocolPacket(OP_SessionDisconnect, nullptr, sizeof(uint32) + sizeof(int16)); *reinterpret_cast(out->pBuffer) = htonl(Session); out->pBuffer[4] = 0; out->pBuffer[5] = 6; NonSequencedPush(out); // Update state if requested if (setstate) { SetState(CLOSING); } } catch (...) { // Ignore exceptions during disconnect } } /** * Add application packet to inbound queue for processing * Thread-safe method to queue received packets for application layer * * @param p - EQApplicationPacket to add to inbound queue */ void EQStream::InboundQueuePush(EQApplicationPacket *p) { MInboundQueue.lock(); InboundQueue.push_back(p); MInboundQueue.unlock(); } /** * Retrieve next application packet from inbound queue * Thread-safe method to get received packets for application processing * * @return Next EQApplicationPacket from queue, or nullptr if queue is empty */ EQApplicationPacket *EQStream::PopPacket() { EQApplicationPacket *p = nullptr; MInboundQueue.lock(); if (InboundQueue.size()) { p = InboundQueue.front(); InboundQueue.pop_front(); } MInboundQueue.unlock(); // Set client version for compatibility if (p) { p->setVersion(client_version); } return p; } /** * Clear all packets from inbound queue * Thread-safe method to empty and delete all queued packets */ void EQStream::InboundQueueClear() { MInboundQueue.lock(); while (InboundQueue.size()) { delete InboundQueue.front(); InboundQueue.pop_front(); } MInboundQueue.unlock(); } /** * Legacy packet encryption function (currently unused) * Placeholder for packet-level encryption functionality * * @param data - Packet data buffer * @param size - Size of packet data */ void EQStream::EncryptPacket(uchar* data, int16 size) { if (size > 6) { // Implementation placeholder } } /** * Check if stream has pending outbound data * Determines if there are packets waiting to be transmitted * * @return true if outbound data is available, false otherwise */ bool EQStream::HasOutgoingData() { bool flag; // Once closed, we have nothing more to say if (CheckClosed()) { return false; } // Check for queued packets MOutboundQueue.lock(); flag = (!NonSequencedQueue.empty()); if (!flag) { flag = (!SequencedQueue.empty()); } MOutboundQueue.unlock(); // Check for pending acknowledgments if (!flag) { MAcks.lock(); flag = (NextAckToSend > LastAckSent); MAcks.unlock(); } // Check for combined application packets if (!flag) { MCombinedAppPacket.lock(); flag = (CombinedAppPacket != nullptr); MCombinedAppPacket.unlock(); } return flag; } /** * Clear all packets from outbound queues * Thread-safe method to empty and delete all queued outbound packets */ void EQStream::OutboundQueueClear() { MOutboundQueue.lock(); // Clear non-sequenced queue while (NonSequencedQueue.size()) { delete NonSequencedQueue.front(); NonSequencedQueue.pop(); } // Clear sequenced queue while (SequencedQueue.size()) { delete SequencedQueue.front(); SequencedQueue.pop_front(); } MOutboundQueue.unlock(); } /** * Process incoming raw network data into protocol packets * Handles decompression, decoding, validation, and packet creation * * @param buffer - Raw network data buffer * @param length - Length of network data */ void EQStream::Process(const unsigned char *buffer, const uint32 length) { received_packets++; static unsigned char newbuffer[2048]; uint32 newlength = 0; #ifdef LE_DEBUG printf("ProcessBuffer:\n"); DumpPacket(buffer, length); #endif if (EQProtocolPacket::ValidateCRC(buffer,length,Key)) { if (compressed) { newlength=EQProtocolPacket::Decompress(buffer,length,newbuffer,2048); #ifdef LE_DEBUG printf("ProcessBufferDecompress:\n"); DumpPacket(buffer, newlength); #endif } else { memcpy(newbuffer,buffer,length); newlength=length; if (encoded) EQProtocolPacket::ChatDecode(newbuffer,newlength-2,Key); } #ifdef LE_DEBUG printf("ResultProcessBuffer:\n"); DumpPacket(buffer, newlength); #endif uint16 opcode=ntohs(*(const uint16 *)newbuffer); //printf("Read packet: opcode %i newlength %u, newbuffer2len: %u, newbuffer3len: %u\n",opcode, newlength, newbuffer[2], newbuffer[3]); if(opcode > 0 && opcode <= OP_OutOfSession) { if (buffer[1]!=0x01 && buffer[1]!=0x02 && buffer[1]!=0x1d) newlength-=2; EQProtocolPacket p(newbuffer,newlength); ProcessPacket(&p); } else { cout << "2Orig Packet: " << opcode << endl; DumpPacket(newbuffer, newlength); ProcessEmbeddedPacket(newbuffer, newlength, OP_Fragment); } ProcessQueue(); } else { cout << "Incoming packet failed checksum:" <(buffer),length,"CRC failed: "); } } /** * Get the highest sequence number that has been acknowledged * Thread-safe accessor for maximum acknowledged sequence number * * @return Highest acknowledged sequence number */ long EQStream::GetMaxAckReceived() { MAcks.lock(); long l = MaxAckReceived; MAcks.unlock(); return l; } /** * Get the next sequence number that should be acknowledged * Thread-safe accessor for next acknowledgment to send * * @return Next sequence number to acknowledge */ long EQStream::GetNextAckToSend() { MAcks.lock(); long l = NextAckToSend; MAcks.unlock(); return l; } /** * Get the last sequence number that was acknowledged * Thread-safe accessor for last sent acknowledgment * * @return Last acknowledged sequence number */ long EQStream::GetLastAckSent() { MAcks.lock(); long l = LastAckSent; MAcks.unlock(); return l; } /** * Set the maximum acknowledged sequence number * Updates acknowledgment tracking and cleans up acknowledged packets from resend queue * * @param seq - New maximum acknowledged sequence number */ void EQStream::SetMaxAckReceived(uint32 seq) { MAcks.lock(); MaxAckReceived = seq; MAcks.unlock(); MOutboundQueue.lock(); if (static_cast(seq) > LastSeqSent) { LastSeqSent = seq; } // Clean up acknowledged packets from resend queue MResendQue.lock(); for (auto itr = resend_que.begin(); itr != resend_que.end();) { EQProtocolPacket* packet = *itr; if (packet && packet->sequence <= seq) { safe_delete(packet); itr = resend_que.erase(itr); if (itr == resend_que.end()) { break; } } else { ++itr; } } MResendQue.unlock(); MOutboundQueue.unlock(); } /** * Set the next sequence number to acknowledge * Thread-safe setter for next acknowledgment to send * * @param seq - Next sequence number to acknowledge */ void EQStream::SetNextAckToSend(uint32 seq) { MAcks.lock(); NextAckToSend = seq; MAcks.unlock(); } /** * Set the last sequence number that was acknowledged * Thread-safe setter for last sent acknowledgment * * @param seq - Last acknowledged sequence number */ void EQStream::SetLastAckSent(uint32 seq) { MAcks.lock(); LastAckSent = seq; MAcks.unlock(); } /** * Set the last sequence number that was sent * Thread-safe setter for last transmitted sequence number * * @param seq - Last sent sequence number */ void EQStream::SetLastSeqSent(uint32 seq) { MOutboundQueue.lock(); LastSeqSent = seq; MOutboundQueue.unlock(); } /** * Configure stream parameters based on stream type * Sets opcode size, compression, and encoding flags for different stream types * * @param type - EQStreamType to configure stream for */ void EQStream::SetStreamType(EQStreamType type) { StreamType = type; switch (StreamType) { case LoginStream: app_opcode_size = 1; compressed = false; encoded = false; break; case EQ2Stream: app_opcode_size = 2; compressed = false; encoded = false; break; case ChatOrMailStream: case ChatStream: case MailStream: app_opcode_size = 1; compressed = false; encoded = true; break; case ZoneStream: case WorldStream: default: app_opcode_size = 2; compressed = true; encoded = false; break; } } /** * Process queued out-of-order packets in sequence * Handles packets that arrived before their predecessors */ void EQStream::ProcessQueue() { if (OutOfOrderpackets.empty()) { return; } // Process packets in sequence order EQProtocolPacket* qp = nullptr; while ((qp = RemoveQueue(NextInSeq)) != nullptr) { ProcessPacket(qp); delete qp; } } /** * Remove and return packet with specific sequence number from queue * Searches out-of-order packet queue for specified sequence * * @param seq - Sequence number of packet to remove * @return EQProtocolPacket if found, nullptr if not found */ EQProtocolPacket* EQStream::RemoveQueue(uint16 seq) { EQProtocolPacket* qp = nullptr; auto itr = OutOfOrderpackets.find(seq); if (itr != OutOfOrderpackets.end()) { qp = itr->second; OutOfOrderpackets.erase(itr); } return qp; } /** * Apply bandwidth decay and check for packet timeouts * Reduces byte counter over time and flags timed-out packets for retransmission */ void EQStream::Decay() { // Apply bandwidth decay MRate.lock(); uint32 rate = DecayRate; MRate.unlock(); if (BytesWritten > 0) { BytesWritten -= rate; if (BytesWritten < 0) { BytesWritten = 0; } } // Check for packet timeouts and flag for retransmission int count = 0; MOutboundQueue.lock(); for (auto sitr = SequencedQueue.begin(); sitr != SequencedQueue.end(); ++sitr, count++) { if (!(*sitr)->acked && (*sitr)->sent_time > 0 && ((*sitr)->sent_time + retransmittimeout) < Timer::GetCurrentTime2()) { (*sitr)->sent_time = 0; // Flag for retransmission LogWrite(PACKET__DEBUG, 9, "Packet", "Timeout exceeded for seq %u. Flagging packet for retransmission", SequencedBase + count); } } MOutboundQueue.unlock(); } /** * Adjust transmission rates based on network conditions * Calculates rate thresholds and decay rates from average delta timing * * @param average_delta - Average packet transmission time in milliseconds */ void EQStream::AdjustRates(uint32 average_delta) { if (average_delta && (average_delta <= AVERAGE_DELTA_MAX)) { MRate.lock(); // Calculate transmission parameters based on network timing AverageDelta = average_delta; RateThreshold = RATEBASE / average_delta; DecayRate = DECAYBASE / average_delta; // Adjust current byte counter if needed if (BytesWritten > RateThreshold) { BytesWritten = RateThreshold + DecayRate; } MRate.unlock(); } else { // Use maximum delta if invalid value provided AverageDelta = AVERAGE_DELTA_MAX; } }