1
0
Protocol/stream.go
2025-09-01 13:56:01 -05:00

567 lines
14 KiB
Go

package eq2net
import (
"context"
"encoding/binary"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
)
// StreamState represents the state of an EQStream connection
type StreamState int32
const (
StreamStateDisconnected StreamState = iota
StreamStateConnecting
StreamStateConnected
StreamStateDisconnecting
StreamStateClosed
)
// StreamConfig contains configuration for an EQStream
type StreamConfig struct {
// Network settings
MaxPacketSize int // Maximum packet size (default: 512)
WindowSize uint16 // Sliding window size for flow control (default: 2048)
RetransmitTimeMs int64 // Initial retransmit time in milliseconds (default: 500)
MaxRetransmits int // Maximum retransmission attempts (default: 5)
ConnectTimeout time.Duration // Connection timeout (default: 30s)
KeepAliveTime time.Duration // Keep-alive interval (default: 5s)
// Session settings
SessionID uint32 // Session identifier
MaxBandwidth uint32 // Maximum bandwidth in bytes/sec (0 = unlimited)
CRCKey uint32 // CRC key for packet validation
EncodeKey uint32 // Encryption key for chat packets
DecodeKey uint32 // Decryption key for chat packets
CompressEnable bool // Enable packet compression
// Performance settings
SendBufferSize int // Size of send buffer (default: 1024)
RecvBufferSize int // Size of receive buffer (default: 1024)
}
// DefaultStreamConfig returns a default configuration
func DefaultStreamConfig() *StreamConfig {
return &StreamConfig{
MaxPacketSize: 512,
WindowSize: 2048,
RetransmitTimeMs: 500,
MaxRetransmits: 5,
ConnectTimeout: 30 * time.Second,
KeepAliveTime: 5 * time.Second,
SendBufferSize: 1024,
RecvBufferSize: 1024,
CompressEnable: true,
}
}
// StreamStats tracks stream statistics
type StreamStats struct {
PacketsSent atomic.Uint64
PacketsReceived atomic.Uint64
BytesSent atomic.Uint64
BytesReceived atomic.Uint64
PacketsDropped atomic.Uint64
Retransmits atomic.Uint64
RTT atomic.Int64 // Round-trip time in microseconds
Bandwidth atomic.Uint64
}
// EQStream implements reliable UDP communication for EQ2
type EQStream struct {
// Configuration
config *StreamConfig
// Network
conn net.PacketConn
remoteAddr net.Addr
localAddr net.Addr
// State management
state atomic.Int32 // StreamState
sessionID uint32
nextSeqOut atomic.Uint32
nextSeqIn atomic.Uint32
lastAckSeq atomic.Uint32
// Packet queues - using channels for lock-free operations
sendQueue chan *EQProtocolPacket
recvQueue chan *EQApplicationPacket
ackQueue chan uint32
resendQueue chan *EQProtocolPacket
fragmentQueue map[uint32][]*EQProtocolPacket // Fragments being assembled
// Sliding window for flow control
sendWindow map[uint32]*sendPacket
sendWindowMu sync.RWMutex
recvWindow map[uint32]*EQProtocolPacket
recvWindowMu sync.RWMutex
// Retransmission management
rtt atomic.Int64 // Smoothed RTT in microseconds
rttVar atomic.Int64 // RTT variance
rto atomic.Int64 // Retransmission timeout
// Statistics
stats *StreamStats
// Lifecycle management
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
// Callbacks
onConnect func()
onDisconnect func(reason string)
onError func(error)
}
// sendPacket tracks packets awaiting acknowledgment
type sendPacket struct {
packet *EQProtocolPacket
sentTime time.Time
attempts int
nextRetry time.Time
}
// NewEQStream creates a new EQ2 stream
func NewEQStream(config *StreamConfig) *EQStream {
if config == nil {
config = DefaultStreamConfig()
}
ctx, cancel := context.WithCancel(context.Background())
s := &EQStream{
config: config,
sendQueue: make(chan *EQProtocolPacket, config.SendBufferSize),
recvQueue: make(chan *EQApplicationPacket, config.RecvBufferSize),
ackQueue: make(chan uint32, 256),
resendQueue: make(chan *EQProtocolPacket, 256),
fragmentQueue: make(map[uint32][]*EQProtocolPacket),
sendWindow: make(map[uint32]*sendPacket),
recvWindow: make(map[uint32]*EQProtocolPacket),
stats: &StreamStats{},
ctx: ctx,
cancel: cancel,
}
// Initialize state
s.state.Store(int32(StreamStateDisconnected))
s.sessionID = config.SessionID
// Set initial RTO
s.rto.Store(config.RetransmitTimeMs * 1000) // Convert to microseconds
return s
}
// Connect establishes a connection to the remote endpoint
func (s *EQStream) Connect(conn net.PacketConn, remoteAddr net.Addr) error {
// Check state
if !s.compareAndSwapState(StreamStateDisconnected, StreamStateConnecting) {
return fmt.Errorf("stream not in disconnected state")
}
s.conn = conn
s.remoteAddr = remoteAddr
s.localAddr = conn.LocalAddr()
// Start workers
s.wg.Add(4)
go s.sendWorker()
go s.recvWorker()
go s.retransmitWorker()
go s.keepAliveWorker()
// Send session request
sessionReq := s.createSessionRequest()
if err := s.sendPacket(sessionReq); err != nil {
s.Close()
return fmt.Errorf("failed to send session request: %w", err)
}
// Wait for connection or timeout
timer := time.NewTimer(s.config.ConnectTimeout)
defer timer.Stop()
for {
select {
case <-timer.C:
s.Close()
return fmt.Errorf("connection timeout")
case <-s.ctx.Done():
return fmt.Errorf("connection cancelled")
default:
if s.GetState() == StreamStateConnected {
if s.onConnect != nil {
s.onConnect()
}
return nil
}
time.Sleep(10 * time.Millisecond)
}
}
}
// Send queues an application packet for transmission
func (s *EQStream) Send(packet *EQApplicationPacket) error {
if s.GetState() != StreamStateConnected {
return fmt.Errorf("stream not connected")
}
// Convert to protocol packet
protoPacket := s.applicationToProtocol(packet)
select {
case s.sendQueue <- protoPacket:
return nil
case <-s.ctx.Done():
return fmt.Errorf("stream closed")
default:
return fmt.Errorf("send queue full")
}
}
// Receive gets the next application packet from the receive queue
func (s *EQStream) Receive() (*EQApplicationPacket, error) {
select {
case packet := <-s.recvQueue:
return packet, nil
case <-s.ctx.Done():
return nil, fmt.Errorf("stream closed")
default:
return nil, nil // Non-blocking
}
}
// sendWorker handles outgoing packets
func (s *EQStream) sendWorker() {
defer s.wg.Done()
combiner := NewPacketCombiner(s.config.MaxPacketSize - 10) // Leave room for headers/CRC
combineTimer := time.NewTicker(1 * time.Millisecond)
defer combineTimer.Stop()
var pendingPackets []*EQProtocolPacket
for {
select {
case <-s.ctx.Done():
return
case packet := <-s.sendQueue:
// Add sequence number if needed
if s.needsSequence(packet.Opcode) {
packet.Sequence = s.nextSeqOut.Add(1)
// Track in send window
s.sendWindowMu.Lock()
s.sendWindow[packet.Sequence] = &sendPacket{
packet: packet.Copy(),
sentTime: time.Now(),
attempts: 1,
nextRetry: time.Now().Add(time.Duration(s.rto.Load()) * time.Microsecond),
}
s.sendWindowMu.Unlock()
}
pendingPackets = append(pendingPackets, packet)
case packet := <-s.resendQueue:
// Priority resend
s.sendPacketNow(packet)
case <-combineTimer.C:
// Send any pending combined packets
if len(pendingPackets) > 0 {
s.sendCombined(pendingPackets, combiner)
pendingPackets = nil
}
}
// Try to combine and send if we have enough packets
if len(pendingPackets) >= 3 {
s.sendCombined(pendingPackets, combiner)
pendingPackets = nil
}
}
}
// recvWorker handles incoming packets from the network
func (s *EQStream) recvWorker() {
defer s.wg.Done()
buffer := make([]byte, s.config.MaxPacketSize)
for {
select {
case <-s.ctx.Done():
return
default:
}
// Read from network with timeout
s.conn.SetReadDeadline(time.Now().Add(100 * time.Millisecond))
n, addr, err := s.conn.ReadFrom(buffer)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
continue
}
if s.onError != nil {
s.onError(err)
}
continue
}
// Verify source address
if addr.String() != s.remoteAddr.String() {
continue // Ignore packets from other sources
}
// Process packet
s.handleIncomingPacket(buffer[:n])
}
}
// retransmitWorker handles packet retransmissions
func (s *EQStream) retransmitWorker() {
defer s.wg.Done()
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
now := time.Now()
s.sendWindowMu.Lock()
for seq, sp := range s.sendWindow {
if now.After(sp.nextRetry) {
if sp.attempts >= s.config.MaxRetransmits {
// Max retransmits reached, connection is dead
delete(s.sendWindow, seq)
s.stats.PacketsDropped.Add(1)
continue
}
// Retransmit
sp.attempts++
sp.nextRetry = now.Add(time.Duration(s.rto.Load()) * time.Microsecond * time.Duration(sp.attempts))
s.stats.Retransmits.Add(1)
// Queue for immediate send
select {
case s.resendQueue <- sp.packet:
default:
// Resend queue full, try next time
}
}
}
s.sendWindowMu.Unlock()
}
}
}
// keepAliveWorker sends periodic keep-alive packets
func (s *EQStream) keepAliveWorker() {
defer s.wg.Done()
ticker := time.NewTicker(s.config.KeepAliveTime)
defer ticker.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-ticker.C:
if s.GetState() == StreamStateConnected {
keepAlive := NewEQProtocolPacket(OPKeepAlive, nil)
select {
case s.sendQueue <- keepAlive:
default:
// Queue full, skip this keep-alive
}
}
}
}
}
// Helper methods
func (s *EQStream) GetState() StreamState {
return StreamState(s.state.Load())
}
func (s *EQStream) compareAndSwapState(old, new StreamState) bool {
return s.state.CompareAndSwap(int32(old), int32(new))
}
func (s *EQStream) createSessionRequest() *EQProtocolPacket {
data := make([]byte, 10)
binary.BigEndian.PutUint32(data[0:4], 2) // Protocol version
binary.BigEndian.PutUint32(data[4:8], s.sessionID)
binary.BigEndian.PutUint16(data[8:10], s.config.WindowSize)
return NewEQProtocolPacket(OPSessionRequest, data)
}
func (s *EQStream) needsSequence(opcode uint16) bool {
switch opcode {
case OPPacket, OPFragment:
return true
default:
return false
}
}
func (s *EQStream) sendPacket(packet *EQProtocolPacket) error {
// Check if connection is established
if s.conn == nil {
return fmt.Errorf("no connection")
}
// Prepare packet data
data := packet.Serialize(0)
// Add CRC if not exempt
if !s.isCRCExempt(packet.Opcode) {
data = AppendCRC(data, s.config.CRCKey)
}
// Send to network
n, err := s.conn.WriteTo(data, s.remoteAddr)
if err != nil {
return err
}
// Update statistics
s.stats.PacketsSent.Add(1)
s.stats.BytesSent.Add(uint64(n))
return nil
}
func (s *EQStream) sendPacketNow(packet *EQProtocolPacket) {
if err := s.sendPacket(packet); err != nil && s.onError != nil {
s.onError(err)
}
}
func (s *EQStream) sendCombined(packets []*EQProtocolPacket, combiner *PacketCombiner) {
if len(packets) == 1 {
s.sendPacketNow(packets[0])
return
}
combined := combiner.CombineProtocolPackets(packets)
if combined != nil {
s.sendPacketNow(combined)
} else {
// Couldn't combine, send individually
for _, p := range packets {
s.sendPacketNow(p)
}
}
}
func (s *EQStream) isCRCExempt(opcode uint16) bool {
switch opcode {
case OPSessionRequest, OPSessionResponse, OPOutOfSession:
return true
default:
return false
}
}
func (s *EQStream) applicationToProtocol(app *EQApplicationPacket) *EQProtocolPacket {
// Serialize application packet
data := app.Serialize()
// Create protocol packet
proto := NewEQProtocolPacket(OPPacket, data)
proto.CopyInfo(app.EQPacket)
// Apply compression if enabled
if s.config.CompressEnable && len(data) > CompressionThreshold {
if compressed, err := CompressPacket(data); err == nil {
proto.Buffer = compressed
proto.Size = uint32(len(compressed))
proto.Compressed = true
}
}
return proto
}
// handleIncomingPacket is implemented in stream_packet_handler.go
// Close gracefully shuts down the stream
func (s *EQStream) Close() error {
if !s.compareAndSwapState(StreamStateConnected, StreamStateDisconnecting) &&
!s.compareAndSwapState(StreamStateConnecting, StreamStateDisconnecting) {
return nil // Already closing or closed
}
// Send disconnect packet if we have a connection
if s.conn != nil {
disconnect := NewEQProtocolPacket(OPSessionDisconnect, nil)
s.sendPacketNow(disconnect)
}
// Cancel context to stop workers
s.cancel()
// Wait for workers to finish
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(5 * time.Second):
// Force close after timeout
}
s.state.Store(int32(StreamStateClosed))
if s.onDisconnect != nil {
s.onDisconnect("closed")
}
return nil
}
// GetStats returns a copy of the current statistics
func (s *EQStream) GetStats() StreamStats {
return StreamStats{
PacketsSent: atomic.Uint64{},
PacketsReceived: atomic.Uint64{},
BytesSent: atomic.Uint64{},
BytesReceived: atomic.Uint64{},
PacketsDropped: atomic.Uint64{},
Retransmits: atomic.Uint64{},
RTT: atomic.Int64{},
Bandwidth: atomic.Uint64{},
}
}
// SetCallbacks sets the stream event callbacks
func (s *EQStream) SetCallbacks(onConnect func(), onDisconnect func(string), onError func(error)) {
s.onConnect = onConnect
s.onDisconnect = onDisconnect
s.onError = onError
}