1
0
Protocol/stream/stream.go

1171 lines
27 KiB
Go

package stream
import (
"encoding/binary"
"fmt"
"sync"
"sync/atomic"
"time"
"git.sharkk.net/EQ2/Protocol/crypto"
"git.sharkk.net/EQ2/Protocol/opcodes"
"git.sharkk.net/EQ2/Protocol/packets"
"github.com/panjf2000/gnet/v2"
)
// Stream implements EQ2's reliable UDP protocol (matches EQStream)
type Stream struct {
conn gnet.Conn
mu sync.RWMutex
// Connection state
state atomic.Value // StreamState
sessionID uint32
crcKey uint32
maxLen uint16
encodeKey int
decodeKey int
// Sequence management with wraparound handling
seqOut uint16
seqIn uint16
seqLastAck uint16
seqExpected uint16
// Acknowledgment tracking
pendingAcks map[uint16]*pendingPacket
ackTimer *time.Timer
lastAckSent time.Time
ackThreshold time.Duration
ackQueue []uint16
duplicateAckCnt map[uint16]int
// Fragment assembly with expiry
fragments map[uint16]*fragmentBuffer
currentFragment *fragmentBuffer
// Out of order handling with expiry
outOfOrder map[uint16]*outOfOrderPacket
// Combined packet for efficient sending
combinedPacket *packets.ProtoPacket
// Packet queues
reliableQueue []*packets.ProtoPacket
unreliableQueue []*packets.ProtoPacket
resendQueue []*pendingPacket
// Opcode management
opcodeManager opcodes.Manager
opcodeSize uint8
clientVersion int16
// Cipher for encryption
cipher *crypto.Ciphers
// Timers
keepAliveTimer *time.Timer
timeoutTimer *time.Timer
retransmitTimer *time.Timer
cleanupTimer *time.Timer
combineTimer *time.Timer
// Retransmission settings
rtt time.Duration
rttVar time.Duration
rto time.Duration
minRTO time.Duration
maxRTO time.Duration
maxRetries int
// Stats
packetsOut uint64
packetsIn uint64
bytesOut uint64
bytesIn uint64
retransmits uint64
// Callbacks
onPacket func(*packets.AppPacket)
onDisconnect func()
}
type pendingPacket struct {
packet *packets.ProtoPacket
seq uint16
sentTime time.Time
attempts int
nextRetry time.Time
}
type fragmentBuffer struct {
totalSize uint32
chunks map[uint16][]byte
received uint32
startTime time.Time
}
type outOfOrderPacket struct {
data []byte
timestamp time.Time
}
type StreamState int
const (
StateDisconnected StreamState = iota
StateConnecting
StateEstablished
StateClosing
StateClosed
)
const (
retransmitInterval = 100 * time.Millisecond
cleanupInterval = 5 * time.Second
fragmentTimeout = 30 * time.Second
outOfOrderTimeout = 10 * time.Second
duplicateAckThreshold = 3
combineFlushInterval = 10 * time.Millisecond
)
// Config holds stream configuration
type Config struct {
SessionID uint32
CRCKey uint32
MaxPacketSize uint16
EncodeKey int
DecodeKey int
OpcodeManager opcodes.Manager
OpcodeSize uint8
AckThreshold time.Duration
KeepaliveTime time.Duration
TimeoutDuration time.Duration
}
// NewStream creates a new EQStream instance
func NewStream(conn gnet.Conn, cfg *Config) *Stream {
s := &Stream{
conn: conn,
sessionID: cfg.SessionID,
crcKey: cfg.CRCKey,
maxLen: cfg.MaxPacketSize,
encodeKey: cfg.EncodeKey,
decodeKey: cfg.DecodeKey,
opcodeManager: cfg.OpcodeManager,
opcodeSize: cfg.OpcodeSize,
ackThreshold: cfg.AckThreshold,
pendingAcks: make(map[uint16]*pendingPacket),
duplicateAckCnt: make(map[uint16]int),
fragments: make(map[uint16]*fragmentBuffer),
outOfOrder: make(map[uint16]*outOfOrderPacket),
ackQueue: make([]uint16, 0),
seqOut: 0,
seqIn: 0,
seqExpected: 0,
rtt: time.Second,
rttVar: 500 * time.Millisecond,
rto: time.Second,
minRTO: 200 * time.Millisecond,
maxRTO: 10 * time.Second,
maxRetries: 10,
}
if s.maxLen == 0 {
s.maxLen = packets.DefaultMTU
}
if s.ackThreshold == 0 {
s.ackThreshold = 200 * time.Millisecond
}
s.state.Store(StateDisconnected)
if cfg.EncodeKey != 0 || cfg.DecodeKey != 0 {
cipher, _ := crypto.NewCiphers(int64(cfg.EncodeKey))
s.cipher = cipher
}
// Start timers
if cfg.KeepaliveTime > 0 {
s.keepAliveTimer = time.AfterFunc(cfg.KeepaliveTime, s.sendKeepalive)
}
if cfg.TimeoutDuration > 0 {
s.timeoutTimer = time.AfterFunc(cfg.TimeoutDuration, s.handleTimeout)
}
s.retransmitTimer = time.AfterFunc(retransmitInterval, s.processRetransmits)
s.cleanupTimer = time.AfterFunc(cleanupInterval, s.cleanup)
s.combineTimer = time.AfterFunc(combineFlushInterval, s.flushCombined)
return s
}
// Process handles incoming data with proper CRC validation
func (s *Stream) Process(data []byte) error {
if len(data) < 2 {
return nil
}
// Validate and strip CRC16
if len(data) > 2 {
if !packets.ValidateCRC(data, s.crcKey) {
return nil
}
data = packets.StripCRC(data)
}
// Decrypt if needed
if s.cipher != nil {
s.cipher.Decrypt(data)
}
if len(data) < 2 {
return nil
}
opcode := binary.BigEndian.Uint16(data[:2])
switch opcode {
case opcodes.OP_SessionRequest:
return s.handleSessionRequest(data[2:])
case opcodes.OP_SessionResponse:
return s.handleSessionResponse(data[2:])
case opcodes.OP_Packet:
return s.handlePacket(data[2:])
case opcodes.OP_Fragment:
return s.handleFragment(data[2:])
case opcodes.OP_Ack:
return s.handleAck(data[2:])
case opcodes.OP_Combined:
return s.handleCombined(data[2:])
case opcodes.OP_AppCombined:
return s.handleAppCombined(data[2:])
case opcodes.OP_KeepAlive:
s.resetTimeout()
return nil
case opcodes.OP_SessionDisconnect:
return s.handleDisconnect()
case opcodes.OP_OutOfOrderAck:
return s.handleOutOfOrderAck(data[2:])
}
atomic.AddUint64(&s.packetsIn, 1)
atomic.AddUint64(&s.bytesIn, uint64(len(data)))
return nil
}
// SendPacket sends an application packet with proper preparation
func (s *Stream) SendPacket(app *packets.AppPacket) error {
if s.state.Load() != StateEstablished {
return fmt.Errorf("stream not established")
}
// Convert to EQ2 protocol packet
proto := packets.NewEQ2Packet(app.GetOpcode(), app.Buffer, s.opcodeManager)
proto.SetVersion(s.clientVersion)
// Check if packet needs fragmentation BEFORE preparation
// C++ checks size > (MaxLen - 8) for: proto-op(2), seq(2), app-op(2) ... data ... crc(2)
if len(app.Buffer) > int(s.maxLen-8) {
return s.sendFragmented(proto)
}
// Try to combine with pending combined packet
s.mu.Lock()
defer s.mu.Unlock()
if s.combinedPacket != nil {
if s.combinedPacket.AppCombine(proto) {
if s.combinedPacket.Size() > uint32(s.maxLen/2) {
s.flushCombinedPacketLocked()
}
return nil
}
s.flushCombinedPacketLocked()
}
// Queue based on reliability
isUnreliable := s.isUnreliableOpcode(app.GetOpcode())
if isUnreliable {
s.unreliableQueue = append(s.unreliableQueue, proto)
} else {
// Start new combined packet if small enough
if proto.Size() < uint32(s.maxLen/4) {
s.combinedPacket = proto
s.combineTimer.Reset(combineFlushInterval)
} else {
s.reliableQueue = append(s.reliableQueue, proto)
}
}
return s.processQueues()
}
// flushCombined timer callback
func (s *Stream) flushCombined() {
s.mu.Lock()
s.flushCombinedPacketLocked()
s.mu.Unlock()
s.processQueues()
s.combineTimer.Reset(combineFlushInterval)
}
// flushCombinedPacketLocked sends any pending combined packet (must hold lock)
func (s *Stream) flushCombinedPacketLocked() {
if s.combinedPacket != nil {
s.reliableQueue = append(s.reliableQueue, s.combinedPacket)
s.combinedPacket = nil
}
}
// processQueues processes all packet queues
func (s *Stream) processQueues() error {
s.mu.Lock()
defer s.mu.Unlock()
// Process resends first
for len(s.resendQueue) > 0 {
pending := s.resendQueue[0]
s.resendQueue = s.resendQueue[1:]
if time.Now().After(pending.nextRetry) {
// Resend already prepared packet
proto := pending.packet
var encryptOffset int8
if proto.LoginOp != opcodes.OP_Unknown {
encryptOffset = proto.PreparePacket(s.maxLen)
}
data := make([]byte, len(proto.Buffer)+2)
binary.BigEndian.PutUint16(data[:2], pending.seq)
copy(data[2:], proto.Buffer)
// Apply encryption if needed (BEFORE CRC, with proper offset)
if s.cipher != nil && len(data) > 2 {
if proto.IsCompressed() {
// Compressed packet: encrypt from offset 3
if len(data) > 3 {
s.cipher.Encrypt(data[3:])
}
} else {
// Uncompressed packet: encrypt from 2 + offset
offset := 2 + int(encryptOffset)
if len(data) > offset {
s.cipher.Encrypt(data[offset:])
}
}
}
pending.attempts++
pending.sentTime = time.Now()
pending.nextRetry = time.Now().Add(s.rto * time.Duration(pending.attempts))
if pending.attempts > s.maxRetries {
delete(s.pendingAcks, pending.seq)
go s.handleTimeout()
return fmt.Errorf("max retransmissions exceeded")
}
s.mu.Unlock()
atomic.AddUint64(&s.retransmits, 1)
s.sendRawWithCRC(opcodes.OP_Packet, data)
s.mu.Lock()
} else {
s.resendQueue = append(s.resendQueue, pending)
}
}
// Process reliable queue
for len(s.reliableQueue) > 0 {
proto := s.reliableQueue[0]
s.reliableQueue = s.reliableQueue[1:]
s.mu.Unlock()
if err := s.sendReliable(proto); err != nil {
return err
}
s.mu.Lock()
}
// Process unreliable queue
for len(s.unreliableQueue) > 0 {
proto := s.unreliableQueue[0]
s.unreliableQueue = s.unreliableQueue[1:]
// Prepare unreliable packets too
var encryptOffset int8
if proto.LoginOp != opcodes.OP_Unknown {
encryptOffset = proto.PreparePacket(s.maxLen)
}
data := make([]byte, len(proto.Buffer))
copy(data, proto.Buffer)
// Apply encryption if needed (BEFORE CRC)
if s.cipher != nil && len(data) > 0 {
if proto.IsCompressed() {
// Compressed: encrypt from offset 1 (skip compress flag)
if len(data) > 1 {
s.cipher.Encrypt(data[1:])
}
} else if encryptOffset > 0 {
// Uncompressed: encrypt from offset
if len(data) > int(encryptOffset) {
s.cipher.Encrypt(data[encryptOffset:])
}
}
}
s.mu.Unlock()
s.sendRawWithCRC(proto.Opcode, data)
s.mu.Lock()
}
s.mu.Unlock()
s.sendPendingAcks()
return nil
}
// sendFragmented sends a fragmented packet (matches EQStream::SendPacket)
func (s *Stream) sendFragmented(proto *packets.ProtoPacket) error {
// Prepare packet first
encryptOffset := proto.PreparePacket(s.maxLen)
if encryptOffset < 0 {
return fmt.Errorf("failed to prepare packet")
}
// Encrypt the prepared packet data BEFORE fragmentation
data := proto.Buffer
if s.cipher != nil && len(data) > 0 {
if proto.IsCompressed() {
// Compressed: encrypt from offset 1 (skip compress flag)
if len(data) > 1 {
s.cipher.Encrypt(data[1:])
}
} else if encryptOffset > 0 {
// Uncompressed: encrypt from offset
if len(data) > int(encryptOffset) {
s.cipher.Encrypt(data[encryptOffset:])
}
}
}
length := uint32(len(data))
// First fragment with total length
out := packets.NewProtoPacket(opcodes.OP_Fragment, nil, s.opcodeManager)
out.Buffer = make([]byte, s.maxLen-4)
binary.BigEndian.PutUint32(out.Buffer[:4], length)
used := copy(out.Buffer[4:], data)
out.Buffer = out.Buffer[:4+used]
// Fragment packets are sent without additional encryption
if err := s.sendReliableFragment(out); err != nil {
return err
}
// Send remaining fragments
pos := used
for pos < int(length) {
chunkSize := int(length) - pos
if chunkSize > int(s.maxLen)-4 {
chunkSize = int(s.maxLen) - 4
}
frag := packets.NewProtoPacket(opcodes.OP_Fragment, data[pos:pos+chunkSize], s.opcodeManager)
if err := s.sendReliableFragment(frag); err != nil {
return err
}
pos += chunkSize
}
return nil
}
// sendReliableFragment sends a fragment packet (already encrypted data)
func (s *Stream) sendReliableFragment(proto *packets.ProtoPacket) error {
s.mu.Lock()
seq := s.seqOut
s.seqOut = s.incrementSequence(s.seqOut)
s.mu.Unlock()
// Build packet with sequence (data is already encrypted)
data := make([]byte, len(proto.Buffer)+2)
binary.BigEndian.PutUint16(data[:2], seq)
copy(data[2:], proto.Buffer)
pending := &pendingPacket{
packet: proto.Copy(),
seq: seq,
sentTime: time.Now(),
attempts: 1,
nextRetry: time.Now().Add(s.rto),
}
s.mu.Lock()
s.pendingAcks[seq] = pending
s.mu.Unlock()
return s.sendRawWithCRC(opcodes.OP_Packet, data)
}
// sendReliable sends a packet reliably with sequencing (matches C++ flow)
func (s *Stream) sendReliable(proto *packets.ProtoPacket) error {
// Prepare packet (matches C++ PreparePacket)
if !proto.IsPrepared() {
// C++ checks (app->PreparePacket(MaxLen) == 255) but PreparePacket returns int8(-1)
// Since -1 != 255 as int8, this check would never trigger in C++
// For 1:1 compatibility, we don't check the return value
proto.PreparePacket(s.maxLen)
}
// Compress if needed (matches C++ EQ2_Compress)
compressedOffset := int8(0)
if !proto.IsCompressed() && proto.Size() > 128 {
compressedOffset = proto.EQ2Compress(3) // Default offset 3
}
// Encrypt if needed (matches C++ EncryptPacket)
if s.cipher != nil && !proto.IsEncrypted() {
proto.EncryptPacket(s.cipher, compressedOffset, 0)
}
// Assign sequence number (matches C++ SequencedPush)
s.mu.Lock()
seq := s.seqOut
// Set sequence in packet buffer (first 2 bytes)
binary.BigEndian.PutUint16(proto.Buffer[:2], seq)
proto.SetSequence(int32(seq))
s.seqOut = s.incrementSequence(s.seqOut)
pending := &pendingPacket{
packet: proto.Copy(),
seq: seq,
sentTime: time.Now(),
attempts: 1,
nextRetry: time.Now().Add(s.rto),
}
s.pendingAcks[seq] = pending
s.mu.Unlock()
// Send via WritePacket equivalent
return s.writePacket(proto)
}
// writePacket sends a protocol packet (matches C++ WritePacket)
func (s *Stream) writePacket(p *packets.ProtoPacket) error {
// Serialize packet
data := make([]byte, p.Size()+2)
length := p.Serialize(data, 0)
// Add CRC for non-session packets
if p.Opcode != opcodes.OP_SessionRequest && p.Opcode != opcodes.OP_SessionResponse {
data = data[:length]
data = packets.AppendCRC(data, s.crcKey)
}
atomic.AddUint64(&s.packetsOut, 1)
atomic.AddUint64(&s.bytesOut, uint64(len(data)))
return s.conn.AsyncWrite(data, nil)
}
// Helper methods
func (s *Stream) isUnreliableOpcode(emuOp opcodes.EmuOpcode) bool {
switch emuOp {
case opcodes.OP_UpdatePositionMsg,
opcodes.OP_WorldPingMsg:
return true
default:
return false
}
}
func (s *Stream) incrementSequence(seq uint16) uint16 {
if seq == 0xFFFF {
return 0
}
return seq + 1
}
func (s *Stream) isSequenceAhead(seq, base uint16) bool {
diff := seq - base
return diff > 0 && diff < 0x8000
}
// Protocol handlers
func (s *Stream) handleSessionRequest(data []byte) error {
if len(data) < 10 {
return fmt.Errorf("session request too small")
}
version := binary.BigEndian.Uint32(data[:4])
sessionID := binary.BigEndian.Uint32(data[4:8])
maxLen := binary.BigEndian.Uint16(data[8:10])
s.mu.Lock()
s.sessionID = sessionID
if maxLen < s.maxLen {
s.maxLen = maxLen
}
s.mu.Unlock()
response := make([]byte, 15)
binary.BigEndian.PutUint32(response[0:4], sessionID)
binary.BigEndian.PutUint32(response[4:8], s.crcKey)
response[8] = 2
binary.BigEndian.PutUint16(response[9:11], s.maxLen)
binary.BigEndian.PutUint32(response[11:15], version)
s.state.Store(StateEstablished)
// Session packets don't have CRC or encryption (matches C++)
return s.sendRaw(opcodes.OP_SessionResponse, response)
}
func (s *Stream) handleSessionResponse(data []byte) error {
if len(data) < 14 {
return fmt.Errorf("session response too small")
}
sessionID := binary.BigEndian.Uint32(data[0:4])
crcKey := binary.BigEndian.Uint32(data[4:8])
maxLen := binary.BigEndian.Uint16(data[9:11])
s.mu.Lock()
s.sessionID = sessionID
s.crcKey = crcKey
if maxLen < s.maxLen {
s.maxLen = maxLen
}
s.mu.Unlock()
s.state.Store(StateEstablished)
return nil
}
func (s *Stream) handlePacket(data []byte) error {
if len(data) < 2 {
return nil
}
seq := binary.BigEndian.Uint16(data[:2])
data = data[2:]
s.mu.Lock()
defer s.mu.Unlock()
if seq == s.seqExpected {
s.seqExpected = s.incrementSequence(s.seqExpected)
s.ackQueue = append(s.ackQueue, seq)
if err := s.processPacketData(data); err != nil {
return err
}
// Process any buffered out-of-order packets
for {
if buffered, exists := s.outOfOrder[s.seqExpected]; exists {
delete(s.outOfOrder, s.seqExpected)
s.ackQueue = append(s.ackQueue, s.seqExpected)
s.seqExpected = s.incrementSequence(s.seqExpected)
if err := s.processPacketData(buffered.data); err != nil {
return err
}
} else {
break
}
}
} else if s.isSequenceAhead(seq, s.seqExpected) {
// Out of order - buffer it
s.outOfOrder[seq] = &outOfOrderPacket{
data: append([]byte(nil), data...),
timestamp: time.Now(),
}
go s.sendOutOfOrderAck(seq)
} else {
// Duplicate packet - just ack it
go s.sendAckImmediate(seq)
}
s.scheduleAck()
return nil
}
func (s *Stream) handleFragment(data []byte) error {
if len(data) < 2 {
return nil
}
seq := binary.BigEndian.Uint16(data[:2])
data = data[2:]
s.mu.Lock()
defer s.mu.Unlock()
// Check if this is start of new fragment stream
if s.currentFragment == nil && len(data) >= 4 {
totalLen := binary.BigEndian.Uint32(data[:4])
s.currentFragment = &fragmentBuffer{
totalSize: totalLen,
chunks: make(map[uint16][]byte),
startTime: time.Now(),
}
if len(data) > 4 {
s.currentFragment.chunks[0] = append([]byte(nil), data[4:]...)
s.currentFragment.received = uint32(len(data) - 4)
}
} else if s.currentFragment != nil {
// Continuation fragment
chunkNum := uint16(len(s.currentFragment.chunks))
s.currentFragment.chunks[chunkNum] = append([]byte(nil), data...)
s.currentFragment.received += uint32(len(data))
// Check if complete
if s.currentFragment.received >= s.currentFragment.totalSize {
complete := make([]byte, 0, s.currentFragment.totalSize)
for i := uint16(0); i < uint16(len(s.currentFragment.chunks)); i++ {
if chunk, ok := s.currentFragment.chunks[i]; ok {
complete = append(complete, chunk...)
}
}
s.currentFragment = nil
return s.processPacketData(complete)
}
}
s.ackQueue = append(s.ackQueue, seq)
s.scheduleAck()
return nil
}
func (s *Stream) handleAck(data []byte) error {
if len(data) < 2 {
return nil
}
ackSeq := binary.BigEndian.Uint16(data[:2])
s.mu.Lock()
defer s.mu.Unlock()
if pending, exists := s.pendingAcks[ackSeq]; exists {
delete(s.pendingAcks, ackSeq)
delete(s.duplicateAckCnt, ackSeq)
// Update RTT
sample := time.Since(pending.sentTime)
if s.rtt == 0 {
s.rtt = sample
s.rttVar = sample / 2
} else {
alpha := 0.125
beta := 0.25
s.rttVar = time.Duration((1-beta)*float64(s.rttVar) + beta*float64(absTime(s.rtt-sample)))
s.rtt = time.Duration((1-alpha)*float64(s.rtt) + alpha*float64(sample))
}
s.rto = s.rtt + 4*s.rttVar
if s.rto < s.minRTO {
s.rto = s.minRTO
}
if s.rto > s.maxRTO {
s.rto = s.maxRTO
}
} else {
// Duplicate ACK
s.duplicateAckCnt[ackSeq]++
if s.duplicateAckCnt[ackSeq] >= duplicateAckThreshold {
// Fast retransmit
for seq, pending := range s.pendingAcks {
if s.isSequenceAhead(seq, ackSeq) {
s.resendQueue = append(s.resendQueue, pending)
}
}
delete(s.duplicateAckCnt, ackSeq)
}
}
if s.isSequenceAhead(ackSeq, s.seqLastAck) {
s.seqLastAck = ackSeq
}
return nil
}
func (s *Stream) handleCombined(data []byte) error {
pos := 0
for pos < len(data) {
if pos+1 > len(data) {
break
}
size := uint16(data[pos])
pos++
if size == 0xff {
if pos+2 > len(data) {
break
}
size = binary.BigEndian.Uint16(data[pos : pos+2])
pos += 2
}
if pos+int(size) > len(data) {
break
}
packet := data[pos : pos+int(size)]
pos += int(size)
if err := s.Process(packet); err != nil {
return err
}
}
return nil
}
func (s *Stream) handleAppCombined(data []byte) error {
pos := 0
for pos < len(data) {
if pos+1 > len(data) {
break
}
size := uint16(data[pos])
pos++
if size == 0xff {
if pos+2 > len(data) {
break
}
size = binary.BigEndian.Uint16(data[pos : pos+2])
pos += 2
}
if pos+int(size) > len(data) {
break
}
packet := data[pos : pos+int(size)]
pos += int(size)
if err := s.processPacketData(packet); err != nil {
return err
}
}
return nil
}
func (s *Stream) handleOutOfOrderAck(data []byte) error {
if len(data) < 2 {
return nil
}
seq := binary.BigEndian.Uint16(data[:2])
s.mu.Lock()
defer s.mu.Unlock()
if pending, exists := s.pendingAcks[seq]; exists {
s.resendQueue = append(s.resendQueue, pending)
}
return nil
}
func (s *Stream) handleDisconnect() error {
s.state.Store(StateClosed)
if s.onDisconnect != nil {
go s.onDisconnect()
}
return nil
}
// processPacketData processes received packet data
func (s *Stream) processPacketData(data []byte) error {
proto := packets.NewProtoPacketFromRaw(data, -1, s.opcodeManager)
app := proto.MakeApplicationPacket(s.opcodeSize)
if s.onPacket != nil {
go s.onPacket(app)
}
return nil
}
// Timer handlers
func (s *Stream) processRetransmits() {
if s.state.Load() != StateEstablished {
return
}
s.mu.Lock()
now := time.Now()
for seq, pending := range s.pendingAcks {
if now.After(pending.nextRetry) {
newPending := &pendingPacket{
packet: pending.packet.Copy(),
seq: seq,
sentTime: pending.sentTime,
attempts: pending.attempts,
nextRetry: pending.nextRetry,
}
s.resendQueue = append(s.resendQueue, newPending)
}
}
s.mu.Unlock()
if len(s.resendQueue) > 0 {
s.processQueues()
}
if s.retransmitTimer != nil {
s.retransmitTimer.Reset(retransmitInterval)
}
}
func (s *Stream) cleanup() {
if s.state.Load() != StateEstablished {
return
}
s.mu.Lock()
now := time.Now()
// Clean up old fragments
for id, frag := range s.fragments {
if now.Sub(frag.startTime) > fragmentTimeout {
delete(s.fragments, id)
}
}
// Clean up old out-of-order packets
for seq, oop := range s.outOfOrder {
if now.Sub(oop.timestamp) > outOfOrderTimeout {
delete(s.outOfOrder, seq)
}
}
// Clean up duplicate ACK counts
for seq := range s.duplicateAckCnt {
if _, exists := s.pendingAcks[seq]; !exists {
delete(s.duplicateAckCnt, seq)
}
}
s.mu.Unlock()
if s.cleanupTimer != nil {
s.cleanupTimer.Reset(cleanupInterval)
}
}
func (s *Stream) scheduleAck() {
if s.ackTimer == nil {
s.ackTimer = time.AfterFunc(s.ackThreshold, func() {
s.sendPendingAcks()
})
}
}
func (s *Stream) sendPendingAcks() {
s.mu.Lock()
if len(s.ackQueue) == 0 {
s.mu.Unlock()
return
}
for _, seq := range s.ackQueue {
data := make([]byte, 2)
binary.BigEndian.PutUint16(data, seq)
s.sendRawWithCRC(opcodes.OP_Ack, data)
}
s.ackQueue = s.ackQueue[:0]
s.lastAckSent = time.Now()
s.mu.Unlock()
}
func (s *Stream) sendAckImmediate(seq uint16) error {
data := make([]byte, 2)
binary.BigEndian.PutUint16(data, seq)
return s.sendRawWithCRC(opcodes.OP_Ack, data)
}
func (s *Stream) sendOutOfOrderAck(seq uint16) error {
data := make([]byte, 2)
binary.BigEndian.PutUint16(data, seq)
return s.sendRawWithCRC(opcodes.OP_OutOfOrderAck, data)
}
func (s *Stream) sendKeepalive() {
if s.state.Load() == StateEstablished {
s.sendRawWithCRC(opcodes.OP_KeepAlive, nil)
}
if s.keepAliveTimer != nil {
s.keepAliveTimer.Reset(10 * time.Second)
}
}
func (s *Stream) resetTimeout() {
if s.timeoutTimer != nil {
s.timeoutTimer.Reset(30 * time.Second)
}
}
func (s *Stream) handleTimeout() {
s.state.Store(StateClosed)
if s.onDisconnect != nil {
s.onDisconnect()
}
}
// sendRaw sends raw packet WITHOUT encryption or CRC (for special packets)
func (s *Stream) sendRaw(opcode uint16, data []byte) error {
packet := make([]byte, 2+len(data))
binary.BigEndian.PutUint16(packet[:2], opcode)
copy(packet[2:], data)
atomic.AddUint64(&s.packetsOut, 1)
atomic.AddUint64(&s.bytesOut, uint64(len(packet)))
return s.conn.AsyncWrite(packet, nil)
}
// sendRawWithCRC sends raw packet with CRC16 (matches C++ WritePacket)
func (s *Stream) sendRawWithCRC(opcode uint16, data []byte) error {
packet := make([]byte, 2+len(data))
binary.BigEndian.PutUint16(packet[:2], opcode)
copy(packet[2:], data)
// Add CRC16 AFTER encryption (matches C++ order)
packet = packets.AppendCRC(packet, s.crcKey)
atomic.AddUint64(&s.packetsOut, 1)
atomic.AddUint64(&s.bytesOut, uint64(len(packet)))
return s.conn.AsyncWrite(packet, nil)
}
// Public methods
func (s *Stream) SetPacketCallback(fn func(*packets.AppPacket)) {
s.onPacket = fn
}
func (s *Stream) SetDisconnectCallback(fn func()) {
s.onDisconnect = fn
}
func (s *Stream) GetState() StreamState {
return s.state.Load().(StreamState)
}
func (s *Stream) IsConnected() bool {
return s.GetState() == StateEstablished
}
func (s *Stream) SendSessionRequest() error {
data := make([]byte, 10)
binary.BigEndian.PutUint32(data[0:4], 2) // protocol version
binary.BigEndian.PutUint32(data[4:8], s.sessionID)
binary.BigEndian.PutUint16(data[8:10], s.maxLen)
s.state.Store(StateConnecting)
// Session packets don't have CRC or encryption (matches C++)
return s.sendRaw(opcodes.OP_SessionRequest, data)
}
func (s *Stream) GetSessionID() uint32 {
s.mu.RLock()
defer s.mu.RUnlock()
return s.sessionID
}
func (s *Stream) SetSessionID(id uint32) {
s.mu.Lock()
s.sessionID = id
s.mu.Unlock()
}
func (s *Stream) GetRemoteAddr() string {
if s.conn != nil {
return s.conn.RemoteAddr().String()
}
return ""
}
func (s *Stream) Close() error {
if s.state.Load() == StateClosed {
return nil
}
s.state.Store(StateClosed)
s.sendRawWithCRC(opcodes.OP_SessionDisconnect, nil)
// Stop all timers
if s.keepAliveTimer != nil {
s.keepAliveTimer.Stop()
}
if s.ackTimer != nil {
s.ackTimer.Stop()
}
if s.timeoutTimer != nil {
s.timeoutTimer.Stop()
}
if s.retransmitTimer != nil {
s.retransmitTimer.Stop()
}
if s.cleanupTimer != nil {
s.cleanupTimer.Stop()
}
if s.combineTimer != nil {
s.combineTimer.Stop()
}
// Clear queues
s.mu.Lock()
s.reliableQueue = nil
s.unreliableQueue = nil
s.resendQueue = nil
s.pendingAcks = nil
s.fragments = nil
s.outOfOrder = nil
s.combinedPacket = nil
s.currentFragment = nil
s.mu.Unlock()
return nil
}
// GetStats returns stream statistics
func (s *Stream) GetStats() map[string]interface{} {
s.mu.RLock()
defer s.mu.RUnlock()
return map[string]interface{}{
"packets_out": atomic.LoadUint64(&s.packetsOut),
"packets_in": atomic.LoadUint64(&s.packetsIn),
"bytes_out": atomic.LoadUint64(&s.bytesOut),
"bytes_in": atomic.LoadUint64(&s.bytesIn),
"retransmits": atomic.LoadUint64(&s.retransmits),
"pending_acks": len(s.pendingAcks),
"out_of_order": len(s.outOfOrder),
"fragments": len(s.fragments),
"rtt": s.rtt,
"rto": s.rto,
}
}
func absTime(d time.Duration) time.Duration {
if d < 0 {
return -d
}
return d
}