package stream import ( "encoding/binary" "fmt" "sync" "sync/atomic" "time" "git.sharkk.net/EQ2/Protocol/crypto" "git.sharkk.net/EQ2/Protocol/opcodes" "git.sharkk.net/EQ2/Protocol/packets" "github.com/panjf2000/gnet/v2" ) // Stream implements EQ2's reliable UDP protocol (matches EQStream) type Stream struct { conn gnet.Conn mu sync.RWMutex // Connection state state atomic.Value // StreamState sessionID uint32 crcKey uint32 maxLen uint16 encodeKey int decodeKey int // Sequence management with wraparound handling seqOut uint16 seqIn uint16 seqLastAck uint16 seqExpected uint16 // Acknowledgment tracking pendingAcks map[uint16]*pendingPacket ackTimer *time.Timer lastAckSent time.Time ackThreshold time.Duration ackQueue []uint16 duplicateAckCnt map[uint16]int // Fragment assembly with expiry fragments map[uint16]*fragmentBuffer currentFragment *fragmentBuffer // Out of order handling with expiry outOfOrder map[uint16]*outOfOrderPacket // Combined packet for efficient sending combinedPacket *packets.ProtoPacket // Packet queues reliableQueue []*packets.ProtoPacket unreliableQueue []*packets.ProtoPacket resendQueue []*pendingPacket // Opcode management opcodeManager opcodes.Manager opcodeSize uint8 clientVersion int16 // Cipher for encryption cipher *crypto.Ciphers // Timers keepAliveTimer *time.Timer timeoutTimer *time.Timer retransmitTimer *time.Timer cleanupTimer *time.Timer combineTimer *time.Timer // Retransmission settings rtt time.Duration rttVar time.Duration rto time.Duration minRTO time.Duration maxRTO time.Duration maxRetries int // Stats packetsOut uint64 packetsIn uint64 bytesOut uint64 bytesIn uint64 retransmits uint64 // Callbacks onPacket func(*packets.AppPacket) onDisconnect func() } type pendingPacket struct { packet *packets.ProtoPacket seq uint16 sentTime time.Time attempts int nextRetry time.Time } type fragmentBuffer struct { totalSize uint32 chunks map[uint16][]byte received uint32 startTime time.Time } type outOfOrderPacket struct { data []byte timestamp time.Time } type StreamState int const ( StateDisconnected StreamState = iota StateConnecting StateEstablished StateClosing StateClosed ) const ( retransmitInterval = 100 * time.Millisecond cleanupInterval = 5 * time.Second fragmentTimeout = 30 * time.Second outOfOrderTimeout = 10 * time.Second duplicateAckThreshold = 3 combineFlushInterval = 10 * time.Millisecond ) // Config holds stream configuration type Config struct { SessionID uint32 CRCKey uint32 MaxPacketSize uint16 EncodeKey int DecodeKey int OpcodeManager opcodes.Manager OpcodeSize uint8 AckThreshold time.Duration KeepaliveTime time.Duration TimeoutDuration time.Duration } // NewStream creates a new EQStream instance func NewStream(conn gnet.Conn, cfg *Config) *Stream { s := &Stream{ conn: conn, sessionID: cfg.SessionID, crcKey: cfg.CRCKey, maxLen: cfg.MaxPacketSize, encodeKey: cfg.EncodeKey, decodeKey: cfg.DecodeKey, opcodeManager: cfg.OpcodeManager, opcodeSize: cfg.OpcodeSize, ackThreshold: cfg.AckThreshold, pendingAcks: make(map[uint16]*pendingPacket), duplicateAckCnt: make(map[uint16]int), fragments: make(map[uint16]*fragmentBuffer), outOfOrder: make(map[uint16]*outOfOrderPacket), ackQueue: make([]uint16, 0), seqOut: 0, seqIn: 0, seqExpected: 0, rtt: time.Second, rttVar: 500 * time.Millisecond, rto: time.Second, minRTO: 200 * time.Millisecond, maxRTO: 10 * time.Second, maxRetries: 10, } if s.maxLen == 0 { s.maxLen = packets.DefaultMTU } if s.ackThreshold == 0 { s.ackThreshold = 200 * time.Millisecond } s.state.Store(StateDisconnected) if cfg.EncodeKey != 0 || cfg.DecodeKey != 0 { cipher, _ := crypto.NewCiphers(int64(cfg.EncodeKey)) s.cipher = cipher } // Start timers if cfg.KeepaliveTime > 0 { s.keepAliveTimer = time.AfterFunc(cfg.KeepaliveTime, s.sendKeepalive) } if cfg.TimeoutDuration > 0 { s.timeoutTimer = time.AfterFunc(cfg.TimeoutDuration, s.handleTimeout) } s.retransmitTimer = time.AfterFunc(retransmitInterval, s.processRetransmits) s.cleanupTimer = time.AfterFunc(cleanupInterval, s.cleanup) s.combineTimer = time.AfterFunc(combineFlushInterval, s.flushCombined) return s } // Process handles incoming data with proper CRC validation func (s *Stream) Process(data []byte) error { if len(data) < 2 { return nil } // Validate and strip CRC16 if len(data) > 2 { if !packets.ValidateCRC(data, s.crcKey) { return nil } data = packets.StripCRC(data) } // Decrypt if needed if s.cipher != nil { s.cipher.Decrypt(data) } if len(data) < 2 { return nil } opcode := binary.BigEndian.Uint16(data[:2]) switch opcode { case opcodes.OP_SessionRequest: return s.handleSessionRequest(data[2:]) case opcodes.OP_SessionResponse: return s.handleSessionResponse(data[2:]) case opcodes.OP_Packet: return s.handlePacket(data[2:]) case opcodes.OP_Fragment: return s.handleFragment(data[2:]) case opcodes.OP_Ack: return s.handleAck(data[2:]) case opcodes.OP_Combined: return s.handleCombined(data[2:]) case opcodes.OP_AppCombined: return s.handleAppCombined(data[2:]) case opcodes.OP_KeepAlive: s.resetTimeout() return nil case opcodes.OP_SessionDisconnect: return s.handleDisconnect() case opcodes.OP_OutOfOrderAck: return s.handleOutOfOrderAck(data[2:]) } atomic.AddUint64(&s.packetsIn, 1) atomic.AddUint64(&s.bytesIn, uint64(len(data))) return nil } // SendPacket sends an application packet with proper preparation func (s *Stream) SendPacket(app *packets.AppPacket) error { if s.state.Load() != StateEstablished { return fmt.Errorf("stream not established") } // Convert to EQ2 protocol packet proto := packets.NewEQ2Packet(app.GetOpcode(), app.Buffer, s.opcodeManager) proto.SetVersion(s.clientVersion) // Check if packet needs fragmentation BEFORE preparation // C++ checks size > (MaxLen - 8) for: proto-op(2), seq(2), app-op(2) ... data ... crc(2) if len(app.Buffer) > int(s.maxLen-8) { return s.sendFragmented(proto) } // Try to combine with pending combined packet s.mu.Lock() defer s.mu.Unlock() if s.combinedPacket != nil { if s.combinedPacket.AppCombine(proto) { if s.combinedPacket.Size() > uint32(s.maxLen/2) { s.flushCombinedPacketLocked() } return nil } s.flushCombinedPacketLocked() } // Queue based on reliability isUnreliable := s.isUnreliableOpcode(app.GetOpcode()) if isUnreliable { s.unreliableQueue = append(s.unreliableQueue, proto) } else { // Start new combined packet if small enough if proto.Size() < uint32(s.maxLen/4) { s.combinedPacket = proto s.combineTimer.Reset(combineFlushInterval) } else { s.reliableQueue = append(s.reliableQueue, proto) } } return s.processQueues() } // flushCombined timer callback func (s *Stream) flushCombined() { s.mu.Lock() s.flushCombinedPacketLocked() s.mu.Unlock() s.processQueues() s.combineTimer.Reset(combineFlushInterval) } // flushCombinedPacketLocked sends any pending combined packet (must hold lock) func (s *Stream) flushCombinedPacketLocked() { if s.combinedPacket != nil { s.reliableQueue = append(s.reliableQueue, s.combinedPacket) s.combinedPacket = nil } } // processQueues processes all packet queues func (s *Stream) processQueues() error { s.mu.Lock() defer s.mu.Unlock() // Process resends first for len(s.resendQueue) > 0 { pending := s.resendQueue[0] s.resendQueue = s.resendQueue[1:] if time.Now().After(pending.nextRetry) { // Resend already prepared packet proto := pending.packet var encryptOffset int8 if proto.LoginOp != opcodes.OP_Unknown { encryptOffset = proto.PreparePacket(s.maxLen) } data := make([]byte, len(proto.Buffer)+2) binary.BigEndian.PutUint16(data[:2], pending.seq) copy(data[2:], proto.Buffer) // Apply encryption if needed (BEFORE CRC, with proper offset) if s.cipher != nil && len(data) > 2 { if proto.IsCompressed() { // Compressed packet: encrypt from offset 3 if len(data) > 3 { s.cipher.Encrypt(data[3:]) } } else { // Uncompressed packet: encrypt from 2 + offset offset := 2 + int(encryptOffset) if len(data) > offset { s.cipher.Encrypt(data[offset:]) } } } pending.attempts++ pending.sentTime = time.Now() pending.nextRetry = time.Now().Add(s.rto * time.Duration(pending.attempts)) if pending.attempts > s.maxRetries { delete(s.pendingAcks, pending.seq) go s.handleTimeout() return fmt.Errorf("max retransmissions exceeded") } s.mu.Unlock() atomic.AddUint64(&s.retransmits, 1) s.sendRawWithCRC(opcodes.OP_Packet, data) s.mu.Lock() } else { s.resendQueue = append(s.resendQueue, pending) } } // Process reliable queue for len(s.reliableQueue) > 0 { proto := s.reliableQueue[0] s.reliableQueue = s.reliableQueue[1:] s.mu.Unlock() if err := s.sendReliable(proto); err != nil { return err } s.mu.Lock() } // Process unreliable queue for len(s.unreliableQueue) > 0 { proto := s.unreliableQueue[0] s.unreliableQueue = s.unreliableQueue[1:] // Prepare unreliable packets too var encryptOffset int8 if proto.LoginOp != opcodes.OP_Unknown { encryptOffset = proto.PreparePacket(s.maxLen) } data := make([]byte, len(proto.Buffer)) copy(data, proto.Buffer) // Apply encryption if needed (BEFORE CRC) if s.cipher != nil && len(data) > 0 { if proto.IsCompressed() { // Compressed: encrypt from offset 1 (skip compress flag) if len(data) > 1 { s.cipher.Encrypt(data[1:]) } } else if encryptOffset > 0 { // Uncompressed: encrypt from offset if len(data) > int(encryptOffset) { s.cipher.Encrypt(data[encryptOffset:]) } } } s.mu.Unlock() s.sendRawWithCRC(proto.Opcode, data) s.mu.Lock() } s.mu.Unlock() s.sendPendingAcks() return nil } // sendFragmented sends a fragmented packet (matches EQStream::SendPacket) func (s *Stream) sendFragmented(proto *packets.ProtoPacket) error { // Prepare packet first encryptOffset := proto.PreparePacket(s.maxLen) if encryptOffset < 0 { return fmt.Errorf("failed to prepare packet") } // Encrypt the prepared packet data BEFORE fragmentation data := proto.Buffer if s.cipher != nil && len(data) > 0 { if proto.IsCompressed() { // Compressed: encrypt from offset 1 (skip compress flag) if len(data) > 1 { s.cipher.Encrypt(data[1:]) } } else if encryptOffset > 0 { // Uncompressed: encrypt from offset if len(data) > int(encryptOffset) { s.cipher.Encrypt(data[encryptOffset:]) } } } length := uint32(len(data)) // First fragment with total length out := packets.NewProtoPacket(opcodes.OP_Fragment, nil, s.opcodeManager) out.Buffer = make([]byte, s.maxLen-4) binary.BigEndian.PutUint32(out.Buffer[:4], length) used := copy(out.Buffer[4:], data) out.Buffer = out.Buffer[:4+used] // Fragment packets are sent without additional encryption if err := s.sendReliableFragment(out); err != nil { return err } // Send remaining fragments pos := used for pos < int(length) { chunkSize := int(length) - pos if chunkSize > int(s.maxLen)-4 { chunkSize = int(s.maxLen) - 4 } frag := packets.NewProtoPacket(opcodes.OP_Fragment, data[pos:pos+chunkSize], s.opcodeManager) if err := s.sendReliableFragment(frag); err != nil { return err } pos += chunkSize } return nil } // sendReliableFragment sends a fragment packet (already encrypted data) func (s *Stream) sendReliableFragment(proto *packets.ProtoPacket) error { s.mu.Lock() seq := s.seqOut s.seqOut = s.incrementSequence(s.seqOut) s.mu.Unlock() // Build packet with sequence (data is already encrypted) data := make([]byte, len(proto.Buffer)+2) binary.BigEndian.PutUint16(data[:2], seq) copy(data[2:], proto.Buffer) pending := &pendingPacket{ packet: proto.Copy(), seq: seq, sentTime: time.Now(), attempts: 1, nextRetry: time.Now().Add(s.rto), } s.mu.Lock() s.pendingAcks[seq] = pending s.mu.Unlock() return s.sendRawWithCRC(opcodes.OP_Packet, data) } // sendReliable sends a packet reliably with sequencing (matches C++ flow) func (s *Stream) sendReliable(proto *packets.ProtoPacket) error { // Prepare packet (matches C++ PreparePacket) if !proto.IsPrepared() { // C++ checks (app->PreparePacket(MaxLen) == 255) but PreparePacket returns int8(-1) // Since -1 != 255 as int8, this check would never trigger in C++ // For 1:1 compatibility, we don't check the return value proto.PreparePacket(s.maxLen) } // Compress if needed (matches C++ EQ2_Compress) compressedOffset := int8(0) if !proto.IsCompressed() && proto.Size() > 128 { compressedOffset = proto.EQ2Compress(3) // Default offset 3 } // Encrypt if needed (matches C++ EncryptPacket) if s.cipher != nil && !proto.IsEncrypted() { proto.EncryptPacket(s.cipher, compressedOffset, 0) } // Assign sequence number (matches C++ SequencedPush) s.mu.Lock() seq := s.seqOut // Set sequence in packet buffer (first 2 bytes) binary.BigEndian.PutUint16(proto.Buffer[:2], seq) proto.SetSequence(int32(seq)) s.seqOut = s.incrementSequence(s.seqOut) pending := &pendingPacket{ packet: proto.Copy(), seq: seq, sentTime: time.Now(), attempts: 1, nextRetry: time.Now().Add(s.rto), } s.pendingAcks[seq] = pending s.mu.Unlock() // Send via WritePacket equivalent return s.writePacket(proto) } // writePacket sends a protocol packet (matches C++ WritePacket) func (s *Stream) writePacket(p *packets.ProtoPacket) error { // Serialize packet data := make([]byte, p.Size()+2) length := p.Serialize(data, 0) // Add CRC for non-session packets if p.Opcode != opcodes.OP_SessionRequest && p.Opcode != opcodes.OP_SessionResponse { data = data[:length] data = packets.AppendCRC(data, s.crcKey) } atomic.AddUint64(&s.packetsOut, 1) atomic.AddUint64(&s.bytesOut, uint64(len(data))) return s.conn.AsyncWrite(data, nil) } // Helper methods func (s *Stream) isUnreliableOpcode(emuOp opcodes.EmuOpcode) bool { switch emuOp { case opcodes.OP_UpdatePositionMsg, opcodes.OP_WorldPingMsg: return true default: return false } } func (s *Stream) incrementSequence(seq uint16) uint16 { if seq == 0xFFFF { return 0 } return seq + 1 } func (s *Stream) isSequenceAhead(seq, base uint16) bool { diff := seq - base return diff > 0 && diff < 0x8000 } // Protocol handlers func (s *Stream) handleSessionRequest(data []byte) error { if len(data) < 10 { return fmt.Errorf("session request too small") } version := binary.BigEndian.Uint32(data[:4]) sessionID := binary.BigEndian.Uint32(data[4:8]) maxLen := binary.BigEndian.Uint16(data[8:10]) s.mu.Lock() s.sessionID = sessionID if maxLen < s.maxLen { s.maxLen = maxLen } s.mu.Unlock() response := make([]byte, 15) binary.BigEndian.PutUint32(response[0:4], sessionID) binary.BigEndian.PutUint32(response[4:8], s.crcKey) response[8] = 2 binary.BigEndian.PutUint16(response[9:11], s.maxLen) binary.BigEndian.PutUint32(response[11:15], version) s.state.Store(StateEstablished) // Session packets don't have CRC or encryption (matches C++) return s.sendRaw(opcodes.OP_SessionResponse, response) } func (s *Stream) handleSessionResponse(data []byte) error { if len(data) < 14 { return fmt.Errorf("session response too small") } sessionID := binary.BigEndian.Uint32(data[0:4]) crcKey := binary.BigEndian.Uint32(data[4:8]) maxLen := binary.BigEndian.Uint16(data[9:11]) s.mu.Lock() s.sessionID = sessionID s.crcKey = crcKey if maxLen < s.maxLen { s.maxLen = maxLen } s.mu.Unlock() s.state.Store(StateEstablished) return nil } func (s *Stream) handlePacket(data []byte) error { if len(data) < 2 { return nil } seq := binary.BigEndian.Uint16(data[:2]) data = data[2:] s.mu.Lock() defer s.mu.Unlock() if seq == s.seqExpected { s.seqExpected = s.incrementSequence(s.seqExpected) s.ackQueue = append(s.ackQueue, seq) if err := s.processPacketData(data); err != nil { return err } // Process any buffered out-of-order packets for { if buffered, exists := s.outOfOrder[s.seqExpected]; exists { delete(s.outOfOrder, s.seqExpected) s.ackQueue = append(s.ackQueue, s.seqExpected) s.seqExpected = s.incrementSequence(s.seqExpected) if err := s.processPacketData(buffered.data); err != nil { return err } } else { break } } } else if s.isSequenceAhead(seq, s.seqExpected) { // Out of order - buffer it s.outOfOrder[seq] = &outOfOrderPacket{ data: append([]byte(nil), data...), timestamp: time.Now(), } go s.sendOutOfOrderAck(seq) } else { // Duplicate packet - just ack it go s.sendAckImmediate(seq) } s.scheduleAck() return nil } func (s *Stream) handleFragment(data []byte) error { if len(data) < 2 { return nil } seq := binary.BigEndian.Uint16(data[:2]) data = data[2:] s.mu.Lock() defer s.mu.Unlock() // Check if this is start of new fragment stream if s.currentFragment == nil && len(data) >= 4 { totalLen := binary.BigEndian.Uint32(data[:4]) s.currentFragment = &fragmentBuffer{ totalSize: totalLen, chunks: make(map[uint16][]byte), startTime: time.Now(), } if len(data) > 4 { s.currentFragment.chunks[0] = append([]byte(nil), data[4:]...) s.currentFragment.received = uint32(len(data) - 4) } } else if s.currentFragment != nil { // Continuation fragment chunkNum := uint16(len(s.currentFragment.chunks)) s.currentFragment.chunks[chunkNum] = append([]byte(nil), data...) s.currentFragment.received += uint32(len(data)) // Check if complete if s.currentFragment.received >= s.currentFragment.totalSize { complete := make([]byte, 0, s.currentFragment.totalSize) for i := uint16(0); i < uint16(len(s.currentFragment.chunks)); i++ { if chunk, ok := s.currentFragment.chunks[i]; ok { complete = append(complete, chunk...) } } s.currentFragment = nil return s.processPacketData(complete) } } s.ackQueue = append(s.ackQueue, seq) s.scheduleAck() return nil } func (s *Stream) handleAck(data []byte) error { if len(data) < 2 { return nil } ackSeq := binary.BigEndian.Uint16(data[:2]) s.mu.Lock() defer s.mu.Unlock() if pending, exists := s.pendingAcks[ackSeq]; exists { delete(s.pendingAcks, ackSeq) delete(s.duplicateAckCnt, ackSeq) // Update RTT sample := time.Since(pending.sentTime) if s.rtt == 0 { s.rtt = sample s.rttVar = sample / 2 } else { alpha := 0.125 beta := 0.25 s.rttVar = time.Duration((1-beta)*float64(s.rttVar) + beta*float64(absTime(s.rtt-sample))) s.rtt = time.Duration((1-alpha)*float64(s.rtt) + alpha*float64(sample)) } s.rto = s.rtt + 4*s.rttVar if s.rto < s.minRTO { s.rto = s.minRTO } if s.rto > s.maxRTO { s.rto = s.maxRTO } } else { // Duplicate ACK s.duplicateAckCnt[ackSeq]++ if s.duplicateAckCnt[ackSeq] >= duplicateAckThreshold { // Fast retransmit for seq, pending := range s.pendingAcks { if s.isSequenceAhead(seq, ackSeq) { s.resendQueue = append(s.resendQueue, pending) } } delete(s.duplicateAckCnt, ackSeq) } } if s.isSequenceAhead(ackSeq, s.seqLastAck) { s.seqLastAck = ackSeq } return nil } func (s *Stream) handleCombined(data []byte) error { pos := 0 for pos < len(data) { if pos+1 > len(data) { break } size := uint16(data[pos]) pos++ if size == 0xff { if pos+2 > len(data) { break } size = binary.BigEndian.Uint16(data[pos : pos+2]) pos += 2 } if pos+int(size) > len(data) { break } packet := data[pos : pos+int(size)] pos += int(size) if err := s.Process(packet); err != nil { return err } } return nil } func (s *Stream) handleAppCombined(data []byte) error { pos := 0 for pos < len(data) { if pos+1 > len(data) { break } size := uint16(data[pos]) pos++ if size == 0xff { if pos+2 > len(data) { break } size = binary.BigEndian.Uint16(data[pos : pos+2]) pos += 2 } if pos+int(size) > len(data) { break } packet := data[pos : pos+int(size)] pos += int(size) if err := s.processPacketData(packet); err != nil { return err } } return nil } func (s *Stream) handleOutOfOrderAck(data []byte) error { if len(data) < 2 { return nil } seq := binary.BigEndian.Uint16(data[:2]) s.mu.Lock() defer s.mu.Unlock() if pending, exists := s.pendingAcks[seq]; exists { s.resendQueue = append(s.resendQueue, pending) } return nil } func (s *Stream) handleDisconnect() error { s.state.Store(StateClosed) if s.onDisconnect != nil { go s.onDisconnect() } return nil } // processPacketData processes received packet data func (s *Stream) processPacketData(data []byte) error { proto := packets.NewProtoPacketFromRaw(data, -1, s.opcodeManager) app := proto.MakeApplicationPacket(s.opcodeSize) if s.onPacket != nil { go s.onPacket(app) } return nil } // Timer handlers func (s *Stream) processRetransmits() { if s.state.Load() != StateEstablished { return } s.mu.Lock() now := time.Now() for seq, pending := range s.pendingAcks { if now.After(pending.nextRetry) { newPending := &pendingPacket{ packet: pending.packet.Copy(), seq: seq, sentTime: pending.sentTime, attempts: pending.attempts, nextRetry: pending.nextRetry, } s.resendQueue = append(s.resendQueue, newPending) } } s.mu.Unlock() if len(s.resendQueue) > 0 { s.processQueues() } if s.retransmitTimer != nil { s.retransmitTimer.Reset(retransmitInterval) } } func (s *Stream) cleanup() { if s.state.Load() != StateEstablished { return } s.mu.Lock() now := time.Now() // Clean up old fragments for id, frag := range s.fragments { if now.Sub(frag.startTime) > fragmentTimeout { delete(s.fragments, id) } } // Clean up old out-of-order packets for seq, oop := range s.outOfOrder { if now.Sub(oop.timestamp) > outOfOrderTimeout { delete(s.outOfOrder, seq) } } // Clean up duplicate ACK counts for seq := range s.duplicateAckCnt { if _, exists := s.pendingAcks[seq]; !exists { delete(s.duplicateAckCnt, seq) } } s.mu.Unlock() if s.cleanupTimer != nil { s.cleanupTimer.Reset(cleanupInterval) } } func (s *Stream) scheduleAck() { if s.ackTimer == nil { s.ackTimer = time.AfterFunc(s.ackThreshold, func() { s.sendPendingAcks() }) } } func (s *Stream) sendPendingAcks() { s.mu.Lock() if len(s.ackQueue) == 0 { s.mu.Unlock() return } for _, seq := range s.ackQueue { data := make([]byte, 2) binary.BigEndian.PutUint16(data, seq) s.sendRawWithCRC(opcodes.OP_Ack, data) } s.ackQueue = s.ackQueue[:0] s.lastAckSent = time.Now() s.mu.Unlock() } func (s *Stream) sendAckImmediate(seq uint16) error { data := make([]byte, 2) binary.BigEndian.PutUint16(data, seq) return s.sendRawWithCRC(opcodes.OP_Ack, data) } func (s *Stream) sendOutOfOrderAck(seq uint16) error { data := make([]byte, 2) binary.BigEndian.PutUint16(data, seq) return s.sendRawWithCRC(opcodes.OP_OutOfOrderAck, data) } func (s *Stream) sendKeepalive() { if s.state.Load() == StateEstablished { s.sendRawWithCRC(opcodes.OP_KeepAlive, nil) } if s.keepAliveTimer != nil { s.keepAliveTimer.Reset(10 * time.Second) } } func (s *Stream) resetTimeout() { if s.timeoutTimer != nil { s.timeoutTimer.Reset(30 * time.Second) } } func (s *Stream) handleTimeout() { s.state.Store(StateClosed) if s.onDisconnect != nil { s.onDisconnect() } } // sendRaw sends raw packet WITHOUT encryption or CRC (for special packets) func (s *Stream) sendRaw(opcode uint16, data []byte) error { packet := make([]byte, 2+len(data)) binary.BigEndian.PutUint16(packet[:2], opcode) copy(packet[2:], data) atomic.AddUint64(&s.packetsOut, 1) atomic.AddUint64(&s.bytesOut, uint64(len(packet))) return s.conn.AsyncWrite(packet, nil) } // sendRawWithCRC sends raw packet with CRC16 (matches C++ WritePacket) func (s *Stream) sendRawWithCRC(opcode uint16, data []byte) error { packet := make([]byte, 2+len(data)) binary.BigEndian.PutUint16(packet[:2], opcode) copy(packet[2:], data) // Add CRC16 AFTER encryption (matches C++ order) packet = packets.AppendCRC(packet, s.crcKey) atomic.AddUint64(&s.packetsOut, 1) atomic.AddUint64(&s.bytesOut, uint64(len(packet))) return s.conn.AsyncWrite(packet, nil) } // Public methods func (s *Stream) SetPacketCallback(fn func(*packets.AppPacket)) { s.onPacket = fn } func (s *Stream) SetDisconnectCallback(fn func()) { s.onDisconnect = fn } func (s *Stream) GetState() StreamState { return s.state.Load().(StreamState) } func (s *Stream) IsConnected() bool { return s.GetState() == StateEstablished } func (s *Stream) SendSessionRequest() error { data := make([]byte, 10) binary.BigEndian.PutUint32(data[0:4], 2) // protocol version binary.BigEndian.PutUint32(data[4:8], s.sessionID) binary.BigEndian.PutUint16(data[8:10], s.maxLen) s.state.Store(StateConnecting) // Session packets don't have CRC or encryption (matches C++) return s.sendRaw(opcodes.OP_SessionRequest, data) } func (s *Stream) GetSessionID() uint32 { s.mu.RLock() defer s.mu.RUnlock() return s.sessionID } func (s *Stream) SetSessionID(id uint32) { s.mu.Lock() s.sessionID = id s.mu.Unlock() } func (s *Stream) GetRemoteAddr() string { if s.conn != nil { return s.conn.RemoteAddr().String() } return "" } func (s *Stream) Close() error { if s.state.Load() == StateClosed { return nil } s.state.Store(StateClosed) s.sendRawWithCRC(opcodes.OP_SessionDisconnect, nil) // Stop all timers if s.keepAliveTimer != nil { s.keepAliveTimer.Stop() } if s.ackTimer != nil { s.ackTimer.Stop() } if s.timeoutTimer != nil { s.timeoutTimer.Stop() } if s.retransmitTimer != nil { s.retransmitTimer.Stop() } if s.cleanupTimer != nil { s.cleanupTimer.Stop() } if s.combineTimer != nil { s.combineTimer.Stop() } // Clear queues s.mu.Lock() s.reliableQueue = nil s.unreliableQueue = nil s.resendQueue = nil s.pendingAcks = nil s.fragments = nil s.outOfOrder = nil s.combinedPacket = nil s.currentFragment = nil s.mu.Unlock() return nil } // GetStats returns stream statistics func (s *Stream) GetStats() map[string]any { s.mu.RLock() defer s.mu.RUnlock() return map[string]any{ "packets_out": atomic.LoadUint64(&s.packetsOut), "packets_in": atomic.LoadUint64(&s.packetsIn), "bytes_out": atomic.LoadUint64(&s.bytesOut), "bytes_in": atomic.LoadUint64(&s.bytesIn), "retransmits": atomic.LoadUint64(&s.retransmits), "pending_acks": len(s.pendingAcks), "out_of_order": len(s.outOfOrder), "fragments": len(s.fragments), "rtt": s.rtt, "rto": s.rto, } } func absTime(d time.Duration) time.Duration { if d < 0 { return -d } return d }