new udp server attempt

This commit is contained in:
Sky Johnson 2025-07-21 18:29:05 -05:00
parent 9f190bcd25
commit 576d080f03
13 changed files with 886 additions and 1427 deletions

312
eq2_protocol_report.md Normal file
View File

@ -0,0 +1,312 @@
# EverQuest 2 Network Protocol Documentation
## Overview
The EverQuest 2 protocol is a custom UDP-based protocol that provides reliable delivery, encryption, compression, and session management. This document describes the protocol structure for reimplementation.
## 1. Protocol Architecture
### 1.1 Protocol Layers
```
Application Layer - Game logic packets (EQApplicationPacket)
Protocol Layer - Session management, reliability (EQProtocolPacket)
Transport Layer - UDP with custom reliability
Network Layer - Standard IP
```
### 1.2 Packet Types
- **EQProtocolPacket**: Low-level protocol control packets
- **EQApplicationPacket**: High-level game data packets
- **EQ2Packet**: EQ2-specific application packets with login opcodes
## 2. Session Management
### 2.1 Session Establishment
```
Client -> Server: OP_SessionRequest
Server -> Client: OP_SessionResponse
```
#### SessionRequest Structure
```c
struct SessionRequest {
uint32 UnknownA; // Usually 0
uint32 Session; // Proposed session ID
uint32 MaxLength; // Maximum packet length
};
```
#### SessionResponse Structure
```c
struct SessionResponse {
uint32 Session; // Confirmed session ID
uint32 Key; // Encryption key
uint8 UnknownA; // Usually 2
uint8 Format; // Flags: 0x01=compressed, 0x04=encoded
uint8 UnknownB; // Usually 0
uint32 MaxLength; // Maximum packet length
uint32 UnknownD; // Usually 0
};
```
### 2.2 Session Termination
```
Either -> Other: OP_SessionDisconnect
```
## 3. Protocol Opcodes
### 3.1 Core Protocol Opcodes
```c
#define OP_SessionRequest 0x01
#define OP_SessionResponse 0x02
#define OP_Combined 0x03
#define OP_SessionDisconnect 0x05
#define OP_KeepAlive 0x06
#define OP_ServerKeyRequest 0x07
#define OP_SessionStatResponse 0x08
#define OP_Packet 0x09
#define OP_Fragment 0x0D
#define OP_OutOfOrderAck 0x11
#define OP_Ack 0x15
#define OP_AppCombined 0x19
#define OP_OutOfSession 0x1D
```
## 4. Reliable Delivery System
### 4.1 Sequence Numbers
- 16-bit sequence numbers for ordered delivery
- Wrap-around handling at 65536
- Window-based flow control (default window size: 2048)
### 4.2 Acknowledgments
- **OP_Ack**: Acknowledges packets up to sequence number
- **OP_OutOfOrderAck**: Acknowledges specific out-of-order packet
- Retransmission on timeout (default: 500ms * 3.0 multiplier, max 5000ms)
### 4.3 Packet Structure for Sequenced Data
```
[2 bytes: Sequence Number][Payload Data]
```
## 5. Encryption System
### 5.1 Key Exchange
1. RSA key exchange during initial handshake
2. 8-byte encrypted key transmitted in packet
3. RC4 encryption initialized with exchanged key
### 5.2 RC4 Encryption
- Applied to packet payload after headers
- Separate encryption state per connection
- Encryption offset varies by packet type and compression
### 5.3 CRC Validation
- 16-bit CRC appended to most packets
- CRC calculated using session key
- Some packets (SessionRequest, SessionResponse, OutOfSession) not CRC'd
## 6. Compression System
### 6.1 zlib Compression
- Individual packets compressed using zlib deflate
- Compression applied when packet size > 128 bytes
- Compression markers:
- `0x5A`: zlib compressed data follows
- `0xA5`: uncompressed data (small packets)
### 6.2 Compression Process
```
1. Check if packet size > compression threshold
2. Apply zlib deflate compression
3. Prepend compression marker
4. If compressed size >= original, use uncompressed with 0xA5 marker
```
## 7. Packet Combination
### 7.1 Protocol-Level Combination (OP_Combined)
Multiple protocol packets combined into single UDP datagram:
```
[1 byte: Packet1 Size][Packet1 Data]
[1 byte: Packet2 Size][Packet2 Data]
...
```
If size >= 255:
```
[1 byte: 0xFF][2 bytes: Actual Size][Packet Data]
```
### 7.2 Application-Level Combination (OP_AppCombined)
Multiple application packets combined:
```
[1 byte: Packet1 Size][Packet1 Data without opcode header]
[1 byte: Packet2 Size][Packet2 Data without opcode header]
...
```
## 8. Fragmentation
### 8.1 Large Packet Handling
Packets larger than MaxLength are fragmented using OP_Fragment:
**First Fragment:**
```
[2 bytes: Sequence][4 bytes: Total Length][Payload Chunk]
```
**Subsequent Fragments:**
```
[2 bytes: Sequence][Payload Chunk]
```
### 8.2 Reassembly
1. Allocate buffer based on total length from first fragment
2. Collect fragments in sequence order
3. Reconstruct original packet when all fragments received
## 9. Data Structure System
### 9.1 Data Types
```c
#define DATA_STRUCT_INT8 1
#define DATA_STRUCT_INT16 2
#define DATA_STRUCT_INT32 3
#define DATA_STRUCT_INT64 4
#define DATA_STRUCT_FLOAT 5
#define DATA_STRUCT_DOUBLE 6
#define DATA_STRUCT_COLOR 7
#define DATA_STRUCT_SINT8 8
#define DATA_STRUCT_SINT16 9
#define DATA_STRUCT_SINT32 10
#define DATA_STRUCT_CHAR 11
#define DATA_STRUCT_EQ2_8BIT_STRING 12
#define DATA_STRUCT_EQ2_16BIT_STRING 13
#define DATA_STRUCT_EQ2_32BIT_STRING 14
#define DATA_STRUCT_EQUIPMENT 15
#define DATA_STRUCT_ARRAY 16
#define DATA_STRUCT_ITEM 17
#define DATA_STRUCT_SINT64 18
```
### 9.2 String Types
- **EQ2_8BitString**: [1 byte length][string data]
- **EQ2_16BitString**: [2 bytes length][string data]
- **EQ2_32BitString**: [4 bytes length][string data]
### 9.3 Color Structure
```c
struct EQ2_Color {
uint8 red;
uint8 green;
uint8 blue;
};
```
### 9.4 Equipment Structure
```c
struct EQ2_EquipmentItem {
uint16 type;
EQ2_Color color;
EQ2_Color highlight;
};
```
## 10. Application Opcodes
### 10.1 Opcode System
- Two-byte opcodes for game servers (WorldServer, ZoneServer)
- One-byte opcodes for login servers
- Version-specific opcode mappings stored in database
- Translation between internal EmuOpcodes and client opcodes
### 10.2 Key Application Opcodes
```c
// Login Operations
OP_LoginRequestMsg
OP_LoginReplyMsg
OP_AllCharactersDescRequestMsg
OP_AllCharactersDescReplyMsg
OP_CreateCharacterRequestMsg
OP_CreateCharacterReplyMsg
// World Operations
OP_ZoneInfoMsg
OP_UpdateCharacterSheetMsg
OP_UpdateInventoryMsg
OP_ClientCmdMsg
// Chat Operations
OP_ChatTellUserMsg
OP_ChatJoinChannelMsg
```
## 11. Implementation Guidelines
### 11.1 Connection State Machine
```
CLOSED -> SessionRequest -> ESTABLISHED
ESTABLISHED -> SessionDisconnect -> CLOSING -> CLOSED
```
### 11.2 Buffer Management
- Maintain separate inbound/outbound queues
- Implement sliding window for flow control
- Handle out-of-order packet storage
- Implement packet combining logic
### 11.3 Threading Considerations
- Separate reader/writer threads recommended
- Reader processes incoming UDP packets
- Writer sends outbound packets and handles retransmission
- Combine packet processor for optimization
### 11.4 Error Handling
- Validate CRC on all received packets
- Handle malformed packets gracefully
- Implement connection timeout detection
- Retry logic for failed transmissions
### 11.5 Performance Optimizations
- Packet combination to reduce UDP overhead
- Compression for large packets
- Rate limiting and congestion control
- Efficient data structure serialization
## 12. Stream Types
Different stream types have different characteristics:
```c
enum EQStreamType {
LoginStream, // 1-byte opcodes, no compression/encryption
WorldStream, // 2-byte opcodes, compression, no encryption
ZoneStream, // 2-byte opcodes, compression, no encryption
ChatStream, // 1-byte opcodes, no compression, encoding
EQ2Stream // 2-byte opcodes, no compression/encryption
};
```
## 13. Sample Packet Flow
### 13.1 Login Sequence
```
1. Client -> Server: OP_SessionRequest
2. Server -> Client: OP_SessionResponse (with key, compression flags)
3. Client -> Server: OP_Packet[OP_LoginRequestMsg] (with credentials)
4. Server -> Client: OP_Packet[OP_LoginReplyMsg] (success/failure)
5. Client -> Server: OP_Packet[OP_AllCharactersDescRequestMsg]
6. Server -> Client: OP_Packet[OP_AllCharactersDescReplyMsg] (character list)
```
### 13.2 Reliable Data Transfer
```
1. Sender: Assign sequence number, add to retransmit queue
2. Sender: Transmit OP_Packet[seq][data]
3. Receiver: Process packet, send OP_Ack[seq]
4. Sender: Receive ack, remove from retransmit queue
5. On timeout: Retransmit packet up to max attempts
```
This documentation provides the foundation for implementing the EQ2 protocol in any programming language while maintaining compatibility with the existing server and client implementations.

