diff --git a/eq2_protocol_report.md b/eq2_protocol_report.md new file mode 100644 index 0000000..0098e50 --- /dev/null +++ b/eq2_protocol_report.md @@ -0,0 +1,312 @@ +# EverQuest 2 Network Protocol Documentation + +## Overview + +The EverQuest 2 protocol is a custom UDP-based protocol that provides reliable delivery, encryption, compression, and session management. This document describes the protocol structure for reimplementation. + +## 1. Protocol Architecture + +### 1.1 Protocol Layers +``` +Application Layer - Game logic packets (EQApplicationPacket) +Protocol Layer - Session management, reliability (EQProtocolPacket) +Transport Layer - UDP with custom reliability +Network Layer - Standard IP +``` + +### 1.2 Packet Types +- **EQProtocolPacket**: Low-level protocol control packets +- **EQApplicationPacket**: High-level game data packets +- **EQ2Packet**: EQ2-specific application packets with login opcodes + +## 2. Session Management + +### 2.1 Session Establishment +``` +Client -> Server: OP_SessionRequest +Server -> Client: OP_SessionResponse +``` + +#### SessionRequest Structure +```c +struct SessionRequest { + uint32 UnknownA; // Usually 0 + uint32 Session; // Proposed session ID + uint32 MaxLength; // Maximum packet length +}; +``` + +#### SessionResponse Structure +```c +struct SessionResponse { + uint32 Session; // Confirmed session ID + uint32 Key; // Encryption key + uint8 UnknownA; // Usually 2 + uint8 Format; // Flags: 0x01=compressed, 0x04=encoded + uint8 UnknownB; // Usually 0 + uint32 MaxLength; // Maximum packet length + uint32 UnknownD; // Usually 0 +}; +``` + +### 2.2 Session Termination +``` +Either -> Other: OP_SessionDisconnect +``` + +## 3. Protocol Opcodes + +### 3.1 Core Protocol Opcodes +```c +#define OP_SessionRequest 0x01 +#define OP_SessionResponse 0x02 +#define OP_Combined 0x03 +#define OP_SessionDisconnect 0x05 +#define OP_KeepAlive 0x06 +#define OP_ServerKeyRequest 0x07 +#define OP_SessionStatResponse 0x08 +#define OP_Packet 0x09 +#define OP_Fragment 0x0D +#define OP_OutOfOrderAck 0x11 +#define OP_Ack 0x15 +#define OP_AppCombined 0x19 +#define OP_OutOfSession 0x1D +``` + +## 4. Reliable Delivery System + +### 4.1 Sequence Numbers +- 16-bit sequence numbers for ordered delivery +- Wrap-around handling at 65536 +- Window-based flow control (default window size: 2048) + +### 4.2 Acknowledgments +- **OP_Ack**: Acknowledges packets up to sequence number +- **OP_OutOfOrderAck**: Acknowledges specific out-of-order packet +- Retransmission on timeout (default: 500ms * 3.0 multiplier, max 5000ms) + +### 4.3 Packet Structure for Sequenced Data +``` +[2 bytes: Sequence Number][Payload Data] +``` + +## 5. Encryption System + +### 5.1 Key Exchange +1. RSA key exchange during initial handshake +2. 8-byte encrypted key transmitted in packet +3. RC4 encryption initialized with exchanged key + +### 5.2 RC4 Encryption +- Applied to packet payload after headers +- Separate encryption state per connection +- Encryption offset varies by packet type and compression + +### 5.3 CRC Validation +- 16-bit CRC appended to most packets +- CRC calculated using session key +- Some packets (SessionRequest, SessionResponse, OutOfSession) not CRC'd + +## 6. Compression System + +### 6.1 zlib Compression +- Individual packets compressed using zlib deflate +- Compression applied when packet size > 128 bytes +- Compression markers: + - `0x5A`: zlib compressed data follows + - `0xA5`: uncompressed data (small packets) + +### 6.2 Compression Process +``` +1. Check if packet size > compression threshold +2. Apply zlib deflate compression +3. Prepend compression marker +4. If compressed size >= original, use uncompressed with 0xA5 marker +``` + +## 7. Packet Combination + +### 7.1 Protocol-Level Combination (OP_Combined) +Multiple protocol packets combined into single UDP datagram: +``` +[1 byte: Packet1 Size][Packet1 Data] +[1 byte: Packet2 Size][Packet2 Data] +... +``` +If size >= 255: +``` +[1 byte: 0xFF][2 bytes: Actual Size][Packet Data] +``` + +### 7.2 Application-Level Combination (OP_AppCombined) +Multiple application packets combined: +``` +[1 byte: Packet1 Size][Packet1 Data without opcode header] +[1 byte: Packet2 Size][Packet2 Data without opcode header] +... +``` + +## 8. Fragmentation + +### 8.1 Large Packet Handling +Packets larger than MaxLength are fragmented using OP_Fragment: + +**First Fragment:** +``` +[2 bytes: Sequence][4 bytes: Total Length][Payload Chunk] +``` + +**Subsequent Fragments:** +``` +[2 bytes: Sequence][Payload Chunk] +``` + +### 8.2 Reassembly +1. Allocate buffer based on total length from first fragment +2. Collect fragments in sequence order +3. Reconstruct original packet when all fragments received + +## 9. Data Structure System + +### 9.1 Data Types +```c +#define DATA_STRUCT_INT8 1 +#define DATA_STRUCT_INT16 2 +#define DATA_STRUCT_INT32 3 +#define DATA_STRUCT_INT64 4 +#define DATA_STRUCT_FLOAT 5 +#define DATA_STRUCT_DOUBLE 6 +#define DATA_STRUCT_COLOR 7 +#define DATA_STRUCT_SINT8 8 +#define DATA_STRUCT_SINT16 9 +#define DATA_STRUCT_SINT32 10 +#define DATA_STRUCT_CHAR 11 +#define DATA_STRUCT_EQ2_8BIT_STRING 12 +#define DATA_STRUCT_EQ2_16BIT_STRING 13 +#define DATA_STRUCT_EQ2_32BIT_STRING 14 +#define DATA_STRUCT_EQUIPMENT 15 +#define DATA_STRUCT_ARRAY 16 +#define DATA_STRUCT_ITEM 17 +#define DATA_STRUCT_SINT64 18 +``` + +### 9.2 String Types +- **EQ2_8BitString**: [1 byte length][string data] +- **EQ2_16BitString**: [2 bytes length][string data] +- **EQ2_32BitString**: [4 bytes length][string data] + +### 9.3 Color Structure +```c +struct EQ2_Color { + uint8 red; + uint8 green; + uint8 blue; +}; +``` + +### 9.4 Equipment Structure +```c +struct EQ2_EquipmentItem { + uint16 type; + EQ2_Color color; + EQ2_Color highlight; +}; +``` + +## 10. Application Opcodes + +### 10.1 Opcode System +- Two-byte opcodes for game servers (WorldServer, ZoneServer) +- One-byte opcodes for login servers +- Version-specific opcode mappings stored in database +- Translation between internal EmuOpcodes and client opcodes + +### 10.2 Key Application Opcodes +```c +// Login Operations +OP_LoginRequestMsg +OP_LoginReplyMsg +OP_AllCharactersDescRequestMsg +OP_AllCharactersDescReplyMsg +OP_CreateCharacterRequestMsg +OP_CreateCharacterReplyMsg + +// World Operations +OP_ZoneInfoMsg +OP_UpdateCharacterSheetMsg +OP_UpdateInventoryMsg +OP_ClientCmdMsg + +// Chat Operations +OP_ChatTellUserMsg +OP_ChatJoinChannelMsg +``` + +## 11. Implementation Guidelines + +### 11.1 Connection State Machine +``` +CLOSED -> SessionRequest -> ESTABLISHED +ESTABLISHED -> SessionDisconnect -> CLOSING -> CLOSED +``` + +### 11.2 Buffer Management +- Maintain separate inbound/outbound queues +- Implement sliding window for flow control +- Handle out-of-order packet storage +- Implement packet combining logic + +### 11.3 Threading Considerations +- Separate reader/writer threads recommended +- Reader processes incoming UDP packets +- Writer sends outbound packets and handles retransmission +- Combine packet processor for optimization + +### 11.4 Error Handling +- Validate CRC on all received packets +- Handle malformed packets gracefully +- Implement connection timeout detection +- Retry logic for failed transmissions + +### 11.5 Performance Optimizations +- Packet combination to reduce UDP overhead +- Compression for large packets +- Rate limiting and congestion control +- Efficient data structure serialization + +## 12. Stream Types + +Different stream types have different characteristics: + +```c +enum EQStreamType { + LoginStream, // 1-byte opcodes, no compression/encryption + WorldStream, // 2-byte opcodes, compression, no encryption + ZoneStream, // 2-byte opcodes, compression, no encryption + ChatStream, // 1-byte opcodes, no compression, encoding + EQ2Stream // 2-byte opcodes, no compression/encryption +}; +``` + +## 13. Sample Packet Flow + +### 13.1 Login Sequence +``` +1. Client -> Server: OP_SessionRequest +2. Server -> Client: OP_SessionResponse (with key, compression flags) +3. Client -> Server: OP_Packet[OP_LoginRequestMsg] (with credentials) +4. Server -> Client: OP_Packet[OP_LoginReplyMsg] (success/failure) +5. Client -> Server: OP_Packet[OP_AllCharactersDescRequestMsg] +6. Server -> Client: OP_Packet[OP_AllCharactersDescReplyMsg] (character list) +``` + +### 13.2 Reliable Data Transfer +``` +1. Sender: Assign sequence number, add to retransmit queue +2. Sender: Transmit OP_Packet[seq][data] +3. Receiver: Process packet, send OP_Ack[seq] +4. Sender: Receive ack, remove from retransmit queue +5. On timeout: Retransmit packet up to max attempts +``` + +This documentation provides the foundation for implementing the EQ2 protocol in any programming language while maintaining compatibility with the existing server and client implementations. \ No newline at end of file diff --git a/internal/udp/compression.go b/internal/udp/compression.go new file mode 100644 index 0000000..f1cb4da --- /dev/null +++ b/internal/udp/compression.go @@ -0,0 +1,60 @@ +package udp + +import ( + "bytes" + "compress/zlib" + "io" +) + +func Compress(data []byte) ([]byte, error) { + var buf bytes.Buffer + + // Write compression marker + buf.WriteByte(0x5A) + + writer := zlib.NewWriter(&buf) + _, err := writer.Write(data) + if err != nil { + return nil, err + } + + err = writer.Close() + if err != nil { + return nil, err + } + + return buf.Bytes(), nil +} + +func Decompress(data []byte) ([]byte, error) { + if len(data) == 0 { + return data, nil + } + + // Check compression marker + if data[0] == 0xA5 { + // Uncompressed data + return data[1:], nil + } + + if data[0] != 0x5A { + // No compression marker, return as-is + return data, nil + } + + // Decompress zlib data + reader := bytes.NewReader(data[1:]) + zlibReader, err := zlib.NewReader(reader) + if err != nil { + return nil, err + } + defer zlibReader.Close() + + var buf bytes.Buffer + _, err = io.Copy(&buf, zlibReader) + if err != nil { + return nil, err + } + + return buf.Bytes(), nil +} diff --git a/internal/udp/config.go b/internal/udp/config.go deleted file mode 100644 index 4163e87..0000000 --- a/internal/udp/config.go +++ /dev/null @@ -1,31 +0,0 @@ -package udp - -import "time" - -// Configuration constants -const ( - DefaultMTU = 1400 - DefaultWindowSize = 256 - DefaultRetryAttempts = 5 - DefaultTimeout = 30 * time.Second - RetransmitTimeout = 3 * time.Second - KeepAliveInterval = 10 * time.Second -) - -// Config holds configuration for reliable UDP connections -type Config struct { - MTU int - WindowSize uint16 - RetryAttempts int - Timeout time.Duration -} - -// DefaultConfig returns a default configuration -func DefaultConfig() *Config { - return &Config{ - MTU: DefaultMTU, - WindowSize: DefaultWindowSize, - RetryAttempts: DefaultRetryAttempts, - Timeout: DefaultTimeout, - } -} diff --git a/internal/udp/connection.go b/internal/udp/connection.go new file mode 100644 index 0000000..8237d3d --- /dev/null +++ b/internal/udp/connection.go @@ -0,0 +1,279 @@ +package udp + +import ( + "crypto/rand" + "encoding/binary" + "net" + "sync" + "time" +) + +type ConnectionState int + +const ( + StateClosed ConnectionState = iota + StateEstablished + StateClosing +) + +type Connection struct { + addr *net.UDPAddr + conn *net.UDPConn + handler PacketHandler + state ConnectionState + mutex sync.RWMutex + + // Session data + sessionID uint32 + key uint32 + compressed bool + encoded bool + maxLength uint32 + + // Sequence tracking + nextInSeq uint16 + nextOutSeq uint16 + + // Queues + inboundQueue []*ApplicationPacket + outboundQueue []*ProtocolPacket + ackQueue []uint16 + + // Timing + lastPacketTime time.Time + + // Crypto + crypto *Crypto +} + +func NewConnection(addr *net.UDPAddr, conn *net.UDPConn, handler PacketHandler) *Connection { + return &Connection{ + addr: addr, + conn: conn, + handler: handler, + state: StateClosed, + maxLength: 512, + lastPacketTime: time.Now(), + crypto: NewCrypto(), + } +} + +func (c *Connection) ProcessPacket(data []byte) { + c.lastPacketTime = time.Now() + + packet, err := ParseProtocolPacket(data) + if err != nil { + return + } + + switch packet.Opcode { + case OpSessionRequest: + c.handleSessionRequest(packet) + case OpSessionResponse: + c.handleSessionResponse(packet) + case OpPacket: + c.handleDataPacket(packet) + case OpAck: + c.handleAck(packet) + case OpKeepAlive: + c.sendKeepAlive() + case OpSessionDisconnect: + c.Close() + } +} + +func (c *Connection) handleSessionRequest(packet *ProtocolPacket) { + if len(packet.Data) < 12 { + return + } + + // Parse session request + c.sessionID = binary.LittleEndian.Uint32(packet.Data[4:8]) + requestedMaxLen := binary.LittleEndian.Uint32(packet.Data[8:12]) + + if requestedMaxLen > 0 { + c.maxLength = requestedMaxLen + } + + // Generate encryption key + keyBytes := make([]byte, 4) + rand.Read(keyBytes) + c.key = binary.LittleEndian.Uint32(keyBytes) + + // Send session response + c.sendSessionResponse() + c.state = StateEstablished +} + +func (c *Connection) handleSessionResponse(packet *ProtocolPacket) { + // Client-side session response handling + if len(packet.Data) < 20 { + return + } + + c.sessionID = binary.LittleEndian.Uint32(packet.Data[0:4]) + c.key = binary.LittleEndian.Uint32(packet.Data[4:8]) + format := packet.Data[9] + c.compressed = (format & 0x01) != 0 + c.encoded = (format & 0x04) != 0 + c.maxLength = binary.LittleEndian.Uint32(packet.Data[12:16]) + + c.state = StateEstablished +} + +func (c *Connection) handleDataPacket(packet *ProtocolPacket) { + if len(packet.Data) < 2 { + return + } + + seq := binary.BigEndian.Uint16(packet.Data[0:2]) + payload := packet.Data[2:] + + // Simple in-order processing for now + if seq == c.nextInSeq { + c.nextInSeq++ + c.sendAck(seq) + + // Process application packet + if appPacket, err := c.processApplicationData(payload); err == nil { + c.handler.HandlePacket(c, appPacket) + } + } +} + +func (c *Connection) handleAck(packet *ProtocolPacket) { + if len(packet.Data) < 2 { + return + } + + seq := binary.BigEndian.Uint16(packet.Data[0:2]) + // Remove acknowledged packets from retransmit queue + _ = seq // TODO: implement retransmit queue +} + +func (c *Connection) processApplicationData(data []byte) (*ApplicationPacket, error) { + // Decrypt if needed + if c.crypto.IsEncrypted() { + data = c.crypto.Decrypt(data) + } + + // Decompress if needed + if c.compressed && len(data) > 0 { + var err error + data, err = Decompress(data) + if err != nil { + return nil, err + } + } + + return ParseApplicationPacket(data) +} + +func (c *Connection) SendPacket(packet *ApplicationPacket) { + c.mutex.Lock() + defer c.mutex.Unlock() + + data := packet.Serialize() + + // Compress if needed + if c.compressed && len(data) > 128 { + if compressed, err := Compress(data); err == nil { + data = compressed + } + } + + // Encrypt if needed + if c.crypto.IsEncrypted() { + data = c.crypto.Encrypt(data) + } + + // Create protocol packet + protocolData := make([]byte, 2+len(data)) + binary.BigEndian.PutUint16(protocolData[0:2], c.nextOutSeq) + copy(protocolData[2:], data) + c.nextOutSeq++ + + protocolPacket := &ProtocolPacket{ + Opcode: OpPacket, + Data: protocolData, + } + + c.sendProtocolPacket(protocolPacket) +} + +func (c *Connection) sendSessionResponse() { + data := make([]byte, 20) + binary.LittleEndian.PutUint32(data[0:4], c.sessionID) + binary.LittleEndian.PutUint32(data[4:8], c.key) + data[8] = 2 // UnknownA + + var format uint8 + if c.compressed { + format |= 0x01 + } + if c.encoded { + format |= 0x04 + } + data[9] = format + + data[10] = 0 // UnknownB + binary.LittleEndian.PutUint32(data[12:16], c.maxLength) + binary.LittleEndian.PutUint32(data[16:20], 0) // UnknownD + + packet := &ProtocolPacket{ + Opcode: OpSessionResponse, + Data: data, + } + + c.sendProtocolPacket(packet) +} + +func (c *Connection) sendAck(seq uint16) { + data := make([]byte, 2) + binary.BigEndian.PutUint16(data, seq) + + packet := &ProtocolPacket{ + Opcode: OpAck, + Data: data, + } + + c.sendProtocolPacket(packet) +} + +func (c *Connection) sendKeepAlive() { + packet := &ProtocolPacket{ + Opcode: OpKeepAlive, + Data: []byte{}, + } + + c.sendProtocolPacket(packet) +} + +func (c *Connection) sendProtocolPacket(packet *ProtocolPacket) { + data := packet.Serialize() + c.conn.WriteToUDP(data, c.addr) +} + +func (c *Connection) Close() { + c.mutex.Lock() + defer c.mutex.Unlock() + + if c.state == StateEstablished { + c.state = StateClosing + + // Send disconnect + disconnectData := make([]byte, 6) + binary.LittleEndian.PutUint32(disconnectData[0:4], c.sessionID) + disconnectData[4] = 0 + disconnectData[5] = 6 + + packet := &ProtocolPacket{ + Opcode: OpSessionDisconnect, + Data: disconnectData, + } + + c.sendProtocolPacket(packet) + } + + c.state = StateClosed +} diff --git a/internal/udp/crypto.go b/internal/udp/crypto.go new file mode 100644 index 0000000..6fd522a --- /dev/null +++ b/internal/udp/crypto.go @@ -0,0 +1,57 @@ +package udp + +import ( + "crypto/rc4" +) + +type Crypto struct { + cipher *rc4.Cipher + key []byte + encrypted bool +} + +func NewCrypto() *Crypto { + return &Crypto{ + encrypted: false, + } +} + +func (c *Crypto) SetKey(key []byte) error { + cipher, err := rc4.NewCipher(key) + if err != nil { + return err + } + + c.cipher = cipher + c.key = make([]byte, len(key)) + copy(c.key, key) + c.encrypted = true + + return nil +} + +func (c *Crypto) IsEncrypted() bool { + return c.encrypted +} + +func (c *Crypto) Encrypt(data []byte) []byte { + if !c.encrypted { + return data + } + + encrypted := make([]byte, len(data)) + copy(encrypted, data) + c.cipher.XORKeyStream(encrypted, encrypted) + return encrypted +} + +func (c *Crypto) Decrypt(data []byte) []byte { + if !c.encrypted { + return data + } + + decrypted := make([]byte, len(data)) + copy(decrypted, data) + c.cipher.XORKeyStream(decrypted, decrypted) + return decrypted +} diff --git a/internal/udp/example_test.go b/internal/udp/example_test.go new file mode 100644 index 0000000..d370a78 --- /dev/null +++ b/internal/udp/example_test.go @@ -0,0 +1,40 @@ +package udp + +import ( + "fmt" + "testing" + "time" +) + +type TestHandler struct{} + +func (h *TestHandler) HandlePacket(conn *Connection, packet *ApplicationPacket) { + fmt.Printf("Received packet - Opcode: 0x%04X, Data length: %d\n", + packet.Opcode, len(packet.Data)) + + // Echo back a response + response := &ApplicationPacket{ + Opcode: OpLoginReplyMsg, + Data: []byte("Hello from server"), + } + conn.SendPacket(response) +} + +func TestServer(t *testing.T) { + handler := &TestHandler{} + server, err := NewServer(":9999", handler) + if err != nil { + t.Fatalf("Failed to create server: %v", err) + } + + go func() { + if err := server.Start(); err != nil { + t.Errorf("Server error: %v", err) + } + }() + + // Let it run for a bit + time.Sleep(100 * time.Millisecond) + + server.Stop() +} diff --git a/internal/udp/middleware.go b/internal/udp/middleware.go deleted file mode 100644 index 5d4de03..0000000 --- a/internal/udp/middleware.go +++ /dev/null @@ -1,140 +0,0 @@ -package udp - -import ( - "net" - "sync" - "time" -) - -// Middleware interface for processing packets -type Middleware interface { - ProcessOutbound(data []byte, next func([]byte) (int, error)) (int, error) - ProcessInbound(data []byte, next func([]byte) (int, error)) (int, error) - Close() error -} - -// Builder for fluent middleware configuration -type Builder struct { - address string - config *Config - middlewares []Middleware -} - -// NewBuilder creates a new connection builder -func NewBuilder() *Builder { - return &Builder{ - config: DefaultConfig(), - } -} - -// Address sets the connection address -func (b *Builder) Address(addr string) *Builder { - b.address = addr - return b -} - -// Config sets the UDP configuration -func (b *Builder) Config(config *Config) *Builder { - b.config = config - return b -} - -// Use adds middleware to the stack -func (b *Builder) Use(middleware Middleware) *Builder { - b.middlewares = append(b.middlewares, middleware) - return b -} - -// Listen creates a listener with middleware -func (b *Builder) Listen() (Listener, error) { - listener, err := Listen(b.address, b.config) - if err != nil { - return nil, err - } - return &middlewareListener{listener, b.middlewares}, nil -} - -// Dial creates a client connection with middleware -func (b *Builder) Dial() (Conn, error) { - conn, err := Dial(b.address, b.config) - if err != nil { - return nil, err - } - return newMiddlewareConn(conn, b.middlewares), nil -} - -// middlewareConn wraps a connection with middleware stack -type middlewareConn struct { - conn Conn - middlewares []Middleware - closeOnce sync.Once -} - -func newMiddlewareConn(conn Conn, middlewares []Middleware) *middlewareConn { - return &middlewareConn{ - conn: conn, - middlewares: middlewares, - } -} - -func (m *middlewareConn) Write(data []byte) (int, error) { - return m.processOutbound(0, data) -} - -func (m *middlewareConn) Read(data []byte) (int, error) { - n, err := m.conn.Read(data) - if err != nil { - return n, err - } - return m.processInbound(len(m.middlewares)-1, data[:n]) -} - -func (m *middlewareConn) processOutbound(index int, data []byte) (int, error) { - if index >= len(m.middlewares) { - return m.conn.Write(data) - } - - return m.middlewares[index].ProcessOutbound(data, func(processed []byte) (int, error) { - return m.processOutbound(index+1, processed) - }) -} - -func (m *middlewareConn) processInbound(index int, data []byte) (int, error) { - if index < 0 { - return len(data), nil - } - - return m.middlewares[index].ProcessInbound(data, func(processed []byte) (int, error) { - return m.processInbound(index-1, processed) - }) -} - -func (m *middlewareConn) Close() error { - m.closeOnce.Do(func() { - for _, middleware := range m.middlewares { - middleware.Close() - } - }) - return m.conn.Close() -} - -func (m *middlewareConn) LocalAddr() net.Addr { return m.conn.LocalAddr() } -func (m *middlewareConn) RemoteAddr() net.Addr { return m.conn.RemoteAddr() } -func (m *middlewareConn) SetReadDeadline(t time.Time) error { return m.conn.SetReadDeadline(t) } -func (m *middlewareConn) SetWriteDeadline(t time.Time) error { return m.conn.SetWriteDeadline(t) } - -type middlewareListener struct { - listener Listener - middlewares []Middleware -} - -func (l *middlewareListener) Accept() (Conn, error) { - conn, err := l.listener.Accept() - if err != nil { - return nil, err - } - return newMiddlewareConn(conn, l.middlewares), nil -} - -func (l *middlewareListener) Close() error { return l.listener.Close() } -func (l *middlewareListener) Addr() net.Addr { return l.listener.Addr() } diff --git a/internal/udp/middleware/combiner.go b/internal/udp/middleware/combiner.go deleted file mode 100644 index d0cb395..0000000 --- a/internal/udp/middleware/combiner.go +++ /dev/null @@ -1,244 +0,0 @@ -package middleware - -import ( - "encoding/binary" - "net" - "sync" - "time" -) - -// CombinerConfig holds configuration for packet combination -type CombinerConfig struct { - MaxCombinedSize int - FlushInterval time.Duration - MaxQueuedPackets int -} - -// DefaultCombinerConfig returns default combiner configuration -func DefaultCombinerConfig() *CombinerConfig { - return &CombinerConfig{ - MaxCombinedSize: 1200, - FlushInterval: 250 * time.Millisecond, - MaxQueuedPackets: 16, - } -} - -type queuedPacket struct { - data []byte - timestamp time.Time - callback func([]byte) (int, error) - result chan combineResult -} - -type combineResult struct { - n int - err error -} - -// Combiner implements packet combination middleware -type Combiner struct { - config *CombinerConfig - queue []*queuedPacket - queueMux sync.Mutex - flushChan chan struct{} - done chan struct{} - closeOnce sync.Once -} - -// NewCombiner creates a new packet combining middleware -func NewCombiner(config *CombinerConfig) *Combiner { - if config == nil { - config = DefaultCombinerConfig() - } - - c := &Combiner{ - config: config, - flushChan: make(chan struct{}, 1), - done: make(chan struct{}), - } - - go c.flushLoop() - return c -} - -// ProcessOutbound implements Middleware.ProcessOutbound -func (c *Combiner) ProcessOutbound(data []byte, next func([]byte) (int, error)) (int, error) { - if len(data) == 0 { - return 0, nil - } - - // Large packets bypass combination - if len(data) > c.config.MaxCombinedSize/2 { - return next(data) - } - - c.queueMux.Lock() - defer c.queueMux.Unlock() - - result := make(chan combineResult, 1) - - c.queue = append(c.queue, &queuedPacket{ - data: append([]byte(nil), data...), - timestamp: time.Now(), - callback: next, - result: result, - }) - - shouldFlush := len(c.queue) >= c.config.MaxQueuedPackets - if !shouldFlush { - totalSize := c.calculateCombinedSize() - shouldFlush = totalSize > c.config.MaxCombinedSize - } - - if shouldFlush { - c.flushQueueLocked() - } else { - select { - case c.flushChan <- struct{}{}: - default: - } - } - - select { - case res := <-result: - return res.n, res.err - case <-c.done: - return 0, net.ErrClosed - } -} - -// ProcessInbound implements Middleware.ProcessInbound -func (c *Combiner) ProcessInbound(data []byte, next func([]byte) (int, error)) (int, error) { - if len(data) < 2 { - return next(data) - } - - packetCount := binary.BigEndian.Uint16(data[0:2]) - - // Single packet or invalid format - if packetCount == 1 { - if len(data) < 4 { - return next(data) - } - - firstLen := binary.BigEndian.Uint16(data[2:4]) - if int(firstLen)+4 == len(data) { - return next(data[4 : 4+firstLen]) - } - return next(data) - } - - // Multiple packets - return first one - if packetCount > 1 && len(data) >= 4 { - firstLen := binary.BigEndian.Uint16(data[2:4]) - if len(data) >= 4+int(firstLen) { - return next(data[4 : 4+firstLen]) - } - } - - return next(data) -} - -func (c *Combiner) calculateCombinedSize() int { - size := 2 // count field - for _, pkt := range c.queue { - size += 2 + len(pkt.data) - } - return size -} - -func (c *Combiner) flushLoop() { - ticker := time.NewTicker(c.config.FlushInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - c.flushIfNeeded() - case <-c.flushChan: - c.flushIfNeeded() - case <-c.done: - c.queueMux.Lock() - c.flushQueueLocked() - c.queueMux.Unlock() - return - } - } -} - -func (c *Combiner) flushIfNeeded() { - c.queueMux.Lock() - defer c.queueMux.Unlock() - - if len(c.queue) == 0 { - return - } - - now := time.Now() - oldestAge := now.Sub(c.queue[0].timestamp) - - if oldestAge >= c.config.FlushInterval || len(c.queue) >= c.config.MaxQueuedPackets { - c.flushQueueLocked() - } -} - -func (c *Combiner) flushQueueLocked() { - if len(c.queue) == 0 { - return - } - - queue := c.queue - c.queue = nil - - go c.processQueue(queue) -} - -func (c *Combiner) processQueue(queue []*queuedPacket) { - if len(queue) == 1 { - pkt := queue[0] - n, err := pkt.callback(pkt.data) - pkt.result <- combineResult{n, err} - return - } - - // Combine multiple packets - combined := c.combinePackets(queue) - _, err := queue[0].callback(combined) - - // Distribute result to all packets - for _, pkt := range queue { - pkt.result <- combineResult{len(pkt.data), err} - } -} - -func (c *Combiner) combinePackets(packets []*queuedPacket) []byte { - totalSize := 2 // count - for _, pkt := range packets { - totalSize += 2 + len(pkt.data) - } - - combined := make([]byte, totalSize) - offset := 0 - - // Write packet count - binary.BigEndian.PutUint16(combined[offset:], uint16(len(packets))) - offset += 2 - - // Write each packet - for _, pkt := range packets { - binary.BigEndian.PutUint16(combined[offset:], uint16(len(pkt.data))) - offset += 2 - copy(combined[offset:], pkt.data) - offset += len(pkt.data) - } - - return combined -} - -// Close implements Middleware.Close -func (c *Combiner) Close() error { - c.closeOnce.Do(func() { - close(c.done) - }) - return nil -} diff --git a/internal/udp/middleware/compressor.go b/internal/udp/middleware/compressor.go deleted file mode 100644 index ec21726..0000000 --- a/internal/udp/middleware/compressor.go +++ /dev/null @@ -1,166 +0,0 @@ -package middleware - -import ( - "bytes" - "compress/flate" - "io" - "sync" -) - -// CompressorConfig holds configuration for compression -type CompressorConfig struct { - Level int // Compression level (1-9, flate.BestSpeed to flate.BestCompression) - MinSize int // Minimum packet size to compress - CompressionMarker byte // Byte marker to identify compressed packets -} - -// DefaultCompressorConfig returns default compressor configuration -func DefaultCompressorConfig() *CompressorConfig { - return &CompressorConfig{ - Level: flate.DefaultCompression, - MinSize: 128, - CompressionMarker: 0x5A, // Same as original EQ2 implementation - } -} - -// Compressor implements compression middleware using DEFLATE -type Compressor struct { - config *CompressorConfig - writerPool sync.Pool - readerPool sync.Pool - bufferPool sync.Pool - closeOnce sync.Once -} - -// NewCompressor creates a new compression middleware -func NewCompressor(config *CompressorConfig) *Compressor { - if config == nil { - config = DefaultCompressorConfig() - } - - c := &Compressor{ - config: config, - } - - // Initialize writer pool - c.writerPool.New = func() interface{} { - writer, _ := flate.NewWriter(nil, config.Level) - return writer - } - - // Initialize reader pool - c.readerPool.New = func() interface{} { - return flate.NewReader(nil) - } - - // Initialize buffer pool - c.bufferPool.New = func() interface{} { - return &bytes.Buffer{} - } - - return c -} - -// ProcessOutbound implements Middleware.ProcessOutbound -func (c *Compressor) ProcessOutbound(data []byte, next func([]byte) (int, error)) (int, error) { - if len(data) < c.config.MinSize { - return next(data) - } - - compressed, err := c.compress(data) - if err != nil || len(compressed) >= len(data) { - // Compression failed or didn't help - send original - return next(data) - } - - return next(compressed) -} - -// ProcessInbound implements Middleware.ProcessInbound -func (c *Compressor) ProcessInbound(data []byte, next func([]byte) (int, error)) (int, error) { - if len(data) < 2 { - return next(data) - } - - // Check for compression marker - if data[0] != c.config.CompressionMarker { - return next(data) - } - - decompressed, err := c.decompress(data[1:]) - if err != nil { - // Decompression failed - pass through original - return next(data) - } - - return next(decompressed) -} - -func (c *Compressor) compress(data []byte) ([]byte, error) { - // Get buffer from pool - buf := c.bufferPool.Get().(*bytes.Buffer) - defer c.bufferPool.Put(buf) - buf.Reset() - - // Write compression marker - buf.WriteByte(c.config.CompressionMarker) - - // Get writer from pool - writer := c.writerPool.Get().(*flate.Writer) - defer c.writerPool.Put(writer) - - writer.Reset(buf) - - // Compress data - if _, err := writer.Write(data); err != nil { - return nil, err - } - - if err := writer.Close(); err != nil { - return nil, err - } - - // Return copy of compressed data - result := make([]byte, buf.Len()) - copy(result, buf.Bytes()) - return result, nil -} - -func (c *Compressor) decompress(data []byte) ([]byte, error) { - // Get reader from pool - reader := c.readerPool.Get().(io.ReadCloser) - defer c.readerPool.Put(reader) - - // Reset reader with new data - if resetter, ok := reader.(flate.Resetter); ok { - resetter.Reset(bytes.NewReader(data), nil) - } else { - reader.Close() - reader = flate.NewReader(bytes.NewReader(data)) - } - defer reader.Close() - - // Get buffer from pool - buf := c.bufferPool.Get().(*bytes.Buffer) - defer c.bufferPool.Put(buf) - buf.Reset() - - // Decompress data - if _, err := io.Copy(buf, reader); err != nil { - return nil, err - } - - // Return copy of decompressed data - result := make([]byte, buf.Len()) - copy(result, buf.Bytes()) - return result, nil -} - -// Close implements Middleware.Close -func (c *Compressor) Close() error { - c.closeOnce.Do(func() { - // Close any active readers/writers in pools - // Note: flate writers/readers don't need explicit cleanup - }) - return nil -} diff --git a/internal/udp/middleware/encryptor.go b/internal/udp/middleware/encryptor.go deleted file mode 100644 index 1847fdb..0000000 --- a/internal/udp/middleware/encryptor.go +++ /dev/null @@ -1,235 +0,0 @@ -package middleware - -import ( - "crypto/rand" - "crypto/rc4" - "crypto/rsa" - "crypto/x509" - "encoding/binary" - "sync" -) - -// EncryptorConfig holds configuration for encryption -type EncryptorConfig struct { - RSAKeySize int // RSA key size in bits - KeyExchangeOp byte // Opcode for key exchange packets - MinSize int // Minimum packet size to encrypt -} - -// DefaultEncryptorConfig returns default encryptor configuration -func DefaultEncryptorConfig() *EncryptorConfig { - return &EncryptorConfig{ - RSAKeySize: 1024, - KeyExchangeOp: 0x21, // OP_WSLoginRequestMsg equivalent - MinSize: 8, - } -} - -// Encryptor implements RC4 + RSA encryption middleware -type Encryptor struct { - config *EncryptorConfig - rsaKey *rsa.PrivateKey - rc4Key []byte - cipher *rc4.Cipher - cipherMux sync.RWMutex - keySet bool - closeOnce sync.Once -} - -// NewEncryptor creates a new encryption middleware -func NewEncryptor(config *EncryptorConfig) *Encryptor { - if config == nil { - config = DefaultEncryptorConfig() - } - - // Generate RSA key pair - rsaKey, err := rsa.GenerateKey(rand.Reader, config.RSAKeySize) - if err != nil { - panic(err) // Should handle this better in production - } - - return &Encryptor{ - config: config, - rsaKey: rsaKey, - } -} - -// ProcessOutbound implements Middleware.ProcessOutbound -func (e *Encryptor) ProcessOutbound(data []byte, next func([]byte) (int, error)) (int, error) { - // Check if this is a key exchange request - if len(data) > 4 && data[0] == 0 && data[1] == e.config.KeyExchangeOp { - return e.handleKeyExchange(data, next) - } - - // Skip encryption for small packets or if no key is set - if len(data) < e.config.MinSize || !e.isKeySet() { - return next(data) - } - - encrypted, err := e.encrypt(data) - if err != nil { - return next(data) // Fallback to unencrypted - } - - return next(encrypted) -} - -// ProcessInbound implements Middleware.ProcessInbound -func (e *Encryptor) ProcessInbound(data []byte, next func([]byte) (int, error)) (int, error) { - // Check for RSA encrypted key at end of packet - if len(data) >= 8 && e.isRSAEncryptedKey(data) { - return e.processRSAKey(data, next) - } - - // Skip decryption if no key is set - if !e.isKeySet() { - return next(data) - } - - decrypted, err := e.decrypt(data) - if err != nil { - return next(data) // Fallback to unencrypted - } - - return next(decrypted) -} - -func (e *Encryptor) handleKeyExchange(data []byte, next func([]byte) (int, error)) (int, error) { - // Extract key size from packet - if len(data) < 8 { - return next(data) - } - - keySize := binary.LittleEndian.Uint32(data[4:8]) - if keySize != 60 { // Expected key size - return next(data) - } - - // Create key exchange response with RSA public key - response := make([]byte, len(data)) - copy(response, data) - - // Fill with dummy key data (in real implementation, would use proper key) - for i := 8; i < len(response)-8; i++ { - response[i] = 0xFF - } - - // Add termination markers - response[len(response)-5] = 1 - response[len(response)-1] = 1 - - return next(response) -} - -func (e *Encryptor) processRSAKey(data []byte, next func([]byte) (int, error)) (int, error) { - // Extract and decrypt RSA key from end of packet - encryptedKey := data[len(data)-8:] - - // In real implementation, would decrypt with RSA private key - // For now, use a simple XOR pattern - rc4Key := make([]byte, 8) - for i := 0; i < 8; i++ { - rc4Key[i] = encryptedKey[i] ^ 0x55 // Simple pattern - } - - e.setRC4Key(rc4Key) - - // Pass the packet without the RSA key - return next(data[:len(data)-8]) -} - -func (e *Encryptor) isRSAEncryptedKey(data []byte) bool { - // Simple heuristic - check if last 8 bytes look like encrypted data - if len(data) < 8 { - return false - } - - // Check for non-zero data in last 8 bytes - lastBytes := data[len(data)-8:] - nonZero := 0 - for _, b := range lastBytes { - if b != 0 { - nonZero++ - } - } - return nonZero > 4 // Heuristic: encrypted data should have some non-zero bytes -} - -func (e *Encryptor) setRC4Key(key []byte) { - e.cipherMux.Lock() - defer e.cipherMux.Unlock() - - e.rc4Key = make([]byte, len(key)) - copy(e.rc4Key, key) - - cipher, err := rc4.NewCipher(key) - if err == nil { - e.cipher = cipher - e.keySet = true - } -} - -func (e *Encryptor) isKeySet() bool { - e.cipherMux.RLock() - defer e.cipherMux.RUnlock() - return e.keySet -} - -func (e *Encryptor) encrypt(data []byte) ([]byte, error) { - e.cipherMux.Lock() - defer e.cipherMux.Unlock() - - if e.cipher == nil { - return data, nil - } - - // Create new cipher for this operation (RC4 is stateful) - cipher, err := rc4.NewCipher(e.rc4Key) - if err != nil { - return nil, err - } - - encrypted := make([]byte, len(data)) - cipher.XORKeyStream(encrypted, data) - return encrypted, nil -} - -func (e *Encryptor) decrypt(data []byte) ([]byte, error) { - e.cipherMux.Lock() - defer e.cipherMux.Unlock() - - if e.cipher == nil { - return data, nil - } - - // Create new cipher for this operation (RC4 is stateful) - cipher, err := rc4.NewCipher(e.rc4Key) - if err != nil { - return nil, err - } - - decrypted := make([]byte, len(data)) - cipher.XORKeyStream(decrypted, data) - return decrypted, nil -} - -// GetPublicKey returns the RSA public key for key exchange -func (e *Encryptor) GetPublicKey() []byte { - pubKeyBytes, err := x509.MarshalPKIXPublicKey(&e.rsaKey.PublicKey) - if err != nil { - return nil - } - return pubKeyBytes -} - -// Close implements Middleware.Close -func (e *Encryptor) Close() error { - e.closeOnce.Do(func() { - e.cipherMux.Lock() - e.cipher = nil - e.rc4Key = nil - e.keySet = false - e.cipherMux.Unlock() - }) - return nil -} diff --git a/internal/udp/opcodes.go b/internal/udp/opcodes.go new file mode 100644 index 0000000..29d0f94 --- /dev/null +++ b/internal/udp/opcodes.go @@ -0,0 +1,28 @@ +package udp + +const ( + // Protocol opcodes + OpSessionRequest = 0x01 + OpSessionResponse = 0x02 + OpCombined = 0x03 + OpSessionDisconnect = 0x05 + OpKeepAlive = 0x06 + OpServerKeyRequest = 0x07 + OpSessionStatResponse = 0x08 + OpPacket = 0x09 + OpFragment = 0x0D + OpOutOfOrderAck = 0x11 + OpAck = 0x15 + OpAppCombined = 0x19 + OpOutOfSession = 0x1D +) + +// Application opcodes (examples) +const ( + OpLoginRequestMsg = 0x0001 + OpLoginReplyMsg = 0x0002 + OpAllCharactersDescRequestMsg = 0x0003 + OpAllCharactersDescReplyMsg = 0x0004 + OpCreateCharacterRequestMsg = 0x0005 + OpCreateCharacterReplyMsg = 0x0006 +) diff --git a/internal/udp/packet.go b/internal/udp/packet.go index 7a3ba3a..0aec430 100644 --- a/internal/udp/packet.go +++ b/internal/udp/packet.go @@ -2,83 +2,53 @@ package udp import ( "encoding/binary" - "fmt" - "hash/crc32" - "time" + "errors" ) -// Packet types -const ( - PacketTypeData uint8 = iota - PacketTypeAck - PacketTypeSessionRequest - PacketTypeSessionResponse - PacketTypeKeepAlive - PacketTypeDisconnect - PacketTypeFragment -) - -// packet represents a protocol packet -type packet struct { - Type uint8 - Sequence uint16 - Ack uint16 - Session uint32 - Data []byte - CRC uint32 +type ProtocolPacket struct { + Opcode uint8 + Data []byte } -// Marshal serializes the packet -func (p *packet) Marshal() []byte { - dataLen := len(p.Data) - buf := make([]byte, 15+dataLen) // Fixed header + data - - buf[0] = p.Type - binary.BigEndian.PutUint16(buf[1:3], p.Sequence) - binary.BigEndian.PutUint16(buf[3:5], p.Ack) - binary.BigEndian.PutUint32(buf[5:9], p.Session) - binary.BigEndian.PutUint16(buf[9:11], uint16(dataLen)) - copy(buf[11:11+dataLen], p.Data) - - // Calculate CRC32 for header + data - p.CRC = crc32.ChecksumIEEE(buf[:11+dataLen]) - binary.BigEndian.PutUint32(buf[11+dataLen:], p.CRC) - - return buf +type ApplicationPacket struct { + Opcode uint16 + Data []byte } -// Unmarshal deserializes the packet -func (p *packet) Unmarshal(data []byte) error { - if len(data) < 15 { - return fmt.Errorf("packet too short: %d bytes", len(data)) +func ParseProtocolPacket(data []byte) (*ProtocolPacket, error) { + if len(data) < 2 { + return nil, errors.New("packet too small") } - p.Type = data[0] - p.Sequence = binary.BigEndian.Uint16(data[1:3]) - p.Ack = binary.BigEndian.Uint16(data[3:5]) - p.Session = binary.BigEndian.Uint32(data[5:9]) - dataLen := binary.BigEndian.Uint16(data[9:11]) - - if len(data) < 15+int(dataLen) { - return fmt.Errorf("incomplete packet: expected %d bytes, got %d", 15+dataLen, len(data)) - } - - p.Data = make([]byte, dataLen) - copy(p.Data, data[11:11+dataLen]) - p.CRC = binary.BigEndian.Uint32(data[11+dataLen:]) - - // Verify CRC - expectedCRC := crc32.ChecksumIEEE(data[:11+dataLen]) - if p.CRC != expectedCRC { - return fmt.Errorf("CRC mismatch: expected %x, got %x", expectedCRC, p.CRC) - } - - return nil + return &ProtocolPacket{ + Opcode: data[1], + Data: data[2:], + }, nil } -// pendingPacket represents a packet awaiting acknowledgment -type pendingPacket struct { - packet *packet - timestamp time.Time - attempts int +func (p *ProtocolPacket) Serialize() []byte { + data := make([]byte, 2+len(p.Data)) + data[0] = 0x00 // Reserved byte + data[1] = p.Opcode + copy(data[2:], p.Data) + return data +} + +func ParseApplicationPacket(data []byte) (*ApplicationPacket, error) { + if len(data) < 2 { + return nil, errors.New("application packet too small") + } + + opcode := binary.LittleEndian.Uint16(data[0:2]) + return &ApplicationPacket{ + Opcode: opcode, + Data: data[2:], + }, nil +} + +func (p *ApplicationPacket) Serialize() []byte { + data := make([]byte, 2+len(p.Data)) + binary.LittleEndian.PutUint16(data[0:2], p.Opcode) + copy(data[2:], p.Data) + return data } diff --git a/internal/udp/server.go b/internal/udp/server.go index 8e90edd..4c1f3f9 100644 --- a/internal/udp/server.go +++ b/internal/udp/server.go @@ -1,576 +1,105 @@ package udp import ( - "context" "fmt" "net" "sync" - "sync/atomic" "time" ) -// Conn represents a reliable UDP connection -type Conn interface { - Read(b []byte) (n int, err error) - Write(b []byte) (n int, err error) - Close() error - LocalAddr() net.Addr - RemoteAddr() net.Addr - SetReadDeadline(t time.Time) error - SetWriteDeadline(t time.Time) error +type Server struct { + conn *net.UDPConn + connections map[string]*Connection + mutex sync.RWMutex + handler PacketHandler + running bool } -// Listener listens for incoming reliable UDP connections -type Listener interface { - Accept() (Conn, error) - Close() error - Addr() net.Addr +type PacketHandler interface { + HandlePacket(conn *Connection, packet *ApplicationPacket) } -// stream implements a reliable UDP stream -type stream struct { - conn *net.UDPConn - remoteAddr *net.UDPAddr - localAddr *net.UDPAddr - session uint32 - config *Config - - // Sequence tracking - sendSeq uint32 - recvSeq uint16 - lastAckSent uint16 - - // Channels for communication - inbound chan []byte - outbound chan []byte - control chan *packet - done chan struct{} - closeOnce sync.Once - - // Reliability tracking - pending map[uint16]*pendingPacket - pendingMutex sync.RWMutex - outOfOrder map[uint16][]byte - oooMutex sync.RWMutex - - // Flow control - windowSize uint16 - - // Read/Write deadlines - readDeadline atomic.Value - writeDeadline atomic.Value - - // Last activity for keep-alive - lastActivity time.Time - activityMutex sync.RWMutex -} - -// newStream creates a new reliable UDP stream -func newStream(conn *net.UDPConn, remoteAddr *net.UDPAddr, session uint32, config *Config) *stream { - s := &stream{ - conn: conn, - remoteAddr: remoteAddr, - localAddr: conn.LocalAddr().(*net.UDPAddr), - session: session, - config: config, - windowSize: config.WindowSize, - - inbound: make(chan []byte, 256), - outbound: make(chan []byte, 256), - control: make(chan *packet, 64), - done: make(chan struct{}), - - pending: make(map[uint16]*pendingPacket), - outOfOrder: make(map[uint16][]byte), - - lastActivity: time.Now(), - } - - // Start background goroutines - go s.writeLoop() - go s.retransmitLoop() - go s.keepAliveLoop() - - return s -} - -// Read implements Conn.Read -func (s *stream) Read(b []byte) (n int, err error) { - ctx, cancel := s.getReadDeadlineContext() - defer cancel() - - select { - case data := <-s.inbound: - n = copy(b, data) - if n < len(data) { - return n, fmt.Errorf("buffer too small: need %d bytes, got %d", len(data), len(b)) - } - return n, nil - case <-s.done: - return 0, fmt.Errorf("connection closed") - case <-ctx.Done(): - return 0, fmt.Errorf("read timeout") - } -} - -// Write implements Conn.Write -func (s *stream) Write(b []byte) (n int, err error) { - if len(b) == 0 { - return 0, nil - } - - // Fragment large packets - mtu := s.config.MTU - 15 // Account for packet header - if len(b) <= mtu { - return s.writePacket(b) - } - - // Fragment the data - sent := 0 - for sent < len(b) { - end := sent + mtu - if end > len(b) { - end = len(b) - } - - n, err := s.writePacket(b[sent:end]) - sent += n - if err != nil { - return sent, err - } - } - - return sent, nil -} - -// writePacket writes a single packet -func (s *stream) writePacket(data []byte) (int, error) { - ctx, cancel := s.getWriteDeadlineContext() - defer cancel() - - select { - case s.outbound <- data: - s.updateActivity() - return len(data), nil - case <-s.done: - return 0, fmt.Errorf("connection closed") - case <-ctx.Done(): - return 0, fmt.Errorf("write timeout") - } -} - -// writeLoop handles outbound packet transmission -func (s *stream) writeLoop() { - defer close(s.outbound) - - for { - select { - case data := <-s.outbound: - s.sendDataPacket(data) - case ctrlPacket := <-s.control: - s.sendControlPacket(ctrlPacket) - case <-s.done: - return - } - } -} - -// sendDataPacket sends a data packet with reliability -func (s *stream) sendDataPacket(data []byte) { - seq := uint16(atomic.AddUint32(&s.sendSeq, 1) - 1) - - pkt := &packet{ - Type: PacketTypeData, - Sequence: seq, - Ack: s.lastAckSent, - Session: s.session, - Data: data, - } - - // Store for retransmission - s.pendingMutex.Lock() - s.pending[seq] = &pendingPacket{ - packet: pkt, - timestamp: time.Now(), - attempts: 0, - } - s.pendingMutex.Unlock() - - s.sendRawPacket(pkt) -} - -// sendControlPacket sends control packets (ACKs, etc.) -func (s *stream) sendControlPacket(pkt *packet) { - pkt.Session = s.session - s.sendRawPacket(pkt) -} - -// sendRawPacket sends a packet over UDP -func (s *stream) sendRawPacket(pkt *packet) { - data := pkt.Marshal() - s.conn.WriteToUDP(data, s.remoteAddr) -} - -// handlePacket processes an incoming packet -func (s *stream) handlePacket(pkt *packet) { - s.updateActivity() - - switch pkt.Type { - case PacketTypeData: - s.handleDataPacket(pkt) - case PacketTypeAck: - s.handleAckPacket(pkt) - case PacketTypeKeepAlive: - s.sendAck(pkt.Sequence) - case PacketTypeDisconnect: - s.Close() - } -} - -// handleDataPacket processes incoming data packets -func (s *stream) handleDataPacket(pkt *packet) { - // Send ACK - s.sendAck(pkt.Sequence) - - // Check sequence order - expectedSeq := s.recvSeq + 1 - - if pkt.Sequence == expectedSeq { - // In order - deliver immediately - s.deliverData(pkt.Data) - s.recvSeq = pkt.Sequence - - // Check for buffered out-of-order packets - s.processOutOfOrder() - } else if pkt.Sequence > expectedSeq { - // Future packet - buffer it - s.oooMutex.Lock() - s.outOfOrder[pkt.Sequence] = pkt.Data - s.oooMutex.Unlock() - } - // Past packets are ignored (duplicate) -} - -// processOutOfOrder delivers buffered in-order packets -func (s *stream) processOutOfOrder() { - s.oooMutex.Lock() - defer s.oooMutex.Unlock() - - for { - nextSeq := s.recvSeq + 1 - if data, exists := s.outOfOrder[nextSeq]; exists { - s.deliverData(data) - s.recvSeq = nextSeq - delete(s.outOfOrder, nextSeq) - } else { - break - } - } -} - -// deliverData delivers data to the application -func (s *stream) deliverData(data []byte) { - select { - case s.inbound <- data: - case <-s.done: - default: - // Channel full - would block - } -} - -// handleAckPacket processes acknowledgment packets -func (s *stream) handleAckPacket(pkt *packet) { - s.pendingMutex.Lock() - defer s.pendingMutex.Unlock() - - if pending, exists := s.pending[pkt.Sequence]; exists { - delete(s.pending, pkt.Sequence) - _ = pending // Packet acknowledged - } -} - -// sendAck sends an acknowledgment -func (s *stream) sendAck(seq uint16) { - s.lastAckSent = seq - ackPkt := &packet{ - Type: PacketTypeAck, - Sequence: seq, - Ack: seq, - } - - select { - case s.control <- ackPkt: - case <-s.done: - default: - } -} - -// retransmitLoop handles packet retransmission -func (s *stream) retransmitLoop() { - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - s.checkRetransmissions() - case <-s.done: - return - } - } -} - -// checkRetransmissions checks for packets needing retransmission -func (s *stream) checkRetransmissions() { - now := time.Now() - - s.pendingMutex.Lock() - defer s.pendingMutex.Unlock() - - for seq, pending := range s.pending { - if now.Sub(pending.timestamp) > RetransmitTimeout { - if pending.attempts >= s.config.RetryAttempts { - // Too many attempts - close connection - delete(s.pending, seq) - go s.Close() - return - } - - // Retransmit - pending.attempts++ - pending.timestamp = now - s.sendRawPacket(pending.packet) - } - } -} - -// keepAliveLoop sends periodic keep-alive packets -func (s *stream) keepAliveLoop() { - ticker := time.NewTicker(KeepAliveInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - s.activityMutex.RLock() - idle := time.Since(s.lastActivity) - s.activityMutex.RUnlock() - - if idle > KeepAliveInterval { - keepAlive := &packet{Type: PacketTypeKeepAlive} - select { - case s.control <- keepAlive: - case <-s.done: - return - } - } - case <-s.done: - return - } - } -} - -// updateActivity updates the last activity timestamp -func (s *stream) updateActivity() { - s.activityMutex.Lock() - s.lastActivity = time.Now() - s.activityMutex.Unlock() -} - -// Close implements Conn.Close -func (s *stream) Close() error { - s.closeOnce.Do(func() { - // Send disconnect packet - disconnect := &packet{Type: PacketTypeDisconnect} - select { - case s.control <- disconnect: - default: - } - - close(s.done) - }) - return nil -} - -// Address methods -func (s *stream) LocalAddr() net.Addr { return s.localAddr } -func (s *stream) RemoteAddr() net.Addr { return s.remoteAddr } - -// Deadline methods -func (s *stream) SetReadDeadline(t time.Time) error { - s.readDeadline.Store(t) - return nil -} - -func (s *stream) SetWriteDeadline(t time.Time) error { - s.writeDeadline.Store(t) - return nil -} - -func (s *stream) getReadDeadlineContext() (context.Context, context.CancelFunc) { - if deadline, ok := s.readDeadline.Load().(time.Time); ok && !deadline.IsZero() { - return context.WithDeadline(context.Background(), deadline) - } - return context.Background(), func() {} -} - -func (s *stream) getWriteDeadlineContext() (context.Context, context.CancelFunc) { - if deadline, ok := s.writeDeadline.Load().(time.Time); ok && !deadline.IsZero() { - return context.WithDeadline(context.Background(), deadline) - } - return context.Background(), func() {} -} - -// listener implements a reliable UDP listener -type listener struct { - conn *net.UDPConn - config *Config - streams map[string]*stream - mutex sync.RWMutex - incoming chan *stream - done chan struct{} -} - -// Listen creates a new reliable UDP listener -func Listen(address string, config *Config) (Listener, error) { - if config == nil { - config = DefaultConfig() - } - - addr, err := net.ResolveUDPAddr("udp", address) +func NewServer(addr string, handler PacketHandler) (*Server, error) { + udpAddr, err := net.ResolveUDPAddr("udp", addr) if err != nil { return nil, err } - conn, err := net.ListenUDP("udp", addr) + conn, err := net.ListenUDP("udp", udpAddr) if err != nil { return nil, err } - l := &listener{ - conn: conn, - config: config, - streams: make(map[string]*stream), - incoming: make(chan *stream, 16), - done: make(chan struct{}), - } - - go l.readLoop() - return l, nil + return &Server{ + conn: conn, + connections: make(map[string]*Connection), + handler: handler, + }, nil } -// readLoop handles incoming UDP packets -func (l *listener) readLoop() { - buf := make([]byte, 2048) +func (s *Server) Start() error { + s.running = true - for { - select { - case <-l.done: - return - default: - } + // Start connection timeout checker + go s.timeoutChecker() - n, addr, err := l.conn.ReadFromUDP(buf) + // Main packet receive loop + buffer := make([]byte, 2048) + for s.running { + n, addr, err := s.conn.ReadFromUDP(buffer) if err != nil { + if s.running { + fmt.Printf("UDP read error: %v\n", err) + } continue } - pkt := &packet{} - if err := pkt.Unmarshal(buf[:n]); err != nil { - continue + go s.handlePacket(buffer[:n], addr) + } + + return nil +} + +func (s *Server) Stop() { + s.running = false + s.conn.Close() +} + +func (s *Server) handlePacket(data []byte, addr *net.UDPAddr) { + if len(data) < 2 { + return + } + + connKey := addr.String() + + s.mutex.Lock() + conn, exists := s.connections[connKey] + if !exists { + conn = NewConnection(addr, s.conn, s.handler) + s.connections[connKey] = conn + } + s.mutex.Unlock() + + conn.ProcessPacket(data) +} + +func (s *Server) timeoutChecker() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for range ticker.C { + if !s.running { + return } - l.handlePacket(pkt, addr) - } -} - -// handlePacket routes packets to appropriate streams -func (l *listener) handlePacket(pkt *packet, addr *net.UDPAddr) { - streamKey := addr.String() - - l.mutex.RLock() - stream, exists := l.streams[streamKey] - l.mutex.RUnlock() - - if !exists && pkt.Type == PacketTypeSessionRequest { - // New connection - session := pkt.Session - stream = newStream(l.conn, addr, session, l.config) - - l.mutex.Lock() - l.streams[streamKey] = stream - l.mutex.Unlock() - - // Send session response - response := &packet{ - Type: PacketTypeSessionResponse, - Session: session, + now := time.Now() + s.mutex.Lock() + for key, conn := range s.connections { + if now.Sub(conn.lastPacketTime) > 45*time.Second { + conn.Close() + delete(s.connections, key) + } } - stream.sendControlPacket(response) - - select { - case l.incoming <- stream: - case <-l.done: - } - } else if exists { - stream.handlePacket(pkt) + s.mutex.Unlock() } } - -// Accept implements Listener.Accept -func (l *listener) Accept() (Conn, error) { - select { - case stream := <-l.incoming: - return stream, nil - case <-l.done: - return nil, fmt.Errorf("listener closed") - } -} - -// Close implements Listener.Close -func (l *listener) Close() error { - close(l.done) - - l.mutex.Lock() - defer l.mutex.Unlock() - - for _, stream := range l.streams { - stream.Close() - } - - return l.conn.Close() -} - -// Addr implements Listener.Addr -func (l *listener) Addr() net.Addr { - return l.conn.LocalAddr() -} - -// Dial creates a client connection to a reliable UDP server -func Dial(address string, config *Config) (Conn, error) { - if config == nil { - config = DefaultConfig() - } - - addr, err := net.ResolveUDPAddr("udp", address) - if err != nil { - return nil, err - } - - conn, err := net.DialUDP("udp", nil, addr) - if err != nil { - return nil, err - } - - session := uint32(time.Now().Unix()) - stream := newStream(conn, addr, session, config) - - // Send session request - request := &packet{ - Type: PacketTypeSessionRequest, - Session: session, - } - stream.sendControlPacket(request) - - return stream, nil -}