eq2go/internal/udp/connection.go
2025-07-21 23:18:39 -05:00

482 lines
13 KiB
Go

package udp
import (
"crypto/rand"
"encoding/binary"
"eq2emu/internal/opcodes"
"net"
"sync"
"time"
)
// ConnectionState represents the current state of a UDP connection
type ConnectionState int
const (
StateClosed ConnectionState = iota // Connection is closed
StateEstablished // Connection is active and ready
StateClosing // Connection is being closed
StateWaitClose // Waiting for close confirmation
)
const (
DefaultWindowSize = 2048 // Default sliding window size for flow control
MaxPacketSize = 512 // Maximum packet size before fragmentation
)
// Connection manages a single client connection over UDP with reliability features
type Connection struct {
// Network details
addr *net.UDPAddr // Client's UDP address
conn *net.UDPConn // Shared UDP socket
handler PacketHandler
state ConnectionState
mutex sync.RWMutex // Protects connection state
// Session parameters
sessionID uint32 // Unique session identifier
key uint32 // Encryption key
compressed bool // Whether compression is enabled
encoded bool // Whether encoding is enabled
maxLength uint32 // Maximum packet length
// Sequence tracking for reliable delivery
nextInSeq uint16 // Next expected incoming sequence number
nextOutSeq uint16 // Next outgoing sequence number
windowSize uint16 // Flow control window size
// Protocol components
retransmitQueue *RetransmitQueue // Handles packet retransmission
fragmentMgr *FragmentManager // Manages packet fragmentation
combiner *PacketCombiner // Combines small packets
outOfOrderMap map[uint16]*ProtocolPacket // Stores out-of-order packets
crypto *Crypto // Handles encryption/decryption
// Connection timing
lastPacketTime time.Time // Last received packet timestamp
lastAckTime time.Time // Last acknowledgment timestamp
// Flow control
sendWindow []bool // Sliding window for sent packets
windowStart uint16 // Start of the sliding window
}
// NewConnection creates a new connection instance with default settings
func NewConnection(addr *net.UDPAddr, conn *net.UDPConn, handler PacketHandler) *Connection {
return &Connection{
addr: addr,
conn: conn,
handler: handler,
state: StateClosed,
maxLength: MaxPacketSize,
windowSize: DefaultWindowSize,
lastPacketTime: time.Now(),
crypto: NewCrypto(),
retransmitQueue: NewRetransmitQueue(),
fragmentMgr: NewFragmentManager(MaxPacketSize),
combiner: NewPacketCombiner(),
outOfOrderMap: make(map[uint16]*ProtocolPacket),
sendWindow: make([]bool, DefaultWindowSize),
}
}
// ProcessPacket handles incoming UDP packets and routes them based on opcode
func (c *Connection) ProcessPacket(data []byte) {
c.mutex.Lock()
defer c.mutex.Unlock()
c.lastPacketTime = time.Now()
packet, err := ParseProtocolPacket(data)
if err != nil {
return // Silently drop malformed packets
}
// Route packet based on opcode
switch packet.Opcode {
case opcodes.OpSessionRequest:
c.handleSessionRequest(packet)
case opcodes.OpSessionResponse:
c.handleSessionResponse(packet)
case opcodes.OpPacket:
c.handleDataPacket(packet)
case opcodes.OpFragment:
c.handleFragment(packet)
case opcodes.OpCombined:
c.handleCombinedPacket(packet)
case opcodes.OpAck:
c.handleAck(packet)
case opcodes.OpOutOfOrderAck:
c.handleOutOfOrderAck(packet)
case opcodes.OpKeepAlive:
c.sendKeepAlive()
case opcodes.OpSessionDisconnect:
c.handleDisconnect()
}
}
// handleSessionRequest processes client session initiation
func (c *Connection) handleSessionRequest(packet *ProtocolPacket) {
if len(packet.Data) < 12 {
return
}
// Extract session parameters from request
c.sessionID = binary.LittleEndian.Uint32(packet.Data[4:8])
requestedMaxLen := binary.LittleEndian.Uint32(packet.Data[8:12])
// Set maximum packet length if reasonable
if requestedMaxLen > 0 && requestedMaxLen <= 8192 {
c.maxLength = requestedMaxLen
c.fragmentMgr = NewFragmentManager(requestedMaxLen)
}
// Generate random encryption key
keyBytes := make([]byte, 4)
rand.Read(keyBytes)
c.key = binary.LittleEndian.Uint32(keyBytes)
// Initialize encryption
c.crypto.SetKey(keyBytes)
c.sendSessionResponse()
c.state = StateEstablished
}
// handleSessionResponse processes server session response (client-side)
func (c *Connection) handleSessionResponse(packet *ProtocolPacket) {
if len(packet.Data) < 20 {
return
}
// Extract session parameters
c.sessionID = binary.LittleEndian.Uint32(packet.Data[0:4])
c.key = binary.LittleEndian.Uint32(packet.Data[4:8])
format := packet.Data[9]
c.compressed = (format & 0x01) != 0
c.encoded = (format & 0x04) != 0
c.maxLength = binary.LittleEndian.Uint32(packet.Data[12:16])
// Initialize encryption with received key
keyBytes := make([]byte, 4)
binary.LittleEndian.PutUint32(keyBytes, c.key)
c.crypto.SetKey(keyBytes)
c.state = StateEstablished
}
// handleDataPacket processes reliable data packets with sequence numbers
func (c *Connection) handleDataPacket(packet *ProtocolPacket) {
if len(packet.Data) < 2 {
return
}
seq := binary.BigEndian.Uint16(packet.Data[0:2])
payload := packet.Data[2:]
if seq == c.nextInSeq {
// In-order packet - process immediately
c.processInOrderPacket(seq, payload)
} else if seq > c.nextInSeq {
// Out-of-order packet - store for later processing
c.outOfOrderMap[seq] = packet
c.sendOutOfOrderAck(seq)
}
// Drop packets with seq < nextInSeq (duplicates/old packets)
}
// processInOrderPacket handles packets received in correct sequence order
func (c *Connection) processInOrderPacket(seq uint16, payload []byte) {
c.nextInSeq++
c.sendAck(seq)
// Process application data
if appPacket, err := c.processApplicationData(payload); err == nil {
c.handler.HandlePacket(c, appPacket)
}
// Check for queued out-of-order packets that can now be processed
c.processQueuedPackets()
}
// processQueuedPackets processes any out-of-order packets that are now in sequence
func (c *Connection) processQueuedPackets() {
for {
packet, exists := c.outOfOrderMap[c.nextInSeq]
if !exists {
break
}
delete(c.outOfOrderMap, c.nextInSeq)
if len(packet.Data) >= 2 {
payload := packet.Data[2:]
seq := binary.BigEndian.Uint16(packet.Data[0:2])
c.nextInSeq++
c.sendAck(seq)
if appPacket, err := c.processApplicationData(payload); err == nil {
c.handler.HandlePacket(c, appPacket)
}
}
}
}
// handleFragment processes fragmented packets and reassembles them
func (c *Connection) handleFragment(packet *ProtocolPacket) {
if data, complete, err := c.fragmentMgr.ProcessFragment(packet); err == nil && complete {
if appPacket, err := c.processApplicationData(data); err == nil {
c.handler.HandlePacket(c, appPacket)
}
}
}
// handleCombinedPacket splits combined packets into individual packets
func (c *Connection) handleCombinedPacket(packet *ProtocolPacket) {
if packets, err := ParseCombinedPacket(packet.Data); err == nil {
for _, p := range packets {
c.ProcessPacket(p.Raw)
}
}
}
// handleAck processes acknowledgment packets
func (c *Connection) handleAck(packet *ProtocolPacket) {
if len(packet.Data) < 2 {
return
}
seq := binary.BigEndian.Uint16(packet.Data[0:2])
c.retransmitQueue.Acknowledge(seq)
c.lastAckTime = time.Now()
}
// handleOutOfOrderAck processes out-of-order acknowledgments
func (c *Connection) handleOutOfOrderAck(packet *ProtocolPacket) {
if len(packet.Data) < 2 {
return
}
seq := binary.BigEndian.Uint16(packet.Data[0:2])
c.retransmitQueue.Acknowledge(seq)
}
// SendPacket sends an application packet with fragmentation and reliability
func (c *Connection) SendPacket(packet *ApplicationPacket) {
c.mutex.Lock()
defer c.mutex.Unlock()
data := packet.Serialize()
// Handle large packets with fragmentation
if fragments := c.fragmentMgr.FragmentPacket(data, c.nextOutSeq); fragments != nil {
for _, frag := range fragments {
c.sendProtocolPacketWithRetransmit(frag)
}
return
}
// Process outbound data (compression, encryption)
processedData := c.processOutboundData(data)
// Create protocol packet with sequence number
protocolData := make([]byte, 2+len(processedData))
binary.BigEndian.PutUint16(protocolData[0:2], c.nextOutSeq)
copy(protocolData[2:], processedData)
protocolPacket := &ProtocolPacket{
Opcode: opcodes.OpPacket,
Data: protocolData,
}
c.sendProtocolPacketWithRetransmit(protocolPacket)
}
// processOutboundData applies compression and encryption to outgoing data
func (c *Connection) processOutboundData(data []byte) []byte {
// Compress large packets if compression is enabled
if c.compressed && len(data) > 128 {
if compressed, err := Compress(data); err == nil {
data = compressed
}
}
// Encrypt data if encryption is enabled
if c.crypto.IsEncrypted() {
data = c.crypto.Encrypt(data)
}
return data
}
// processApplicationData decrypts and decompresses incoming application data
func (c *Connection) processApplicationData(data []byte) (*ApplicationPacket, error) {
// Decrypt if encryption is enabled
if c.crypto.IsEncrypted() {
data = c.crypto.Decrypt(data)
}
// Decompress if compression is enabled
if c.compressed && len(data) > 0 {
var err error
data, err = Decompress(data)
if err != nil {
return nil, err
}
}
return ParseApplicationPacket(data)
}
// sendProtocolPacketWithRetransmit sends a packet and adds it to retransmit queue if needed
func (c *Connection) sendProtocolPacketWithRetransmit(packet *ProtocolPacket) {
// Add reliable packets to retransmit queue
if packet.Opcode == opcodes.OpPacket || packet.Opcode == opcodes.OpFragment {
c.retransmitQueue.Add(packet, c.nextOutSeq)
c.nextOutSeq++
}
c.sendProtocolPacket(packet)
}
// sendSessionResponse sends session establishment response to client
func (c *Connection) sendSessionResponse() {
data := make([]byte, 20)
binary.LittleEndian.PutUint32(data[0:4], c.sessionID)
binary.LittleEndian.PutUint32(data[4:8], c.key)
data[8] = 2 // UnknownA
var format uint8
if c.compressed {
format |= 0x01
}
if c.encoded {
format |= 0x04
}
data[9] = format
data[10] = 0 // UnknownB
binary.LittleEndian.PutUint32(data[12:16], c.maxLength)
binary.LittleEndian.PutUint32(data[16:20], 0) // UnknownD
packet := &ProtocolPacket{
Opcode: opcodes.OpSessionResponse,
Data: data,
}
c.sendProtocolPacket(packet)
}
// sendAck sends acknowledgment for received packet
func (c *Connection) sendAck(seq uint16) {
data := make([]byte, 2)
binary.BigEndian.PutUint16(data, seq)
packet := &ProtocolPacket{
Opcode: opcodes.OpAck,
Data: data,
}
c.sendProtocolPacket(packet)
}
// sendOutOfOrderAck sends acknowledgment for out-of-order packet
func (c *Connection) sendOutOfOrderAck(seq uint16) {
data := make([]byte, 2)
binary.BigEndian.PutUint16(data, seq)
packet := &ProtocolPacket{
Opcode: opcodes.OpOutOfOrderAck,
Data: data,
}
c.sendProtocolPacket(packet)
}
// sendKeepAlive sends keep-alive packet to maintain connection
func (c *Connection) sendKeepAlive() {
packet := &ProtocolPacket{
Opcode: opcodes.OpKeepAlive,
Data: []byte{},
}
c.sendProtocolPacket(packet)
}
// sendProtocolPacket sends a protocol packet over UDP
func (c *Connection) sendProtocolPacket(packet *ProtocolPacket) {
data := packet.Serialize()
c.conn.WriteToUDP(data, c.addr)
}
// handleDisconnect processes disconnection request
func (c *Connection) handleDisconnect() {
c.state = StateClosing
}
// Close gracefully closes the connection
func (c *Connection) Close() {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.state == StateEstablished {
c.state = StateClosing
// Send disconnect packet
disconnectData := make([]byte, 6)
binary.LittleEndian.PutUint32(disconnectData[0:4], c.sessionID)
disconnectData[4] = 0
disconnectData[5] = 6
packet := &ProtocolPacket{
Opcode: opcodes.OpSessionDisconnect,
Data: disconnectData,
}
c.sendProtocolPacket(packet)
}
c.state = StateClosed
c.retransmitQueue.Clear()
}
// StartRetransmitLoop begins the retransmission management goroutine
func (c *Connection) StartRetransmitLoop() {
go func() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
if c.state == StateClosed {
return
}
// Retransmit expired packets
for _, entry := range c.retransmitQueue.GetExpired() {
c.sendProtocolPacket(entry.Packet)
}
}
}()
}
// GetState returns the current connection state (thread-safe)
func (c *Connection) GetState() ConnectionState {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.state
}
// GetSessionID returns the session ID (thread-safe)
func (c *Connection) GetSessionID() uint32 {
c.mutex.RLock()
defer c.mutex.RUnlock()
return c.sessionID
}
// IsTimedOut checks if connection has timed out
func (c *Connection) IsTimedOut(timeout time.Duration) bool {
c.mutex.RLock()
defer c.mutex.RUnlock()
return time.Since(c.lastPacketTime) > timeout
}