View File

@ -0,0 +1,60 @@
package udp
import (
"bytes"
"compress/zlib"
"io"
)
func Compress(data []byte) ([]byte, error) {
var buf bytes.Buffer
// Write compression marker
buf.WriteByte(0x5A)
writer := zlib.NewWriter(&buf)
_, err := writer.Write(data)
if err != nil {
return nil, err
}
err = writer.Close()
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func Decompress(data []byte) ([]byte, error) {
if len(data) == 0 {
return data, nil
}
// Check compression marker
if data[0] == 0xA5 {
// Uncompressed data
return data[1:], nil
}
if data[0] != 0x5A {
// No compression marker, return as-is
return data, nil
}
// Decompress zlib data
reader := bytes.NewReader(data[1:])
zlibReader, err := zlib.NewReader(reader)
if err != nil {
return nil, err
}
defer zlibReader.Close()
var buf bytes.Buffer
_, err = io.Copy(&buf, zlibReader)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}

View File

@ -1,31 +0,0 @@
package udp
import "time"
// Configuration constants
const (
DefaultMTU = 1400
DefaultWindowSize = 256
DefaultRetryAttempts = 5
DefaultTimeout = 30 * time.Second
RetransmitTimeout = 3 * time.Second
KeepAliveInterval = 10 * time.Second
)
// Config holds configuration for reliable UDP connections
type Config struct {
MTU int
WindowSize uint16
RetryAttempts int
Timeout time.Duration
}
// DefaultConfig returns a default configuration
func DefaultConfig() *Config {
return &Config{
MTU: DefaultMTU,
WindowSize: DefaultWindowSize,
RetryAttempts: DefaultRetryAttempts,
Timeout: DefaultTimeout,
}
}

279
internal/udp/connection.go Normal file
View File

@ -0,0 +1,279 @@
package udp
import (
"crypto/rand"
"encoding/binary"
"net"
"sync"
"time"
)
type ConnectionState int
const (
StateClosed ConnectionState = iota
StateEstablished
StateClosing
)
type Connection struct {
addr *net.UDPAddr
conn *net.UDPConn
handler PacketHandler
state ConnectionState
mutex sync.RWMutex
// Session data
sessionID uint32
key uint32
compressed bool
encoded bool
maxLength uint32
// Sequence tracking
nextInSeq uint16
nextOutSeq uint16
// Queues
inboundQueue []*ApplicationPacket
outboundQueue []*ProtocolPacket
ackQueue []uint16
// Timing
lastPacketTime time.Time
// Crypto
crypto *Crypto
}
func NewConnection(addr *net.UDPAddr, conn *net.UDPConn, handler PacketHandler) *Connection {
return &Connection{
addr: addr,
conn: conn,
handler: handler,
state: StateClosed,
maxLength: 512,
lastPacketTime: time.Now(),
crypto: NewCrypto(),
}
}
func (c *Connection) ProcessPacket(data []byte) {
c.lastPacketTime = time.Now()
packet, err := ParseProtocolPacket(data)
if err != nil {
return
}
switch packet.Opcode {
case OpSessionRequest:
c.handleSessionRequest(packet)
case OpSessionResponse:
c.handleSessionResponse(packet)
case OpPacket:
c.handleDataPacket(packet)
case OpAck:
c.handleAck(packet)
case OpKeepAlive:
c.sendKeepAlive()
case OpSessionDisconnect:
c.Close()
}
}
func (c *Connection) handleSessionRequest(packet *ProtocolPacket) {
if len(packet.Data) < 12 {
return
}
// Parse session request
c.sessionID = binary.LittleEndian.Uint32(packet.Data[4:8])
requestedMaxLen := binary.LittleEndian.Uint32(packet.Data[8:12])
if requestedMaxLen > 0 {
c.maxLength = requestedMaxLen
}
// Generate encryption key
keyBytes := make([]byte, 4)
rand.Read(keyBytes)
c.key = binary.LittleEndian.Uint32(keyBytes)
// Send session response
c.sendSessionResponse()
c.state = StateEstablished
}
func (c *Connection) handleSessionResponse(packet *ProtocolPacket) {
// Client-side session response handling
if len(packet.Data) < 20 {
return
}
c.sessionID = binary.LittleEndian.Uint32(packet.Data[0:4])
c.key = binary.LittleEndian.Uint32(packet.Data[4:8])
format := packet.Data[9]
c.compressed = (format & 0x01) != 0
c.encoded = (format & 0x04) != 0
c.maxLength = binary.LittleEndian.Uint32(packet.Data[12:16])
c.state = StateEstablished
}
func (c *Connection) handleDataPacket(packet *ProtocolPacket) {
if len(packet.Data) < 2 {
return
}
seq := binary.BigEndian.Uint16(packet.Data[0:2])
payload := packet.Data[2:]
// Simple in-order processing for now
if seq == c.nextInSeq {
c.nextInSeq++
c.sendAck(seq)
// Process application packet
if appPacket, err := c.processApplicationData(payload); err == nil {
c.handler.HandlePacket(c, appPacket)
}
}
}
func (c *Connection) handleAck(packet *ProtocolPacket) {
if len(packet.Data) < 2 {
return
}
seq := binary.BigEndian.Uint16(packet.Data[0:2])
// Remove acknowledged packets from retransmit queue
_ = seq // TODO: implement retransmit queue
}
func (c *Connection) processApplicationData(data []byte) (*ApplicationPacket, error) {
// Decrypt if needed
if c.crypto.IsEncrypted() {
data = c.crypto.Decrypt(data)
}
// Decompress if needed
if c.compressed && len(data) > 0 {
var err error
data, err = Decompress(data)
if err != nil {
return nil, err
}
}
return ParseApplicationPacket(data)
}
func (c *Connection) SendPacket(packet *ApplicationPacket) {
c.mutex.Lock()
defer c.mutex.Unlock()
data := packet.Serialize()
// Compress if needed
if c.compressed && len(data) > 128 {
if compressed, err := Compress(data); err == nil {
data = compressed
}
}
// Encrypt if needed
if c.crypto.IsEncrypted() {
data = c.crypto.Encrypt(data)
}
// Create protocol packet
protocolData := make([]byte, 2+len(data))
binary.BigEndian.PutUint16(protocolData[0:2], c.nextOutSeq)
copy(protocolData[2:], data)
c.nextOutSeq++
protocolPacket := &ProtocolPacket{
Opcode: OpPacket,
Data: protocolData,
}
c.sendProtocolPacket(protocolPacket)
}
func (c *Connection) sendSessionResponse() {
data := make([]byte, 20)
binary.LittleEndian.PutUint32(data[0:4], c.sessionID)
binary.LittleEndian.PutUint32(data[4:8], c.key)
data[8] = 2 // UnknownA
var format uint8
if c.compressed {
format |= 0x01
}
if c.encoded {
format |= 0x04
}
data[9] = format
data[10] = 0 // UnknownB
binary.LittleEndian.PutUint32(data[12:16], c.maxLength)
binary.LittleEndian.PutUint32(data[16:20], 0) // UnknownD
packet := &ProtocolPacket{
Opcode: OpSessionResponse,
Data: data,
}
c.sendProtocolPacket(packet)
}
func (c *Connection) sendAck(seq uint16) {
data := make([]byte, 2)
binary.BigEndian.PutUint16(data, seq)
packet := &ProtocolPacket{
Opcode: OpAck,
Data: data,
}
c.sendProtocolPacket(packet)
}
func (c *Connection) sendKeepAlive() {
packet := &ProtocolPacket{
Opcode: OpKeepAlive,
Data: []byte{},
}
c.sendProtocolPacket(packet)
}
func (c *Connection) sendProtocolPacket(packet *ProtocolPacket) {
data := packet.Serialize()
c.conn.WriteToUDP(data, c.addr)
}
func (c *Connection) Close() {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.state == StateEstablished {
c.state = StateClosing
// Send disconnect
disconnectData := make([]byte, 6)
binary.LittleEndian.PutUint32(disconnectData[0:4], c.sessionID)
disconnectData[4] = 0
disconnectData[5] = 6
packet := &ProtocolPacket{
Opcode: OpSessionDisconnect,
Data: disconnectData,
}
c.sendProtocolPacket(packet)
}
c.state = StateClosed
}

57
internal/udp/crypto.go Normal file
View File

@ -0,0 +1,57 @@
package udp
import (
"crypto/rc4"
)
type Crypto struct {
cipher *rc4.Cipher
key []byte
encrypted bool
}
func NewCrypto() *Crypto {
return &Crypto{
encrypted: false,
}
}
func (c *Crypto) SetKey(key []byte) error {
cipher, err := rc4.NewCipher(key)
if err != nil {
return err
}
c.cipher = cipher
c.key = make([]byte, len(key))
copy(c.key, key)
c.encrypted = true
return nil
}
func (c *Crypto) IsEncrypted() bool {
return c.encrypted
}
func (c *Crypto) Encrypt(data []byte) []byte {
if !c.encrypted {
return data
}
encrypted := make([]byte, len(data))
copy(encrypted, data)
c.cipher.XORKeyStream(encrypted, encrypted)
return encrypted
}
func (c *Crypto) Decrypt(data []byte) []byte {
if !c.encrypted {
return data
}
decrypted := make([]byte, len(data))
copy(decrypted, data)
c.cipher.XORKeyStream(decrypted, decrypted)
return decrypted
}

View File

@ -0,0 +1,40 @@
package udp
import (
"fmt"
"testing"
"time"
)
type TestHandler struct{}
func (h *TestHandler) HandlePacket(conn *Connection, packet *ApplicationPacket) {
fmt.Printf("Received packet - Opcode: 0x%04X, Data length: %d\n",
packet.Opcode, len(packet.Data))
// Echo back a response
response := &ApplicationPacket{
Opcode: OpLoginReplyMsg,
Data: []byte("Hello from server"),
}
conn.SendPacket(response)
}
func TestServer(t *testing.T) {
handler := &TestHandler{}
server, err := NewServer(":9999", handler)
if err != nil {
t.Fatalf("Failed to create server: %v", err)
}
go func() {
if err := server.Start(); err != nil {
t.Errorf("Server error: %v", err)
}
}()
// Let it run for a bit
time.Sleep(100 * time.Millisecond)
server.Stop()
}

View File

@ -1,140 +0,0 @@
package udp
import (
"net"
"sync"
"time"
)
// Middleware interface for processing packets
type Middleware interface {
ProcessOutbound(data []byte, next func([]byte) (int, error)) (int, error)
ProcessInbound(data []byte, next func([]byte) (int, error)) (int, error)
Close() error
}
// Builder for fluent middleware configuration
type Builder struct {
address string
config *Config
middlewares []Middleware
}
// NewBuilder creates a new connection builder
func NewBuilder() *Builder {
return &Builder{
config: DefaultConfig(),
}
}
// Address sets the connection address
func (b *Builder) Address(addr string) *Builder {
b.address = addr
return b
}
// Config sets the UDP configuration
func (b *Builder) Config(config *Config) *Builder {
b.config = config
return b
}
// Use adds middleware to the stack
func (b *Builder) Use(middleware Middleware) *Builder {
b.middlewares = append(b.middlewares, middleware)
return b
}
// Listen creates a listener with middleware
func (b *Builder) Listen() (Listener, error) {
listener, err := Listen(b.address, b.config)
if err != nil {
return nil, err
}
return &middlewareListener{listener, b.middlewares}, nil
}
// Dial creates a client connection with middleware
func (b *Builder) Dial() (Conn, error) {
conn, err := Dial(b.address, b.config)
if err != nil {
return nil, err
}
return newMiddlewareConn(conn, b.middlewares), nil
}
// middlewareConn wraps a connection with middleware stack
type middlewareConn struct {
conn Conn
middlewares []Middleware
closeOnce sync.Once
}
func newMiddlewareConn(conn Conn, middlewares []Middleware) *middlewareConn {
return &middlewareConn{
conn: conn,
middlewares: middlewares,
}
}
func (m *middlewareConn) Write(data []byte) (int, error) {
return m.processOutbound(0, data)
}
func (m *middlewareConn) Read(data []byte) (int, error) {
n, err := m.conn.Read(data)
if err != nil {
return n, err
}
return m.processInbound(len(m.middlewares)-1, data[:n])
}
func (m *middlewareConn) processOutbound(index int, data []byte) (int, error) {
if index >= len(m.middlewares) {
return m.conn.Write(data)
}
return m.middlewares[index].ProcessOutbound(data, func(processed []byte) (int, error) {
return m.processOutbound(index+1, processed)
})
}
func (m *middlewareConn) processInbound(index int, data []byte) (int, error) {
if index < 0 {
return len(data), nil
}
return m.middlewares[index].ProcessInbound(data, func(processed []byte) (int, error) {
return m.processInbound(index-1, processed)
})
}
func (m *middlewareConn) Close() error {
m.closeOnce.Do(func() {
for _, middleware := range m.middlewares {
middleware.Close()
}
})
return m.conn.Close()
}
func (m *middlewareConn) LocalAddr() net.Addr { return m.conn.LocalAddr() }
func (m *middlewareConn) RemoteAddr() net.Addr { return m.conn.RemoteAddr() }
func (m *middlewareConn) SetReadDeadline(t time.Time) error { return m.conn.SetReadDeadline(t) }
func (m *middlewareConn) SetWriteDeadline(t time.Time) error { return m.conn.SetWriteDeadline(t) }
type middlewareListener struct {
listener Listener
middlewares []Middleware
}
func (l *middlewareListener) Accept() (Conn, error) {
conn, err := l.listener.Accept()
if err != nil {
return nil, err
}
return newMiddlewareConn(conn, l.middlewares), nil
}
func (l *middlewareListener) Close() error { return l.listener.Close() }
func (l *middlewareListener) Addr() net.Addr { return l.listener.Addr() }

View File

@ -1,244 +0,0 @@
package middleware
import (
"encoding/binary"
"net"
"sync"
"time"
)
// CombinerConfig holds configuration for packet combination
type CombinerConfig struct {
MaxCombinedSize int
FlushInterval time.Duration
MaxQueuedPackets int
}
// DefaultCombinerConfig returns default combiner configuration
func DefaultCombinerConfig() *CombinerConfig {
return &CombinerConfig{
MaxCombinedSize: 1200,
FlushInterval: 250 * time.Millisecond,
MaxQueuedPackets: 16,
}
}
type queuedPacket struct {
data []byte
timestamp time.Time
callback func([]byte) (int, error)
result chan combineResult
}
type combineResult struct {
n int
err error
}
// Combiner implements packet combination middleware
type Combiner struct {
config *CombinerConfig
queue []*queuedPacket
queueMux sync.Mutex
flushChan chan struct{}
done chan struct{}
closeOnce sync.Once
}
// NewCombiner creates a new packet combining middleware
func NewCombiner(config *CombinerConfig) *Combiner {
if config == nil {
config = DefaultCombinerConfig()
}
c := &Combiner{
config: config,
flushChan: make(chan struct{}, 1),
done: make(chan struct{}),
}
go c.flushLoop()
return c
}
// ProcessOutbound implements Middleware.ProcessOutbound
func (c *Combiner) ProcessOutbound(data []byte, next func([]byte) (int, error)) (int, error) {
if len(data) == 0 {
return 0, nil
}
// Large packets bypass combination
if len(data) > c.config.MaxCombinedSize/2 {
return next(data)
}
c.queueMux.Lock()
defer c.queueMux.Unlock()
result := make(chan combineResult, 1)
c.queue = append(c.queue, &queuedPacket{
data: append([]byte(nil), data...),
timestamp: time.Now(),
callback: next,
result: result,
})
shouldFlush := len(c.queue) >= c.config.MaxQueuedPackets
if !shouldFlush {
totalSize := c.calculateCombinedSize()
shouldFlush = totalSize > c.config.MaxCombinedSize
}
if shouldFlush {
c.flushQueueLocked()
} else {
select {
case c.flushChan <- struct{}{}:
default:
}
}
select {
case res := <-result:
return res.n, res.err
case <-c.done:
return 0, net.ErrClosed
}
}
// ProcessInbound implements Middleware.ProcessInbound
func (c *Combiner) ProcessInbound(data []byte, next func([]byte) (int, error)) (int, error) {
if len(data) < 2 {
return next(data)
}
packetCount := binary.BigEndian.Uint16(data[0:2])
// Single packet or invalid format
if packetCount == 1 {
if len(data) < 4 {
return next(data)
}
firstLen := binary.BigEndian.Uint16(data[2:4])
if int(firstLen)+4 == len(data) {
return next(data[4 : 4+firstLen])
}
return next(data)
}
// Multiple packets - return first one
if packetCount > 1 && len(data) >= 4 {
firstLen := binary.BigEndian.Uint16(data[2:4])
if len(data) >= 4+int(firstLen) {
return next(data[4 : 4+firstLen])
}
}
return next(data)
}
func (c *Combiner) calculateCombinedSize() int {
size := 2 // count field
for _, pkt := range c.queue {
size += 2 + len(pkt.data)
}
return size
}
func (c *Combiner) flushLoop() {
ticker := time.NewTicker(c.config.FlushInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.flushIfNeeded()
case <-c.flushChan:
c.flushIfNeeded()
case <-c.done:
c.queueMux.Lock()
c.flushQueueLocked()
c.queueMux.Unlock()
return
}
}
}
func (c *Combiner) flushIfNeeded() {
c.queueMux.Lock()
defer c.queueMux.Unlock()
if len(c.queue) == 0 {
return
}
now := time.Now()
oldestAge := now.Sub(c.queue[0].timestamp)
if oldestAge >= c.config.FlushInterval || len(c.queue) >= c.config.MaxQueuedPackets {
c.flushQueueLocked()
}
}
func (c *Combiner) flushQueueLocked() {
if len(c.queue) == 0 {
return
}
queue := c.queue
c.queue = nil
go c.processQueue(queue)
}
func (c *Combiner) processQueue(queue []*queuedPacket) {
if len(queue) == 1 {
pkt := queue[0]
n, err := pkt.callback(pkt.data)
pkt.result <- combineResult{n, err}
return
}
// Combine multiple packets
combined := c.combinePackets(queue)
_, err := queue[0].callback(combined)
// Distribute result to all packets
for _, pkt := range queue {
pkt.result <- combineResult{len(pkt.data), err}
}
}
func (c *Combiner) combinePackets(packets []*queuedPacket) []byte {
totalSize := 2 // count
for _, pkt := range packets {
totalSize += 2 + len(pkt.data)
}
combined := make([]byte, totalSize)
offset := 0
// Write packet count
binary.BigEndian.PutUint16(combined[offset:], uint16(len(packets)))
offset += 2
// Write each packet
for _, pkt := range packets {
binary.BigEndian.PutUint16(combined[offset:], uint16(len(pkt.data)))
offset += 2
copy(combined[offset:], pkt.data)
offset += len(pkt.data)
}
return combined
}
// Close implements Middleware.Close
func (c *Combiner) Close() error {
c.closeOnce.Do(func() {
close(c.done)
})
return nil
}

