package eq2net import ( "context" "encoding/binary" "fmt" "net" "sync" "sync/atomic" "time" ) // StreamState represents the state of an EQStream connection type StreamState int32 const ( StreamStateDisconnected StreamState = iota StreamStateConnecting StreamStateConnected StreamStateDisconnecting StreamStateClosed ) // StreamConfig contains configuration for an EQStream type StreamConfig struct { // Network settings MaxPacketSize int // Maximum packet size (default: 512) WindowSize uint16 // Sliding window size for flow control (default: 2048) RetransmitTimeMs int64 // Initial retransmit time in milliseconds (default: 500) MaxRetransmits int // Maximum retransmission attempts (default: 5) ConnectTimeout time.Duration // Connection timeout (default: 30s) KeepAliveTime time.Duration // Keep-alive interval (default: 5s) // Session settings SessionID uint32 // Session identifier MaxBandwidth uint32 // Maximum bandwidth in bytes/sec (0 = unlimited) CRCKey uint32 // CRC key for packet validation EncodeKey uint32 // Encryption key for chat packets DecodeKey uint32 // Decryption key for chat packets CompressEnable bool // Enable packet compression // Performance settings SendBufferSize int // Size of send buffer (default: 1024) RecvBufferSize int // Size of receive buffer (default: 1024) } // DefaultStreamConfig returns a default configuration func DefaultStreamConfig() *StreamConfig { return &StreamConfig{ MaxPacketSize: 512, WindowSize: 2048, RetransmitTimeMs: 500, MaxRetransmits: 5, ConnectTimeout: 30 * time.Second, KeepAliveTime: 5 * time.Second, SendBufferSize: 1024, RecvBufferSize: 1024, CompressEnable: true, } } // StreamStats tracks stream statistics type StreamStats struct { PacketsSent atomic.Uint64 PacketsReceived atomic.Uint64 BytesSent atomic.Uint64 BytesReceived atomic.Uint64 PacketsDropped atomic.Uint64 Retransmits atomic.Uint64 RTT atomic.Int64 // Round-trip time in microseconds Bandwidth atomic.Uint64 } // EQStream implements reliable UDP communication for EQ2 type EQStream struct { // Configuration config *StreamConfig // Network conn net.PacketConn remoteAddr net.Addr localAddr net.Addr // State management state atomic.Int32 // StreamState sessionID uint32 nextSeqOut atomic.Uint32 nextSeqIn atomic.Uint32 lastAckSeq atomic.Uint32 // Packet queues - using channels for lock-free operations sendQueue chan *EQProtocolPacket recvQueue chan *EQApplicationPacket ackQueue chan uint32 resendQueue chan *EQProtocolPacket fragmentQueue map[uint32][]*EQProtocolPacket // Fragments being assembled // Sliding window for flow control sendWindow map[uint32]*sendPacket sendWindowMu sync.RWMutex recvWindow map[uint32]*EQProtocolPacket recvWindowMu sync.RWMutex // Retransmission management rtt atomic.Int64 // Smoothed RTT in microseconds rttVar atomic.Int64 // RTT variance rto atomic.Int64 // Retransmission timeout // Statistics stats *StreamStats // Lifecycle management ctx context.Context cancel context.CancelFunc wg sync.WaitGroup // Callbacks onConnect func() onDisconnect func(reason string) onError func(error) } // sendPacket tracks packets awaiting acknowledgment type sendPacket struct { packet *EQProtocolPacket sentTime time.Time attempts int nextRetry time.Time } // NewEQStream creates a new EQ2 stream func NewEQStream(config *StreamConfig) *EQStream { if config == nil { config = DefaultStreamConfig() } ctx, cancel := context.WithCancel(context.Background()) s := &EQStream{ config: config, sendQueue: make(chan *EQProtocolPacket, config.SendBufferSize), recvQueue: make(chan *EQApplicationPacket, config.RecvBufferSize), ackQueue: make(chan uint32, 256), resendQueue: make(chan *EQProtocolPacket, 256), fragmentQueue: make(map[uint32][]*EQProtocolPacket), sendWindow: make(map[uint32]*sendPacket), recvWindow: make(map[uint32]*EQProtocolPacket), stats: &StreamStats{}, ctx: ctx, cancel: cancel, } // Initialize state s.state.Store(int32(StreamStateDisconnected)) s.sessionID = config.SessionID // Set initial RTO s.rto.Store(config.RetransmitTimeMs * 1000) // Convert to microseconds return s } // Connect establishes a connection to the remote endpoint func (s *EQStream) Connect(conn net.PacketConn, remoteAddr net.Addr) error { // Check state if !s.compareAndSwapState(StreamStateDisconnected, StreamStateConnecting) { return fmt.Errorf("stream not in disconnected state") } s.conn = conn s.remoteAddr = remoteAddr s.localAddr = conn.LocalAddr() // Start workers s.wg.Add(4) go s.sendWorker() go s.recvWorker() go s.retransmitWorker() go s.keepAliveWorker() // Send session request sessionReq := s.createSessionRequest() if err := s.sendPacket(sessionReq); err != nil { s.Close() return fmt.Errorf("failed to send session request: %w", err) } // Wait for connection or timeout timer := time.NewTimer(s.config.ConnectTimeout) defer timer.Stop() for { select { case <-timer.C: s.Close() return fmt.Errorf("connection timeout") case <-s.ctx.Done(): return fmt.Errorf("connection cancelled") default: if s.GetState() == StreamStateConnected { if s.onConnect != nil { s.onConnect() } return nil } time.Sleep(10 * time.Millisecond) } } } // Send queues an application packet for transmission func (s *EQStream) Send(packet *EQApplicationPacket) error { if s.GetState() != StreamStateConnected { return fmt.Errorf("stream not connected") } // Convert to protocol packet protoPacket := s.applicationToProtocol(packet) select { case s.sendQueue <- protoPacket: return nil case <-s.ctx.Done(): return fmt.Errorf("stream closed") default: return fmt.Errorf("send queue full") } } // Receive gets the next application packet from the receive queue func (s *EQStream) Receive() (*EQApplicationPacket, error) { select { case packet := <-s.recvQueue: return packet, nil case <-s.ctx.Done(): return nil, fmt.Errorf("stream closed") default: return nil, nil // Non-blocking } } // sendWorker handles outgoing packets func (s *EQStream) sendWorker() { defer s.wg.Done() combiner := NewPacketCombiner(s.config.MaxPacketSize - 10) // Leave room for headers/CRC combineTimer := time.NewTicker(1 * time.Millisecond) defer combineTimer.Stop() var pendingPackets []*EQProtocolPacket for { select { case <-s.ctx.Done(): return case packet := <-s.sendQueue: // Add sequence number if needed if s.needsSequence(packet.Opcode) { packet.Sequence = s.nextSeqOut.Add(1) // Track in send window s.sendWindowMu.Lock() s.sendWindow[packet.Sequence] = &sendPacket{ packet: packet.Copy(), sentTime: time.Now(), attempts: 1, nextRetry: time.Now().Add(time.Duration(s.rto.Load()) * time.Microsecond), } s.sendWindowMu.Unlock() } pendingPackets = append(pendingPackets, packet) case packet := <-s.resendQueue: // Priority resend s.sendPacketNow(packet) case <-combineTimer.C: // Send any pending combined packets if len(pendingPackets) > 0 { s.sendCombined(pendingPackets, combiner) pendingPackets = nil } } // Try to combine and send if we have enough packets if len(pendingPackets) >= 3 { s.sendCombined(pendingPackets, combiner) pendingPackets = nil } } } // recvWorker handles incoming packets from the network func (s *EQStream) recvWorker() { defer s.wg.Done() buffer := make([]byte, s.config.MaxPacketSize) for { select { case <-s.ctx.Done(): return default: } // Read from network with timeout s.conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond)) n, addr, err := s.conn.ReadFrom(buffer) if err != nil { if netErr, ok := err.(net.Error); ok && netErr.Timeout() { continue } if s.onError != nil { s.onError(err) } continue } // Verify source address if addr.String() != s.remoteAddr.String() { continue // Ignore packets from other sources } // Process packet s.handleIncomingPacket(buffer[:n]) } } // retransmitWorker handles packet retransmissions func (s *EQStream) retransmitWorker() { defer s.wg.Done() ticker := time.NewTicker(10 * time.Millisecond) defer ticker.Stop() for { select { case <-s.ctx.Done(): return case <-ticker.C: now := time.Now() s.sendWindowMu.Lock() for seq, sp := range s.sendWindow { if now.After(sp.nextRetry) { if sp.attempts >= s.config.MaxRetransmits { // Max retransmits reached, connection is dead delete(s.sendWindow, seq) s.stats.PacketsDropped.Add(1) continue } // Retransmit sp.attempts++ sp.nextRetry = now.Add(time.Duration(s.rto.Load()) * time.Microsecond * time.Duration(sp.attempts)) s.stats.Retransmits.Add(1) // Queue for immediate send select { case s.resendQueue <- sp.packet: default: // Resend queue full, try next time } } } s.sendWindowMu.Unlock() } } } // keepAliveWorker sends periodic keep-alive packets func (s *EQStream) keepAliveWorker() { defer s.wg.Done() ticker := time.NewTicker(s.config.KeepAliveTime) defer ticker.Stop() for { select { case <-s.ctx.Done(): return case <-ticker.C: if s.GetState() == StreamStateConnected { keepAlive := NewEQProtocolPacket(OPKeepAlive, nil) select { case s.sendQueue <- keepAlive: default: // Queue full, skip this keep-alive } } } } } // Helper methods func (s *EQStream) GetState() StreamState { return StreamState(s.state.Load()) } func (s *EQStream) compareAndSwapState(old, new StreamState) bool { return s.state.CompareAndSwap(int32(old), int32(new)) } func (s *EQStream) createSessionRequest() *EQProtocolPacket { data := make([]byte, 10) binary.BigEndian.PutUint32(data[0:4], 2) // Protocol version binary.BigEndian.PutUint32(data[4:8], s.sessionID) binary.BigEndian.PutUint16(data[8:10], s.config.WindowSize) return NewEQProtocolPacket(OPSessionRequest, data) } func (s *EQStream) needsSequence(opcode uint16) bool { switch opcode { case OPPacket, OPFragment: return true default: return false } } func (s *EQStream) sendPacket(packet *EQProtocolPacket) error { // Check if connection is established if s.conn == nil { return fmt.Errorf("no connection") } // Prepare packet data data := packet.Serialize(0) // Add CRC if not exempt if !s.isCRCExempt(packet.Opcode) { data = AppendCRC(data, s.config.CRCKey) } // Send to network n, err := s.conn.WriteTo(data, s.remoteAddr) if err != nil { return err } // Update statistics s.stats.PacketsSent.Add(1) s.stats.BytesSent.Add(uint64(n)) return nil } func (s *EQStream) sendPacketNow(packet *EQProtocolPacket) { if err := s.sendPacket(packet); err != nil && s.onError != nil { s.onError(err) } } func (s *EQStream) sendCombined(packets []*EQProtocolPacket, combiner *PacketCombiner) { if len(packets) == 1 { s.sendPacketNow(packets[0]) return } combined := combiner.CombineProtocolPackets(packets) if combined != nil { s.sendPacketNow(combined) } else { // Couldn't combine, send individually for _, p := range packets { s.sendPacketNow(p) } } } func (s *EQStream) isCRCExempt(opcode uint16) bool { switch opcode { case OPSessionRequest, OPSessionResponse, OPOutOfSession: return true default: return false } } func (s *EQStream) applicationToProtocol(app *EQApplicationPacket) *EQProtocolPacket { // Serialize application packet data := app.Serialize() // Create protocol packet proto := NewEQProtocolPacket(OPPacket, data) proto.CopyInfo(app.EQPacket) // Apply compression if enabled if s.config.CompressEnable && len(data) > CompressionThreshold { if compressed, err := CompressPacket(data); err == nil { proto.Buffer = compressed proto.Size = uint32(len(compressed)) proto.Compressed = true } } return proto } // handleIncomingPacket is implemented in stream_packet_handler.go // Close gracefully shuts down the stream func (s *EQStream) Close() error { if !s.compareAndSwapState(StreamStateConnected, StreamStateDisconnecting) && !s.compareAndSwapState(StreamStateConnecting, StreamStateDisconnecting) { return nil // Already closing or closed } // Send disconnect packet if we have a connection if s.conn != nil { disconnect := NewEQProtocolPacket(OPSessionDisconnect, nil) s.sendPacketNow(disconnect) } // Cancel context to stop workers s.cancel() // Wait for workers to finish done := make(chan struct{}) go func() { s.wg.Wait() close(done) }() select { case <-done: case <-time.After(5 * time.Second): // Force close after timeout } s.state.Store(int32(StreamStateClosed)) if s.onDisconnect != nil { s.onDisconnect("closed") } return nil } // GetStats returns a copy of the current statistics func (s *EQStream) GetStats() StreamStats { return StreamStats{ PacketsSent: atomic.Uint64{}, PacketsReceived: atomic.Uint64{}, BytesSent: atomic.Uint64{}, BytesReceived: atomic.Uint64{}, PacketsDropped: atomic.Uint64{}, Retransmits: atomic.Uint64{}, RTT: atomic.Int64{}, Bandwidth: atomic.Uint64{}, } } // SetCallbacks sets the stream event callbacks func (s *EQStream) SetCallbacks(onConnect func(), onDisconnect func(string), onError func(error)) { s.onConnect = onConnect s.onDisconnect = onDisconnect s.onError = onError }