package stream import ( "encoding/binary" "sync" "sync/atomic" "time" "git.sharkk.net/EQ2/Protocol/crypto" "git.sharkk.net/EQ2/Protocol/opcodes" "git.sharkk.net/EQ2/Protocol/packets" "github.com/panjf2000/gnet/v2" ) // Stream implements EQ2's reliable UDP protocol type Stream struct { conn gnet.Conn mu sync.RWMutex // Connection state state atomic.Value // StreamState sessionID uint32 crcKey uint32 maxLen uint16 encodeKey int decodeKey int // Sequence management seqOut uint16 seqIn uint16 seqLastAck uint16 // Acknowledgment tracking pendingAcks map[uint16]*pendingPacket ackTimer *time.Timer lastAckSent time.Time ackThreshold time.Duration // Fragment assembly fragments map[uint16]*fragmentBuffer nextFragID uint16 // Packet queues reliableQueue []*packets.ProtoPacket unreliableQueue []*packets.ProtoPacket resendQueue []*pendingPacket // Opcode management opcodeManager opcodes.Manager opcodeSize uint8 // Cipher for encryption cipher *crypto.Ciphers // Timers keepAliveTimer *time.Timer timeoutTimer *time.Timer // Stats packetsOut uint64 packetsIn uint64 bytesOut uint64 bytesIn uint64 // Callbacks onPacket func(*packets.AppPacket) onDisconnect func() } type pendingPacket struct { packet *packets.ProtoPacket seq uint16 sentTime time.Time attempts int nextRetry time.Time } type fragmentBuffer struct { totalSize uint32 chunks map[uint16][]byte received uint32 startTime time.Time } type StreamState int const ( StateDisconnected StreamState = iota StateConnecting StateEstablished StateClosing StateClosed ) // Config holds stream configuration type Config struct { SessionID uint32 CRCKey uint32 MaxPacketSize uint16 EncodeKey int DecodeKey int OpcodeManager opcodes.Manager OpcodeSize uint8 AckThreshold time.Duration KeepaliveTime time.Duration TimeoutDuration time.Duration } // NewStream creates a new EQStream instance func NewStream(conn gnet.Conn, cfg *Config) *Stream { s := &Stream{ conn: conn, sessionID: cfg.SessionID, crcKey: cfg.CRCKey, maxLen: cfg.MaxPacketSize, encodeKey: cfg.EncodeKey, decodeKey: cfg.DecodeKey, opcodeManager: cfg.OpcodeManager, opcodeSize: cfg.OpcodeSize, ackThreshold: cfg.AckThreshold, pendingAcks: make(map[uint16]*pendingPacket), fragments: make(map[uint16]*fragmentBuffer), seqOut: 0, seqIn: 0, } if s.maxLen == 0 { s.maxLen = packets.DefaultMTU } if s.ackThreshold == 0 { s.ackThreshold = 200 * time.Millisecond } s.state.Store(StateDisconnected) // Initialize cipher if keys provided if cfg.EncodeKey != 0 || cfg.DecodeKey != 0 { cipher, _ := crypto.NewCiphers(int64(cfg.EncodeKey)) s.cipher = cipher } // Start keepalive timer if cfg.KeepaliveTime > 0 { s.keepAliveTimer = time.AfterFunc(cfg.KeepaliveTime, s.sendKeepalive) } return s } // Process handles incoming data from gnet func (s *Stream) Process(data []byte) error { // Validate CRC if len(data) < 2 { return nil } // Check for CRC (last 2 bytes) if len(data) > 2 { providedCRC := binary.BigEndian.Uint16(data[len(data)-2:]) calculatedCRC := crypto.CalculateCRC(data[:len(data)-2], s.crcKey) if providedCRC != calculatedCRC { return nil // Drop packet with bad CRC } data = data[:len(data)-2] // Strip CRC } // Decrypt if needed if s.cipher != nil { s.cipher.Decrypt(data) } // Parse protocol opcode if len(data) < 2 { return nil } opcode := binary.BigEndian.Uint16(data[:2]) switch opcode { case opcodes.OP_SessionRequest: return s.handleSessionRequest(data[2:]) case opcodes.OP_SessionResponse: return s.handleSessionResponse(data[2:]) case opcodes.OP_Packet: return s.handlePacket(data[2:]) case opcodes.OP_Fragment: return s.handleFragment(data[2:]) case opcodes.OP_Ack: return s.handleAck(data[2:]) case opcodes.OP_Combined: return s.handleCombined(data[2:]) case opcodes.OP_AppCombined: return s.handleAppCombined(data[2:]) case opcodes.OP_KeepAlive: s.resetTimeout() return nil case opcodes.OP_SessionDisconnect: return s.handleDisconnect() } atomic.AddUint64(&s.packetsIn, 1) atomic.AddUint64(&s.bytesIn, uint64(len(data))) return nil } // handlePacket processes reliable packets func (s *Stream) handlePacket(data []byte) error { if len(data) < 2 { return nil } seq := binary.BigEndian.Uint16(data[:2]) data = data[2:] s.mu.Lock() defer s.mu.Unlock() // Check sequence if seq != s.seqIn { // Out of order - send immediate ACK s.sendAckImmediate(seq) return nil } // Update sequence s.seqIn++ // Schedule ACK s.scheduleAck(seq) // Process packet data return s.processPacketData(data) } // handleFragment assembles fragmented packets func (s *Stream) handleFragment(data []byte) error { if len(data) < 8 { return nil } seq := binary.BigEndian.Uint16(data[:2]) fragSeq := binary.BigEndian.Uint16(data[2:4]) fragTotal := binary.BigEndian.Uint16(data[4:6]) fragCur := binary.BigEndian.Uint16(data[6:8]) data = data[8:] s.mu.Lock() defer s.mu.Unlock() // Get or create fragment buffer frag, exists := s.fragments[fragSeq] if !exists { frag = &fragmentBuffer{ totalSize: uint32(fragTotal), chunks: make(map[uint16][]byte), startTime: time.Now(), } s.fragments[fragSeq] = frag } // Store chunk frag.chunks[fragCur] = append([]byte(nil), data...) frag.received++ // Check if complete if frag.received == uint32(fragTotal) { // Reassemble complete := make([]byte, 0) for i := uint16(0); i < fragTotal; i++ { if chunk, ok := frag.chunks[i]; ok { complete = append(complete, chunk...) } } delete(s.fragments, fragSeq) // Process reassembled packet return s.processPacketData(complete) } // Schedule ACK for fragment s.scheduleAck(seq) return nil } // handleAck processes acknowledgments func (s *Stream) handleAck(data []byte) error { if len(data) < 2 { return nil } ackSeq := binary.BigEndian.Uint16(data[:2]) s.mu.Lock() defer s.mu.Unlock() // Remove from pending if pending, exists := s.pendingAcks[ackSeq]; exists { delete(s.pendingAcks, ackSeq) // Update RTT stats here if needed _ = pending } // Update last ACK seen if ackSeq > s.seqLastAck { s.seqLastAck = ackSeq } return nil } // SendPacket sends an application packet func (s *Stream) SendPacket(app *packets.AppPacket) error { // Convert to protocol packet proto := s.appToProto(app) // Check if needs fragmentation if proto.Size() > uint32(s.maxLen) { return s.sendFragmented(proto) } // Add to appropriate queue if app.Priority > packets.PriorityNormal { s.mu.Lock() s.reliableQueue = append(s.reliableQueue, proto) s.mu.Unlock() } else { s.mu.Lock() s.unreliableQueue = append(s.unreliableQueue, proto) s.mu.Unlock() } // Process queues return s.processQueues() } // sendReliable sends a reliable packet with sequence number func (s *Stream) sendReliable(proto *packets.ProtoPacket) error { s.mu.Lock() seq := s.seqOut s.seqOut++ s.mu.Unlock() // Build packet with sequence data := make([]byte, proto.Size()+2) binary.BigEndian.PutUint16(data[:2], seq) proto.Serialize(data[2:], 0) // Track for retransmission pending := &pendingPacket{ packet: proto, seq: seq, sentTime: time.Now(), attempts: 1, nextRetry: time.Now().Add(time.Second), } s.mu.Lock() s.pendingAcks[seq] = pending s.mu.Unlock() // Send with protocol header return s.sendRaw(opcodes.OP_Packet, data) } // sendRaw sends raw data with protocol opcode func (s *Stream) sendRaw(opcode uint16, data []byte) error { // Build packet: opcode + data + CRC packet := make([]byte, 2+len(data)+2) binary.BigEndian.PutUint16(packet[:2], opcode) copy(packet[2:], data) // Add CRC crc := crypto.CalculateCRC(packet[:len(packet)-2], s.crcKey) binary.BigEndian.PutUint16(packet[len(packet)-2:], crc) // Encrypt if needed if s.cipher != nil { s.cipher.Encrypt(packet) } // Send via gnet atomic.AddUint64(&s.packetsOut, 1) atomic.AddUint64(&s.bytesOut, uint64(len(packet))) return s.conn.AsyncWrite(packet, nil) } // Helper methods func (s *Stream) processPacketData(data []byte) error { // Create ProtoPacket from raw data proto := packets.NewProtoPacketFromRaw(data, -1, s.opcodeManager) // Decompress if needed proto.DecompressPacket() // Convert to AppPacket app := proto.MakeApplicationPacket(s.opcodeSize) // Deliver to callback if s.onPacket != nil { s.onPacket(app) } return nil } func (s *Stream) scheduleAck(seq uint16) { if s.ackTimer == nil { s.ackTimer = time.AfterFunc(s.ackThreshold, func() { s.sendAckImmediate(seq) }) } } func (s *Stream) sendAckImmediate(seq uint16) error { data := make([]byte, 2) binary.BigEndian.PutUint16(data, seq) return s.sendRaw(opcodes.OP_Ack, data) } func (s *Stream) sendKeepalive() { s.sendRaw(opcodes.OP_KeepAlive, nil) // Reschedule if s.keepAliveTimer != nil { s.keepAliveTimer.Reset(10 * time.Second) } } func (s *Stream) resetTimeout() { if s.timeoutTimer != nil { s.timeoutTimer.Reset(30 * time.Second) } } func (s *Stream) appToProto(app *packets.AppPacket) *packets.ProtoPacket { proto := packets.NewProtoPacket(app.Opcode, app.Buffer, s.opcodeManager) proto.CopyInfo(app.Packet) return proto } func (s *Stream) processQueues() error { // Process reliable queue first s.mu.Lock() for len(s.reliableQueue) > 0 { proto := s.reliableQueue[0] s.reliableQueue = s.reliableQueue[1:] s.mu.Unlock() if err := s.sendReliable(proto); err != nil { return err } s.mu.Lock() } s.mu.Unlock() return nil } func (s *Stream) sendFragmented(proto *packets.ProtoPacket) error { // Fragment implementation return nil // TODO } func (s *Stream) handleSessionRequest(data []byte) error { // Session setup s.state.Store(StateConnecting) return nil } func (s *Stream) handleSessionResponse(data []byte) error { s.state.Store(StateEstablished) return nil } func (s *Stream) handleCombined(data []byte) error { // Process combined packets return nil } func (s *Stream) handleAppCombined(data []byte) error { // Process app-combined packets return nil } func (s *Stream) handleDisconnect() error { s.state.Store(StateClosed) if s.onDisconnect != nil { s.onDisconnect() } return nil } // SetPacketCallback sets the callback for received packets func (s *Stream) SetPacketCallback(fn func(*packets.AppPacket)) { s.onPacket = fn } // Close closes the stream func (s *Stream) Close() error { s.state.Store(StateClosed) // Send disconnect s.sendRaw(opcodes.OP_SessionDisconnect, nil) // Clean up timers if s.keepAliveTimer != nil { s.keepAliveTimer.Stop() } if s.ackTimer != nil { s.ackTimer.Stop() } return nil }