View File

@ -1,166 +0,0 @@
package middleware
import (
"bytes"
"compress/flate"
"io"
"sync"
)
// CompressorConfig holds configuration for compression
type CompressorConfig struct {
Level int // Compression level (1-9, flate.BestSpeed to flate.BestCompression)
MinSize int // Minimum packet size to compress
CompressionMarker byte // Byte marker to identify compressed packets
}
// DefaultCompressorConfig returns default compressor configuration
func DefaultCompressorConfig() *CompressorConfig {
return &CompressorConfig{
Level: flate.DefaultCompression,
MinSize: 128,
CompressionMarker: 0x5A, // Same as original EQ2 implementation
}
}
// Compressor implements compression middleware using DEFLATE
type Compressor struct {
config *CompressorConfig
writerPool sync.Pool
readerPool sync.Pool
bufferPool sync.Pool
closeOnce sync.Once
}
// NewCompressor creates a new compression middleware
func NewCompressor(config *CompressorConfig) *Compressor {
if config == nil {
config = DefaultCompressorConfig()
}
c := &Compressor{
config: config,
}
// Initialize writer pool
c.writerPool.New = func() interface{} {
writer, _ := flate.NewWriter(nil, config.Level)
return writer
}
// Initialize reader pool
c.readerPool.New = func() interface{} {
return flate.NewReader(nil)
}
// Initialize buffer pool
c.bufferPool.New = func() interface{} {
return &bytes.Buffer{}
}
return c
}
// ProcessOutbound implements Middleware.ProcessOutbound
func (c *Compressor) ProcessOutbound(data []byte, next func([]byte) (int, error)) (int, error) {
if len(data) < c.config.MinSize {
return next(data)
}
compressed, err := c.compress(data)
if err != nil || len(compressed) >= len(data) {
// Compression failed or didn't help - send original
return next(data)
}
return next(compressed)
}
// ProcessInbound implements Middleware.ProcessInbound
func (c *Compressor) ProcessInbound(data []byte, next func([]byte) (int, error)) (int, error) {
if len(data) < 2 {
return next(data)
}
// Check for compression marker
if data[0] != c.config.CompressionMarker {
return next(data)
}
decompressed, err := c.decompress(data[1:])
if err != nil {
// Decompression failed - pass through original
return next(data)
}
return next(decompressed)
}
func (c *Compressor) compress(data []byte) ([]byte, error) {
// Get buffer from pool
buf := c.bufferPool.Get().(*bytes.Buffer)
defer c.bufferPool.Put(buf)
buf.Reset()
// Write compression marker
buf.WriteByte(c.config.CompressionMarker)
// Get writer from pool
writer := c.writerPool.Get().(*flate.Writer)
defer c.writerPool.Put(writer)
writer.Reset(buf)
// Compress data
if _, err := writer.Write(data); err != nil {
return nil, err
}
if err := writer.Close(); err != nil {
return nil, err
}
// Return copy of compressed data
result := make([]byte, buf.Len())
copy(result, buf.Bytes())
return result, nil
}
func (c *Compressor) decompress(data []byte) ([]byte, error) {
// Get reader from pool
reader := c.readerPool.Get().(io.ReadCloser)
defer c.readerPool.Put(reader)
// Reset reader with new data
if resetter, ok := reader.(flate.Resetter); ok {
resetter.Reset(bytes.NewReader(data), nil)
} else {
reader.Close()
reader = flate.NewReader(bytes.NewReader(data))
}
defer reader.Close()
// Get buffer from pool
buf := c.bufferPool.Get().(*bytes.Buffer)
defer c.bufferPool.Put(buf)
buf.Reset()
// Decompress data
if _, err := io.Copy(buf, reader); err != nil {
return nil, err
}
// Return copy of decompressed data
result := make([]byte, buf.Len())
copy(result, buf.Bytes())
return result, nil
}
// Close implements Middleware.Close
func (c *Compressor) Close() error {
c.closeOnce.Do(func() {
// Close any active readers/writers in pools
// Note: flate writers/readers don't need explicit cleanup
})
return nil
}

