1
0
EQ2Emu/internal/packets/reliability.go

299 lines
5.5 KiB
Go

package packets
import (
"sync"
"time"
"eq2emu/internal/opcodes"
)
type ConnectionState uint8
const (
StateConnecting ConnectionState = iota
StateEstablished
StateClosing
StateClosed
)
type ReliablePacket struct {
Sequence uint16
Data []byte
SentTime time.Time
Attempts int
Acked bool
}
type ReliabilityManager struct {
sessionID uint32
state ConnectionState
nextSeq uint16
lastAcked uint16
// Outbound reliability
resendQueue map[uint16]*ReliablePacket
resendTimer *time.Timer
// Inbound reliability
expectedSeq uint16
outOfOrder map[uint16][]byte
// Configuration
maxRetries int
retryDelay time.Duration
ackDelay time.Duration
mu sync.RWMutex
onSend func([]byte)
onReceive func([]byte)
}
func NewReliabilityManager(sessionID uint32) *ReliabilityManager {
rm := &ReliabilityManager{
sessionID: sessionID,
state: StateConnecting,
resendQueue: make(map[uint16]*ReliablePacket),
outOfOrder: make(map[uint16][]byte),
maxRetries: 5,
retryDelay: time.Millisecond * 500,
ackDelay: time.Millisecond * 200,
}
rm.startResendTimer()
return rm
}
func (rm *ReliabilityManager) SetCallbacks(onSend, onReceive func([]byte)) {
rm.mu.Lock()
defer rm.mu.Unlock()
rm.onSend = onSend
rm.onReceive = onReceive
}
func (rm *ReliabilityManager) SetState(state ConnectionState) {
rm.mu.Lock()
defer rm.mu.Unlock()
rm.state = state
}
func (rm *ReliabilityManager) GetState() ConnectionState {
rm.mu.RLock()
defer rm.mu.RUnlock()
return rm.state
}
// Send packet with reliability
func (rm *ReliabilityManager) SendReliable(data []byte) uint16 {
rm.mu.Lock()
defer rm.mu.Unlock()
seq := rm.nextSeq
rm.nextSeq++
packet := &ReliablePacket{
Sequence: seq,
Data: make([]byte, len(data)),
SentTime: time.Now(),
Attempts: 1,
}
copy(packet.Data, data)
rm.resendQueue[seq] = packet
if rm.onSend != nil {
rm.onSend(data)
}
return seq
}
// Send unreliable packet
func (rm *ReliabilityManager) SendUnreliable(data []byte) {
rm.mu.RLock()
onSend := rm.onSend
rm.mu.RUnlock()
if onSend != nil {
onSend(data)
}
}
// Process incoming packet
func (rm *ReliabilityManager) ProcessIncoming(data []byte) {
if len(data) < 1 {
return
}
opcode := opcodes.ProtocolOpcode(data[0])
switch opcode {
case opcodes.OP_Ack:
rm.handleAck(data)
case opcodes.OP_Packet:
rm.handleSequencedPacket(data)
case opcodes.OP_Combined:
rm.handleSequencedPacket(data)
default:
// Unreliable packet
rm.deliverPacket(data)
}
}
func (rm *ReliabilityManager) handleAck(data []byte) {
if len(data) < 3 {
return
}
seq := uint16(data[1]) | (uint16(data[2]) << 8)
rm.mu.Lock()
defer rm.mu.Unlock()
if packet, exists := rm.resendQueue[seq]; exists {
packet.Acked = true
delete(rm.resendQueue, seq)
if seq > rm.lastAcked {
rm.lastAcked = seq
}
}
}
func (rm *ReliabilityManager) handleSequencedPacket(data []byte) {
if len(data) < 4 {
return
}
// Extract sequence (bytes 1-2)
seq := uint16(data[1]) | (uint16(data[2]) << 8)
rm.mu.Lock()
defer rm.mu.Unlock()
// Send ACK
rm.sendAck(seq)
if seq == rm.expectedSeq {
// In order - deliver immediately
rm.deliverPacketLocked(data)
rm.expectedSeq++
// Check for buffered packets
rm.processBufferedPackets()
} else if seq > rm.expectedSeq {
// Out of order - buffer it
buffered := make([]byte, len(data))
copy(buffered, data)
rm.outOfOrder[seq] = buffered
}
// Ignore old/duplicate packets (seq < expectedSeq)
}
func (rm *ReliabilityManager) processBufferedPackets() {
for rm.outOfOrder[rm.expectedSeq] != nil {
data := rm.outOfOrder[rm.expectedSeq]
delete(rm.outOfOrder, rm.expectedSeq)
rm.deliverPacketLocked(data)
rm.expectedSeq++
}
}
func (rm *ReliabilityManager) sendAck(seq uint16) {
ack := []byte{
byte(opcodes.OP_Ack),
byte(seq & 0xFF),
byte((seq >> 8) & 0xFF),
}
if rm.onSend != nil {
go rm.onSend(ack)
}
}
func (rm *ReliabilityManager) deliverPacket(data []byte) {
rm.mu.RLock()
onReceive := rm.onReceive
rm.mu.RUnlock()
if onReceive != nil {
onReceive(data)
}
}
func (rm *ReliabilityManager) deliverPacketLocked(data []byte) {
if rm.onReceive != nil {
// Make copy since we're calling from locked context
delivered := make([]byte, len(data))
copy(delivered, data)
go rm.onReceive(delivered)
}
}
func (rm *ReliabilityManager) startResendTimer() {
rm.resendTimer = time.AfterFunc(rm.retryDelay, func() {
rm.checkResends()
rm.startResendTimer()
})
}
func (rm *ReliabilityManager) checkResends() {
rm.mu.Lock()
defer rm.mu.Unlock()
now := time.Now()
var toResend []*ReliablePacket
for seq, packet := range rm.resendQueue {
if packet.Acked {
continue
}
if now.Sub(packet.SentTime) > rm.retryDelay {
if packet.Attempts >= rm.maxRetries {
// Give up on this packet
delete(rm.resendQueue, seq)
continue
}
packet.Attempts++
packet.SentTime = now
toResend = append(toResend, packet)
}
}
// Send outside of lock
if len(toResend) > 0 && rm.onSend != nil {
go func() {
for _, packet := range toResend {
rm.onSend(packet.Data)
}
}()
}
}
func (rm *ReliabilityManager) Close() {
rm.mu.Lock()
defer rm.mu.Unlock()
if rm.resendTimer != nil {
rm.resendTimer.Stop()
}
rm.state = StateClosed
rm.resendQueue = make(map[uint16]*ReliablePacket)
rm.outOfOrder = make(map[uint16][]byte)
}
// Get stats for monitoring
func (rm *ReliabilityManager) GetStats() (int, int, int) {
rm.mu.RLock()
defer rm.mu.RUnlock()
pending := len(rm.resendQueue)
buffered := len(rm.outOfOrder)
nextSeq := int(rm.nextSeq)
return pending, buffered, nextSeq
}