eq2go/internal/udp/connection.go

559 lines
15 KiB
Go

package udp
import (
"crypto/rand"
"encoding/binary"
"eq2emu/internal/common/opcodes"
"errors"
"net"
"sync"
"time"
)
// ConnectionState represents the current state of a UDP connection
type ConnectionState int
const (
StateClosed ConnectionState = iota // Connection is closed
StateEstablished // Connection is active and ready
StateClosing // Connection is being closed
StateWaitClose // Waiting for close confirmation
)
// Common connection errors
var (
ErrSessionClosed = errors.New("session closed")
)
// Config holds all UDP server and connection configuration
type Config struct {
// Server settings
MaxConnections int // Maximum concurrent connections
Timeout time.Duration // Connection timeout duration
BufferSize int // UDP socket buffer size
// Protocol settings
MaxPacketSize uint32 // Maximum packet size before fragmentation
WindowSize uint16 // Sliding window size for flow control
RetransmitBase time.Duration // Base retransmission timeout
RetransmitMax time.Duration // Maximum retransmission timeout
RetransmitAttempts int // Maximum retransmission attempts
CombineThreshold int // Packet combining size threshold
// Features
EnableCompression bool // Enable zlib compression
EnableEncryption bool // Enable RC4 encryption
}
// DefaultConfig returns sensible defaults for EQ2EMu protocol
func DefaultConfig() Config {
return Config{
MaxConnections: 1000,
Timeout: 45 * time.Second,
BufferSize: 8192,
MaxPacketSize: 512,
WindowSize: 2048,
RetransmitBase: 500 * time.Millisecond,
RetransmitMax: 5 * time.Second,
RetransmitAttempts: 5,
CombineThreshold: 256,
EnableCompression: true,
EnableEncryption: true,
}
}
// PacketHandler processes application-level packets
type PacketHandler func(*Connection, *ApplicationPacket)
// Connection manages a single client connection over UDP with reliability features
type Connection struct {
// Network details
addr *net.UDPAddr // Client's UDP address
conn *net.UDPConn // Shared UDP socket
handler PacketHandler
state ConnectionState
mutex sync.RWMutex // Protects connection state
// Session parameters
sessionID uint32 // Unique session identifier
key uint32 // Encryption key
compressed bool // Whether compression is enabled
encoded bool // Whether encoding is enabled
maxLength uint32 // Maximum packet length
// Sequence tracking for reliable delivery
nextInSeq uint16 // Next expected incoming sequence number
nextOutSeq uint16 // Next outgoing sequence number
// Protocol components
retransmitQueue *RetransmitQueue // Handles packet retransmission
fragmentMgr *FragmentManager // Manages packet fragmentation
combiner *PacketCombiner // Combines small packets
outOfOrderMap map[uint16]*ProtocolPacket // Stores out-of-order packets
crypto *Crypto // Handles encryption/decryption
// Connection timing
lastActivity time.Time // Last activity timestamp
// Configuration (embedded from server)
config Config
}
// NewConnection creates a new connection instance with server configuration
func NewConnection(addr *net.UDPAddr, conn *net.UDPConn, handler PacketHandler, config Config) *Connection {
return &Connection{
addr: addr,
conn: conn,
handler: handler,
state: StateClosed,
maxLength: config.MaxPacketSize,
lastActivity: time.Now(),
config: config,
// Initialize components with config values
retransmitQueue: NewRetransmitQueue(
config.RetransmitBase,
config.RetransmitMax,
config.RetransmitAttempts,
),
fragmentMgr: NewFragmentManager(config.MaxPacketSize),
combiner: NewPacketCombiner(config.CombineThreshold),
crypto: NewCrypto(),
outOfOrderMap: make(map[uint16]*ProtocolPacket),
}
}
// ProcessPacket handles incoming UDP packets and routes them based on opcode
func (c *Connection) ProcessPacket(data []byte) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.lastActivity = time.Now()
packet, err := ParseProtocolPacket(data)
if err != nil {
return // Silently drop malformed packets
}
// Route packet based on opcode
switch packet.Opcode {
case opcodes.OpSessionRequest:
c.handleSessionRequest(packet)
case opcodes.OpSessionResponse:
c.handleSessionResponse(packet)
case opcodes.OpPacket:
c.handleDataPacket(packet)
case opcodes.OpFragment:
c.handleFragment(packet)
case opcodes.OpCombined:
c.handleCombinedPacket(packet)
case opcodes.OpAck:
c.handleAck(packet)
case opcodes.OpOutOfOrderAck:
c.handleOutOfOrderAck(packet)
case opcodes.OpKeepAlive:
c.sendKeepAlive()
case opcodes.OpSessionDisconnect:
c.handleDisconnect()
}
}
// handleSessionRequest processes client session initiation
func (c *Connection) handleSessionRequest(packet *ProtocolPacket) {
if len(packet.Data) < 12 {
return
}
// Extract session parameters from request
c.sessionID = binary.LittleEndian.Uint32(packet.Data[4:8])
requestedMaxLen := binary.LittleEndian.Uint32(packet.Data[8:12])
// Set maximum packet length if reasonable
if requestedMaxLen > 0 && requestedMaxLen <= 8192 {
c.maxLength = requestedMaxLen
c.fragmentMgr = NewFragmentManager(requestedMaxLen)
}
// Generate random encryption key
keyBytes := make([]byte, 4)
rand.Read(keyBytes)
c.key = binary.LittleEndian.Uint32(keyBytes)
// Initialize encryption
c.crypto.SetKey(keyBytes)
c.sendSessionResponse()
c.state = StateEstablished
}
// handleSessionResponse processes server session response (client-side)
func (c *Connection) handleSessionResponse(packet *ProtocolPacket) {
if len(packet.Data) < 20 {
return
}
// Extract session parameters
c.sessionID = binary.LittleEndian.Uint32(packet.Data[0:4])
c.key = binary.LittleEndian.Uint32(packet.Data[4:8])
format := packet.Data[9]
c.compressed = (format & 0x01) != 0
c.encoded = (format & 0x04) != 0
c.maxLength = binary.LittleEndian.Uint32(packet.Data[12:16])
// Initialize encryption with received key
keyBytes := make([]byte, 4)
binary.LittleEndian.PutUint32(keyBytes, c.key)
c.crypto.SetKey(keyBytes)
c.state = StateEstablished
}
// handleDataPacket processes reliable data packets with sequence numbers
func (c *Connection) handleDataPacket(packet *ProtocolPacket) {
if len(packet.Data) < 2 {
return
}
seq := binary.BigEndian.Uint16(packet.Data[0:2])
payload := packet.Data[2:]
if seq == c.nextInSeq {
// In-order packet - process immediately
c.processInOrderPacket(seq, payload)
} else if seq > c.nextInSeq {
// Out-of-order packet - store for later processing
c.outOfOrderMap[seq] = packet
c.sendOutOfOrderAck(seq)
}
// Drop packets with seq < nextInSeq (duplicates/old packets)
}
// processInOrderPacket handles packets received in correct sequence order
func (c *Connection) processInOrderPacket(seq uint16, payload []byte) {
c.nextInSeq++
c.sendAck(seq)
// Process application data
if appPacket, err := c.processApplicationData(payload); err == nil {
c.handler(c, appPacket)
}
// Check for queued out-of-order packets that can now be processed
c.processQueuedPackets()
}
// processQueuedPackets processes any out-of-order packets that are now in sequence
func (c *Connection) processQueuedPackets() {
for {
packet, exists := c.outOfOrderMap[c.nextInSeq]
if !exists {
break
}
delete(c.outOfOrderMap, c.nextInSeq)
if len(packet.Data) >= 2 {
payload := packet.Data[2:]
seq := binary.BigEndian.Uint16(packet.Data[0:2])
c.nextInSeq++
c.sendAck(seq)
if appPacket, err := c.processApplicationData(payload); err == nil {
c.handler(c, appPacket)
}
}
}
}
// handleFragment processes fragmented packets and reassembles them
func (c *Connection) handleFragment(packet *ProtocolPacket) {
if data, complete, err := c.fragmentMgr.ProcessFragment(packet); err == nil && complete {
if appPacket, err := c.processApplicationData(data); err == nil {
c.handler(c, appPacket)
}
}
}
// handleCombinedPacket splits combined packets into individual packets
func (c *Connection) handleCombinedPacket(packet *ProtocolPacket) {
if packets, err := ParseCombinedPacket(packet.Data); err == nil {
for _, p := range packets {
c.ProcessPacket(p.Raw)
}
}
}
// handleAck processes acknowledgment packets
func (c *Connection) handleAck(packet *ProtocolPacket) {
if len(packet.Data) < 2 {
return
}
seq := binary.BigEndian.Uint16(packet.Data[0:2])
c.retransmitQueue.Acknowledge(seq)
}
// handleOutOfOrderAck processes out-of-order acknowledgments
func (c *Connection) handleOutOfOrderAck(packet *ProtocolPacket) {
if len(packet.Data) < 2 {
return
}
seq := binary.BigEndian.Uint16(packet.Data[0:2])
c.retransmitQueue.Acknowledge(seq)
}
// SendPacket sends an application packet with fragmentation and reliability
func (c *Connection) SendPacket(packet *ApplicationPacket) {
c.mutex.Lock()
defer c.mutex.Unlock()
data := packet.Serialize()
// Handle large packets with fragmentation
if fragments := c.fragmentMgr.FragmentPacket(data, c.nextOutSeq); fragments != nil {
for _, frag := range fragments {
c.sendProtocolPacketWithRetransmit(frag)
}
return
}
// Process outbound data (compression, encryption)
processedData := c.processOutboundData(data)
// Create protocol packet with sequence number
protocolData := make([]byte, 2+len(processedData))
binary.BigEndian.PutUint16(protocolData[0:2], c.nextOutSeq)
copy(protocolData[2:], processedData)
protocolPacket := &ProtocolPacket{
Opcode: opcodes.OpPacket,
Data: protocolData,
}
c.sendProtocolPacketWithRetransmit(protocolPacket)
}
// processOutboundData applies compression and encryption to outgoing data
func (c *Connection) processOutboundData(data []byte) []byte {
// Compress large packets if compression is enabled
if c.config.EnableCompression && c.compressed && len(data) > 128 {
if compressed, err := Compress(data); err == nil {
data = compressed
}
}
// Encrypt data if encryption is enabled
if c.config.EnableEncryption && c.crypto.IsEncrypted() {
data = c.crypto.Encrypt(data)
}
return data
}
// processApplicationData decrypts and decompresses incoming application data
func (c *Connection) processApplicationData(data []byte) (*ApplicationPacket, error) {
// Decrypt if encryption is enabled
if c.config.EnableEncryption && c.crypto.IsEncrypted() {
data = c.crypto.Decrypt(data)
}
// Decompress if compression is enabled
if c.config.EnableCompression && c.compressed && len(data) > 0 {
var err error
data, err = Decompress(data)
if err != nil {
return nil, err
}
}
return ParseApplicationPacket(data)
}
// sendProtocolPacketWithRetransmit sends a packet and adds it to retransmit queue if needed
func (c *Connection) sendProtocolPacketWithRetransmit(packet *ProtocolPacket) {
// Add reliable packets to retransmit queue
if packet.Opcode == opcodes.OpPacket || packet.Opcode == opcodes.OpFragment {
c.retransmitQueue.Add(packet, c.nextOutSeq)
c.nextOutSeq++
}
c.sendProtocolPacket(packet)
}
// sendSessionResponse sends session establishment response to client
func (c *Connection) sendSessionResponse() {
data := make([]byte, 20)
binary.LittleEndian.PutUint32(data[0:4], c.sessionID)
binary.LittleEndian.PutUint32(data[4:8], c.key)
data[8] = 2 // UnknownA
var format uint8
if c.compressed {
format |= 0x01
}
if c.encoded {
format |= 0x04
}
data[9] = format
data[10] = 0 // UnknownB
binary.LittleEndian.PutUint32(data[12:16], c.maxLength)
binary.LittleEndian.PutUint32(data[16:20], 0) // UnknownD
packet := &ProtocolPacket{
Opcode: opcodes.OpSessionResponse,
Data: data,
}
c.sendProtocolPacket(packet)
}
// sendAck sends acknowledgment for received packet
func (c *Connection) sendAck(seq uint16) {
data := make([]byte, 2)
binary.BigEndian.PutUint16(data, seq)
packet := &ProtocolPacket{
Opcode: opcodes.OpAck,
Data: data,
}
c.sendProtocolPacket(packet)
}
// sendOutOfOrderAck sends acknowledgment for out-of-order packet
func (c *Connection) sendOutOfOrderAck(seq uint16) {
data := make([]byte, 2)
binary.BigEndian.PutUint16(data, seq)
packet := &ProtocolPacket{
Opcode: opcodes.OpOutOfOrderAck,
Data: data,
}
c.sendProtocolPacket(packet)
}
// sendKeepAlive sends keep-alive packet to maintain connection
func (c *Connection) sendKeepAlive() {
packet := &ProtocolPacket{
Opcode: opcodes.OpKeepAlive,
Data: []byte{},
}
c.sendProtocolPacket(packet)
}
// sendProtocolPacket sends a protocol packet over UDP
func (c *Connection) sendProtocolPacket(packet *ProtocolPacket) {
data := packet.Serialize()
c.conn.WriteToUDP(data, c.addr)
}
// handleDisconnect processes disconnection request
func (c *Connection) handleDisconnect() {
c.state = StateClosing
}
// Close gracefully closes the connection
func (c *Connection) Close() {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.state == StateEstablished {
c.state = StateClosing
// Send disconnect packet
disconnectData := make([]byte, 6)
binary.LittleEndian.PutUint32(disconnectData[0:4], c.sessionID)
disconnectData[4] = 0
disconnectData[5] = 6
packet := &ProtocolPacket{
Opcode: opcodes.OpSessionDisconnect,
Data: disconnectData,
}
c.sendProtocolPacket(packet)
}
c.state = StateClosed
c.retransmitQueue.Clear()
}
// StartRetransmitLoop begins the retransmission management goroutine
func (c *Connection) StartRetransmitLoop() {
go func() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
if c.state == StateClosed {
return
}
// Retransmit expired packets
for _, entry := range c.retransmitQueue.GetExpired() {
c.sendProtocolPacket(entry.Packet)
}
}
}()
}
// Stats returns comprehensive connection statistics
type Stats struct {
// Connection info
State ConnectionState
SessionID uint32
LastActivity time.Time
// Queue stats
PendingRetransmits int
PendingFragments int
PendingCombined int
OutOfOrderCount int
}
// GetStats returns unified statistics
func (c *Connection) GetStats() Stats {
c.mutex.RLock()
defer c.mutex.RUnlock()
return Stats{
State: c.state,
SessionID: c.sessionID,
LastActivity: c.lastActivity,
PendingRetransmits: c.retransmitQueue.Size(),
PendingFragments: len(c.fragmentMgr.fragments),
PendingCombined: len(c.combiner.PendingPackets),
OutOfOrderCount: len(c.outOfOrderMap),
}
}
// GetState returns the current connection state (thread-safe)
func (c *Connection) GetState() ConnectionState {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.state
}
// GetSessionID returns the session ID (thread-safe)
func (c *Connection) GetSessionID() uint32 {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.sessionID
}
// IsTimedOut checks if connection has timed out
func (c *Connection) IsTimedOut() bool {
c.mutex.RLock()
defer c.mutex.RUnlock()
return time.Since(c.lastActivity) > c.config.Timeout
}
// GetClientAddr returns the client's UDP address
func (c *Connection) GetClientAddr() *net.UDPAddr {
return c.addr
}