View File

@ -1,235 +0,0 @@
package middleware
import (
"crypto/rand"
"crypto/rc4"
"crypto/rsa"
"crypto/x509"
"encoding/binary"
"sync"
)
// EncryptorConfig holds configuration for encryption
type EncryptorConfig struct {
RSAKeySize int // RSA key size in bits
KeyExchangeOp byte // Opcode for key exchange packets
MinSize int // Minimum packet size to encrypt
}
// DefaultEncryptorConfig returns default encryptor configuration
func DefaultEncryptorConfig() *EncryptorConfig {
return &EncryptorConfig{
RSAKeySize: 1024,
KeyExchangeOp: 0x21, // OP_WSLoginRequestMsg equivalent
MinSize: 8,
}
}
// Encryptor implements RC4 + RSA encryption middleware
type Encryptor struct {
config *EncryptorConfig
rsaKey *rsa.PrivateKey
rc4Key []byte
cipher *rc4.Cipher
cipherMux sync.RWMutex
keySet bool
closeOnce sync.Once
}
// NewEncryptor creates a new encryption middleware
func NewEncryptor(config *EncryptorConfig) *Encryptor {
if config == nil {
config = DefaultEncryptorConfig()
}
// Generate RSA key pair
rsaKey, err := rsa.GenerateKey(rand.Reader, config.RSAKeySize)
if err != nil {
panic(err) // Should handle this better in production
}
return &Encryptor{
config: config,
rsaKey: rsaKey,
}
}
// ProcessOutbound implements Middleware.ProcessOutbound
func (e *Encryptor) ProcessOutbound(data []byte, next func([]byte) (int, error)) (int, error) {
// Check if this is a key exchange request
if len(data) > 4 && data[0] == 0 && data[1] == e.config.KeyExchangeOp {
return e.handleKeyExchange(data, next)
}
// Skip encryption for small packets or if no key is set
if len(data) < e.config.MinSize || !e.isKeySet() {
return next(data)
}
encrypted, err := e.encrypt(data)
if err != nil {
return next(data) // Fallback to unencrypted
}
return next(encrypted)
}
// ProcessInbound implements Middleware.ProcessInbound
func (e *Encryptor) ProcessInbound(data []byte, next func([]byte) (int, error)) (int, error) {
// Check for RSA encrypted key at end of packet
if len(data) >= 8 && e.isRSAEncryptedKey(data) {
return e.processRSAKey(data, next)
}
// Skip decryption if no key is set
if !e.isKeySet() {
return next(data)
}
decrypted, err := e.decrypt(data)
if err != nil {
return next(data) // Fallback to unencrypted
}
return next(decrypted)
}
func (e *Encryptor) handleKeyExchange(data []byte, next func([]byte) (int, error)) (int, error) {
// Extract key size from packet
if len(data) < 8 {
return next(data)
}
keySize := binary.LittleEndian.Uint32(data[4:8])
if keySize != 60 { // Expected key size
return next(data)
}
// Create key exchange response with RSA public key
response := make([]byte, len(data))
copy(response, data)
// Fill with dummy key data (in real implementation, would use proper key)
for i := 8; i < len(response)-8; i++ {
response[i] = 0xFF
}
// Add termination markers
response[len(response)-5] = 1
response[len(response)-1] = 1
return next(response)
}
func (e *Encryptor) processRSAKey(data []byte, next func([]byte) (int, error)) (int, error) {
// Extract and decrypt RSA key from end of packet
encryptedKey := data[len(data)-8:]
// In real implementation, would decrypt with RSA private key
// For now, use a simple XOR pattern
rc4Key := make([]byte, 8)
for i := 0; i < 8; i++ {
rc4Key[i] = encryptedKey[i] ^ 0x55 // Simple pattern
}
e.setRC4Key(rc4Key)
// Pass the packet without the RSA key
return next(data[:len(data)-8])
}
func (e *Encryptor) isRSAEncryptedKey(data []byte) bool {
// Simple heuristic - check if last 8 bytes look like encrypted data
if len(data) < 8 {
return false
}
// Check for non-zero data in last 8 bytes
lastBytes := data[len(data)-8:]
nonZero := 0
for _, b := range lastBytes {
if b != 0 {
nonZero++
}
}
return nonZero > 4 // Heuristic: encrypted data should have some non-zero bytes
}
func (e *Encryptor) setRC4Key(key []byte) {
e.cipherMux.Lock()
defer e.cipherMux.Unlock()
e.rc4Key = make([]byte, len(key))
copy(e.rc4Key, key)
cipher, err := rc4.NewCipher(key)
if err == nil {
e.cipher = cipher
e.keySet = true
}
}
func (e *Encryptor) isKeySet() bool {
e.cipherMux.RLock()
defer e.cipherMux.RUnlock()
return e.keySet
}
func (e *Encryptor) encrypt(data []byte) ([]byte, error) {
e.cipherMux.Lock()
defer e.cipherMux.Unlock()
if e.cipher == nil {
return data, nil
}
// Create new cipher for this operation (RC4 is stateful)
cipher, err := rc4.NewCipher(e.rc4Key)
if err != nil {
return nil, err
}
encrypted := make([]byte, len(data))
cipher.XORKeyStream(encrypted, data)
return encrypted, nil
}
func (e *Encryptor) decrypt(data []byte) ([]byte, error) {
e.cipherMux.Lock()
defer e.cipherMux.Unlock()
if e.cipher == nil {
return data, nil
}
// Create new cipher for this operation (RC4 is stateful)
cipher, err := rc4.NewCipher(e.rc4Key)
if err != nil {
return nil, err
}
decrypted := make([]byte, len(data))
cipher.XORKeyStream(decrypted, data)
return decrypted, nil
}
// GetPublicKey returns the RSA public key for key exchange
func (e *Encryptor) GetPublicKey() []byte {
pubKeyBytes, err := x509.MarshalPKIXPublicKey(&e.rsaKey.PublicKey)
if err != nil {
return nil
}
return pubKeyBytes
}
// Close implements Middleware.Close
func (e *Encryptor) Close() error {
e.closeOnce.Do(func() {
e.cipherMux.Lock()
e.cipher = nil
e.rc4Key = nil
e.keySet = false
e.cipherMux.Unlock()
})
return nil
}

