// Copyright (C) 2007 EQ2EMulator Development Team - GPLv3 License #pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "eq_stream_factory.hpp" #include "../log.hpp" #include "../misc.hpp" #include "../timer.hpp" #include "../crc16.hpp" #include "../debug.hpp" #include "../crypto.hpp" #include "../eq_common_structs.hpp" #include "../packet/eq_packet.hpp" #include "../packet/packet_dump.hpp" #include "../opcodes/opcodes.hpp" #ifdef LOGIN #include "../LoginServer/login_structs.hpp" #endif #ifdef WRITE_PACKETS #include #endif using namespace std; typedef enum { ESTABLISHED, WAIT_CLOSE, CLOSING, DISCONNECTING, CLOSED } EQStreamState; #define FLAG_COMPRESSED 0x01 #define FLAG_ENCODED 0x04 #define RATEBASE 1048576 // 1 MB #define DECAYBASE 78642 // RATEBASE/10 #ifndef RETRANSMIT_TIMEOUT_MULT #define RETRANSMIT_TIMEOUT_MULT 3.0 #endif #ifndef RETRANSMIT_TIMEOUT_MAX #define RETRANSMIT_TIMEOUT_MAX 5000 #endif #ifndef AVERAGE_DELTA_MAX #define AVERAGE_DELTA_MAX 2500 #endif #pragma pack(1) struct SessionRequest { uint32 UnknownA; uint32 Session; uint32 MaxLength; }; struct SessionResponse { uint32 Session; uint32 Key; uint8 UnknownA; uint8 Format; uint8 UnknownB; uint32 MaxLength; uint32 UnknownD; }; struct ClientSessionStats { /*000*/ uint16 RequestID; /*002*/ uint32 last_local_delta; /*006*/ uint32 average_delta; /*010*/ uint32 low_delta; /*014*/ uint32 high_delta; /*018*/ uint32 last_remote_delta; /*022*/ uint64 packets_sent; /*030*/ uint64 packets_recieved; /*038*/ }; struct ServerSessionStats { uint16 RequestID; uint32 current_time; uint32 unknown1; uint32 received_packets; uint32 unknown2; uint32 sent_packets; uint32 unknown3; uint32 sent_packets2; uint32 unknown4; uint32 received_packets2; }; #pragma pack() class OpcodeManager; extern OpcodeManager *EQNetworkOpcodeManager; class EQStreamFactory; typedef enum { UnknownStream=0, LoginStream, WorldStream, ZoneStream, ChatOrMailStream, ChatStream, MailStream, EQ2Stream, } EQStreamType; class EQStream { protected: typedef enum { SeqPast, SeqInOrder, SeqFuture } SeqOrder; uint32 received_packets; // Number of packets received uint32 sent_packets; // Number of packets sent uint32 remote_ip; // Remote client IP address uint16 remote_port; // Remote client port uint8 buffer[8192]; // Main I/O buffer unsigned char *oversize_buffer; // Buffer for oversized packets uint32 oversize_offset, oversize_length; // Oversize buffer management unsigned char *rogue_buffer; // Buffer for handling rogue packets uint32 roguebuf_offset, roguebuf_size; // Rogue buffer management uint8 app_opcode_size; // Size of application opcodes EQStreamType StreamType; // Type of stream (login, world, zone, etc.) bool compressed, encoded; // Stream compression and encoding flags unsigned char write_buffer[2048]; // Temporary write buffer uint32 retransmittimer; // Timer for retransmission uint32 retransmittimeout; // Timeout value for retransmission uint16 sessionAttempts; // Number of session establishment attempts uint16 reconnectAttempt; // Number of reconnection attempts bool streamactive; // Whether stream is actively processing packets uint32 Session, Key; // Session ID and encryption key uint16 NextInSeq; // Next expected incoming sequence number uint16 NextOutSeq; // Next outgoing sequence number uint16 SequencedBase; // Base sequence number for sequenced queue uint32 MaxLen; // Maximum packet length uint16 MaxSends; // Maximum sends per cycle int8 timeout_delays; // Number of timeout delays encountered uint8 active_users; // Number of active users of this stream Mutex MInUse; // Mutex for active user count #ifdef WRITE_PACKETS FILE* write_packets = NULL; // File handle for packet logging Mutex MWritePackets; // Mutex for packet writing #endif EQStreamState State; // Current stream state Mutex MState; // Mutex for state changes uint32 LastPacket; // Timestamp of last packet Mutex MVarlock; // General variable mutex EQApplicationPacket* CombinedAppPacket; // Combined application packet Mutex MCombinedAppPacket; // Mutex for combined packet long LastSeqSent; // Last sequence number sent Mutex MLastSeqSent; // Mutex for last sequence sent // Acknowledgment sequence tracking long MaxAckReceived, NextAckToSend, LastAckSent; Mutex MAcks; // Mutex for acknowledgment tracking // Outbound packet queues queue NonSequencedQueue; // Non-sequenced packets deque SequencedQueue; // Sequenced packets map OutOfOrderpackets; // Out-of-order packets Mutex MOutboundQueue; // Mutex for outbound queues // Inbound packet queue deque InboundQueue; // Incoming application packets Mutex MInboundQueue; // Mutex for inbound queue static uint16 MaxWindowSize; // Maximum window size for sequencing sint32 BytesWritten; // Bytes written in current cycle Mutex MRate; // Mutex for rate limiting sint32 RateThreshold; // Rate limiting threshold sint32 DecayRate; // Rate decay value uint32 AverageDelta; // Average delta time EQStreamFactory *Factory; // Factory that created this stream public: Mutex MCombineQueueLock; // Mutex for combine queue deque combine_queue; // Queue for packet combining Timer* combine_timer; // Timer for packet combining Crypto* crypto; // Cryptography handler z_stream stream; // Zlib compression stream uchar* stream_buffer; // Compression buffer int32 stream_buffer_size; // Size of compression buffer bool eq2_compressed; // EQ2 compression flag int8 compressed_offset; // Compression offset int16 client_version; // Client version Mutex MResendQue; // Mutex for resend queue Mutex MCompressData; // Mutex for compression data deque resend_que; // Resend queue Timer* resend_que_timer; // Resend queue timer // Default constructor - initializes basic stream EQStream() { init(); remote_ip = 0; remote_port = 0; State = CLOSED; StreamType = UnknownStream; compressed = true; encoded = false; app_opcode_size = 2; } // Constructor with socket address - creates stream for specific client EQStream(sockaddr_in addr); // Destructor - cleans up all resources virtual ~EQStream() { MOutboundQueue.lock(); SetState(CLOSED); MOutboundQueue.unlock(); RemoveData(); safe_delete(crypto); safe_delete(combine_timer); safe_delete(resend_que_timer); safe_delete_array(oversize_buffer); safe_delete_array(rogue_buffer); deque::iterator cmb; MCombineQueueLock.lock(); for (cmb = combine_queue.begin(); cmb != combine_queue.end(); cmb++){ safe_delete(*cmb); } MCombineQueueLock.unlock(); deflateEnd(&stream); map::iterator oop; for (oop = OutOfOrderpackets.begin(); oop != OutOfOrderpackets.end(); oop++){ safe_delete(oop->second); } #ifdef WRITE_PACKETS if (write_packets) fclose(write_packets); #endif } // Factory management inline void SetFactory(EQStreamFactory *f) { Factory=f; } // Initialize or reset stream state void init(bool resetSession = true); // Packet length management void SetMaxLen(uint32 length) { MaxLen = length; } // Timeout management int8 getTimeoutDelays() { return timeout_delays; } void addTimeoutDelay() { timeout_delays++; } // EQ2 specific packet operations void EQ2QueuePacket(EQ2Packet* app, bool attempted_combine = false); void PreparePacket(EQ2Packet* app, int8 offset = 0); void UnPreparePacket(EQ2Packet* app); void EncryptPacket(EQ2Packet* app, int8 compress_offset, int8 offset); int8 EQ2_Compress(EQ2Packet* app, int8 offset = 3); // Generic packet operations void SendPacket(EQApplicationPacket *p); void SendPacket(EQProtocolPacket *p); void NonSequencedPush(EQProtocolPacket *p); void SequencedPush(EQProtocolPacket *p); // Network I/O void Write(int eq_fd); void WritePacket(int fd, EQProtocolPacket *p); static EQProtocolPacket *Read(int eq_fd, sockaddr_in *from); // Packet processing void Process(const unsigned char *data, const uint32 length); void ProcessPacket(EQProtocolPacket *p, EQProtocolPacket* lastp = NULL); bool ProcessEmbeddedPacket(uchar* pBuffer, uint16 length, int8 opcode = OP_Packet); bool HandleEmbeddedPacket(EQProtocolPacket *p, int16 offset = 2, int16 length = 0); EQProtocolPacket* ProcessEncryptedPacket(EQProtocolPacket *p); EQProtocolPacket* ProcessEncryptedData(uchar* data, int32 size, int16 opcode); // Acknowledgment handling void AckPackets(uint16 seq); void SendAck(uint16 seq); void SendOutOfOrderAck(uint16 seq); // Session management void SendSessionResponse(); void SendSessionRequest(); void SendDisconnect(bool setstate = true); void SendKeyRequest(); int16 processRSAKey(EQProtocolPacket *p, uint16 subpacket_length = 0); // Queue management void InboundQueuePush(EQApplicationPacket *p); EQApplicationPacket *PopPacket(); void InboundQueueClear(); void OutboundQueueClear(); bool HasOutgoingData(); bool CheckCombineQueue(); void CheckResend(int eq_fd); // State and properties inline EQStreamState GetState() { return State; } inline void SetState(EQStreamState state) { MState.lock(); State = state; MState.unlock(); } inline uint32 GetRemoteIP() { return remote_ip; } inline uint32 GetrIP() { return remote_ip; } inline uint16 GetRemotePort() { return remote_port; } inline uint16 GetrPort() { return remote_port; } inline const EQStreamType GetStreamType() const { return StreamType; } // Client version management int16 GetClientVersion() { return client_version; } void SetClientVersion(int16 version) { client_version = version; } // Connection management void SetActive(bool val) { streamactive = val; } void ResetSessionAttempts() { reconnectAttempt = 0; } bool HasSessionAttempts() { return reconnectAttempt > 0; } bool CheckActive() { return (GetState() == ESTABLISHED); } bool CheckClosed() { return GetState() == CLOSED; } bool CheckTimeout(uint32 now, uint32 timeout = 30) { return (LastPacket && (now - LastPacket) > timeout); } bool Stale(uint32 now, uint32 timeout = 30) { return (LastPacket && (now - LastPacket) > timeout); } void Close() { SendDisconnect(); } // Resource management inline bool IsInUse() { bool flag; MInUse.lock(); flag = (active_users > 0); MInUse.unlock(); return flag; } inline void PutInUse() { MInUse.lock(); active_users++; MInUse.unlock(); } inline void ReleaseFromUse() { MInUse.lock(); if(active_users > 0) active_users--; MInUse.unlock(); } void RemoveData() { InboundQueueClear(); OutboundQueueClear(); if (CombinedAppPacket) delete CombinedAppPacket; } // Configuration void SetOpcodeSize(uint8 s) { app_opcode_size = s; } void SetStreamType(EQStreamType t); uint32 GetKey() { return Key; } void SetKey(uint32 k) { Key = k; } void SetSession(uint32 s) { Session = s; } void SetLastPacketTime(uint32 t) { LastPacket = t; } // Sequence management static SeqOrder CompareSequence(uint16 expected_seq, uint16 seq); void ProcessQueue(); EQProtocolPacket* RemoveQueue(uint16 seq); // Rate limiting and performance void Decay(); void AdjustRates(uint32 average_delta); // Utility functions virtual void DispatchPacket(EQApplicationPacket *p) { p->DumpRaw(); } void EncryptPacket(uchar* data, int16 size); #ifdef WRITE_PACKETS char GetChar(uchar in); void WriteToFile(char* pFormat, ...); void WritePackets(const char* opcodeName, uchar* data, int32 size, bool outgoing); void WritePackets(EQ2Packet* app, bool outgoing); #endif private: // Acknowledgment tracking methods long GetMaxAckReceived(); long GetNextAckToSend(); long GetLastAckSent(); void SetMaxAckReceived(uint32 seq); void SetNextAckToSend(uint32); void SetLastAckSent(uint32); void SetLastSeqSent(uint32); }; // Implementation uint16 EQStream::MaxWindowSize = 2048; // Initialize stream state and variables void EQStream::init(bool resetSession) { if (resetSession) { streamactive = false; sessionAttempts = 0; } timeout_delays = 0; MInUse.lock(); active_users = 0; MInUse.unlock(); Session = 0; Key = 0; MaxLen = 0; NextInSeq = 0; NextOutSeq = 0; CombinedAppPacket = NULL; MAcks.lock(); MaxAckReceived = -1; NextAckToSend = -1; LastAckSent = -1; MAcks.unlock(); LastSeqSent = -1; MaxSends = 5; LastPacket = Timer::GetCurrentTime2(); oversize_buffer = NULL; oversize_length = 0; oversize_offset = 0; Factory = NULL; rogue_buffer = NULL; roguebuf_offset = 0; roguebuf_size = 0; MRate.lock(); RateThreshold = RATEBASE / 250; DecayRate = DECAYBASE / 250; MRate.unlock(); BytesWritten = 0; SequencedBase = 0; AverageDelta = 500; crypto->setRC4Key(0); retransmittimer = Timer::GetCurrentTime2(); retransmittimeout = 500 * RETRANSMIT_TIMEOUT_MULT; reconnectAttempt = 0; if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { LogWrite(PACKET__DEBUG, 9, "Packet", "init Invalid Sequenced queue: BS %u + SQ %u != NOS %u", SequencedBase, SequencedQueue.size(), NextOutSeq); } } // Constructor with socket address EQStream::EQStream(sockaddr_in addr) { crypto = new Crypto(); resend_que_timer = new Timer(1000); combine_timer = new Timer(250); combine_timer->Start(); resend_que_timer->Start(); init(); remote_ip = addr.sin_addr.s_addr; remote_port = addr.sin_port; State = CLOSED; StreamType = UnknownStream; compressed = true; encoded = false; app_opcode_size = 2; memset(&stream, 0, sizeof(z_stream)); stream.zalloc = (alloc_func)0; stream.zfree = (free_func)0; stream.opaque = (voidpf)0; deflateInit2(&stream, 9, Z_DEFLATED, 13, 9, Z_DEFAULT_STRATEGY); compressed_offset = 0; client_version = 0; received_packets = 0; sent_packets = 0; #ifdef WRITE_PACKETS write_packets = 0; char write_packets_filename[64]; snprintf(write_packets_filename, sizeof(write_packets_filename), "PacketLog%i.log", Timer::GetCurrentTime2()); write_packets = fopen(write_packets_filename, "w+"); #endif } // Process encrypted data and return protocol packet EQProtocolPacket* EQStream::ProcessEncryptedData(uchar* data, int32 size, int16 opcode) { crypto->RC4Decrypt(data, size); int8 offset = 0; if (data[0] == 0xFF && size > 2) { offset = 3; memcpy(&opcode, data + sizeof(int8), sizeof(int16)); } else { offset = 1; memcpy(&opcode, data, sizeof(int8)); } return new EQProtocolPacket(opcode, data + offset, size - offset); } // Process encrypted protocol packet EQProtocolPacket* EQStream::ProcessEncryptedPacket(EQProtocolPacket *p) { EQProtocolPacket* ret = NULL; if (p->opcode == OP_Packet && p->size > 2) ret = ProcessEncryptedData(p->pBuffer + 2, p->size - 2, p->opcode); else ret = ProcessEncryptedData(p->pBuffer, p->size, p->opcode); return ret; } // Process embedded packet within another packet bool EQStream::ProcessEmbeddedPacket(uchar* pBuffer, int16 length, int8 opcode) { if (!pBuffer || !crypto->isEncrypted()) return false; MCombineQueueLock.lock(); EQProtocolPacket* newpacket = ProcessEncryptedData(pBuffer, length, opcode); MCombineQueueLock.unlock(); if (newpacket) { EQApplicationPacket* ap = newpacket->MakeApplicationPacket(2); if (ap->version == 0) ap->version = client_version; InboundQueuePush(ap); #ifdef WRITE_PACKETS WritePackets(ap->GetOpcodeName(), pBuffer, length, false); #endif safe_delete(newpacket); return true; } return false; } // Handle embedded packet with offset and length validation bool EQStream::HandleEmbeddedPacket(EQProtocolPacket *p, int16 offset, int16 length) { if (!p) return false; if (p->size >= ((uint32)(offset + 2))) { if (p->pBuffer[offset] == 0 && p->pBuffer[offset + 1] == 0x19) { uint32 data_length = 0; if (length == 0) { if (p->size < offset + 2) { return false; } data_length = p->size - offset - 2; } else { if (length < 2) { return false; } data_length = length - 2; } if (offset + 2 + data_length > p->size) { return false; } EQProtocolPacket *subp = new EQProtocolPacket(OP_AppCombined, p->pBuffer + offset + 2, data_length); subp->copyInfo(p); ProcessPacket(subp, p); safe_delete(subp); return true; } else if (p->pBuffer[offset] == 0 && p->pBuffer[offset + 1] == 0) { if (length == 0) length = p->size - 1 - offset; else length--; uchar* buffer = (p->pBuffer + 1 + offset); bool valid = ProcessEmbeddedPacket(buffer, length); if (valid) return true; } else if (offset + 4 < p->size && ntohl(*(uint32 *)(p->pBuffer + offset)) != 0xffffffff) { if (length == 0) length = p->size - offset; uchar* buffer = (p->pBuffer + offset); bool valid = ProcessEmbeddedPacket(buffer, length); if (valid) return true; } else if (p->pBuffer[offset] != 0xff && p->pBuffer[offset + 1] == 0xff && p->size >= offset + 3) { uint16 total_length = p->pBuffer[offset]; if (total_length + offset + 2 == p->size && total_length >= 2) { uint32 data_length = total_length - 2; EQProtocolPacket *subp = new EQProtocolPacket(p->pBuffer + offset + 2, data_length, OP_Packet); subp->copyInfo(p); ProcessPacket(subp, p); delete subp; return true; } } } return false; } // Main packet processing function - handles all protocol packet types void EQStream::ProcessPacket(EQProtocolPacket *p, EQProtocolPacket* lastp) { uint32 processed = 0, subpacket_length = 0; if (p) { if (p->opcode != OP_SessionRequest && p->opcode != OP_SessionResponse && !Session) { #ifdef EQN_DEBUG LogWrite(PACKET__ERROR, 0, "Packet", "*** Session not initialized, packet ignored "); #endif return; } switch (p->opcode) { case OP_Combined: { processed = 0; int8 offset = 0; int count = 0; while (processed < p->size) { if ((subpacket_length = (unsigned char)*(p->pBuffer + processed)) == 0xff) { subpacket_length = ntohs(*(uint16*)(p->pBuffer + processed + 1)); offset = 3; } else { offset = 1; } count++; bool isSubPacket = EQProtocolPacket::IsProtocolPacket(p->pBuffer + processed + offset, subpacket_length, false); if (isSubPacket) { EQProtocolPacket* subp = new EQProtocolPacket(p->pBuffer + processed + offset, subpacket_length); subp->copyInfo(p); ProcessPacket(subp, p); delete subp; } else { offset = 1; if (ntohs(*reinterpret_cast(p->pBuffer + processed + offset)) <= 0x1e) { subpacket_length = (unsigned char)*(p->pBuffer + processed); LogWrite(PACKET__ERROR, 0, "Packet", "!!!!!!!!!Garbage Packet Unknown Process as OP_Packet!!!!!!!!!!!!!\n"); DumpPacket(p->pBuffer + processed + offset, subpacket_length); uchar* newbuf = p->pBuffer; newbuf += processed + offset; EQProtocolPacket *subp = new EQProtocolPacket(newbuf, subpacket_length); subp->copyInfo(p); ProcessPacket(subp, p); delete subp; } else { crypto->RC4Decrypt(p->pBuffer + processed + offset, subpacket_length); LogWrite(PACKET__ERROR, 0, "Packet", "!!!!!!!!!Garbage Packet!!!!!!!!!!!!! processed: %u, offset: %u, count: %i, subpacket_length: %u, offset_pos_1: %u, oversized_buffer_present: %u, offset size: %u, offset length: %u\n", processed, offset, count, subpacket_length, p->pBuffer[processed + offset], oversize_buffer ? 1 : 0, oversize_offset, oversize_length); if (p->pBuffer[processed + offset] == 0xff) { uchar* newbuf = p->pBuffer; newbuf += processed + offset + 1; DumpPacket(p->pBuffer + processed + offset, subpacket_length); EQProtocolPacket *subp = new EQProtocolPacket(newbuf, subpacket_length, OP_Packet); subp->copyInfo(p); ProcessPacket(subp, p); delete subp; } else break; } } processed += subpacket_length + offset; } break; } case OP_AppCombined: { processed = 0; EQProtocolPacket* newpacket = 0; int8 offset = 0; int count = 0; while (processed < p->size) { count++; if ((subpacket_length = (unsigned char)*(p->pBuffer + processed)) == 0xff) { subpacket_length = ntohs(*(uint16 *)(p->pBuffer + processed + 1)); offset = 3; } else offset = 1; if (crypto->getRC4Key() == 0 && p && subpacket_length > 8 + offset) { p->pBuffer += offset; processRSAKey(p, subpacket_length); p->pBuffer -= offset; } else if (crypto->isEncrypted()) { if (!HandleEmbeddedPacket(p, processed + offset, subpacket_length)) { uchar* buffer = (p->pBuffer + processed + offset); if (!ProcessEmbeddedPacket(buffer, subpacket_length, OP_AppCombined)) { LogWrite(PACKET__ERROR, 0, "Packet", "*** This is bad, ProcessEmbeddedPacket failed, report to Image!"); } } } processed += subpacket_length + offset; } } break; case OP_Packet: { if (!p->pBuffer || (p->Size() < 4)) { break; } uint16 seq = ntohs(*(uint16 *)(p->pBuffer)); sint8 check = CompareSequence(NextInSeq, seq); if (check == SeqFuture) { #ifdef EQN_DEBUG LogWrite(PACKET__DEBUG, 1, "Packet", "*** Future packet: Expecting Seq=%i, but got Seq=%i", NextInSeq, seq); LogWrite(PACKET__DEBUG, 1, "Packet", "[Start]"); p->DumpRawHeader(seq); LogWrite(PACKET__DEBUG, 1, "Packet", "[End]"); #endif OutOfOrderpackets[seq] = p->Copy(); } else if (check == SeqPast) { #ifdef EQN_DEBUG LogWrite(PACKET__DEBUG, 1, "Packet", "*** Duplicate packet: Expecting Seq=%i, but got Seq=%i", NextInSeq, seq); LogWrite(PACKET__DEBUG, 1, "Packet", "[Start]"); p->DumpRawHeader(seq); LogWrite(PACKET__DEBUG, 1, "Packet", "[End]"); #endif SendOutOfOrderAck(seq); } else { EQProtocolPacket* qp = RemoveQueue(seq); if (qp) { LogWrite(PACKET__DEBUG, 1, "Packet", "OP_Fragment: Removing older queued packet with sequence %i", seq); delete qp; } SetNextAckToSend(seq); NextInSeq++; if (HandleEmbeddedPacket(p)) break; if (crypto->getRC4Key() == 0 && p && p->size >= 69) { processRSAKey(p); } else if (crypto->isEncrypted() && p) { MCombineQueueLock.lock(); EQProtocolPacket* newpacket = ProcessEncryptedPacket(p); MCombineQueueLock.unlock(); if (newpacket) { EQApplicationPacket* ap = newpacket->MakeApplicationPacket(2); if (ap->version == 0) ap->version = client_version; #ifdef WRITE_PACKETS WritePackets(ap->GetOpcodeName(), p->pBuffer, p->size, false); #endif InboundQueuePush(ap); safe_delete(newpacket); } } } } break; case OP_Fragment: { if (!p->pBuffer || (p->Size() < 4)) { break; } uint16 seq = ntohs(*(uint16 *)(p->pBuffer)); sint8 check = CompareSequence(NextInSeq, seq); if (check == SeqFuture) { #ifdef EQN_DEBUG LogWrite(PACKET__DEBUG, 1, "Packet", "*** Future packet2: Expecting Seq=%i, but got Seq=%i", NextInSeq, seq); LogWrite(PACKET__DEBUG, 1, "Packet", "[Start]"); LogWrite(PACKET__DEBUG, 1, "Packet", "[End]"); #endif OutOfOrderpackets[seq] = p->Copy(); } else if (check == SeqPast) { #ifdef EQN_DEBUG LogWrite(PACKET__DEBUG, 1, "Packet", "*** Duplicate packet2: Expecting Seq=%i, but got Seq=%i", NextInSeq, seq); LogWrite(PACKET__DEBUG, 1, "Packet", "[Start]"); LogWrite(PACKET__DEBUG, 1, "Packet", "[End]"); #endif SendOutOfOrderAck(seq); } else { EQProtocolPacket* qp = RemoveQueue(seq); if (qp) { LogWrite(PACKET__DEBUG, 1, "Packet", "OP_Fragment: Removing older queued packet with sequence %i", seq); delete qp; } SetNextAckToSend(seq); NextInSeq++; if (oversize_buffer) { memcpy(oversize_buffer + oversize_offset, p->pBuffer + 2, p->size - 2); oversize_offset += p->size - 2; if (oversize_offset == oversize_length) { if (*(p->pBuffer + 2) == 0x00 && *(p->pBuffer + 3) == 0x19) { EQProtocolPacket *subp = new EQProtocolPacket(oversize_buffer, oversize_offset); subp->copyInfo(p); ProcessPacket(subp, p); delete subp; } else { if (crypto->isEncrypted() && p && p->size > 2) { MCombineQueueLock.lock(); EQProtocolPacket* p2 = ProcessEncryptedData(oversize_buffer, oversize_offset, p->opcode); MCombineQueueLock.unlock(); EQApplicationPacket* ap = p2->MakeApplicationPacket(2); ap->copyInfo(p); if (ap->version == 0) ap->version = client_version; #ifdef WRITE_PACKETS WritePackets(ap->GetOpcodeName(), oversize_buffer, oversize_offset, false); #endif ap->copyInfo(p); InboundQueuePush(ap); safe_delete(p2); } } delete[] oversize_buffer; oversize_buffer = NULL; oversize_offset = 0; } } else if (!oversize_buffer) { oversize_length = ntohl(*(uint32 *)(p->pBuffer + 2)); oversize_buffer = new unsigned char[oversize_length]; memcpy(oversize_buffer, p->pBuffer + 6, p->size - 6); oversize_offset = p->size - 6; } } } break; case OP_KeepAlive: { #ifndef COLLECTOR NonSequencedPush(new EQProtocolPacket(p->opcode, p->pBuffer, p->size)); #endif } break; case OP_Ack: { if (!p->pBuffer || (p->Size() < 4)) { LogWrite(PACKET__DEBUG, 9, "Packet", "Received OP_Ack that was of malformed size"); break; } uint16 seq = ntohs(*(uint16*)(p->pBuffer)); AckPackets(seq); retransmittimer = Timer::GetCurrentTime2(); } break; case OP_SessionRequest: { if (p->Size() < sizeof(SessionRequest)) { break; } if (GetState() == ESTABLISHED) { if (streamactive || (sessionAttempts > 30)) { SendDisconnect(false); SetState(CLOSED); break; } } sessionAttempts++; if (GetState() == WAIT_CLOSE) { printf("WAIT_CLOSE Reconnect with streamactive %u, sessionAttempts %u\n", streamactive, sessionAttempts); reconnectAttempt++; } init(GetState() != ESTABLISHED); OutboundQueueClear(); SessionRequest *Request = (SessionRequest *)p->pBuffer; Session = ntohl(Request->Session); SetMaxLen(ntohl(Request->MaxLength)); #ifndef COLLECTOR NextInSeq = 0; Key = 0x33624702; SendSessionResponse(); #endif SetState(ESTABLISHED); } break; case OP_SessionResponse: { if (p->Size() < sizeof(SessionResponse)) { break; } init(); OutboundQueueClear(); SetActive(true); SessionResponse *Response = (SessionResponse *)p->pBuffer; SetMaxLen(ntohl(Response->MaxLength)); Key = ntohl(Response->Key); NextInSeq = 0; SetState(ESTABLISHED); if (!Session) Session = ntohl(Response->Session); compressed = (Response->Format & FLAG_COMPRESSED); encoded = (Response->Format & FLAG_ENCODED); if (compressed) { if (remote_port == 9000 || (remote_port == 0 && p->src_port == 9000)) SetStreamType(WorldStream); else SetStreamType(ZoneStream); } else if (encoded) SetStreamType(ChatOrMailStream); else SetStreamType(LoginStream); } break; case OP_SessionDisconnect: { SendDisconnect(); } break; case OP_OutOfOrderAck: { if (!p->pBuffer || (p->Size() < 4)) { LogWrite(PACKET__DEBUG, 9, "Packet", "Received OP_OutOfOrderAck that was of malformed size"); break; } uint16 seq = ntohs(*(uint16*)(p->pBuffer)); MOutboundQueue.lock(); if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { LogWrite(PACKET__DEBUG, 9, "Packet", "Pre-OOA Invalid Sequenced queue: BS %u + SQ %u != NOS %u", SequencedBase, SequencedQueue.size(), NextOutSeq); } if (CompareSequence(SequencedBase, seq) != SeqPast && CompareSequence(NextOutSeq, seq) == SeqPast) { uint16 sqsize = SequencedQueue.size(); uint16 index = seq - SequencedBase; LogWrite(PACKET__DEBUG, 9, "Packet", "OP_OutOfOrderAck marking packet acked in queue (queue index = %u, queue size = %u)", index, sqsize); if (index < sqsize) { SequencedQueue[index]->acked = true; uint16 count = 0; uint32 timeout = AverageDelta * 2 + 100; for (auto sitr = SequencedQueue.begin(); sitr != SequencedQueue.end() && count < index; ++sitr, ++count) { if (!(*sitr)->acked && (*sitr)->sent_time > 0 && (((*sitr)->sent_time + timeout) < Timer::GetCurrentTime2())) { (*sitr)->sent_time = 0; LogWrite(PACKET__DEBUG, 9, "Packet", "OP_OutOfOrderAck Flagging packet %u for retransmission", SequencedBase + count); } } } if (RETRANSMIT_TIMEOUT_MULT) { retransmittimer = Timer::GetCurrentTime2(); } } else { LogWrite(PACKET__DEBUG, 9, "Packet", "Received OP_OutOfOrderAck for out-of-window %u. Window (%u->%u)", seq, SequencedBase, NextOutSeq); } if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { LogWrite(PACKET__DEBUG, 9, "Packet", "Post-OOA Invalid Sequenced queue: BS %u + SQ %u != NOS %u", SequencedBase, SequencedQueue.size(), NextOutSeq); } MOutboundQueue.unlock(); } break; case OP_ServerKeyRequest: { if (p->Size() < sizeof(ClientSessionStats)) { break; } ClientSessionStats* Stats = (ClientSessionStats*)p->pBuffer; int16 request_id = Stats->RequestID; AdjustRates(ntohl(Stats->average_delta)); ServerSessionStats* stats = (ServerSessionStats*)p->pBuffer; memset(stats, 0, sizeof(ServerSessionStats)); stats->RequestID = request_id; stats->current_time = ntohl(Timer::GetCurrentTime2()); stats->sent_packets = ntohl(sent_packets); stats->sent_packets2 = ntohl(sent_packets); stats->received_packets = ntohl(received_packets); stats->received_packets2 = ntohl(received_packets); NonSequencedPush(new EQProtocolPacket(OP_SessionStatResponse, p->pBuffer, p->size)); if (!crypto->isEncrypted()) SendKeyRequest(); else SendSessionResponse(); } break; case OP_SessionStatResponse: { LogWrite(PACKET__INFO, 0, "Packet", "OP_SessionStatResponse"); } break; case OP_OutOfSession: { LogWrite(PACKET__INFO, 0, "Packet", "OP_OutOfSession"); SendDisconnect(); SetState(CLOSED); } break; default: cout << "Orig Packet: " << p->opcode << endl; DumpPacket(p->pBuffer, p->size); if (p && p->size >= 69) { processRSAKey(p); } MCombineQueueLock.lock(); EQProtocolPacket* p2 = ProcessEncryptedData(p->pBuffer, p->size, OP_Packet); MCombineQueueLock.unlock(); cout << "Decrypted Packet: " << p2->opcode << endl; DumpPacket(p2->pBuffer, p2->size); safe_delete(p2); LogWrite(PACKET__INFO, 0, "Packet", "Received unknown packet type, not adding to inbound queue"); break; } } } // Compress EQ2 packet data using zlib compression int8 EQStream::EQ2_Compress(EQ2Packet* app, int8 offset) { uchar* pDataPtr = app->pBuffer + offset; int xpandSize = app->size * 2; uchar* deflate_buff = new uchar[xpandSize]; MCompressData.lock(); stream.next_in = pDataPtr; stream.avail_in = app->size - offset; stream.next_out = deflate_buff; stream.avail_out = xpandSize; int ret = deflate(&stream, Z_SYNC_FLUSH); if (ret != Z_OK) { printf("ZLIB COMPRESSION RETFAIL: %i, %i (Ret: %i)\n", app->size, stream.avail_out, ret); MCompressData.unlock(); safe_delete_array(deflate_buff); return 0; } int32 newsize = xpandSize - stream.avail_out; safe_delete_array(app->pBuffer); app->size = newsize + offset; app->pBuffer = new uchar[app->size]; app->pBuffer[(offset - 1)] = 1; memcpy(app->pBuffer + offset, deflate_buff, newsize); MCompressData.unlock(); safe_delete_array(deflate_buff); return offset - 1; } // Process RSA key from packet for encryption setup int16 EQStream::processRSAKey(EQProtocolPacket *p, uint16 subpacket_length) { if (subpacket_length) crypto->setRC4Key(Crypto::RSADecrypt(p->pBuffer + subpacket_length - 8, 8)); else crypto->setRC4Key(Crypto::RSADecrypt(p->pBuffer + p->size - 8, 8)); return 0; } // Send encryption key request to client void EQStream::SendKeyRequest() { int32 crypto_key_size = 60; int16 size = sizeof(KeyGen_Struct) + sizeof(KeyGen_End_Struct) + crypto_key_size; EQ2Packet *outapp = new EQ2Packet(OP_WSLoginRequestMsg, NULL, size); memcpy(&outapp->pBuffer[0], &crypto_key_size, sizeof(int32)); memset(&outapp->pBuffer[4], 0xFF, crypto_key_size); memset(&outapp->pBuffer[size - 5], 1, 1); memset(&outapp->pBuffer[size - 1], 1, 1); EQ2QueuePacket(outapp, true); } // Encrypt packet data using RC4 encryption void EQStream::EncryptPacket(EQ2Packet* app, int8 compress_offset, int8 offset) { if (app->size > 2 && crypto->isEncrypted()) { app->packet_encrypted = true; uchar* crypt_buff = app->pBuffer; if (app->eq2_compressed) crypto->RC4Encrypt(crypt_buff + compress_offset, app->size - compress_offset); else crypto->RC4Encrypt(crypt_buff + 2 + offset, app->size - 2 - offset); } } // Queue EQ2 packet for transmission with optional combining void EQStream::EQ2QueuePacket(EQ2Packet* app, bool attempted_combine) { if (CheckActive()) { if (!attempted_combine) { MCombineQueueLock.lock(); combine_queue.push_back(app); MCombineQueueLock.unlock(); } else { MCombineQueueLock.lock(); PreparePacket(app); MCombineQueueLock.unlock(); SendPacket(app); } } } // Reverse packet preparation for debugging purposes void EQStream::UnPreparePacket(EQ2Packet* app) { if (app->pBuffer[2] == 0 && app->pBuffer[3] == 19) { uchar* new_buffer = new uchar[app->size - 3]; memcpy(new_buffer + 2, app->pBuffer + 5, app->size - 3); delete[] app->pBuffer; app->size -= 3; app->pBuffer = new_buffer; } } #ifdef WRITE_PACKETS // Convert unprintable characters to dots for packet logging char EQStream::GetChar(uchar in) { if (in < ' ' || in > '~') return '.'; return (char)in; } // Write formatted output to packet log file void EQStream::WriteToFile(char* pFormat, ...) { va_list args; va_start(args, pFormat); vfprintf(write_packets, pFormat, args); va_end(args); } // Log packet data in hex dump format void EQStream::WritePackets(const char* opcodeName, uchar* data, int32 size, bool outgoing) { MWritePackets.lock(); struct in_addr ip_addr; ip_addr.s_addr = remote_ip; char timebuffer[80]; time_t rawtime; struct tm* timeinfo; time(&rawtime); timeinfo = localtime(&rawtime); strftime(timebuffer, 80, "%m/%d/%Y %H:%M:%S", timeinfo); if (outgoing) WriteToFile("-- %s --\n%s\nSERVER -> %s\n", opcodeName, timebuffer, inet_ntoa(ip_addr)); else WriteToFile("-- %s --\n%s\n%s -> SERVER\n", opcodeName, timebuffer, inet_ntoa(ip_addr)); int i; int nLines = size / 16; int nExtra = size % 16; uchar* pPtr = data; for (i = 0; i < nLines; i++) { WriteToFile("%4.4X:\t%2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %2.2X %c%c%c%c%c%c%c%c%c%c%c%c%c%c%c%c\n", i * 16, pPtr[0], pPtr[1], pPtr[2], pPtr[3], pPtr[4], pPtr[5], pPtr[6], pPtr[7], pPtr[8], pPtr[9], pPtr[10], pPtr[11], pPtr[12], pPtr[13], pPtr[14], pPtr[15], GetChar(pPtr[0]), GetChar(pPtr[1]), GetChar(pPtr[2]), GetChar(pPtr[3]), GetChar(pPtr[4]), GetChar(pPtr[5]), GetChar(pPtr[6]), GetChar(pPtr[7]), GetChar(pPtr[8]), GetChar(pPtr[9]), GetChar(pPtr[10]), GetChar(pPtr[11]), GetChar(pPtr[12]), GetChar(pPtr[13]), GetChar(pPtr[14]), GetChar(pPtr[15])); pPtr += 16; } if (nExtra) { WriteToFile("%4.4X\t", nLines * 16); for (i = 0; i < nExtra; i++) { WriteToFile("%2.2X ", pPtr[i]); } for (i; i < 16; i++) WriteToFile(" "); for (i = 0; i < nExtra; i++) { WriteToFile("%c", GetChar(pPtr[i])); } WriteToFile("\n"); } WriteToFile("\n\n"); fflush(write_packets); MWritePackets.unlock(); } // Log EQ2 packet with opcode name resolution void EQStream::WritePackets(EQ2Packet* app, bool outgoing) { if (app->version == 0) app->version = client_version; WritePackets(app->GetOpcodeName(), app->pBuffer, app->size, outgoing); } #endif // Prepare EQ2 packet for transmission (compression, encryption) void EQStream::PreparePacket(EQ2Packet* app, int8 offset) { app->setVersion(client_version); compressed_offset = 0; if (!app->packet_prepared) { if (app->PreparePacket(MaxLen) == 255) return; } #ifdef WRITE_PACKETS if (!app->eq2_compressed && !app->packet_encrypted) WritePackets(app, true); #endif if (!app->eq2_compressed && app->size > 128) { compressed_offset = EQ2_Compress(app); if (compressed_offset) app->eq2_compressed = true; } if (!app->packet_encrypted) { EncryptPacket(app, compressed_offset, offset); if (app->size > 2 && app->pBuffer[2] == 0) { uchar* new_buffer = new uchar[app->size + 1]; new_buffer[2] = 0; memcpy(new_buffer + 3, app->pBuffer + 2, app->size - 2); delete[] app->pBuffer; app->pBuffer = new_buffer; app->size++; } } } // Send protocol packet, fragmenting if necessary void EQStream::SendPacket(EQProtocolPacket *p) { uint32 chunksize, used; uint32 length; if (p->size > (MaxLen - 8)) { uchar* tmpbuff = p->pBuffer; length = p->size - 2; EQProtocolPacket *out = new EQProtocolPacket(OP_Fragment, NULL, MaxLen - 4); *(uint32 *)(out->pBuffer + 2) = htonl(length); used = MaxLen - 10; memcpy(out->pBuffer + 6, tmpbuff + 2, used); SequencedPush(out); while (used < length) { chunksize = min(length - used, MaxLen - 6); out = new EQProtocolPacket(OP_Fragment, NULL, chunksize + 2); memcpy(out->pBuffer + 2, tmpbuff + used + 2, chunksize); SequencedPush(out); used += chunksize; } delete p; } else { SequencedPush(p); } } // Send application packet, converting to protocol packet first void EQStream::SendPacket(EQApplicationPacket *p) { uint32 chunksize, used; uint32 length; if (p->size > (MaxLen - 8)) { unsigned char *tmpbuff = new unsigned char[p->size + 2]; length = p->serialize(tmpbuff); EQProtocolPacket *out = new EQProtocolPacket(OP_Fragment, NULL, MaxLen - 4); *(uint32 *)(out->pBuffer + 2) = htonl(p->Size()); memcpy(out->pBuffer + 6, tmpbuff, MaxLen - 10); used = MaxLen - 10; SequencedPush(out); while (used < length) { out = new EQProtocolPacket(OP_Fragment, NULL, MaxLen - 4); chunksize = min(length - used, MaxLen - 6); memcpy(out->pBuffer + 2, tmpbuff + used, chunksize); out->size = chunksize + 2; SequencedPush(out); used += chunksize; } delete p; delete[] tmpbuff; } else { EQProtocolPacket *out = new EQProtocolPacket(OP_Packet, NULL, p->Size() + 2); p->serialize(out->pBuffer + 2); SequencedPush(out); delete p; } } // Add packet to sequenced queue with sequence number void EQStream::SequencedPush(EQProtocolPacket *p) { p->setVersion(client_version); MOutboundQueue.lock(); *(uint16 *)(p->pBuffer) = htons(NextOutSeq); SequencedQueue.push_back(p); p->sequence = NextOutSeq; NextOutSeq++; MOutboundQueue.unlock(); } // Add packet to non-sequenced queue void EQStream::NonSequencedPush(EQProtocolPacket *p) { p->setVersion(client_version); MOutboundQueue.lock(); NonSequencedQueue.push(p); MOutboundQueue.unlock(); } // Send acknowledgment for received packet void EQStream::SendAck(uint16 seq) { uint16 Seq = htons(seq); SetLastAckSent(seq); NonSequencedPush(new EQProtocolPacket(OP_Ack, (unsigned char *)&Seq, sizeof(uint16))); } // Send out-of-order acknowledgment void EQStream::SendOutOfOrderAck(uint16 seq) { uint16 Seq = htons(seq); NonSequencedPush(new EQProtocolPacket(OP_OutOfOrderAck, (unsigned char *)&Seq, sizeof(uint16))); } // Check and process combine queue for packet combining bool EQStream::CheckCombineQueue() { bool ret = true; MCombineQueueLock.lock(); if (combine_queue.size() > 0) { EQ2Packet* first = combine_queue.front(); combine_queue.pop_front(); if (combine_queue.size() == 0) { EQ2QueuePacket(first, true); } else { PreparePacket(first); EQ2Packet* second = 0; bool combine_worked = false; int16 count = 0; while (combine_queue.size()) { count++; second = combine_queue.front(); combine_queue.pop_front(); PreparePacket(second); if (!first->AppCombine(second)) { first->SetProtocolOpcode(OP_Packet); if (combine_worked) { SequencedPush(first); } else { EQ2QueuePacket(first, true); } first = second; combine_worked = false; } else { combine_worked = true; } if (count >= 60 || first->size > 4000) { ret = false; break; } } if (first) { first->SetProtocolOpcode(OP_Packet); if (combine_worked) { SequencedPush(first); } else { EQ2QueuePacket(first, true); } } } } MCombineQueueLock.unlock(); return ret; } // Check resend queue and retransmit packets as needed void EQStream::CheckResend(int eq_fd) { int32 curr = Timer::GetCurrentTime2(); EQProtocolPacket* packet = 0; deque::iterator itr; MResendQue.lock(); for (itr = resend_que.begin(); itr != resend_que.end(); itr++) { packet = *itr; if (packet->attempt_count >= 5) { safe_delete(packet); itr = resend_que.erase(itr); if (itr == resend_que.end()) break; } else { if ((curr - packet->sent_time) < 1000) continue; packet->sent_time -= 1000; packet->attempt_count++; WritePacket(eq_fd, packet); } } MResendQue.unlock(); } // Compare sequence numbers accounting for wrap-around EQStream::SeqOrder EQStream::CompareSequence(uint16 expected_seq, uint16 seq) { if (expected_seq == seq) { return SeqInOrder; } else if ((seq > expected_seq && (uint32)seq < ((uint32)expected_seq + EQStream::MaxWindowSize)) || seq < (expected_seq - EQStream::MaxWindowSize)) { return SeqFuture; } else { return SeqPast; } } // Process acknowledgments and remove acked packets from queue void EQStream::AckPackets(uint16 seq) { std::deque::iterator itr, tmp; MOutboundQueue.lock(); SeqOrder ord = CompareSequence(SequencedBase, seq); if (ord == SeqInOrder) { LogWrite(PACKET__DEBUG, 9, "Packet", "Received an ack with no window advancement (seq %u)", seq); } else if (ord == SeqPast) { LogWrite(PACKET__DEBUG, 9, "Packet", "Received an ack with backward window advancement (they gave %u, our window starts at %u). This is bad", seq, SequencedBase); } else { LogWrite(PACKET__DEBUG, 9, "Packet", "Received an ack up through sequence %u. Our base is %u", seq, SequencedBase); seq++; while (SequencedBase != seq) { if (SequencedQueue.empty()) { LogWrite(PACKET__DEBUG, 9, "Packet", "OUT OF PACKETS acked packet with sequence %u. Next send is %u before this", (unsigned long)SequencedBase, SequencedQueue.size()); SequencedBase = NextOutSeq; break; } LogWrite(PACKET__DEBUG, 9, "Packet", "Removing acked packet with sequence %u", (unsigned long)SequencedBase); delete SequencedQueue.front(); SequencedQueue.pop_front(); SequencedBase++; } if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { LogWrite(PACKET__DEBUG, 9, "Packet", "Post-Ack on %u Invalid Sequenced queue: BS %u + SQ %u != NOS %u", seq, SequencedBase, SequencedQueue.size(), NextOutSeq); } } MOutboundQueue.unlock(); } // Main write function - sends packets to network void EQStream::Write(int eq_fd) { queue ReadyToSend; long maxack; MRate.lock(); sint32 threshold = RateThreshold; MRate.unlock(); if (BytesWritten > threshold) { return; } MCombinedAppPacket.lock(); EQApplicationPacket *CombPack = CombinedAppPacket; CombinedAppPacket = NULL; MCombinedAppPacket.unlock(); if (CombPack) { SendPacket(CombPack); } MAcks.lock(); maxack = MaxAckReceived; if (NextAckToSend > LastAckSent || LastAckSent == 0x0000ffff) SendAck(NextAckToSend); MAcks.unlock(); MOutboundQueue.lock(); EQProtocolPacket *p = NULL; std::deque::iterator sitr; sitr = SequencedQueue.begin(); uint16 count = 0; while (sitr != SequencedQueue.end() && (*sitr)->sent_time > 0) { ++sitr; ++count; } bool SeqEmpty = false, NonSeqEmpty = false; while (!SeqEmpty || !NonSeqEmpty) { if (!NonSequencedQueue.empty()) { if (!p) { p = NonSequencedQueue.front(); LogWrite(PACKET__DEBUG, 9, "Packet", "Starting combined packet with non-seq packet of len %u", p->size); NonSequencedQueue.pop(); } else if (!p->combine(NonSequencedQueue.front())) { LogWrite(PACKET__DEBUG, 9, "Packet", "Combined packet full at len %u, next non-seq packet is len %u", p->size, (NonSequencedQueue.front())->size); ReadyToSend.push(p); BytesWritten += p->size; p = nullptr; if (BytesWritten > threshold) { LogWrite(PACKET__DEBUG, 9, "Packet", "Exceeded write threshold in nonseq (%u > %u)", BytesWritten, threshold); break; } } else { LogWrite(PACKET__DEBUG, 9, "Packet", "Combined non-seq packet of len %u, yeilding %u combined", (NonSequencedQueue.front())->size, p->size); delete NonSequencedQueue.front(); NonSequencedQueue.pop(); } } else { NonSeqEmpty = true; } if (sitr != SequencedQueue.end()) { uint16 seq_send = SequencedBase + count; if (SequencedQueue.empty()) { LogWrite(PACKET__DEBUG, 9, "Packet", "Tried to write a packet with an empty queue (%u is past next out %u)", seq_send, NextOutSeq); SeqEmpty = true; continue; } if ((*sitr)->acked || (*sitr)->sent_time != 0) { ++sitr; ++count; if (p) { LogWrite(PACKET__DEBUG, 9, "Packet", "Final combined packet not full, len %u", p->size); ReadyToSend.push(p); BytesWritten += p->size; p = nullptr; } LogWrite(PACKET__DEBUG, 9, "Packet", "Not retransmitting seq packet %u because already marked as acked", seq_send); } else if (!p) { p = (*sitr)->Copy(); LogWrite(PACKET__DEBUG, 9, "Packet", "Starting combined packet with seq packet %u of len %u", seq_send, p->size); (*sitr)->sent_time = Timer::GetCurrentTime2(); ++sitr; ++count; } else if (!p->combine(*sitr)) { LogWrite(PACKET__DEBUG, 9, "Packet", "Combined packet full at len %u, next seq packet %u is len %u", p->size, seq_send + 1, (*sitr)->size); ReadyToSend.push(p); BytesWritten += p->size; p = nullptr; if ((*sitr)->opcode != OP_Fragment && BytesWritten > threshold) { LogWrite(PACKET__DEBUG, 9, "Packet", "Exceeded write threshold in seq (%u > %u)", BytesWritten, threshold); break; } } else { LogWrite(PACKET__DEBUG, 9, "Packet", "Combined seq packet %u of len %u, yeilding %u combined", seq_send, (*sitr)->size, p->size); (*sitr)->sent_time = Timer::GetCurrentTime2(); ++sitr; ++count; } if (uint16(SequencedBase + SequencedQueue.size()) != NextOutSeq) { LogWrite(PACKET__DEBUG, 9, "Packet", "Post send Invalid Sequenced queue: BS %u + SQ %u != NOS %u", SequencedBase, SequencedQueue.size(), NextOutSeq); } } else { SeqEmpty = true; } } MOutboundQueue.unlock(); if (p) { LogWrite(PACKET__DEBUG, 9, "Packet", "Final combined packet not full, len %u", p->size); ReadyToSend.push(p); BytesWritten += p->size; } while (!ReadyToSend.empty()) { p = ReadyToSend.front(); WritePacket(eq_fd, p); delete p; ReadyToSend.pop(); } if (SeqEmpty && NonSeqEmpty) { if (GetState() == CLOSING) { MOutboundQueue.lock(); if (SequencedQueue.size() > 0) { // retransmission attempts } else { LogWrite(PACKET__DEBUG, 9, "Packet", "All outgoing data flushed, disconnecting client."); SendDisconnect(); } MOutboundQueue.unlock(); } } } // Write packet to socket void EQStream::WritePacket(int eq_fd, EQProtocolPacket *p) { uint32 length = 0; sockaddr_in address; address.sin_family = AF_INET; address.sin_addr.s_addr = remote_ip; address.sin_port = remote_port; length = p->serialize(buffer); if (p->opcode != OP_SessionRequest && p->opcode != OP_SessionResponse) { if (compressed) { BytesWritten -= p->size; uint32 newlen = EQProtocolPacket::Compress(buffer, length, write_buffer, 2048); memcpy(buffer, write_buffer, newlen); length = newlen; BytesWritten += newlen; } if (encoded) { EQProtocolPacket::ChatEncode(buffer, length, Key); } *(uint16 *)(buffer + length) = htons(CRC16(buffer, length, Key)); length += 2; } sent_packets++; sendto(eq_fd, (char *)buffer, length, 0, (sockaddr *)&address, sizeof(address)); } // Read packet from socket EQProtocolPacket *EQStream::Read(int eq_fd, sockaddr_in *from) { int socklen; int length = 0; unsigned char buffer[2048]; EQProtocolPacket *p = NULL; char temp[15]; socklen = sizeof(sockaddr); length = recvfrom(eq_fd, buffer, 2048, 0, (struct sockaddr*)from, (socklen_t *)&socklen); if (length >= 2) { DumpPacket(buffer, length); p = new EQProtocolPacket(buffer[1], &buffer[2], length - 2); uint32 ip = from->sin_addr.s_addr; sprintf(temp, "%d.%d.%d.%d:%d", *(unsigned char *)&ip, *((unsigned char *)&ip + 1), *((unsigned char *)&ip + 2), *((unsigned char *)&ip + 3), ntohs(from->sin_port)); } return p; } // Send session response to client void EQStream::SendSessionResponse() { EQProtocolPacket *out = new EQProtocolPacket(OP_SessionResponse, NULL, sizeof(SessionResponse)); SessionResponse *Response = (SessionResponse *)out->pBuffer; Response->Session = htonl(Session); Response->MaxLength = htonl(MaxLen); Response->UnknownA = 2; Response->Format = 0; if (compressed) Response->Format |= FLAG_COMPRESSED; if (encoded) Response->Format |= FLAG_ENCODED; Response->Key = htonl(Key); out->size = sizeof(SessionResponse); NonSequencedPush(out); } // Send session request to server void EQStream::SendSessionRequest() { EQProtocolPacket *out = new EQProtocolPacket(OP_SessionRequest, NULL, sizeof(SessionRequest)); SessionRequest *Request = (SessionRequest *)out->pBuffer; memset(Request, 0, sizeof(SessionRequest)); Request->Session = htonl(time(NULL)); Request->MaxLength = htonl(512); NonSequencedPush(out); } // Send disconnect message to peer void EQStream::SendDisconnect(bool setstate) { try { if (GetState() != ESTABLISHED && GetState() != WAIT_CLOSE) return; EQProtocolPacket *out = new EQProtocolPacket(OP_SessionDisconnect, NULL, sizeof(uint32) + sizeof(int16)); *(uint32 *)out->pBuffer = htonl(Session); out->pBuffer[4] = 0; out->pBuffer[5] = 6; NonSequencedPush(out); if (setstate) SetState(CLOSING); } catch (...) {} } // Add packet to inbound queue void EQStream::InboundQueuePush(EQApplicationPacket *p) { MInboundQueue.lock(); InboundQueue.push_back(p); MInboundQueue.unlock(); } // Remove and return next packet from inbound queue EQApplicationPacket *EQStream::PopPacket() { EQApplicationPacket *p = NULL; MInboundQueue.lock(); if (InboundQueue.size()) { p = InboundQueue.front(); InboundQueue.pop_front(); } MInboundQueue.unlock(); if (p) p->setVersion(client_version); return p; } // Clear all packets from inbound queue void EQStream::InboundQueueClear() { MInboundQueue.lock(); while (InboundQueue.size()) { delete InboundQueue.front(); InboundQueue.pop_front(); } MInboundQueue.unlock(); } // Encrypt packet data (placeholder implementation) void EQStream::EncryptPacket(uchar* data, int16 size) { if (size > 6) { // Encryption implementation would go here } } // Check if stream has outgoing data ready to send bool EQStream::HasOutgoingData() { bool flag; if (CheckClosed()) return false; MOutboundQueue.lock(); flag = (!NonSequencedQueue.empty()); if (!flag) { flag = (!SequencedQueue.empty()); } MOutboundQueue.unlock(); if (!flag) { MAcks.lock(); flag = (NextAckToSend > LastAckSent); MAcks.unlock(); } if (!flag) { MCombinedAppPacket.lock(); flag = (CombinedAppPacket != NULL); MCombinedAppPacket.unlock(); } return flag; } // Clear all outbound queues void EQStream::OutboundQueueClear() { MOutboundQueue.lock(); while (NonSequencedQueue.size()) { delete NonSequencedQueue.front(); NonSequencedQueue.pop(); } while (SequencedQueue.size()) { delete SequencedQueue.front(); SequencedQueue.pop_front(); } MOutboundQueue.unlock(); } // Process incoming data buffer void EQStream::Process(const unsigned char *buffer, const uint32 length) { received_packets++; static unsigned char newbuffer[2048]; uint32 newlength = 0; if (EQProtocolPacket::ValidateCRC(buffer, length, Key)) { if (compressed) { newlength = EQProtocolPacket::Decompress(buffer, length, newbuffer, 2048); } else { memcpy(newbuffer, buffer, length); newlength = length; if (encoded) EQProtocolPacket::ChatDecode(newbuffer, newlength - 2, Key); } uint16 opcode = ntohs(*(const uint16 *)newbuffer); if (opcode > 0 && opcode <= OP_OutOfSession) { if (buffer[1] != 0x01 && buffer[1] != 0x02 && buffer[1] != 0x1d) newlength -= 2; EQProtocolPacket p(newbuffer, newlength); ProcessPacket(&p); } else { cout << "2Orig Packet: " << opcode << endl; DumpPacket(newbuffer, newlength); ProcessEmbeddedPacket(newbuffer, newlength, OP_Fragment); } ProcessQueue(); } else { cout << "Incoming packet failed checksum:" << endl; dump_message_column(const_cast(buffer), length, "CRC failed: "); } } // Acknowledgment tracking getters long EQStream::GetMaxAckReceived() { MAcks.lock(); long l = MaxAckReceived; MAcks.unlock(); return l; } long EQStream::GetNextAckToSend() { MAcks.lock(); long l = NextAckToSend; MAcks.unlock(); return l; } long EQStream::GetLastAckSent() { MAcks.lock(); long l = LastAckSent; MAcks.unlock(); return l; } // Acknowledgment tracking setters void EQStream::SetMaxAckReceived(uint32 seq) { deque::iterator itr; MAcks.lock(); MaxAckReceived = seq; MAcks.unlock(); MOutboundQueue.lock(); if (long(seq) > LastSeqSent) LastSeqSent = seq; MResendQue.lock(); EQProtocolPacket* packet = 0; for (itr = resend_que.begin(); itr != resend_que.end(); itr++) { packet = *itr; if (packet && packet->sequence <= seq) { safe_delete(packet); itr = resend_que.erase(itr); if (itr == resend_que.end()) break; } } MResendQue.unlock(); MOutboundQueue.unlock(); } void EQStream::SetNextAckToSend(uint32 seq) { MAcks.lock(); NextAckToSend = seq; MAcks.unlock(); } void EQStream::SetLastAckSent(uint32 seq) { MAcks.lock(); LastAckSent = seq; MAcks.unlock(); } void EQStream::SetLastSeqSent(uint32 seq) { MOutboundQueue.lock(); LastSeqSent = seq; MOutboundQueue.unlock(); } // Set stream type and configure related settings void EQStream::SetStreamType(EQStreamType type) { StreamType = type; switch (StreamType) { case LoginStream: app_opcode_size = 1; compressed = false; encoded = false; break; case EQ2Stream: app_opcode_size = 2; compressed = false; encoded = false; break; case ChatOrMailStream: case ChatStream: case MailStream: app_opcode_size = 1; compressed = false; encoded = true; break; case ZoneStream: case WorldStream: default: app_opcode_size = 2; compressed = true; encoded = false; break; } } // Process out-of-order packet queue void EQStream::ProcessQueue() { if (OutOfOrderpackets.empty()) { return; } EQProtocolPacket* qp = NULL; while ((qp = RemoveQueue(NextInSeq)) != NULL) { ProcessPacket(qp); delete qp; } } // Remove specific sequence from out-of-order queue EQProtocolPacket* EQStream::RemoveQueue(uint16 seq) { map::iterator itr; EQProtocolPacket* qp = NULL; if ((itr = OutOfOrderpackets.find(seq)) != OutOfOrderpackets.end()) { qp = itr->second; OutOfOrderpackets.erase(itr); } return qp; } // Decay byte counters and check for packet timeouts void EQStream::Decay() { MRate.lock(); uint32 rate = DecayRate; MRate.unlock(); if (BytesWritten > 0) { BytesWritten -= rate; if (BytesWritten < 0) BytesWritten = 0; } int count = 0; MOutboundQueue.lock(); for (auto sitr = SequencedQueue.begin(); sitr != SequencedQueue.end(); ++sitr, count++) { if (!(*sitr)->acked && (*sitr)->sent_time > 0 && ((*sitr)->sent_time + retransmittimeout) < Timer::GetCurrentTime2()) { (*sitr)->sent_time = 0; LogWrite(PACKET__DEBUG, 9, "Packet", "Timeout exceeded for seq %u. Flagging packet for retransmission", SequencedBase + count); } } MOutboundQueue.unlock(); } // Adjust transmission rates based on network conditions void EQStream::AdjustRates(uint32 average_delta) { if (average_delta && (average_delta <= AVERAGE_DELTA_MAX)) { MRate.lock(); AverageDelta = average_delta; RateThreshold = RATEBASE / average_delta; DecayRate = DECAYBASE / average_delta; if (BytesWritten > RateThreshold) BytesWritten = RateThreshold + DecayRate; MRate.unlock(); } else { AverageDelta = AVERAGE_DELTA_MAX; } }