package stream import ( "encoding/binary" "fmt" "sync" "sync/atomic" "time" "git.sharkk.net/EQ2/Protocol/crypto" "git.sharkk.net/EQ2/Protocol/opcodes" "git.sharkk.net/EQ2/Protocol/packets" "github.com/panjf2000/gnet/v2" ) // Stream implements EQ2's reliable UDP protocol type Stream struct { conn gnet.Conn mu sync.RWMutex // Connection state state atomic.Value // StreamState sessionID uint32 crcKey uint32 maxLen uint16 encodeKey int decodeKey int // Sequence management seqOut uint16 seqIn uint16 seqLastAck uint16 seqExpected uint16 // Acknowledgment tracking pendingAcks map[uint16]*pendingPacket ackTimer *time.Timer lastAckSent time.Time ackThreshold time.Duration ackQueue []uint16 // Sequences needing ACK // Fragment assembly fragments map[uint16]*fragmentBuffer nextFragID uint16 // Out of order handling outOfOrder map[uint16][]byte // Packet queues reliableQueue []*packets.ProtoPacket unreliableQueue []*packets.ProtoPacket resendQueue []*pendingPacket // Opcode management opcodeManager opcodes.Manager opcodeSize uint8 // Cipher for encryption cipher *crypto.Ciphers // Timers keepAliveTimer *time.Timer timeoutTimer *time.Timer // Retransmission settings rtt time.Duration rttVar time.Duration rto time.Duration minRTO time.Duration maxRTO time.Duration maxRetries int // Stats packetsOut uint64 packetsIn uint64 bytesOut uint64 bytesIn uint64 retransmits uint64 // Callbacks onPacket func(*packets.AppPacket) onDisconnect func() } type pendingPacket struct { packet *packets.ProtoPacket seq uint16 sentTime time.Time attempts int nextRetry time.Time } type fragmentBuffer struct { totalSize uint32 chunks map[uint16][]byte received uint32 startTime time.Time } type StreamState int const ( StateDisconnected StreamState = iota StateConnecting StateEstablished StateClosing StateClosed ) // Config holds stream configuration type Config struct { SessionID uint32 CRCKey uint32 MaxPacketSize uint16 EncodeKey int DecodeKey int OpcodeManager opcodes.Manager OpcodeSize uint8 AckThreshold time.Duration KeepaliveTime time.Duration TimeoutDuration time.Duration } // NewStream creates a new EQStream instance func NewStream(conn gnet.Conn, cfg *Config) *Stream { s := &Stream{ conn: conn, sessionID: cfg.SessionID, crcKey: cfg.CRCKey, maxLen: cfg.MaxPacketSize, encodeKey: cfg.EncodeKey, decodeKey: cfg.DecodeKey, opcodeManager: cfg.OpcodeManager, opcodeSize: cfg.OpcodeSize, ackThreshold: cfg.AckThreshold, pendingAcks: make(map[uint16]*pendingPacket), fragments: make(map[uint16]*fragmentBuffer), outOfOrder: make(map[uint16][]byte), ackQueue: make([]uint16, 0), seqOut: 0, seqIn: 0, seqExpected: 0, rtt: time.Second, rttVar: 500 * time.Millisecond, rto: time.Second, minRTO: 200 * time.Millisecond, maxRTO: 10 * time.Second, maxRetries: 10, } if s.maxLen == 0 { s.maxLen = packets.DefaultMTU } if s.ackThreshold == 0 { s.ackThreshold = 200 * time.Millisecond } s.state.Store(StateDisconnected) // Initialize cipher if keys provided if cfg.EncodeKey != 0 || cfg.DecodeKey != 0 { cipher, _ := crypto.NewCiphers(int64(cfg.EncodeKey)) s.cipher = cipher } // Start keepalive timer if cfg.KeepaliveTime > 0 { s.keepAliveTimer = time.AfterFunc(cfg.KeepaliveTime, s.sendKeepalive) } // Start timeout timer if cfg.TimeoutDuration > 0 { s.timeoutTimer = time.AfterFunc(cfg.TimeoutDuration, s.handleTimeout) } return s } // Process handles incoming data from gnet func (s *Stream) Process(data []byte) error { // Validate CRC if len(data) < 2 { return nil } // Check for CRC (last 2 bytes) if len(data) > 2 { providedCRC := binary.BigEndian.Uint16(data[len(data)-2:]) calculatedCRC := crypto.CalculateCRC(data[:len(data)-2], s.crcKey) if providedCRC != calculatedCRC { return nil // Drop packet with bad CRC } data = data[:len(data)-2] // Strip CRC } // Decrypt if needed if s.cipher != nil { s.cipher.Decrypt(data) } // Parse protocol opcode if len(data) < 2 { return nil } opcode := binary.BigEndian.Uint16(data[:2]) switch opcode { case opcodes.OP_SessionRequest: return s.handleSessionRequest(data[2:]) case opcodes.OP_SessionResponse: return s.handleSessionResponse(data[2:]) case opcodes.OP_Packet: return s.handlePacket(data[2:]) case opcodes.OP_Fragment: return s.handleFragment(data[2:]) case opcodes.OP_Ack: return s.handleAck(data[2:]) case opcodes.OP_Combined: return s.handleCombined(data[2:]) case opcodes.OP_AppCombined: return s.handleAppCombined(data[2:]) case opcodes.OP_KeepAlive: s.resetTimeout() return nil case opcodes.OP_SessionDisconnect: return s.handleDisconnect() case opcodes.OP_OutOfOrderAck: return s.handleOutOfOrderAck(data[2:]) } atomic.AddUint64(&s.packetsIn, 1) atomic.AddUint64(&s.bytesIn, uint64(len(data))) return nil } // handleSessionRequest processes session establishment request func (s *Stream) handleSessionRequest(data []byte) error { if len(data) < 10 { return fmt.Errorf("session request too small") } // Parse session request // Format: version(4) + sessionID(4) + maxLen(2) version := binary.BigEndian.Uint32(data[:4]) sessionID := binary.BigEndian.Uint32(data[4:8]) maxLen := binary.BigEndian.Uint16(data[8:10]) // Update session info s.mu.Lock() s.sessionID = sessionID if maxLen < s.maxLen { s.maxLen = maxLen // Use smaller of the two } s.mu.Unlock() // Send session response response := make([]byte, 14) binary.BigEndian.PutUint32(response[0:4], sessionID) binary.BigEndian.PutUint32(response[4:8], s.crcKey) response[8] = 2 // Encoding type binary.BigEndian.PutUint16(response[9:11], s.maxLen) binary.BigEndian.PutUint32(response[11:15], version) s.state.Store(StateEstablished) return s.sendRaw(opcodes.OP_SessionResponse, response) } // handleSessionResponse processes session establishment response func (s *Stream) handleSessionResponse(data []byte) error { if len(data) < 14 { return fmt.Errorf("session response too small") } // Parse response sessionID := binary.BigEndian.Uint32(data[0:4]) crcKey := binary.BigEndian.Uint32(data[4:8]) // encodingType := data[8] maxLen := binary.BigEndian.Uint16(data[9:11]) s.mu.Lock() s.sessionID = sessionID s.crcKey = crcKey if maxLen < s.maxLen { s.maxLen = maxLen } s.mu.Unlock() s.state.Store(StateEstablished) return nil } // handlePacket processes reliable packets func (s *Stream) handlePacket(data []byte) error { if len(data) < 2 { return nil } seq := binary.BigEndian.Uint16(data[:2]) data = data[2:] s.mu.Lock() defer s.mu.Unlock() // Check if this is the expected sequence if seq == s.seqExpected { // Process in-order packet s.seqExpected++ s.ackQueue = append(s.ackQueue, seq) // Process packet data if err := s.processPacketData(data); err != nil { return err } // Check for buffered out-of-order packets for { if buffered, exists := s.outOfOrder[s.seqExpected]; exists { delete(s.outOfOrder, s.seqExpected) s.ackQueue = append(s.ackQueue, s.seqExpected) s.seqExpected++ if err := s.processPacketData(buffered); err != nil { return err } } else { break } } } else if seq > s.seqExpected { // Out of order - buffer it s.outOfOrder[seq] = append([]byte(nil), data...) // Send out-of-order ACK go s.sendOutOfOrderAck(seq) } else { // Duplicate packet - just ACK it go s.sendAckImmediate(seq) } // Schedule grouped ACK s.scheduleAck() return nil } // handleFragment assembles fragmented packets func (s *Stream) handleFragment(data []byte) error { if len(data) < 10 { return nil } seq := binary.BigEndian.Uint16(data[:2]) fragID := binary.BigEndian.Uint32(data[2:6]) fragTotal := binary.BigEndian.Uint16(data[6:8]) fragCur := binary.BigEndian.Uint16(data[8:10]) data = data[10:] s.mu.Lock() defer s.mu.Unlock() // Get or create fragment buffer frag, exists := s.fragments[uint16(fragID)] if !exists { frag = &fragmentBuffer{ totalSize: uint32(fragTotal), chunks: make(map[uint16][]byte), startTime: time.Now(), } s.fragments[uint16(fragID)] = frag } // Store chunk frag.chunks[fragCur] = append([]byte(nil), data...) frag.received++ // Check if complete if frag.received == uint32(fragTotal) { // Reassemble in order complete := make([]byte, 0) for i := uint16(0); i < fragTotal; i++ { if chunk, ok := frag.chunks[i]; ok { complete = append(complete, chunk...) } else { // Missing chunk - wait for retransmit return nil } } delete(s.fragments, uint16(fragID)) // Process reassembled packet return s.processPacketData(complete) } // ACK the fragment s.ackQueue = append(s.ackQueue, seq) s.scheduleAck() return nil } // handleAck processes acknowledgments func (s *Stream) handleAck(data []byte) error { if len(data) < 2 { return nil } ackSeq := binary.BigEndian.Uint16(data[:2]) s.mu.Lock() defer s.mu.Unlock() // Remove from pending and update RTT if pending, exists := s.pendingAcks[ackSeq]; exists { delete(s.pendingAcks, ackSeq) // Update RTT estimates (TCP-like algorithm) sample := time.Since(pending.sentTime) if s.rtt == 0 { s.rtt = sample s.rttVar = sample / 2 } else { alpha := 0.125 beta := 0.25 s.rttVar = time.Duration((1-beta)*float64(s.rttVar) + beta*float64(absTime(s.rtt-sample))) s.rtt = time.Duration((1-alpha)*float64(s.rtt) + alpha*float64(sample)) } s.rto = s.rtt + 4*s.rttVar if s.rto < s.minRTO { s.rto = s.minRTO } if s.rto > s.maxRTO { s.rto = s.maxRTO } } // Fast retransmit: if ACK is higher than pending packets, trigger retransmit if ackSeq > s.seqLastAck { s.seqLastAck = ackSeq // Check for gaps that need immediate retransmit for seq, pending := range s.pendingAcks { if seq < ackSeq { // This packet was likely lost, retransmit immediately s.resendQueue = append(s.resendQueue, pending) } } } return nil } // handleOutOfOrderAck handles out-of-order acknowledgments func (s *Stream) handleOutOfOrderAck(data []byte) error { if len(data) < 2 { return nil } seq := binary.BigEndian.Uint16(data[:2]) s.mu.Lock() defer s.mu.Unlock() // Immediately retransmit if we have this packet pending if pending, exists := s.pendingAcks[seq]; exists { s.resendQueue = append(s.resendQueue, pending) } return nil } // handleCombined processes combined protocol packets func (s *Stream) handleCombined(data []byte) error { pos := 0 for pos < len(data) { if pos+1 > len(data) { break } // Read packet size size := uint16(data[pos]) pos++ // Check for oversized marker if size == 0xff { if pos+2 > len(data) { break } size = binary.BigEndian.Uint16(data[pos : pos+2]) pos += 2 } // Extract packet if pos+int(size) > len(data) { break } packet := data[pos : pos+int(size)] pos += int(size) // Process each sub-packet if err := s.Process(packet); err != nil { return err } } return nil } // handleAppCombined processes combined application packets func (s *Stream) handleAppCombined(data []byte) error { pos := 0 for pos < len(data) { if pos+1 > len(data) { break } // Read packet size size := uint16(data[pos]) pos++ // Check for oversized marker if size == 0xff { if pos+2 > len(data) { break } size = binary.BigEndian.Uint16(data[pos : pos+2]) pos += 2 } // Extract packet if pos+int(size) > len(data) { break } packet := data[pos : pos+int(size)] pos += int(size) // Process as application packet if err := s.processPacketData(packet); err != nil { return err } } return nil } // SendPacket sends an application packet func (s *Stream) SendPacket(app *packets.AppPacket) error { // Check state if s.state.Load() != StateEstablished { return fmt.Errorf("stream not established") } // Convert to protocol packet proto := s.appToProto(app) // Check if needs fragmentation if proto.Size() > uint32(s.maxLen-10) { // Reserve space for headers return s.sendFragmented(proto) } // Add to appropriate queue based on reliability s.mu.Lock() if app.Priority > packets.PriorityNormal { s.reliableQueue = append(s.reliableQueue, proto) } else { s.unreliableQueue = append(s.unreliableQueue, proto) } s.mu.Unlock() // Process queues return s.processQueues() } // sendFragmented fragments and sends large packets func (s *Stream) sendFragmented(proto *packets.ProtoPacket) error { data := make([]byte, proto.Size()) proto.Serialize(data, 0) // Calculate fragment size (leave room for headers) fragSize := int(s.maxLen) - 12 // Fragment header is 10 bytes + 2 for protocol numFrags := (len(data) + fragSize - 1) / fragSize if numFrags > 0xFFFF { return fmt.Errorf("packet too large to fragment") } s.mu.Lock() fragID := s.nextFragID s.nextFragID++ s.mu.Unlock() // Send each fragment for i := 0; i < numFrags; i++ { start := i * fragSize end := start + fragSize if end > len(data) { end = len(data) } // Build fragment header fragHeader := make([]byte, 10) // Sequence added by sendReliable binary.BigEndian.PutUint32(fragHeader[0:4], uint32(fragID)) binary.BigEndian.PutUint16(fragHeader[4:6], uint16(numFrags)) binary.BigEndian.PutUint16(fragHeader[6:8], uint16(i)) binary.BigEndian.PutUint16(fragHeader[8:10], uint16(end-start)) // Combine header and data fragment := append(fragHeader, data[start:end]...) // Send as reliable packet fragProto := packets.NewProtoPacket(opcodes.OP_Fragment, fragment, s.opcodeManager) if err := s.sendReliable(fragProto); err != nil { return err } } return nil } // sendReliable sends a reliable packet with sequence number func (s *Stream) sendReliable(proto *packets.ProtoPacket) error { s.mu.Lock() seq := s.seqOut s.seqOut++ s.mu.Unlock() // Build packet with sequence data := make([]byte, proto.Size()+2) binary.BigEndian.PutUint16(data[:2], seq) proto.Serialize(data[2:], 0) // Track for retransmission pending := &pendingPacket{ packet: proto.Copy(), seq: seq, sentTime: time.Now(), attempts: 1, nextRetry: time.Now().Add(s.rto), } s.mu.Lock() s.pendingAcks[seq] = pending s.mu.Unlock() // Send with protocol header return s.sendRaw(opcodes.OP_Packet, data) } // processQueues processes all packet queues func (s *Stream) processQueues() error { s.mu.Lock() // Process resends first (highest priority) for len(s.resendQueue) > 0 { pending := s.resendQueue[0] s.resendQueue = s.resendQueue[1:] if time.Now().After(pending.nextRetry) { s.mu.Unlock() // Rebuild packet data := make([]byte, pending.packet.Size()+2) binary.BigEndian.PutUint16(data[:2], pending.seq) pending.packet.Serialize(data[2:], 0) // Update pending info pending.attempts++ pending.sentTime = time.Now() pending.nextRetry = time.Now().Add(s.rto * time.Duration(pending.attempts)) // Check max retries if pending.attempts > s.maxRetries { s.mu.Lock() delete(s.pendingAcks, pending.seq) s.mu.Unlock() // Connection likely dead s.handleTimeout() return fmt.Errorf("max retransmissions exceeded") } atomic.AddUint64(&s.retransmits, 1) s.sendRaw(opcodes.OP_Packet, data) s.mu.Lock() } else { // Not time yet, re-queue s.resendQueue = append(s.resendQueue, pending) } } // Process reliable queue for len(s.reliableQueue) > 0 { proto := s.reliableQueue[0] s.reliableQueue = s.reliableQueue[1:] s.mu.Unlock() if err := s.sendReliable(proto); err != nil { return err } s.mu.Lock() } // Process unreliable queue for len(s.unreliableQueue) > 0 { proto := s.unreliableQueue[0] s.unreliableQueue = s.unreliableQueue[1:] s.mu.Unlock() // Send unreliable (no sequence/retransmit) data := make([]byte, proto.Size()) proto.Serialize(data, 0) s.sendRaw(proto.Opcode, data) s.mu.Lock() } // Check for retransmissions needed now := time.Now() for _, pending := range s.pendingAcks { if now.After(pending.nextRetry) { s.resendQueue = append(s.resendQueue, pending) } } s.mu.Unlock() // Process any pending ACKs s.sendPendingAcks() return nil } // Helper methods func (s *Stream) processPacketData(data []byte) error { // Create ProtoPacket from raw data proto := packets.NewProtoPacketFromRaw(data, -1, s.opcodeManager) // Decompress if needed if err := proto.DecompressPacket(); err != nil { return err } // Convert to AppPacket app := proto.MakeApplicationPacket(s.opcodeSize) // Deliver to callback if s.onPacket != nil { go s.onPacket(app) } return nil } func (s *Stream) scheduleAck() { if s.ackTimer == nil { s.ackTimer = time.AfterFunc(s.ackThreshold, func() { s.sendPendingAcks() }) } } func (s *Stream) sendPendingAcks() { s.mu.Lock() if len(s.ackQueue) == 0 { s.mu.Unlock() return } // Send all pending ACKs for _, seq := range s.ackQueue { data := make([]byte, 2) binary.BigEndian.PutUint16(data, seq) s.sendRaw(opcodes.OP_Ack, data) } s.ackQueue = s.ackQueue[:0] s.lastAckSent = time.Now() s.mu.Unlock() } func (s *Stream) sendAckImmediate(seq uint16) error { data := make([]byte, 2) binary.BigEndian.PutUint16(data, seq) return s.sendRaw(opcodes.OP_Ack, data) } func (s *Stream) sendOutOfOrderAck(seq uint16) error { data := make([]byte, 2) binary.BigEndian.PutUint16(data, seq) return s.sendRaw(opcodes.OP_OutOfOrderAck, data) } func (s *Stream) sendKeepalive() { if s.state.Load() == StateEstablished { s.sendRaw(opcodes.OP_KeepAlive, nil) } // Reschedule if s.keepAliveTimer != nil { s.keepAliveTimer.Reset(10 * time.Second) } } func (s *Stream) resetTimeout() { if s.timeoutTimer != nil { s.timeoutTimer.Reset(30 * time.Second) } } func (s *Stream) handleTimeout() { s.state.Store(StateClosed) if s.onDisconnect != nil { s.onDisconnect() } } func (s *Stream) handleDisconnect() error { s.state.Store(StateClosed) if s.onDisconnect != nil { go s.onDisconnect() } return nil } // sendRaw sends raw data with protocol opcode func (s *Stream) sendRaw(opcode uint16, data []byte) error { // Build packet: opcode + data + CRC packet := make([]byte, 2+len(data)+2) binary.BigEndian.PutUint16(packet[:2], opcode) copy(packet[2:], data) // Add CRC crc := crypto.CalculateCRC(packet[:len(packet)-2], s.crcKey) binary.BigEndian.PutUint16(packet[len(packet)-2:], crc) // Encrypt if needed if s.cipher != nil { s.cipher.Encrypt(packet) } // Send via gnet atomic.AddUint64(&s.packetsOut, 1) atomic.AddUint64(&s.bytesOut, uint64(len(packet))) return s.conn.AsyncWrite(packet, nil) } func (s *Stream) appToProto(app *packets.AppPacket) *packets.ProtoPacket { proto := packets.NewProtoPacket(app.Opcode, app.Buffer, s.opcodeManager) proto.CopyInfo(app.Packet) proto.CompressThreshold = 100 proto.EncodeKey = s.encodeKey return proto } // Public methods // SetPacketCallback sets the callback for received packets func (s *Stream) SetPacketCallback(fn func(*packets.AppPacket)) { s.onPacket = fn } // SetDisconnectCallback sets the callback for disconnection func (s *Stream) SetDisconnectCallback(fn func()) { s.onDisconnect = fn } // GetState returns current stream state func (s *Stream) GetState() StreamState { return s.state.Load().(StreamState) } // IsConnected returns true if stream is established func (s *Stream) IsConnected() bool { return s.GetState() == StateEstablished } // SendSessionRequest initiates client connection func (s *Stream) SendSessionRequest() error { // Build session request packet // Format: version(4) + sessionID(4) + maxLen(2) data := make([]byte, 10) binary.BigEndian.PutUint32(data[0:4], 2) // Protocol version binary.BigEndian.PutUint32(data[4:8], s.sessionID) binary.BigEndian.PutUint16(data[8:10], s.maxLen) s.state.Store(StateConnecting) return s.sendRaw(opcodes.OP_SessionRequest, data) } // validateSession checks if packet belongs to this session func (s *Stream) validateSession(sessionID uint32) bool { if s.sessionID == 0 { // Not yet established, accept any return true } if sessionID != s.sessionID { // Wrong session - send out of session response go s.sendRaw(opcodes.OP_OutOfSession, nil) return false } return true } // GetSessionID returns current session ID func (s *Stream) GetSessionID() uint32 { s.mu.RLock() defer s.mu.RUnlock() return s.sessionID } // SetSessionID sets the session ID (for client mode) func (s *Stream) SetSessionID(id uint32) { s.mu.Lock() s.sessionID = id s.mu.Unlock() } // GetRemoteAddr returns remote address func (s *Stream) GetRemoteAddr() string { if s.conn != nil { return s.conn.RemoteAddr().String() } return "" } // Close closes the stream func (s *Stream) Close() error { if s.state.Load() == StateClosed { return nil } s.state.Store(StateClosed) // Send disconnect s.sendRaw(opcodes.OP_SessionDisconnect, nil) // Clean up timers if s.keepAliveTimer != nil { s.keepAliveTimer.Stop() } if s.ackTimer != nil { s.ackTimer.Stop() } if s.timeoutTimer != nil { s.timeoutTimer.Stop() } // Clear queues s.mu.Lock() s.reliableQueue = nil s.unreliableQueue = nil s.resendQueue = nil s.pendingAcks = nil s.fragments = nil s.outOfOrder = nil s.mu.Unlock() return nil } // Utility functions func absTime(d time.Duration) time.Duration { if d < 0 { return -d } return d }