package packets import ( "sync" "time" "eq2emu/internal/opcodes" ) type ConnectionState uint8 const ( StateConnecting ConnectionState = iota StateEstablished StateClosing StateClosed ) type ReliablePacket struct { Sequence uint16 Data []byte SentTime time.Time Attempts int Acked bool } type ReliabilityManager struct { sessionID uint32 state ConnectionState nextSeq uint16 lastAcked uint16 // Outbound reliability resendQueue map[uint16]*ReliablePacket resendTimer *time.Timer // Inbound reliability expectedSeq uint16 outOfOrder map[uint16][]byte // Configuration maxRetries int retryDelay time.Duration ackDelay time.Duration mu sync.RWMutex onSend func([]byte) onReceive func([]byte) } func NewReliabilityManager(sessionID uint32) *ReliabilityManager { rm := &ReliabilityManager{ sessionID: sessionID, state: StateConnecting, resendQueue: make(map[uint16]*ReliablePacket), outOfOrder: make(map[uint16][]byte), maxRetries: 5, retryDelay: time.Millisecond * 500, ackDelay: time.Millisecond * 200, } rm.startResendTimer() return rm } func (rm *ReliabilityManager) SetCallbacks(onSend, onReceive func([]byte)) { rm.mu.Lock() defer rm.mu.Unlock() rm.onSend = onSend rm.onReceive = onReceive } func (rm *ReliabilityManager) SetState(state ConnectionState) { rm.mu.Lock() defer rm.mu.Unlock() rm.state = state } func (rm *ReliabilityManager) GetState() ConnectionState { rm.mu.RLock() defer rm.mu.RUnlock() return rm.state } // Send packet with reliability func (rm *ReliabilityManager) SendReliable(data []byte) uint16 { rm.mu.Lock() defer rm.mu.Unlock() seq := rm.nextSeq rm.nextSeq++ packet := &ReliablePacket{ Sequence: seq, Data: make([]byte, len(data)), SentTime: time.Now(), Attempts: 1, } copy(packet.Data, data) rm.resendQueue[seq] = packet if rm.onSend != nil { rm.onSend(data) } return seq } // Send unreliable packet func (rm *ReliabilityManager) SendUnreliable(data []byte) { rm.mu.RLock() onSend := rm.onSend rm.mu.RUnlock() if onSend != nil { onSend(data) } } // Process incoming packet func (rm *ReliabilityManager) ProcessIncoming(data []byte) { if len(data) < 1 { return } opcode := opcodes.ProtocolOpcode(data[0]) switch opcode { case opcodes.OP_Ack: rm.handleAck(data) case opcodes.OP_Packet: rm.handleSequencedPacket(data) case opcodes.OP_Combined: rm.handleSequencedPacket(data) default: // Unreliable packet rm.deliverPacket(data) } } func (rm *ReliabilityManager) handleAck(data []byte) { if len(data) < 3 { return } seq := uint16(data[1]) | (uint16(data[2]) << 8) rm.mu.Lock() defer rm.mu.Unlock() if packet, exists := rm.resendQueue[seq]; exists { packet.Acked = true delete(rm.resendQueue, seq) if seq > rm.lastAcked { rm.lastAcked = seq } } } func (rm *ReliabilityManager) handleSequencedPacket(data []byte) { if len(data) < 4 { return } // Extract sequence (bytes 1-2) seq := uint16(data[1]) | (uint16(data[2]) << 8) rm.mu.Lock() defer rm.mu.Unlock() // Send ACK rm.sendAck(seq) if seq == rm.expectedSeq { // In order - deliver immediately rm.deliverPacketLocked(data) rm.expectedSeq++ // Check for buffered packets rm.processBufferedPackets() } else if seq > rm.expectedSeq { // Out of order - buffer it buffered := make([]byte, len(data)) copy(buffered, data) rm.outOfOrder[seq] = buffered } // Ignore old/duplicate packets (seq < expectedSeq) } func (rm *ReliabilityManager) processBufferedPackets() { for rm.outOfOrder[rm.expectedSeq] != nil { data := rm.outOfOrder[rm.expectedSeq] delete(rm.outOfOrder, rm.expectedSeq) rm.deliverPacketLocked(data) rm.expectedSeq++ } } func (rm *ReliabilityManager) sendAck(seq uint16) { ack := []byte{ byte(opcodes.OP_Ack), byte(seq & 0xFF), byte((seq >> 8) & 0xFF), } if rm.onSend != nil { go rm.onSend(ack) } } func (rm *ReliabilityManager) deliverPacket(data []byte) { rm.mu.RLock() onReceive := rm.onReceive rm.mu.RUnlock() if onReceive != nil { onReceive(data) } } func (rm *ReliabilityManager) deliverPacketLocked(data []byte) { if rm.onReceive != nil { // Make copy since we're calling from locked context delivered := make([]byte, len(data)) copy(delivered, data) go rm.onReceive(delivered) } } func (rm *ReliabilityManager) startResendTimer() { rm.resendTimer = time.AfterFunc(rm.retryDelay, func() { rm.checkResends() rm.startResendTimer() }) } func (rm *ReliabilityManager) checkResends() { rm.mu.Lock() defer rm.mu.Unlock() now := time.Now() var toResend []*ReliablePacket for seq, packet := range rm.resendQueue { if packet.Acked { continue } if now.Sub(packet.SentTime) > rm.retryDelay { if packet.Attempts >= rm.maxRetries { // Give up on this packet delete(rm.resendQueue, seq) continue } packet.Attempts++ packet.SentTime = now toResend = append(toResend, packet) } } // Send outside of lock if len(toResend) > 0 && rm.onSend != nil { go func() { for _, packet := range toResend { rm.onSend(packet.Data) } }() } } func (rm *ReliabilityManager) Close() { rm.mu.Lock() defer rm.mu.Unlock() if rm.resendTimer != nil { rm.resendTimer.Stop() } rm.state = StateClosed rm.resendQueue = make(map[uint16]*ReliablePacket) rm.outOfOrder = make(map[uint16][]byte) } // Get stats for monitoring func (rm *ReliabilityManager) GetStats() (int, int, int) { rm.mu.RLock() defer rm.mu.RUnlock() pending := len(rm.resendQueue) buffered := len(rm.outOfOrder) nextSeq := int(rm.nextSeq) return pending, buffered, nextSeq }