package stream import ( "encoding/binary" "fmt" "sync" "sync/atomic" "time" "git.sharkk.net/EQ2/Protocol/crypto" "git.sharkk.net/EQ2/Protocol/opcodes" "git.sharkk.net/EQ2/Protocol/packets" "github.com/panjf2000/gnet/v2" ) // Stream implements EQ2's reliable UDP protocol type Stream struct { conn gnet.Conn mu sync.RWMutex // Connection state state atomic.Value // StreamState sessionID uint32 crcKey uint32 maxLen uint16 encodeKey int decodeKey int // Sequence management with wraparound handling seqOut uint16 seqIn uint16 seqLastAck uint16 seqExpected uint16 // Acknowledgment tracking pendingAcks map[uint16]*pendingPacket ackTimer *time.Timer lastAckSent time.Time ackThreshold time.Duration ackQueue []uint16 duplicateAckCnt map[uint16]int // Fragment assembly with expiry fragments map[uint16]*fragmentBuffer nextFragID uint16 // Out of order handling with expiry outOfOrder map[uint16]*outOfOrderPacket // Packet queues reliableQueue []*packets.ProtoPacket unreliableQueue []*packets.ProtoPacket resendQueue []*pendingPacket // Opcode management opcodeManager opcodes.Manager opcodeSize uint8 // Cipher for encryption cipher *crypto.Ciphers // Timers keepAliveTimer *time.Timer timeoutTimer *time.Timer retransmitTimer *time.Timer cleanupTimer *time.Timer // Retransmission settings rtt time.Duration rttVar time.Duration rto time.Duration minRTO time.Duration maxRTO time.Duration maxRetries int // Stats packetsOut uint64 packetsIn uint64 bytesOut uint64 bytesIn uint64 retransmits uint64 // Callbacks onPacket func(*packets.AppPacket) onDisconnect func() } type pendingPacket struct { packet *packets.ProtoPacket seq uint16 sentTime time.Time attempts int nextRetry time.Time } type fragmentBuffer struct { totalSize uint32 chunks map[uint16][]byte received uint32 startTime time.Time } type outOfOrderPacket struct { data []byte timestamp time.Time } type StreamState int const ( StateDisconnected StreamState = iota StateConnecting StateEstablished StateClosing StateClosed ) const ( retransmitInterval = 100 * time.Millisecond cleanupInterval = 5 * time.Second fragmentTimeout = 30 * time.Second outOfOrderTimeout = 10 * time.Second duplicateAckThreshold = 3 ) // Config holds stream configuration type Config struct { SessionID uint32 CRCKey uint32 MaxPacketSize uint16 EncodeKey int DecodeKey int OpcodeManager opcodes.Manager OpcodeSize uint8 AckThreshold time.Duration KeepaliveTime time.Duration TimeoutDuration time.Duration } // NewStream creates a new EQStream instance func NewStream(conn gnet.Conn, cfg *Config) *Stream { s := &Stream{ conn: conn, sessionID: cfg.SessionID, crcKey: cfg.CRCKey, maxLen: cfg.MaxPacketSize, encodeKey: cfg.EncodeKey, decodeKey: cfg.DecodeKey, opcodeManager: cfg.OpcodeManager, opcodeSize: cfg.OpcodeSize, ackThreshold: cfg.AckThreshold, pendingAcks: make(map[uint16]*pendingPacket), duplicateAckCnt: make(map[uint16]int), fragments: make(map[uint16]*fragmentBuffer), outOfOrder: make(map[uint16]*outOfOrderPacket), ackQueue: make([]uint16, 0), seqOut: 0, seqIn: 0, seqExpected: 0, rtt: time.Second, rttVar: 500 * time.Millisecond, rto: time.Second, minRTO: 200 * time.Millisecond, maxRTO: 10 * time.Second, maxRetries: 10, } if s.maxLen == 0 { s.maxLen = packets.DefaultMTU } if s.ackThreshold == 0 { s.ackThreshold = 200 * time.Millisecond } s.state.Store(StateDisconnected) if cfg.EncodeKey != 0 || cfg.DecodeKey != 0 { cipher, _ := crypto.NewCiphers(int64(cfg.EncodeKey)) s.cipher = cipher } // Start timers if cfg.KeepaliveTime > 0 { s.keepAliveTimer = time.AfterFunc(cfg.KeepaliveTime, s.sendKeepalive) } if cfg.TimeoutDuration > 0 { s.timeoutTimer = time.AfterFunc(cfg.TimeoutDuration, s.handleTimeout) } s.retransmitTimer = time.AfterFunc(retransmitInterval, s.processRetransmits) s.cleanupTimer = time.AfterFunc(cleanupInterval, s.cleanup) return s } // Process handles incoming data from gnet func (s *Stream) Process(data []byte) error { if len(data) < 2 { return nil } // Check for CRC if len(data) > 2 { providedCRC := binary.BigEndian.Uint16(data[len(data)-2:]) calculatedCRC := crypto.CalculateCRC(data[:len(data)-2], s.crcKey) if providedCRC != calculatedCRC { return nil } data = data[:len(data)-2] } // Decrypt if needed if s.cipher != nil { s.cipher.Decrypt(data) } if len(data) < 2 { return nil } opcode := binary.BigEndian.Uint16(data[:2]) switch opcode { case opcodes.OP_SessionRequest: return s.handleSessionRequest(data[2:]) case opcodes.OP_SessionResponse: return s.handleSessionResponse(data[2:]) case opcodes.OP_Packet: return s.handlePacket(data[2:]) case opcodes.OP_Fragment: return s.handleFragment(data[2:]) case opcodes.OP_Ack: return s.handleAck(data[2:]) case opcodes.OP_Combined: return s.handleCombined(data[2:]) case opcodes.OP_AppCombined: return s.handleAppCombined(data[2:]) case opcodes.OP_KeepAlive: s.resetTimeout() return nil case opcodes.OP_SessionDisconnect: return s.handleDisconnect() case opcodes.OP_OutOfOrderAck: return s.handleOutOfOrderAck(data[2:]) } atomic.AddUint64(&s.packetsIn, 1) atomic.AddUint64(&s.bytesIn, uint64(len(data))) return nil } // SendPacket sends an application packet func (s *Stream) SendPacket(app *packets.AppPacket) error { if s.state.Load() != StateEstablished { return fmt.Errorf("stream not established") } // Validate packet size if app.Size() > uint32(s.maxLen) { proto := s.appToProto(app) return s.sendFragmented(proto) } proto := s.appToProto(app) isUnreliable := s.isUnreliableOpcode(app.GetOpcode()) s.mu.Lock() if isUnreliable { s.unreliableQueue = append(s.unreliableQueue, proto) } else { s.reliableQueue = append(s.reliableQueue, proto) } s.mu.Unlock() return s.processQueues() } // processQueues processes all packet queues func (s *Stream) processQueues() error { s.mu.Lock() defer s.mu.Unlock() // Process resends first for len(s.resendQueue) > 0 { pending := s.resendQueue[0] s.resendQueue = s.resendQueue[1:] if time.Now().After(pending.nextRetry) { data := make([]byte, pending.packet.Size()+2) binary.BigEndian.PutUint16(data[:2], pending.seq) pending.packet.Serialize(data[2:], 0) pending.attempts++ pending.sentTime = time.Now() pending.nextRetry = time.Now().Add(s.rto * time.Duration(pending.attempts)) if pending.attempts > s.maxRetries { delete(s.pendingAcks, pending.seq) go s.handleTimeout() return fmt.Errorf("max retransmissions exceeded") } s.mu.Unlock() atomic.AddUint64(&s.retransmits, 1) s.sendRaw(opcodes.OP_Packet, data) s.mu.Lock() } else { s.resendQueue = append(s.resendQueue, pending) } } // Process reliable queue for len(s.reliableQueue) > 0 { proto := s.reliableQueue[0] s.reliableQueue = s.reliableQueue[1:] s.mu.Unlock() if err := s.sendReliable(proto); err != nil { return err } s.mu.Lock() } // Process unreliable queue for len(s.unreliableQueue) > 0 { proto := s.unreliableQueue[0] s.unreliableQueue = s.unreliableQueue[1:] data := make([]byte, proto.Size()) proto.Serialize(data, 0) s.mu.Unlock() s.sendRaw(proto.Opcode, data) s.mu.Lock() } s.mu.Unlock() s.sendPendingAcks() return nil } // Helper methods func (s *Stream) isUnreliableOpcode(emuOp opcodes.EmuOpcode) bool { switch emuOp { case opcodes.OP_UpdatePositionMsg, opcodes.OP_WorldPingMsg: return true default: return false } } func (s *Stream) incrementSequence(seq uint16) uint16 { if seq == 0xFFFF { return 0 } return seq + 1 } func (s *Stream) isSequenceAhead(seq, base uint16) bool { diff := seq - base return diff > 0 && diff < 0x8000 } func (s *Stream) handleSessionRequest(data []byte) error { if len(data) < 10 { return fmt.Errorf("session request too small") } version := binary.BigEndian.Uint32(data[:4]) sessionID := binary.BigEndian.Uint32(data[4:8]) maxLen := binary.BigEndian.Uint16(data[8:10]) s.mu.Lock() s.sessionID = sessionID if maxLen < s.maxLen { s.maxLen = maxLen } s.mu.Unlock() response := make([]byte, 15) binary.BigEndian.PutUint32(response[0:4], sessionID) binary.BigEndian.PutUint32(response[4:8], s.crcKey) response[8] = 2 binary.BigEndian.PutUint16(response[9:11], s.maxLen) binary.BigEndian.PutUint32(response[11:15], version) s.state.Store(StateEstablished) return s.sendRaw(opcodes.OP_SessionResponse, response) } func (s *Stream) handleSessionResponse(data []byte) error { if len(data) < 14 { return fmt.Errorf("session response too small") } sessionID := binary.BigEndian.Uint32(data[0:4]) crcKey := binary.BigEndian.Uint32(data[4:8]) maxLen := binary.BigEndian.Uint16(data[9:11]) s.mu.Lock() s.sessionID = sessionID s.crcKey = crcKey if maxLen < s.maxLen { s.maxLen = maxLen } s.mu.Unlock() s.state.Store(StateEstablished) return nil } func (s *Stream) handlePacket(data []byte) error { if len(data) < 2 { return nil } seq := binary.BigEndian.Uint16(data[:2]) data = data[2:] s.mu.Lock() defer s.mu.Unlock() if seq == s.seqExpected { s.seqExpected = s.incrementSequence(s.seqExpected) s.ackQueue = append(s.ackQueue, seq) if err := s.processPacketData(data); err != nil { return err } for { if buffered, exists := s.outOfOrder[s.seqExpected]; exists { delete(s.outOfOrder, s.seqExpected) s.ackQueue = append(s.ackQueue, s.seqExpected) s.seqExpected = s.incrementSequence(s.seqExpected) if err := s.processPacketData(buffered.data); err != nil { return err } } else { break } } } else if s.isSequenceAhead(seq, s.seqExpected) { s.outOfOrder[seq] = &outOfOrderPacket{ data: append([]byte(nil), data...), timestamp: time.Now(), } go s.sendOutOfOrderAck(seq) } else { go s.sendAckImmediate(seq) } s.scheduleAck() return nil } func (s *Stream) handleFragment(data []byte) error { if len(data) < 10 { return nil } seq := binary.BigEndian.Uint16(data[:2]) fragID := binary.BigEndian.Uint32(data[2:6]) fragTotal := binary.BigEndian.Uint16(data[6:8]) fragCur := binary.BigEndian.Uint16(data[8:10]) data = data[10:] s.mu.Lock() defer s.mu.Unlock() frag, exists := s.fragments[uint16(fragID)] if !exists { frag = &fragmentBuffer{ totalSize: uint32(fragTotal), chunks: make(map[uint16][]byte), startTime: time.Now(), } s.fragments[uint16(fragID)] = frag } frag.chunks[fragCur] = append([]byte(nil), data...) frag.received++ if frag.received == uint32(fragTotal) { complete := make([]byte, 0) for i := uint16(0); i < fragTotal; i++ { if chunk, ok := frag.chunks[i]; ok { complete = append(complete, chunk...) } else { return nil } } delete(s.fragments, uint16(fragID)) return s.processPacketData(complete) } s.ackQueue = append(s.ackQueue, seq) s.scheduleAck() return nil } func (s *Stream) handleAck(data []byte) error { if len(data) < 2 { return nil } ackSeq := binary.BigEndian.Uint16(data[:2]) s.mu.Lock() defer s.mu.Unlock() if pending, exists := s.pendingAcks[ackSeq]; exists { delete(s.pendingAcks, ackSeq) delete(s.duplicateAckCnt, ackSeq) sample := time.Since(pending.sentTime) if s.rtt == 0 { s.rtt = sample s.rttVar = sample / 2 } else { alpha := 0.125 beta := 0.25 s.rttVar = time.Duration((1-beta)*float64(s.rttVar) + beta*float64(absTime(s.rtt-sample))) s.rtt = time.Duration((1-alpha)*float64(s.rtt) + alpha*float64(sample)) } s.rto = s.rtt + 4*s.rttVar if s.rto < s.minRTO { s.rto = s.minRTO } if s.rto > s.maxRTO { s.rto = s.maxRTO } } else { s.duplicateAckCnt[ackSeq]++ if s.duplicateAckCnt[ackSeq] >= duplicateAckThreshold { for seq, pending := range s.pendingAcks { if s.isSequenceAhead(seq, ackSeq) { s.resendQueue = append(s.resendQueue, pending) } } delete(s.duplicateAckCnt, ackSeq) } } if s.isSequenceAhead(ackSeq, s.seqLastAck) { s.seqLastAck = ackSeq } return nil } func (s *Stream) handleCombined(data []byte) error { pos := 0 for pos < len(data) { if pos+1 > len(data) { break } size := uint16(data[pos]) pos++ if size == 0xff { if pos+2 > len(data) { break } size = binary.BigEndian.Uint16(data[pos : pos+2]) pos += 2 } if pos+int(size) > len(data) { break } packet := data[pos : pos+int(size)] pos += int(size) if err := s.Process(packet); err != nil { return err } } return nil } func (s *Stream) handleAppCombined(data []byte) error { pos := 0 for pos < len(data) { if pos+1 > len(data) { break } size := uint16(data[pos]) pos++ if size == 0xff { if pos+2 > len(data) { break } size = binary.BigEndian.Uint16(data[pos : pos+2]) pos += 2 } if pos+int(size) > len(data) { break } packet := data[pos : pos+int(size)] pos += int(size) if err := s.processPacketData(packet); err != nil { return err } } return nil } func (s *Stream) handleOutOfOrderAck(data []byte) error { if len(data) < 2 { return nil } seq := binary.BigEndian.Uint16(data[:2]) s.mu.Lock() defer s.mu.Unlock() if pending, exists := s.pendingAcks[seq]; exists { s.resendQueue = append(s.resendQueue, pending) } return nil } func (s *Stream) handleDisconnect() error { s.state.Store(StateClosed) if s.onDisconnect != nil { go s.onDisconnect() } return nil } func (s *Stream) sendFragmented(proto *packets.ProtoPacket) error { data := make([]byte, proto.Size()) proto.Serialize(data, 0) fragSize := int(s.maxLen) - 12 numFrags := (len(data) + fragSize - 1) / fragSize if numFrags > 0xFFFF { return fmt.Errorf("packet too large to fragment") } s.mu.Lock() fragID := s.nextFragID s.nextFragID++ s.mu.Unlock() for i := 0; i < numFrags; i++ { start := i * fragSize end := start + fragSize if end > len(data) { end = len(data) } fragHeader := make([]byte, 10) binary.BigEndian.PutUint32(fragHeader[0:4], uint32(fragID)) binary.BigEndian.PutUint16(fragHeader[4:6], uint16(numFrags)) binary.BigEndian.PutUint16(fragHeader[6:8], uint16(i)) binary.BigEndian.PutUint16(fragHeader[8:10], uint16(end-start)) fragment := append(fragHeader, data[start:end]...) fragProto := packets.NewProtoPacket(opcodes.OP_Fragment, fragment, s.opcodeManager) if err := s.sendReliable(fragProto); err != nil { return err } } return nil } func (s *Stream) sendReliable(proto *packets.ProtoPacket) error { s.mu.Lock() seq := s.seqOut s.seqOut = s.incrementSequence(s.seqOut) s.mu.Unlock() data := make([]byte, proto.Size()+2) binary.BigEndian.PutUint16(data[:2], seq) proto.Serialize(data[2:], 0) pending := &pendingPacket{ packet: proto.Copy(), seq: seq, sentTime: time.Now(), attempts: 1, nextRetry: time.Now().Add(s.rto), } s.mu.Lock() s.pendingAcks[seq] = pending s.mu.Unlock() return s.sendRaw(opcodes.OP_Packet, data) } func (s *Stream) processPacketData(data []byte) error { proto := packets.NewProtoPacketFromRaw(data, -1, s.opcodeManager) if err := proto.DecompressPacket(); err != nil { return err } app := proto.MakeApplicationPacket(s.opcodeSize) if s.onPacket != nil { go s.onPacket(app) } return nil } func (s *Stream) processRetransmits() { if s.state.Load() != StateEstablished { return } s.mu.Lock() now := time.Now() for seq, pending := range s.pendingAcks { if now.After(pending.nextRetry) { newPending := &pendingPacket{ packet: pending.packet.Copy(), seq: seq, sentTime: pending.sentTime, attempts: pending.attempts, nextRetry: pending.nextRetry, } s.resendQueue = append(s.resendQueue, newPending) } } s.mu.Unlock() if len(s.resendQueue) > 0 { s.processQueues() } if s.retransmitTimer != nil { s.retransmitTimer.Reset(retransmitInterval) } } func (s *Stream) cleanup() { if s.state.Load() != StateEstablished { return } s.mu.Lock() now := time.Now() for id, frag := range s.fragments { if now.Sub(frag.startTime) > fragmentTimeout { delete(s.fragments, id) } } for seq, oop := range s.outOfOrder { if now.Sub(oop.timestamp) > outOfOrderTimeout { delete(s.outOfOrder, seq) } } for seq := range s.duplicateAckCnt { if _, exists := s.pendingAcks[seq]; !exists { delete(s.duplicateAckCnt, seq) } } s.mu.Unlock() if s.cleanupTimer != nil { s.cleanupTimer.Reset(cleanupInterval) } } func (s *Stream) scheduleAck() { if s.ackTimer == nil { s.ackTimer = time.AfterFunc(s.ackThreshold, func() { s.sendPendingAcks() }) } } func (s *Stream) sendPendingAcks() { s.mu.Lock() if len(s.ackQueue) == 0 { s.mu.Unlock() return } for _, seq := range s.ackQueue { data := make([]byte, 2) binary.BigEndian.PutUint16(data, seq) s.sendRaw(opcodes.OP_Ack, data) } s.ackQueue = s.ackQueue[:0] s.lastAckSent = time.Now() s.mu.Unlock() } func (s *Stream) sendAckImmediate(seq uint16) error { data := make([]byte, 2) binary.BigEndian.PutUint16(data, seq) return s.sendRaw(opcodes.OP_Ack, data) } func (s *Stream) sendOutOfOrderAck(seq uint16) error { data := make([]byte, 2) binary.BigEndian.PutUint16(data, seq) return s.sendRaw(opcodes.OP_OutOfOrderAck, data) } func (s *Stream) sendKeepalive() { if s.state.Load() == StateEstablished { s.sendRaw(opcodes.OP_KeepAlive, nil) } if s.keepAliveTimer != nil { s.keepAliveTimer.Reset(10 * time.Second) } } func (s *Stream) resetTimeout() { if s.timeoutTimer != nil { s.timeoutTimer.Reset(30 * time.Second) } } func (s *Stream) handleTimeout() { s.state.Store(StateClosed) if s.onDisconnect != nil { s.onDisconnect() } } func (s *Stream) sendRaw(opcode uint16, data []byte) error { packet := make([]byte, 2+len(data)+2) binary.BigEndian.PutUint16(packet[:2], opcode) copy(packet[2:], data) crc := crypto.CalculateCRC(packet[:len(packet)-2], s.crcKey) binary.BigEndian.PutUint16(packet[len(packet)-2:], crc) if s.cipher != nil { s.cipher.Encrypt(packet) } atomic.AddUint64(&s.packetsOut, 1) atomic.AddUint64(&s.bytesOut, uint64(len(packet))) return s.conn.AsyncWrite(packet, nil) } func (s *Stream) appToProto(app *packets.AppPacket) *packets.ProtoPacket { proto := packets.NewProtoPacket(app.Opcode, app.Buffer, s.opcodeManager) proto.CopyInfo(app.Packet) proto.CompressThreshold = 100 proto.EncodeKey = s.encodeKey return proto } // Public methods func (s *Stream) SetPacketCallback(fn func(*packets.AppPacket)) { s.onPacket = fn } func (s *Stream) SetDisconnectCallback(fn func()) { s.onDisconnect = fn } func (s *Stream) GetState() StreamState { return s.state.Load().(StreamState) } func (s *Stream) IsConnected() bool { return s.GetState() == StateEstablished } func (s *Stream) SendSessionRequest() error { data := make([]byte, 10) binary.BigEndian.PutUint32(data[0:4], 2) binary.BigEndian.PutUint32(data[4:8], s.sessionID) binary.BigEndian.PutUint16(data[8:10], s.maxLen) s.state.Store(StateConnecting) return s.sendRaw(opcodes.OP_SessionRequest, data) } func (s *Stream) GetSessionID() uint32 { s.mu.RLock() defer s.mu.RUnlock() return s.sessionID } func (s *Stream) SetSessionID(id uint32) { s.mu.Lock() s.sessionID = id s.mu.Unlock() } func (s *Stream) GetRemoteAddr() string { if s.conn != nil { return s.conn.RemoteAddr().String() } return "" } func (s *Stream) Close() error { if s.state.Load() == StateClosed { return nil } s.state.Store(StateClosed) s.sendRaw(opcodes.OP_SessionDisconnect, nil) if s.keepAliveTimer != nil { s.keepAliveTimer.Stop() } if s.ackTimer != nil { s.ackTimer.Stop() } if s.timeoutTimer != nil { s.timeoutTimer.Stop() } if s.retransmitTimer != nil { s.retransmitTimer.Stop() } if s.cleanupTimer != nil { s.cleanupTimer.Stop() } s.mu.Lock() s.reliableQueue = nil s.unreliableQueue = nil s.resendQueue = nil s.pendingAcks = nil s.fragments = nil s.outOfOrder = nil s.mu.Unlock() return nil } func absTime(d time.Duration) time.Duration { if d < 0 { return -d } return d }