28
internal/udp/opcodes.go Normal file
View File

@ -0,0 +1,28 @@
package udp
const (
// Protocol opcodes
OpSessionRequest = 0x01
OpSessionResponse = 0x02
OpCombined = 0x03
OpSessionDisconnect = 0x05
OpKeepAlive = 0x06
OpServerKeyRequest = 0x07
OpSessionStatResponse = 0x08
OpPacket = 0x09
OpFragment = 0x0D
OpOutOfOrderAck = 0x11
OpAck = 0x15
OpAppCombined = 0x19
OpOutOfSession = 0x1D
)
// Application opcodes (examples)
const (
OpLoginRequestMsg = 0x0001
OpLoginReplyMsg = 0x0002
OpAllCharactersDescRequestMsg = 0x0003
OpAllCharactersDescReplyMsg = 0x0004
OpCreateCharacterRequestMsg = 0x0005
OpCreateCharacterReplyMsg = 0x0006
)

View File

@ -2,83 +2,53 @@ package udp
import (
"encoding/binary"
"fmt"
"hash/crc32"
"time"
"errors"
)
// Packet types
const (
PacketTypeData uint8 = iota
PacketTypeAck
PacketTypeSessionRequest
PacketTypeSessionResponse
PacketTypeKeepAlive
PacketTypeDisconnect
PacketTypeFragment
)
// packet represents a protocol packet
type packet struct {
Type uint8
Sequence uint16
Ack uint16
Session uint32
Data []byte
CRC uint32
type ProtocolPacket struct {
Opcode uint8
Data []byte
}
// Marshal serializes the packet
func (p *packet) Marshal() []byte {
dataLen := len(p.Data)
buf := make([]byte, 15+dataLen) // Fixed header + data
buf[0] = p.Type
binary.BigEndian.PutUint16(buf[1:3], p.Sequence)
binary.BigEndian.PutUint16(buf[3:5], p.Ack)
binary.BigEndian.PutUint32(buf[5:9], p.Session)
binary.BigEndian.PutUint16(buf[9:11], uint16(dataLen))
copy(buf[11:11+dataLen], p.Data)
// Calculate CRC32 for header + data
p.CRC = crc32.ChecksumIEEE(buf[:11+dataLen])
binary.BigEndian.PutUint32(buf[11+dataLen:], p.CRC)
return buf
type ApplicationPacket struct {
Opcode uint16
Data []byte
}
// Unmarshal deserializes the packet
func (p *packet) Unmarshal(data []byte) error {
if len(data) < 15 {
return fmt.Errorf("packet too short: %d bytes", len(data))
func ParseProtocolPacket(data []byte) (*ProtocolPacket, error) {
if len(data) < 2 {
return nil, errors.New("packet too small")
}
p.Type = data[0]
p.Sequence = binary.BigEndian.Uint16(data[1:3])
p.Ack = binary.BigEndian.Uint16(data[3:5])
p.Session = binary.BigEndian.Uint32(data[5:9])
dataLen := binary.BigEndian.Uint16(data[9:11])
if len(data) < 15+int(dataLen) {
return fmt.Errorf("incomplete packet: expected %d bytes, got %d", 15+dataLen, len(data))
}
p.Data = make([]byte, dataLen)
copy(p.Data, data[11:11+dataLen])
p.CRC = binary.BigEndian.Uint32(data[11+dataLen:])
// Verify CRC
expectedCRC := crc32.ChecksumIEEE(data[:11+dataLen])
if p.CRC != expectedCRC {
return fmt.Errorf("CRC mismatch: expected %x, got %x", expectedCRC, p.CRC)
}
return nil
return &ProtocolPacket{
Opcode: data[1],
Data: data[2:],
}, nil
}
// pendingPacket represents a packet awaiting acknowledgment
type pendingPacket struct {
packet *packet
timestamp time.Time
attempts int
func (p *ProtocolPacket) Serialize() []byte {
data := make([]byte, 2+len(p.Data))
data[0] = 0x00 // Reserved byte
data[1] = p.Opcode
copy(data[2:], p.Data)
return data
}
func ParseApplicationPacket(data []byte) (*ApplicationPacket, error) {
if len(data) < 2 {
return nil, errors.New("application packet too small")
}
opcode := binary.LittleEndian.Uint16(data[0:2])
return &ApplicationPacket{
Opcode: opcode,
Data: data[2:],
}, nil
}
func (p *ApplicationPacket) Serialize() []byte {
data := make([]byte, 2+len(p.Data))
binary.LittleEndian.PutUint16(data[0:2], p.Opcode)
copy(data[2:], p.Data)
return data
}

View File

@ -1,576 +1,105 @@
package udp
import (
"context"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
)
// Conn represents a reliable UDP connection
type Conn interface {
Read(b []byte) (n int, err error)
Write(b []byte) (n int, err error)
Close() error
LocalAddr() net.Addr
RemoteAddr() net.Addr
SetReadDeadline(t time.Time) error
SetWriteDeadline(t time.Time) error
type Server struct {
conn *net.UDPConn
connections map[string]*Connection
mutex sync.RWMutex
handler PacketHandler
running bool
}
// Listener listens for incoming reliable UDP connections
type Listener interface {
Accept() (Conn, error)
Close() error
Addr() net.Addr
type PacketHandler interface {
HandlePacket(conn *Connection, packet *ApplicationPacket)
}
// stream implements a reliable UDP stream
type stream struct {
conn *net.UDPConn
remoteAddr *net.UDPAddr
localAddr *net.UDPAddr
session uint32
config *Config
// Sequence tracking
sendSeq uint32
recvSeq uint16
lastAckSent uint16
// Channels for communication
inbound chan []byte
outbound chan []byte
control chan *packet
done chan struct{}
closeOnce sync.Once
// Reliability tracking
pending map[uint16]*pendingPacket
pendingMutex sync.RWMutex
outOfOrder map[uint16][]byte
oooMutex sync.RWMutex
// Flow control
windowSize uint16
// Read/Write deadlines
readDeadline atomic.Value
writeDeadline atomic.Value
// Last activity for keep-alive
lastActivity time.Time
activityMutex sync.RWMutex
}
// newStream creates a new reliable UDP stream
func newStream(conn *net.UDPConn, remoteAddr *net.UDPAddr, session uint32, config *Config) *stream {
s := &stream{
conn: conn,
remoteAddr: remoteAddr,
localAddr: conn.LocalAddr().(*net.UDPAddr),
session: session,
config: config,
windowSize: config.WindowSize,
inbound: make(chan []byte, 256),
outbound: make(chan []byte, 256),
control: make(chan *packet, 64),
done: make(chan struct{}),
pending: make(map[uint16]*pendingPacket),
outOfOrder: make(map[uint16][]byte),
lastActivity: time.Now(),
}
// Start background goroutines
go s.writeLoop()
go s.retransmitLoop()
go s.keepAliveLoop()
return s
}
// Read implements Conn.Read
func (s *stream) Read(b []byte) (n int, err error) {
ctx, cancel := s.getReadDeadlineContext()
defer cancel()
select {
case data := <-s.inbound:
n = copy(b, data)
if n < len(data) {
return n, fmt.Errorf("buffer too small: need %d bytes, got %d", len(data), len(b))
}
return n, nil
case <-s.done:
return 0, fmt.Errorf("connection closed")
case <-ctx.Done():
return 0, fmt.Errorf("read timeout")
}
}
// Write implements Conn.Write
func (s *stream) Write(b []byte) (n int, err error) {
if len(b) == 0 {
return 0, nil
}
// Fragment large packets
mtu := s.config.MTU - 15 // Account for packet header
if len(b) <= mtu {
return s.writePacket(b)
}
// Fragment the data
sent := 0
for sent < len(b) {
end := sent + mtu
if end > len(b) {
end = len(b)
}
n, err := s.writePacket(b[sent:end])
sent += n
if err != nil {
return sent, err
}
}
return sent, nil
}
// writePacket writes a single packet
func (s *stream) writePacket(data []byte) (int, error) {
ctx, cancel := s.getWriteDeadlineContext()
defer cancel()
select {
case s.outbound <- data:
s.updateActivity()
return len(data), nil
case <-s.done:
return 0, fmt.Errorf("connection closed")
case <-ctx.Done():
return 0, fmt.Errorf("write timeout")
}
}
// writeLoop handles outbound packet transmission
func (s *stream) writeLoop() {
defer close(s.outbound)
for {
select {
case data := <-s.outbound:
s.sendDataPacket(data)
case ctrlPacket := <-s.control:
s.sendControlPacket(ctrlPacket)
case <-s.done:
return
}
}
}
// sendDataPacket sends a data packet with reliability
func (s *stream) sendDataPacket(data []byte) {
seq := uint16(atomic.AddUint32(&s.sendSeq, 1) - 1)
pkt := &packet{
Type: PacketTypeData,
Sequence: seq,
Ack: s.lastAckSent,
Session: s.session,
Data: data,
}
// Store for retransmission
s.pendingMutex.Lock()
s.pending[seq] = &pendingPacket{
packet: pkt,
timestamp: time.Now(),
attempts: 0,
}
s.pendingMutex.Unlock()
s.sendRawPacket(pkt)
}
// sendControlPacket sends control packets (ACKs, etc.)
func (s *stream) sendControlPacket(pkt *packet) {
pkt.Session = s.session
s.sendRawPacket(pkt)
}
// sendRawPacket sends a packet over UDP
func (s *stream) sendRawPacket(pkt *packet) {
data := pkt.Marshal()
s.conn.WriteToUDP(data, s.remoteAddr)
}
// handlePacket processes an incoming packet
func (s *stream) handlePacket(pkt *packet) {
s.updateActivity()
switch pkt.Type {
case PacketTypeData:
s.handleDataPacket(pkt)
case PacketTypeAck:
s.handleAckPacket(pkt)
case PacketTypeKeepAlive:
s.sendAck(pkt.Sequence)
case PacketTypeDisconnect:
s.Close()
}
}
// handleDataPacket processes incoming data packets
func (s *stream) handleDataPacket(pkt *packet) {
// Send ACK
s.sendAck(pkt.Sequence)
// Check sequence order
expectedSeq := s.recvSeq + 1
if pkt.Sequence == expectedSeq {
// In order - deliver immediately
s.deliverData(pkt.Data)
s.recvSeq = pkt.Sequence
// Check for buffered out-of-order packets
s.processOutOfOrder()
} else if pkt.Sequence > expectedSeq {
// Future packet - buffer it
s.oooMutex.Lock()
s.outOfOrder[pkt.Sequence] = pkt.Data
s.oooMutex.Unlock()
}
// Past packets are ignored (duplicate)
}
// processOutOfOrder delivers buffered in-order packets
func (s *stream) processOutOfOrder() {
s.oooMutex.Lock()
defer s.oooMutex.Unlock()
for {
nextSeq := s.recvSeq + 1
if data, exists := s.outOfOrder[nextSeq]; exists {
s.deliverData(data)
s.recvSeq = nextSeq
delete(s.outOfOrder, nextSeq)
} else {
break
}
}
}
// deliverData delivers data to the application
func (s *stream) deliverData(data []byte) {
select {
case s.inbound <- data:
case <-s.done:
default:
// Channel full - would block
}
}
// handleAckPacket processes acknowledgment packets
func (s *stream) handleAckPacket(pkt *packet) {
s.pendingMutex.Lock()
defer s.pendingMutex.Unlock()
if pending, exists := s.pending[pkt.Sequence]; exists {
delete(s.pending, pkt.Sequence)
_ = pending // Packet acknowledged
}
}
// sendAck sends an acknowledgment
func (s *stream) sendAck(seq uint16) {
s.lastAckSent = seq
ackPkt := &packet{
Type: PacketTypeAck,
Sequence: seq,
Ack: seq,
}
select {
case s.control <- ackPkt:
case <-s.done:
default:
}
}
// retransmitLoop handles packet retransmission
func (s *stream) retransmitLoop() {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.checkRetransmissions()
case <-s.done:
return
}
}
}
// checkRetransmissions checks for packets needing retransmission
func (s *stream) checkRetransmissions() {
now := time.Now()
s.pendingMutex.Lock()
defer s.pendingMutex.Unlock()
for seq, pending := range s.pending {
if now.Sub(pending.timestamp) > RetransmitTimeout {
if pending.attempts >= s.config.RetryAttempts {
// Too many attempts - close connection
delete(s.pending, seq)
go s.Close()
return
}
// Retransmit
pending.attempts++
pending.timestamp = now
s.sendRawPacket(pending.packet)
}
}
}
// keepAliveLoop sends periodic keep-alive packets
func (s *stream) keepAliveLoop() {
ticker := time.NewTicker(KeepAliveInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.activityMutex.RLock()
idle := time.Since(s.lastActivity)
s.activityMutex.RUnlock()
if idle > KeepAliveInterval {
keepAlive := &packet{Type: PacketTypeKeepAlive}
select {
case s.control <- keepAlive:
case <-s.done:
return
}
}
case <-s.done:
return
}
}
}
// updateActivity updates the last activity timestamp
func (s *stream) updateActivity() {
s.activityMutex.Lock()
s.lastActivity = time.Now()
s.activityMutex.Unlock()
}
// Close implements Conn.Close
func (s *stream) Close() error {
s.closeOnce.Do(func() {
// Send disconnect packet
disconnect := &packet{Type: PacketTypeDisconnect}
select {
case s.control <- disconnect:
default:
}
close(s.done)
})
return nil
}
// Address methods
func (s *stream) LocalAddr() net.Addr { return s.localAddr }
func (s *stream) RemoteAddr() net.Addr { return s.remoteAddr }
// Deadline methods
func (s *stream) SetReadDeadline(t time.Time) error {
s.readDeadline.Store(t)
return nil
}
func (s *stream) SetWriteDeadline(t time.Time) error {
s.writeDeadline.Store(t)
return nil
}
func (s *stream) getReadDeadlineContext() (context.Context, context.CancelFunc) {
if deadline, ok := s.readDeadline.Load().(time.Time); ok && !deadline.IsZero() {
return context.WithDeadline(context.Background(), deadline)
}
return context.Background(), func() {}
}
func (s *stream) getWriteDeadlineContext() (context.Context, context.CancelFunc) {
if deadline, ok := s.writeDeadline.Load().(time.Time); ok && !deadline.IsZero() {
return context.WithDeadline(context.Background(), deadline)
}
return context.Background(), func() {}
}
// listener implements a reliable UDP listener
type listener struct {
conn *net.UDPConn
config *Config
streams map[string]*stream
mutex sync.RWMutex
incoming chan *stream
done chan struct{}
}
// Listen creates a new reliable UDP listener
func Listen(address string, config *Config) (Listener, error) {
if config == nil {
config = DefaultConfig()
}
addr, err := net.ResolveUDPAddr("udp", address)
func NewServer(addr string, handler PacketHandler) (*Server, error) {
udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return nil, err
}
conn, err := net.ListenUDP("udp", addr)
conn, err := net.ListenUDP("udp", udpAddr)
if err != nil {
return nil, err
}
l := &listener{
conn: conn,
config: config,
streams: make(map[string]*stream),
incoming: make(chan *stream, 16),
done: make(chan struct{}),
}
go l.readLoop()
return l, nil
return &Server{
conn: conn,
connections: make(map[string]*Connection),
handler: handler,
}, nil
}
// readLoop handles incoming UDP packets
func (l *listener) readLoop() {
buf := make([]byte, 2048)
func (s *Server) Start() error {
s.running = true
for {
select {
case <-l.done:
return
default:
}
// Start connection timeout checker
go s.timeoutChecker()
n, addr, err := l.conn.ReadFromUDP(buf)
// Main packet receive loop
buffer := make([]byte, 2048)
for s.running {
n, addr, err := s.conn.ReadFromUDP(buffer)
if err != nil {
if s.running {
fmt.Printf("UDP read error: %v\n", err)
}
continue
}
pkt := &packet{}
if err := pkt.Unmarshal(buf[:n]); err != nil {
continue
go s.handlePacket(buffer[:n], addr)
}
return nil
}
func (s *Server) Stop() {
s.running = false
s.conn.Close()
}
func (s *Server) handlePacket(data []byte, addr *net.UDPAddr) {
if len(data) < 2 {
return
}
connKey := addr.String()
s.mutex.Lock()
conn, exists := s.connections[connKey]
if !exists {
conn = NewConnection(addr, s.conn, s.handler)
s.connections[connKey] = conn
}
s.mutex.Unlock()
conn.ProcessPacket(data)
}
func (s *Server) timeoutChecker() {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for range ticker.C {
if !s.running {
return
}
l.handlePacket(pkt, addr)
}
}
// handlePacket routes packets to appropriate streams
func (l *listener) handlePacket(pkt *packet, addr *net.UDPAddr) {
streamKey := addr.String()
l.mutex.RLock()
stream, exists := l.streams[streamKey]
l.mutex.RUnlock()
if !exists && pkt.Type == PacketTypeSessionRequest {
// New connection
session := pkt.Session
stream = newStream(l.conn, addr, session, l.config)
l.mutex.Lock()
l.streams[streamKey] = stream
l.mutex.Unlock()
// Send session response
response := &packet{
Type: PacketTypeSessionResponse,
Session: session,
now := time.Now()
s.mutex.Lock()
for key, conn := range s.connections {
if now.Sub(conn.lastPacketTime) > 45*time.Second {
conn.Close()
delete(s.connections, key)
}
}
stream.sendControlPacket(response)
select {
case l.incoming <- stream:
case <-l.done:
}
} else if exists {
stream.handlePacket(pkt)
s.mutex.Unlock()
}
}
// Accept implements Listener.Accept
func (l *listener) Accept() (Conn, error) {
select {
case stream := <-l.incoming:
return stream, nil
case <-l.done:
return nil, fmt.Errorf("listener closed")
}
}
// Close implements Listener.Close
func (l *listener) Close() error {
close(l.done)
l.mutex.Lock()
defer l.mutex.Unlock()
for _, stream := range l.streams {
stream.Close()
}
return l.conn.Close()
}
// Addr implements Listener.Addr
func (l *listener) Addr() net.Addr {
return l.conn.LocalAddr()
}
// Dial creates a client connection to a reliable UDP server
func Dial(address string, config *Config) (Conn, error) {
if config == nil {
config = DefaultConfig()
}
addr, err := net.ResolveUDPAddr("udp", address)
if err != nil {
return nil, err
}
conn, err := net.DialUDP("udp", nil, addr)
if err != nil {
return nil, err
}
session := uint32(time.Now().Unix())
stream := newStream(conn, addr, session, config)
// Send session request
request := &packet{
Type: PacketTypeSessionRequest,
Session: session,
}
stream.sendControlPacket(request)
return stream, nil
}