diff --git a/packets/protopacket.go b/packets/protopacket.go index cb627e6..5480b1b 100644 --- a/packets/protopacket.go +++ b/packets/protopacket.go @@ -8,7 +8,6 @@ import ( ) // ProtoPacket handles low-level protocol features including EQ2-specific operations -// Merges functionality from EQProtocolPacket and EQ2Packet type ProtoPacket struct { *Packet @@ -16,7 +15,7 @@ type ProtoPacket struct { flags uint8 // bit 0: compressed, bit 1: prepared, bit 2: encrypted, bit 3: acked // EQ2-specific - LoginOp opcodes.EmuOpcode // From EQ2Packet + LoginOp opcodes.EmuOpcode // Reliability and sequencing Sequence int32 @@ -39,7 +38,7 @@ const ( FlagAcked ) -// Default compression threshold (compress packets larger than this) +// Default compression threshold const DefaultCompressThreshold = 100 // NewProtoPacket creates a protocol packet with opcode and buffer @@ -190,7 +189,7 @@ func (p *ProtoPacket) EncodeChat() { return } - if IsChatPacket(p.Opcode) { + if p.IsChatPacket() { ChatEncode(p.Buffer, p.EncodeKey) p.SetEncrypted(true) } @@ -206,6 +205,29 @@ func (p *ProtoPacket) DecodeChat() { p.SetEncrypted(false) } +// IsChatPacket checks if this is a chat packet using opcode manager +func (p *ProtoPacket) IsChatPacket() bool { + if p.manager == nil { + return false + } + + // Get emulator opcode + emuOp := p.manager.EQToEmu(p.Opcode) + + // Check against known chat opcodes + switch emuOp { + case opcodes.OP_ChatMsg, + opcodes.OP_TellMsg, + opcodes.OP_ChatLeaveChannelMsg, + opcodes.OP_ChatTellChannelMsg, + opcodes.OP_ChatTellUserMsg, + opcodes.OP_GuildsayMsg: + return true + default: + return false + } +} + // Copy creates a deep copy of this protocol packet func (p *ProtoPacket) Copy() *ProtoPacket { newPacket := &ProtoPacket{ @@ -255,7 +277,7 @@ func (p *ProtoPacket) Serialize(dest []byte, offset int8) uint32 { return uint32(len(p.Buffer)-int(offset)) + uint32(pos) } -// PreparePacket prepares an EQ2 packet for transmission (from EQ2Packet) +// PreparePacket prepares an EQ2 packet for transmission func (p *ProtoPacket) PreparePacket(maxLen int16) int8 { if p.IsPrepared() { return 0 @@ -367,53 +389,122 @@ func (p *ProtoPacket) MakeApplicationPacket(opcodeSize uint8) *AppPacket { return app } -// AppCombine combines this packet with another for efficient transmission (from EQ2Packet) +// AppCombine combines this packet with another for efficient transmission func (p *ProtoPacket) AppCombine(rhs *ProtoPacket) bool { - const opAppCombined = 0x19 // OP_AppCombined value + const opAppCombined = 0x19 // OP_AppCombined - // Apply compression to both packets before combining - p.CompressPacket() - rhs.CompressPacket() + // Calculate sizes + lhsSize := p.Size() + rhsSize := rhs.Size() - // Case 1: This packet is already a combined packet - if p.Opcode == opAppCombined && (len(p.Buffer)+len(rhs.Buffer)+3) < 255 { - tmpSize := len(rhs.Buffer) - 2 - overSized := tmpSize >= 255 + // Check max combined size + if lhsSize+rhsSize > MaxCombinedSize { + return false + } + // If this packet is already combined, add to it + if p.Opcode == opAppCombined { + // Calculate new size var newSize int - if overSized { - newSize = len(p.Buffer) + tmpSize + 3 + if rhsSize >= 255 { + newSize = len(p.Buffer) + int(rhsSize) + 3 // oversized header } else { - newSize = len(p.Buffer) + tmpSize + 1 + newSize = len(p.Buffer) + int(rhsSize) + 1 // normal header } - tmpBuffer := make([]byte, newSize) + // Check size limit + if newSize > MaxCombinedSize { + return false + } + + // Create new buffer + newBuffer := make([]byte, newSize) pos := 0 - // Copy existing combined packet data - copy(tmpBuffer, p.Buffer) - pos += len(p.Buffer) + // Copy existing combined data + copy(newBuffer, p.Buffer) + pos = len(p.Buffer) - // Add size information for the new packet - if overSized { - tmpBuffer[pos] = 255 + // Add new packet with size header + if rhsSize >= 255 { + newBuffer[pos] = 255 // Oversized marker pos++ - binary.BigEndian.PutUint16(tmpBuffer[pos:], uint16(tmpSize)) + binary.BigEndian.PutUint16(newBuffer[pos:], uint16(rhsSize)) pos += 2 } else { - tmpBuffer[pos] = byte(tmpSize) + newBuffer[pos] = byte(rhsSize) pos++ } - // Copy the new packet data (skip first 2 bytes which are opcode) - if len(rhs.Buffer) > 2 { - copy(tmpBuffer[pos:], rhs.Buffer[2:]) - } + // Serialize rhs packet + tmpBuf := make([]byte, rhsSize) + rhs.Serialize(tmpBuf, 0) + copy(newBuffer[pos:], tmpBuf) - p.Buffer = tmpBuffer + p.Buffer = newBuffer return true } - // Case 2: Create new combined packet (simplified) - return false + // Create new combined packet from two non-combined packets + // Calculate total size: size1(1-3) + packet1 + size2(1-3) + packet2 + var totalSize int + if lhsSize >= 255 { + totalSize += 3 + int(lhsSize) + } else { + totalSize += 1 + int(lhsSize) + } + if rhsSize >= 255 { + totalSize += 3 + int(rhsSize) + } else { + totalSize += 1 + int(rhsSize) + } + + // Check size limit + if totalSize > MaxCombinedSize { + return false + } + + // Build combined packet + newBuffer := make([]byte, totalSize) + pos := 0 + + // Add first packet with size header + if lhsSize >= 255 { + newBuffer[pos] = 255 + pos++ + binary.BigEndian.PutUint16(newBuffer[pos:], uint16(lhsSize)) + pos += 2 + } else { + newBuffer[pos] = byte(lhsSize) + pos++ + } + + // Serialize first packet + tmpBuf := make([]byte, lhsSize) + p.Serialize(tmpBuf, 0) + copy(newBuffer[pos:], tmpBuf) + pos += int(lhsSize) + + // Add second packet with size header + if rhsSize >= 255 { + newBuffer[pos] = 255 + pos++ + binary.BigEndian.PutUint16(newBuffer[pos:], uint16(rhsSize)) + pos += 2 + } else { + newBuffer[pos] = byte(rhsSize) + pos++ + } + + // Serialize second packet + tmpBuf = make([]byte, rhsSize) + rhs.Serialize(tmpBuf, 0) + copy(newBuffer[pos:], tmpBuf) + + // Update this packet to be combined + p.Opcode = opAppCombined + p.Buffer = newBuffer + p.SetPrepared(false) // Need to re-prepare + + return true } diff --git a/stream/factory.go b/stream/factory.go index e0e8fdb..40161c5 100644 --- a/stream/factory.go +++ b/stream/factory.go @@ -95,7 +95,7 @@ func (f *StreamFactory) OnBoot(eng gnet.Engine) gnet.Action { func (f *StreamFactory) OnOpen(c gnet.Conn) ([]byte, gnet.Action) { // Create stream configuration streamCfg := &Config{ - SessionID: uint32(time.Now().UnixNano()), // Generate session ID + SessionID: uint32(time.Now().UnixNano()), CRCKey: f.crcKey, MaxPacketSize: f.maxPacketSize, EncodeKey: f.encodeKey, @@ -181,13 +181,17 @@ func (f *StreamFactory) OnTraffic(c gnet.Conn) gnet.Action { return gnet.None } -// OnTick called periodically +// OnTick called periodically to process stream queues func (f *StreamFactory) OnTick() (time.Duration, gnet.Action) { f.mu.RLock() - defer f.mu.RUnlock() - - // Process retransmissions for all streams + streams := make([]*Stream, 0, len(f.streams)) for _, stream := range f.streams { + streams = append(streams, stream) + } + f.mu.RUnlock() + + // Process queues for all streams + for _, stream := range streams { stream.processQueues() } diff --git a/stream/stream.go b/stream/stream.go index f2bd68d..24f1931 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -26,25 +26,26 @@ type Stream struct { encodeKey int decodeKey int - // Sequence management + // Sequence management with wraparound handling seqOut uint16 seqIn uint16 seqLastAck uint16 seqExpected uint16 // Acknowledgment tracking - pendingAcks map[uint16]*pendingPacket - ackTimer *time.Timer - lastAckSent time.Time - ackThreshold time.Duration - ackQueue []uint16 // Sequences needing ACK + pendingAcks map[uint16]*pendingPacket + ackTimer *time.Timer + lastAckSent time.Time + ackThreshold time.Duration + ackQueue []uint16 + duplicateAckCnt map[uint16]int - // Fragment assembly + // Fragment assembly with expiry fragments map[uint16]*fragmentBuffer nextFragID uint16 - // Out of order handling - outOfOrder map[uint16][]byte + // Out of order handling with expiry + outOfOrder map[uint16]*outOfOrderPacket // Packet queues reliableQueue []*packets.ProtoPacket @@ -59,8 +60,10 @@ type Stream struct { cipher *crypto.Ciphers // Timers - keepAliveTimer *time.Timer - timeoutTimer *time.Timer + keepAliveTimer *time.Timer + timeoutTimer *time.Timer + retransmitTimer *time.Timer + cleanupTimer *time.Timer // Retransmission settings rtt time.Duration @@ -97,6 +100,11 @@ type fragmentBuffer struct { startTime time.Time } +type outOfOrderPacket struct { + data []byte + timestamp time.Time +} + type StreamState int const ( @@ -107,6 +115,14 @@ const ( StateClosed ) +const ( + retransmitInterval = 100 * time.Millisecond + cleanupInterval = 5 * time.Second + fragmentTimeout = 30 * time.Second + outOfOrderTimeout = 10 * time.Second + duplicateAckThreshold = 3 +) + // Config holds stream configuration type Config struct { SessionID uint32 @@ -124,28 +140,29 @@ type Config struct { // NewStream creates a new EQStream instance func NewStream(conn gnet.Conn, cfg *Config) *Stream { s := &Stream{ - conn: conn, - sessionID: cfg.SessionID, - crcKey: cfg.CRCKey, - maxLen: cfg.MaxPacketSize, - encodeKey: cfg.EncodeKey, - decodeKey: cfg.DecodeKey, - opcodeManager: cfg.OpcodeManager, - opcodeSize: cfg.OpcodeSize, - ackThreshold: cfg.AckThreshold, - pendingAcks: make(map[uint16]*pendingPacket), - fragments: make(map[uint16]*fragmentBuffer), - outOfOrder: make(map[uint16][]byte), - ackQueue: make([]uint16, 0), - seqOut: 0, - seqIn: 0, - seqExpected: 0, - rtt: time.Second, - rttVar: 500 * time.Millisecond, - rto: time.Second, - minRTO: 200 * time.Millisecond, - maxRTO: 10 * time.Second, - maxRetries: 10, + conn: conn, + sessionID: cfg.SessionID, + crcKey: cfg.CRCKey, + maxLen: cfg.MaxPacketSize, + encodeKey: cfg.EncodeKey, + decodeKey: cfg.DecodeKey, + opcodeManager: cfg.OpcodeManager, + opcodeSize: cfg.OpcodeSize, + ackThreshold: cfg.AckThreshold, + pendingAcks: make(map[uint16]*pendingPacket), + duplicateAckCnt: make(map[uint16]int), + fragments: make(map[uint16]*fragmentBuffer), + outOfOrder: make(map[uint16]*outOfOrderPacket), + ackQueue: make([]uint16, 0), + seqOut: 0, + seqIn: 0, + seqExpected: 0, + rtt: time.Second, + rttVar: 500 * time.Millisecond, + rto: time.Second, + minRTO: 200 * time.Millisecond, + maxRTO: 10 * time.Second, + maxRetries: 10, } if s.maxLen == 0 { @@ -157,40 +174,38 @@ func NewStream(conn gnet.Conn, cfg *Config) *Stream { s.state.Store(StateDisconnected) - // Initialize cipher if keys provided if cfg.EncodeKey != 0 || cfg.DecodeKey != 0 { cipher, _ := crypto.NewCiphers(int64(cfg.EncodeKey)) s.cipher = cipher } - // Start keepalive timer + // Start timers if cfg.KeepaliveTime > 0 { s.keepAliveTimer = time.AfterFunc(cfg.KeepaliveTime, s.sendKeepalive) } - - // Start timeout timer if cfg.TimeoutDuration > 0 { s.timeoutTimer = time.AfterFunc(cfg.TimeoutDuration, s.handleTimeout) } + s.retransmitTimer = time.AfterFunc(retransmitInterval, s.processRetransmits) + s.cleanupTimer = time.AfterFunc(cleanupInterval, s.cleanup) return s } // Process handles incoming data from gnet func (s *Stream) Process(data []byte) error { - // Validate CRC if len(data) < 2 { return nil } - // Check for CRC (last 2 bytes) + // Check for CRC if len(data) > 2 { providedCRC := binary.BigEndian.Uint16(data[len(data)-2:]) calculatedCRC := crypto.CalculateCRC(data[:len(data)-2], s.crcKey) if providedCRC != calculatedCRC { - return nil // Drop packet with bad CRC + return nil } - data = data[:len(data)-2] // Strip CRC + data = data[:len(data)-2] } // Decrypt if needed @@ -198,7 +213,6 @@ func (s *Stream) Process(data []byte) error { s.cipher.Decrypt(data) } - // Parse protocol opcode if len(data) < 2 { return nil } @@ -234,459 +248,62 @@ func (s *Stream) Process(data []byte) error { return nil } -// handleSessionRequest processes session establishment request -func (s *Stream) handleSessionRequest(data []byte) error { - if len(data) < 10 { - return fmt.Errorf("session request too small") - } - - // Parse session request - // Format: version(4) + sessionID(4) + maxLen(2) - version := binary.BigEndian.Uint32(data[:4]) - sessionID := binary.BigEndian.Uint32(data[4:8]) - maxLen := binary.BigEndian.Uint16(data[8:10]) - - // Update session info - s.mu.Lock() - s.sessionID = sessionID - if maxLen < s.maxLen { - s.maxLen = maxLen // Use smaller of the two - } - s.mu.Unlock() - - // Send session response - response := make([]byte, 14) - binary.BigEndian.PutUint32(response[0:4], sessionID) - binary.BigEndian.PutUint32(response[4:8], s.crcKey) - response[8] = 2 // Encoding type - binary.BigEndian.PutUint16(response[9:11], s.maxLen) - binary.BigEndian.PutUint32(response[11:15], version) - - s.state.Store(StateEstablished) - return s.sendRaw(opcodes.OP_SessionResponse, response) -} - -// handleSessionResponse processes session establishment response -func (s *Stream) handleSessionResponse(data []byte) error { - if len(data) < 14 { - return fmt.Errorf("session response too small") - } - - // Parse response - sessionID := binary.BigEndian.Uint32(data[0:4]) - crcKey := binary.BigEndian.Uint32(data[4:8]) - // encodingType := data[8] - maxLen := binary.BigEndian.Uint16(data[9:11]) - - s.mu.Lock() - s.sessionID = sessionID - s.crcKey = crcKey - if maxLen < s.maxLen { - s.maxLen = maxLen - } - s.mu.Unlock() - - s.state.Store(StateEstablished) - return nil -} - -// handlePacket processes reliable packets -func (s *Stream) handlePacket(data []byte) error { - if len(data) < 2 { - return nil - } - - seq := binary.BigEndian.Uint16(data[:2]) - data = data[2:] - - s.mu.Lock() - defer s.mu.Unlock() - - // Check if this is the expected sequence - if seq == s.seqExpected { - // Process in-order packet - s.seqExpected++ - s.ackQueue = append(s.ackQueue, seq) - - // Process packet data - if err := s.processPacketData(data); err != nil { - return err - } - - // Check for buffered out-of-order packets - for { - if buffered, exists := s.outOfOrder[s.seqExpected]; exists { - delete(s.outOfOrder, s.seqExpected) - s.ackQueue = append(s.ackQueue, s.seqExpected) - s.seqExpected++ - if err := s.processPacketData(buffered); err != nil { - return err - } - } else { - break - } - } - } else if seq > s.seqExpected { - // Out of order - buffer it - s.outOfOrder[seq] = append([]byte(nil), data...) - // Send out-of-order ACK - go s.sendOutOfOrderAck(seq) - } else { - // Duplicate packet - just ACK it - go s.sendAckImmediate(seq) - } - - // Schedule grouped ACK - s.scheduleAck() - - return nil -} - -// handleFragment assembles fragmented packets -func (s *Stream) handleFragment(data []byte) error { - if len(data) < 10 { - return nil - } - - seq := binary.BigEndian.Uint16(data[:2]) - fragID := binary.BigEndian.Uint32(data[2:6]) - fragTotal := binary.BigEndian.Uint16(data[6:8]) - fragCur := binary.BigEndian.Uint16(data[8:10]) - data = data[10:] - - s.mu.Lock() - defer s.mu.Unlock() - - // Get or create fragment buffer - frag, exists := s.fragments[uint16(fragID)] - if !exists { - frag = &fragmentBuffer{ - totalSize: uint32(fragTotal), - chunks: make(map[uint16][]byte), - startTime: time.Now(), - } - s.fragments[uint16(fragID)] = frag - } - - // Store chunk - frag.chunks[fragCur] = append([]byte(nil), data...) - frag.received++ - - // Check if complete - if frag.received == uint32(fragTotal) { - // Reassemble in order - complete := make([]byte, 0) - for i := uint16(0); i < fragTotal; i++ { - if chunk, ok := frag.chunks[i]; ok { - complete = append(complete, chunk...) - } else { - // Missing chunk - wait for retransmit - return nil - } - } - delete(s.fragments, uint16(fragID)) - - // Process reassembled packet - return s.processPacketData(complete) - } - - // ACK the fragment - s.ackQueue = append(s.ackQueue, seq) - s.scheduleAck() - - return nil -} - -// handleAck processes acknowledgments -func (s *Stream) handleAck(data []byte) error { - if len(data) < 2 { - return nil - } - - ackSeq := binary.BigEndian.Uint16(data[:2]) - - s.mu.Lock() - defer s.mu.Unlock() - - // Remove from pending and update RTT - if pending, exists := s.pendingAcks[ackSeq]; exists { - delete(s.pendingAcks, ackSeq) - - // Update RTT estimates (TCP-like algorithm) - sample := time.Since(pending.sentTime) - if s.rtt == 0 { - s.rtt = sample - s.rttVar = sample / 2 - } else { - alpha := 0.125 - beta := 0.25 - s.rttVar = time.Duration((1-beta)*float64(s.rttVar) + beta*float64(absTime(s.rtt-sample))) - s.rtt = time.Duration((1-alpha)*float64(s.rtt) + alpha*float64(sample)) - } - s.rto = s.rtt + 4*s.rttVar - if s.rto < s.minRTO { - s.rto = s.minRTO - } - if s.rto > s.maxRTO { - s.rto = s.maxRTO - } - } - - // Fast retransmit: if ACK is higher than pending packets, trigger retransmit - if ackSeq > s.seqLastAck { - s.seqLastAck = ackSeq - - // Check for gaps that need immediate retransmit - for seq, pending := range s.pendingAcks { - if seq < ackSeq { - // This packet was likely lost, retransmit immediately - s.resendQueue = append(s.resendQueue, pending) - } - } - } - - return nil -} - -// handleOutOfOrderAck handles out-of-order acknowledgments -func (s *Stream) handleOutOfOrderAck(data []byte) error { - if len(data) < 2 { - return nil - } - - seq := binary.BigEndian.Uint16(data[:2]) - - s.mu.Lock() - defer s.mu.Unlock() - - // Immediately retransmit if we have this packet pending - if pending, exists := s.pendingAcks[seq]; exists { - s.resendQueue = append(s.resendQueue, pending) - } - - return nil -} - -// handleCombined processes combined protocol packets -func (s *Stream) handleCombined(data []byte) error { - pos := 0 - - for pos < len(data) { - if pos+1 > len(data) { - break - } - - // Read packet size - size := uint16(data[pos]) - pos++ - - // Check for oversized marker - if size == 0xff { - if pos+2 > len(data) { - break - } - size = binary.BigEndian.Uint16(data[pos : pos+2]) - pos += 2 - } - - // Extract packet - if pos+int(size) > len(data) { - break - } - - packet := data[pos : pos+int(size)] - pos += int(size) - - // Process each sub-packet - if err := s.Process(packet); err != nil { - return err - } - } - - return nil -} - -// handleAppCombined processes combined application packets -func (s *Stream) handleAppCombined(data []byte) error { - pos := 0 - - for pos < len(data) { - if pos+1 > len(data) { - break - } - - // Read packet size - size := uint16(data[pos]) - pos++ - - // Check for oversized marker - if size == 0xff { - if pos+2 > len(data) { - break - } - size = binary.BigEndian.Uint16(data[pos : pos+2]) - pos += 2 - } - - // Extract packet - if pos+int(size) > len(data) { - break - } - - packet := data[pos : pos+int(size)] - pos += int(size) - - // Process as application packet - if err := s.processPacketData(packet); err != nil { - return err - } - } - - return nil -} - // SendPacket sends an application packet func (s *Stream) SendPacket(app *packets.AppPacket) error { - // Check state if s.state.Load() != StateEstablished { return fmt.Errorf("stream not established") } - // Convert to protocol packet - proto := s.appToProto(app) - - // Check if needs fragmentation - if proto.Size() > uint32(s.maxLen-10) { // Reserve space for headers + // Validate packet size + if app.Size() > uint32(s.maxLen) { + proto := s.appToProto(app) return s.sendFragmented(proto) } - // Add to appropriate queue based on reliability + proto := s.appToProto(app) + isUnreliable := s.isUnreliableOpcode(app.GetOpcode()) + s.mu.Lock() - if app.Priority > packets.PriorityNormal { - s.reliableQueue = append(s.reliableQueue, proto) - } else { + if isUnreliable { s.unreliableQueue = append(s.unreliableQueue, proto) + } else { + s.reliableQueue = append(s.reliableQueue, proto) } s.mu.Unlock() - // Process queues return s.processQueues() } -// sendFragmented fragments and sends large packets -func (s *Stream) sendFragmented(proto *packets.ProtoPacket) error { - data := make([]byte, proto.Size()) - proto.Serialize(data, 0) - - // Calculate fragment size (leave room for headers) - fragSize := int(s.maxLen) - 12 // Fragment header is 10 bytes + 2 for protocol - numFrags := (len(data) + fragSize - 1) / fragSize - - if numFrags > 0xFFFF { - return fmt.Errorf("packet too large to fragment") - } - - s.mu.Lock() - fragID := s.nextFragID - s.nextFragID++ - s.mu.Unlock() - - // Send each fragment - for i := 0; i < numFrags; i++ { - start := i * fragSize - end := start + fragSize - if end > len(data) { - end = len(data) - } - - // Build fragment header - fragHeader := make([]byte, 10) - // Sequence added by sendReliable - binary.BigEndian.PutUint32(fragHeader[0:4], uint32(fragID)) - binary.BigEndian.PutUint16(fragHeader[4:6], uint16(numFrags)) - binary.BigEndian.PutUint16(fragHeader[6:8], uint16(i)) - binary.BigEndian.PutUint16(fragHeader[8:10], uint16(end-start)) - - // Combine header and data - fragment := append(fragHeader, data[start:end]...) - - // Send as reliable packet - fragProto := packets.NewProtoPacket(opcodes.OP_Fragment, fragment, s.opcodeManager) - if err := s.sendReliable(fragProto); err != nil { - return err - } - } - - return nil -} - -// sendReliable sends a reliable packet with sequence number -func (s *Stream) sendReliable(proto *packets.ProtoPacket) error { - s.mu.Lock() - seq := s.seqOut - s.seqOut++ - s.mu.Unlock() - - // Build packet with sequence - data := make([]byte, proto.Size()+2) - binary.BigEndian.PutUint16(data[:2], seq) - proto.Serialize(data[2:], 0) - - // Track for retransmission - pending := &pendingPacket{ - packet: proto.Copy(), - seq: seq, - sentTime: time.Now(), - attempts: 1, - nextRetry: time.Now().Add(s.rto), - } - - s.mu.Lock() - s.pendingAcks[seq] = pending - s.mu.Unlock() - - // Send with protocol header - return s.sendRaw(opcodes.OP_Packet, data) -} - // processQueues processes all packet queues func (s *Stream) processQueues() error { s.mu.Lock() + defer s.mu.Unlock() - // Process resends first (highest priority) + // Process resends first for len(s.resendQueue) > 0 { pending := s.resendQueue[0] s.resendQueue = s.resendQueue[1:] if time.Now().After(pending.nextRetry) { - s.mu.Unlock() - - // Rebuild packet data := make([]byte, pending.packet.Size()+2) binary.BigEndian.PutUint16(data[:2], pending.seq) pending.packet.Serialize(data[2:], 0) - // Update pending info pending.attempts++ pending.sentTime = time.Now() pending.nextRetry = time.Now().Add(s.rto * time.Duration(pending.attempts)) - // Check max retries if pending.attempts > s.maxRetries { - s.mu.Lock() delete(s.pendingAcks, pending.seq) - s.mu.Unlock() - // Connection likely dead - s.handleTimeout() + go s.handleTimeout() return fmt.Errorf("max retransmissions exceeded") } + s.mu.Unlock() atomic.AddUint64(&s.retransmits, 1) s.sendRaw(opcodes.OP_Packet, data) - s.mu.Lock() } else { - // Not time yet, re-queue s.resendQueue = append(s.resendQueue, pending) } } @@ -708,54 +325,462 @@ func (s *Stream) processQueues() error { for len(s.unreliableQueue) > 0 { proto := s.unreliableQueue[0] s.unreliableQueue = s.unreliableQueue[1:] - s.mu.Unlock() - // Send unreliable (no sequence/retransmit) data := make([]byte, proto.Size()) proto.Serialize(data, 0) - s.sendRaw(proto.Opcode, data) + s.mu.Unlock() + s.sendRaw(proto.Opcode, data) s.mu.Lock() } - // Check for retransmissions needed - now := time.Now() - for _, pending := range s.pendingAcks { - if now.After(pending.nextRetry) { - s.resendQueue = append(s.resendQueue, pending) - } - } - s.mu.Unlock() - - // Process any pending ACKs s.sendPendingAcks() return nil } // Helper methods +func (s *Stream) isUnreliableOpcode(emuOp opcodes.EmuOpcode) bool { + switch emuOp { + case opcodes.OP_UpdatePositionMsg, + opcodes.OP_WorldPingMsg: + return true + default: + return false + } +} -func (s *Stream) processPacketData(data []byte) error { - // Create ProtoPacket from raw data - proto := packets.NewProtoPacketFromRaw(data, -1, s.opcodeManager) +func (s *Stream) incrementSequence(seq uint16) uint16 { + if seq == 0xFFFF { + return 0 + } + return seq + 1 +} - // Decompress if needed - if err := proto.DecompressPacket(); err != nil { - return err +func (s *Stream) isSequenceAhead(seq, base uint16) bool { + diff := seq - base + return diff > 0 && diff < 0x8000 +} + +func (s *Stream) handleSessionRequest(data []byte) error { + if len(data) < 10 { + return fmt.Errorf("session request too small") } - // Convert to AppPacket - app := proto.MakeApplicationPacket(s.opcodeSize) + version := binary.BigEndian.Uint32(data[:4]) + sessionID := binary.BigEndian.Uint32(data[4:8]) + maxLen := binary.BigEndian.Uint16(data[8:10]) - // Deliver to callback - if s.onPacket != nil { - go s.onPacket(app) + s.mu.Lock() + s.sessionID = sessionID + if maxLen < s.maxLen { + s.maxLen = maxLen + } + s.mu.Unlock() + + response := make([]byte, 15) + binary.BigEndian.PutUint32(response[0:4], sessionID) + binary.BigEndian.PutUint32(response[4:8], s.crcKey) + response[8] = 2 + binary.BigEndian.PutUint16(response[9:11], s.maxLen) + binary.BigEndian.PutUint32(response[11:15], version) + + s.state.Store(StateEstablished) + return s.sendRaw(opcodes.OP_SessionResponse, response) +} + +func (s *Stream) handleSessionResponse(data []byte) error { + if len(data) < 14 { + return fmt.Errorf("session response too small") + } + + sessionID := binary.BigEndian.Uint32(data[0:4]) + crcKey := binary.BigEndian.Uint32(data[4:8]) + maxLen := binary.BigEndian.Uint16(data[9:11]) + + s.mu.Lock() + s.sessionID = sessionID + s.crcKey = crcKey + if maxLen < s.maxLen { + s.maxLen = maxLen + } + s.mu.Unlock() + + s.state.Store(StateEstablished) + return nil +} + +func (s *Stream) handlePacket(data []byte) error { + if len(data) < 2 { + return nil + } + + seq := binary.BigEndian.Uint16(data[:2]) + data = data[2:] + + s.mu.Lock() + defer s.mu.Unlock() + + if seq == s.seqExpected { + s.seqExpected = s.incrementSequence(s.seqExpected) + s.ackQueue = append(s.ackQueue, seq) + + if err := s.processPacketData(data); err != nil { + return err + } + + for { + if buffered, exists := s.outOfOrder[s.seqExpected]; exists { + delete(s.outOfOrder, s.seqExpected) + s.ackQueue = append(s.ackQueue, s.seqExpected) + s.seqExpected = s.incrementSequence(s.seqExpected) + if err := s.processPacketData(buffered.data); err != nil { + return err + } + } else { + break + } + } + } else if s.isSequenceAhead(seq, s.seqExpected) { + s.outOfOrder[seq] = &outOfOrderPacket{ + data: append([]byte(nil), data...), + timestamp: time.Now(), + } + go s.sendOutOfOrderAck(seq) + } else { + go s.sendAckImmediate(seq) + } + + s.scheduleAck() + return nil +} + +func (s *Stream) handleFragment(data []byte) error { + if len(data) < 10 { + return nil + } + + seq := binary.BigEndian.Uint16(data[:2]) + fragID := binary.BigEndian.Uint32(data[2:6]) + fragTotal := binary.BigEndian.Uint16(data[6:8]) + fragCur := binary.BigEndian.Uint16(data[8:10]) + data = data[10:] + + s.mu.Lock() + defer s.mu.Unlock() + + frag, exists := s.fragments[uint16(fragID)] + if !exists { + frag = &fragmentBuffer{ + totalSize: uint32(fragTotal), + chunks: make(map[uint16][]byte), + startTime: time.Now(), + } + s.fragments[uint16(fragID)] = frag + } + + frag.chunks[fragCur] = append([]byte(nil), data...) + frag.received++ + + if frag.received == uint32(fragTotal) { + complete := make([]byte, 0) + for i := uint16(0); i < fragTotal; i++ { + if chunk, ok := frag.chunks[i]; ok { + complete = append(complete, chunk...) + } else { + return nil + } + } + delete(s.fragments, uint16(fragID)) + return s.processPacketData(complete) + } + + s.ackQueue = append(s.ackQueue, seq) + s.scheduleAck() + return nil +} + +func (s *Stream) handleAck(data []byte) error { + if len(data) < 2 { + return nil + } + + ackSeq := binary.BigEndian.Uint16(data[:2]) + + s.mu.Lock() + defer s.mu.Unlock() + + if pending, exists := s.pendingAcks[ackSeq]; exists { + delete(s.pendingAcks, ackSeq) + delete(s.duplicateAckCnt, ackSeq) + + sample := time.Since(pending.sentTime) + if s.rtt == 0 { + s.rtt = sample + s.rttVar = sample / 2 + } else { + alpha := 0.125 + beta := 0.25 + s.rttVar = time.Duration((1-beta)*float64(s.rttVar) + beta*float64(absTime(s.rtt-sample))) + s.rtt = time.Duration((1-alpha)*float64(s.rtt) + alpha*float64(sample)) + } + s.rto = s.rtt + 4*s.rttVar + if s.rto < s.minRTO { + s.rto = s.minRTO + } + if s.rto > s.maxRTO { + s.rto = s.maxRTO + } + } else { + s.duplicateAckCnt[ackSeq]++ + + if s.duplicateAckCnt[ackSeq] >= duplicateAckThreshold { + for seq, pending := range s.pendingAcks { + if s.isSequenceAhead(seq, ackSeq) { + s.resendQueue = append(s.resendQueue, pending) + } + } + delete(s.duplicateAckCnt, ackSeq) + } + } + + if s.isSequenceAhead(ackSeq, s.seqLastAck) { + s.seqLastAck = ackSeq } return nil } +func (s *Stream) handleCombined(data []byte) error { + pos := 0 + for pos < len(data) { + if pos+1 > len(data) { + break + } + + size := uint16(data[pos]) + pos++ + + if size == 0xff { + if pos+2 > len(data) { + break + } + size = binary.BigEndian.Uint16(data[pos : pos+2]) + pos += 2 + } + + if pos+int(size) > len(data) { + break + } + + packet := data[pos : pos+int(size)] + pos += int(size) + + if err := s.Process(packet); err != nil { + return err + } + } + return nil +} + +func (s *Stream) handleAppCombined(data []byte) error { + pos := 0 + for pos < len(data) { + if pos+1 > len(data) { + break + } + + size := uint16(data[pos]) + pos++ + + if size == 0xff { + if pos+2 > len(data) { + break + } + size = binary.BigEndian.Uint16(data[pos : pos+2]) + pos += 2 + } + + if pos+int(size) > len(data) { + break + } + + packet := data[pos : pos+int(size)] + pos += int(size) + + if err := s.processPacketData(packet); err != nil { + return err + } + } + return nil +} + +func (s *Stream) handleOutOfOrderAck(data []byte) error { + if len(data) < 2 { + return nil + } + + seq := binary.BigEndian.Uint16(data[:2]) + + s.mu.Lock() + defer s.mu.Unlock() + + if pending, exists := s.pendingAcks[seq]; exists { + s.resendQueue = append(s.resendQueue, pending) + } + + return nil +} + +func (s *Stream) handleDisconnect() error { + s.state.Store(StateClosed) + if s.onDisconnect != nil { + go s.onDisconnect() + } + return nil +} + +func (s *Stream) sendFragmented(proto *packets.ProtoPacket) error { + data := make([]byte, proto.Size()) + proto.Serialize(data, 0) + + fragSize := int(s.maxLen) - 12 + numFrags := (len(data) + fragSize - 1) / fragSize + + if numFrags > 0xFFFF { + return fmt.Errorf("packet too large to fragment") + } + + s.mu.Lock() + fragID := s.nextFragID + s.nextFragID++ + s.mu.Unlock() + + for i := 0; i < numFrags; i++ { + start := i * fragSize + end := start + fragSize + if end > len(data) { + end = len(data) + } + + fragHeader := make([]byte, 10) + binary.BigEndian.PutUint32(fragHeader[0:4], uint32(fragID)) + binary.BigEndian.PutUint16(fragHeader[4:6], uint16(numFrags)) + binary.BigEndian.PutUint16(fragHeader[6:8], uint16(i)) + binary.BigEndian.PutUint16(fragHeader[8:10], uint16(end-start)) + + fragment := append(fragHeader, data[start:end]...) + fragProto := packets.NewProtoPacket(opcodes.OP_Fragment, fragment, s.opcodeManager) + + if err := s.sendReliable(fragProto); err != nil { + return err + } + } + + return nil +} + +func (s *Stream) sendReliable(proto *packets.ProtoPacket) error { + s.mu.Lock() + seq := s.seqOut + s.seqOut = s.incrementSequence(s.seqOut) + s.mu.Unlock() + + data := make([]byte, proto.Size()+2) + binary.BigEndian.PutUint16(data[:2], seq) + proto.Serialize(data[2:], 0) + + pending := &pendingPacket{ + packet: proto.Copy(), + seq: seq, + sentTime: time.Now(), + attempts: 1, + nextRetry: time.Now().Add(s.rto), + } + + s.mu.Lock() + s.pendingAcks[seq] = pending + s.mu.Unlock() + + return s.sendRaw(opcodes.OP_Packet, data) +} + +func (s *Stream) processPacketData(data []byte) error { + proto := packets.NewProtoPacketFromRaw(data, -1, s.opcodeManager) + if err := proto.DecompressPacket(); err != nil { + return err + } + app := proto.MakeApplicationPacket(s.opcodeSize) + if s.onPacket != nil { + go s.onPacket(app) + } + return nil +} + +func (s *Stream) processRetransmits() { + if s.state.Load() != StateEstablished { + return + } + + s.mu.Lock() + now := time.Now() + + for seq, pending := range s.pendingAcks { + if now.After(pending.nextRetry) { + newPending := &pendingPacket{ + packet: pending.packet.Copy(), + seq: seq, + sentTime: pending.sentTime, + attempts: pending.attempts, + nextRetry: pending.nextRetry, + } + s.resendQueue = append(s.resendQueue, newPending) + } + } + s.mu.Unlock() + + if len(s.resendQueue) > 0 { + s.processQueues() + } + + if s.retransmitTimer != nil { + s.retransmitTimer.Reset(retransmitInterval) + } +} + +func (s *Stream) cleanup() { + if s.state.Load() != StateEstablished { + return + } + + s.mu.Lock() + now := time.Now() + + for id, frag := range s.fragments { + if now.Sub(frag.startTime) > fragmentTimeout { + delete(s.fragments, id) + } + } + + for seq, oop := range s.outOfOrder { + if now.Sub(oop.timestamp) > outOfOrderTimeout { + delete(s.outOfOrder, seq) + } + } + + for seq := range s.duplicateAckCnt { + if _, exists := s.pendingAcks[seq]; !exists { + delete(s.duplicateAckCnt, seq) + } + } + s.mu.Unlock() + + if s.cleanupTimer != nil { + s.cleanupTimer.Reset(cleanupInterval) + } +} + func (s *Stream) scheduleAck() { if s.ackTimer == nil { s.ackTimer = time.AfterFunc(s.ackThreshold, func() { @@ -771,7 +796,6 @@ func (s *Stream) sendPendingAcks() { return } - // Send all pending ACKs for _, seq := range s.ackQueue { data := make([]byte, 2) binary.BigEndian.PutUint16(data, seq) @@ -799,7 +823,6 @@ func (s *Stream) sendKeepalive() { if s.state.Load() == StateEstablished { s.sendRaw(opcodes.OP_KeepAlive, nil) } - // Reschedule if s.keepAliveTimer != nil { s.keepAliveTimer.Reset(10 * time.Second) } @@ -818,31 +841,18 @@ func (s *Stream) handleTimeout() { } } -func (s *Stream) handleDisconnect() error { - s.state.Store(StateClosed) - if s.onDisconnect != nil { - go s.onDisconnect() - } - return nil -} - -// sendRaw sends raw data with protocol opcode func (s *Stream) sendRaw(opcode uint16, data []byte) error { - // Build packet: opcode + data + CRC packet := make([]byte, 2+len(data)+2) binary.BigEndian.PutUint16(packet[:2], opcode) copy(packet[2:], data) - // Add CRC crc := crypto.CalculateCRC(packet[:len(packet)-2], s.crcKey) binary.BigEndian.PutUint16(packet[len(packet)-2:], crc) - // Encrypt if needed if s.cipher != nil { s.cipher.Encrypt(packet) } - // Send via gnet atomic.AddUint64(&s.packetsOut, 1) atomic.AddUint64(&s.bytesOut, uint64(len(packet))) @@ -858,33 +868,25 @@ func (s *Stream) appToProto(app *packets.AppPacket) *packets.ProtoPacket { } // Public methods - -// SetPacketCallback sets the callback for received packets func (s *Stream) SetPacketCallback(fn func(*packets.AppPacket)) { s.onPacket = fn } -// SetDisconnectCallback sets the callback for disconnection func (s *Stream) SetDisconnectCallback(fn func()) { s.onDisconnect = fn } -// GetState returns current stream state func (s *Stream) GetState() StreamState { return s.state.Load().(StreamState) } -// IsConnected returns true if stream is established func (s *Stream) IsConnected() bool { return s.GetState() == StateEstablished } -// SendSessionRequest initiates client connection func (s *Stream) SendSessionRequest() error { - // Build session request packet - // Format: version(4) + sessionID(4) + maxLen(2) data := make([]byte, 10) - binary.BigEndian.PutUint32(data[0:4], 2) // Protocol version + binary.BigEndian.PutUint32(data[0:4], 2) binary.BigEndian.PutUint32(data[4:8], s.sessionID) binary.BigEndian.PutUint16(data[8:10], s.maxLen) @@ -892,35 +894,18 @@ func (s *Stream) SendSessionRequest() error { return s.sendRaw(opcodes.OP_SessionRequest, data) } -// validateSession checks if packet belongs to this session -func (s *Stream) validateSession(sessionID uint32) bool { - if s.sessionID == 0 { - // Not yet established, accept any - return true - } - if sessionID != s.sessionID { - // Wrong session - send out of session response - go s.sendRaw(opcodes.OP_OutOfSession, nil) - return false - } - return true -} - -// GetSessionID returns current session ID func (s *Stream) GetSessionID() uint32 { s.mu.RLock() defer s.mu.RUnlock() return s.sessionID } -// SetSessionID sets the session ID (for client mode) func (s *Stream) SetSessionID(id uint32) { s.mu.Lock() s.sessionID = id s.mu.Unlock() } -// GetRemoteAddr returns remote address func (s *Stream) GetRemoteAddr() string { if s.conn != nil { return s.conn.RemoteAddr().String() @@ -928,18 +913,14 @@ func (s *Stream) GetRemoteAddr() string { return "" } -// Close closes the stream func (s *Stream) Close() error { if s.state.Load() == StateClosed { return nil } s.state.Store(StateClosed) - - // Send disconnect s.sendRaw(opcodes.OP_SessionDisconnect, nil) - // Clean up timers if s.keepAliveTimer != nil { s.keepAliveTimer.Stop() } @@ -949,8 +930,13 @@ func (s *Stream) Close() error { if s.timeoutTimer != nil { s.timeoutTimer.Stop() } + if s.retransmitTimer != nil { + s.retransmitTimer.Stop() + } + if s.cleanupTimer != nil { + s.cleanupTimer.Stop() + } - // Clear queues s.mu.Lock() s.reliableQueue = nil s.unreliableQueue = nil @@ -963,8 +949,6 @@ func (s *Stream) Close() error { return nil } -// Utility functions - func absTime(d time.Duration) time.Duration { if d < 0 { return -d