diff --git a/app_packet.go b/app_packet.go new file mode 100644 index 0000000..b892669 --- /dev/null +++ b/app_packet.go @@ -0,0 +1,286 @@ +package eq2net + +import ( + "encoding/binary" + "fmt" +) + +// AppPacket handles application-level game packets with opcode abstraction +// This layer sits above the protocol layer and handles game-specific opcodes +type AppPacket struct { + EQPacket + + EmuOpcode uint16 // Emulator opcode (internal representation) + OpcodeSize uint8 // Size of opcode in bytes (1 or 2) +} + +// Default opcode size for application packets +var DefaultAppOpcodeSize uint8 = 2 + +// NewAppPacket creates a new application packet with emulator opcode +func NewAppPacket(emuOpcode uint16, data []byte) *AppPacket { + p := &AppPacket{ + EQPacket: *NewEQPacket(0, data), // Network opcode will be set during conversion + EmuOpcode: emuOpcode, + OpcodeSize: DefaultAppOpcodeSize, + } + return p +} + +// NewAppPacketWithSize creates an application packet with specified opcode size +func NewAppPacketWithSize(emuOpcode uint16, data []byte, opcodeSize uint8) *AppPacket { + p := &AppPacket{ + EQPacket: *NewEQPacket(0, data), + EmuOpcode: emuOpcode, + OpcodeSize: opcodeSize, + } + return p +} + +// ParseAppPacket creates an application packet from raw data +// The data should NOT include the protocol header, just the app opcode + payload +func ParseAppPacket(data []byte, opcodeSize uint8) (*AppPacket, error) { + if len(data) < int(opcodeSize) { + return nil, fmt.Errorf("packet too small for opcode: need %d bytes, got %d", opcodeSize, len(data)) + } + + // Extract opcode + var opcode uint16 + if opcodeSize == 1 { + opcode = uint16(data[0]) + } else { + // Handle special encoding for 2-byte opcodes + if data[0] == 0x00 && len(data) > 2 { + // Special case: extra 0x00 prefix for opcodes with low byte = 0x00 + opcode = binary.BigEndian.Uint16(data[1:3]) + data = data[3:] // Skip the extra byte + } else { + opcode = binary.BigEndian.Uint16(data[0:2]) + data = data[2:] + } + } + + p := &AppPacket{ + EQPacket: *NewEQPacket(opcode, data[opcodeSize:]), + EmuOpcode: opcode, // Initially same as network opcode + OpcodeSize: opcodeSize, + } + + return p, nil +} + +// SetEmuOpcode sets the emulator opcode +// This is used when converting between emulator and network opcodes +func (p *AppPacket) SetEmuOpcode(opcode uint16) { + p.EmuOpcode = opcode +} + +// GetEmuOpcode returns the emulator opcode +func (p *AppPacket) GetEmuOpcode() uint16 { + return p.EmuOpcode +} + +// SerializeApp serializes the application packet with proper opcode encoding +func (p *AppPacket) SerializeApp() []byte { + opcodeBytes := p.OpcodeSize + extraBytes := 0 + + // Handle special encoding rules for 2-byte opcodes + if p.OpcodeSize == 2 && (p.Opcode&0x00FF) == 0 { + // Opcodes with low byte = 0x00 need an extra 0x00 prefix + extraBytes = 1 + } + + // Create buffer + buf := make([]byte, int(opcodeBytes)+extraBytes+len(p.Buffer)) + offset := 0 + + // Write opcode + if p.OpcodeSize == 1 { + buf[offset] = byte(p.Opcode) + offset++ + } else { + if extraBytes > 0 { + // Special encoding: add 0x00 prefix + buf[offset] = 0x00 + offset++ + binary.BigEndian.PutUint16(buf[offset:], p.Opcode) + offset += 2 + } else { + binary.BigEndian.PutUint16(buf[offset:], p.Opcode) + offset += 2 + } + } + + // Copy packet data + if len(p.Buffer) > 0 { + copy(buf[offset:], p.Buffer) + } + + return buf +} + +// Combine combines multiple application packets into a single packet +// Returns true if successful, false if size limit exceeded +func (p *AppPacket) Combine(other *AppPacket) bool { + // Create a new buffer with both packets + myData := p.SerializeApp() + otherData := other.SerializeApp() + + // Check size limit (application packets can be larger than protocol) + if len(myData)+len(otherData) > 8192 { // Reasonable max size + return false + } + + // Combine the data + newBuffer := make([]byte, len(myData)+len(otherData)) + copy(newBuffer, myData) + copy(newBuffer[len(myData):], otherData) + + // Update packet + p.Buffer = newBuffer + return true +} + +// Copy creates a deep copy of the application packet +func (p *AppPacket) Copy() *AppPacket { + newPacket := &AppPacket{ + EQPacket: *p.Clone(), + EmuOpcode: p.EmuOpcode, + OpcodeSize: p.OpcodeSize, + } + return newPacket +} + +// SetVersion sets the protocol version for opcode conversion +func (p *AppPacket) SetVersion(version int16) { + p.Version = version +} + +// ConvertToProtocolPacket wraps this application packet in a protocol packet +// This is used when sending application data over the network +func (p *AppPacket) ConvertToProtocolPacket() *ProtocolPacket { + // Serialize the application packet + appData := p.SerializeApp() + + // Create protocol packet with OP_Packet opcode + proto := &ProtocolPacket{ + EQPacket: *NewEQPacket(OP_Packet, appData), + } + + // Copy network info + proto.SrcIP = p.SrcIP + proto.SrcPort = p.SrcPort + proto.DstIP = p.DstIP + proto.DstPort = p.DstPort + proto.Timestamp = p.Timestamp + proto.Version = p.Version + + return proto +} + +// AppCombine performs application-level packet combining (EQ2-specific) +// This is different from protocol-level combining +func AppCombine(packets []*AppPacket) *AppPacket { + if len(packets) == 0 { + return nil + } + if len(packets) == 1 { + return packets[0] + } + + // Calculate total size needed + var totalSize int + for _, p := range packets { + data := p.SerializeApp() + if len(data) >= 255 { + totalSize += 3 + len(data) // 0xFF marker + 2-byte size + data + } else { + totalSize += 1 + len(data) // 1-byte size + data + } + } + + // Build combined buffer + buffer := make([]byte, totalSize) + offset := 0 + + for _, p := range packets { + data := p.SerializeApp() + size := len(data) + + if size >= 255 { + // Oversized packet: use 0xFF marker followed by 2-byte size + buffer[offset] = 0xFF + offset++ + binary.BigEndian.PutUint16(buffer[offset:], uint16(size)) + offset += 2 + } else { + // Normal packet: 1-byte size + buffer[offset] = byte(size) + offset++ + } + + // Copy packet data + copy(buffer[offset:], data) + offset += size + } + + // Create combined packet with OP_AppCombined + combined := &AppPacket{ + EQPacket: *NewEQPacket(OP_AppCombined, buffer), + OpcodeSize: DefaultAppOpcodeSize, + } + + return combined +} + +// ExtractAppPackets extracts individual packets from an app-combined packet +func ExtractAppPackets(combined *AppPacket) ([]*AppPacket, error) { + if combined.Opcode != OP_AppCombined { + return nil, fmt.Errorf("not an app-combined packet") + } + + var packets []*AppPacket + data := combined.Buffer + offset := 0 + + for offset < len(data) { + if offset >= len(data) { + break + } + + var size int + // Read size + if data[offset] == 0xFF { + // Oversized packet + if offset+3 > len(data) { + return nil, fmt.Errorf("invalid oversized packet header") + } + offset++ + size = int(binary.BigEndian.Uint16(data[offset:])) + offset += 2 + } else { + // Normal packet + size = int(data[offset]) + offset++ + } + + // Extract packet data + if offset+size > len(data) { + return nil, fmt.Errorf("packet size exceeds buffer") + } + + packetData := data[offset : offset+size] + offset += size + + // Parse the packet + app, err := ParseAppPacket(packetData, combined.OpcodeSize) + if err != nil { + return nil, fmt.Errorf("failed to parse app packet: %w", err) + } + + packets = append(packets, app) + } + + return packets, nil +} diff --git a/crypto/rc4.go b/crypto/rc4.go index 77a9853..ccc0a79 100644 --- a/crypto/rc4.go +++ b/crypto/rc4.go @@ -39,3 +39,17 @@ func NewCiphers(key int64) (*Ciphers, error) { server: serverCipher, }, nil } + +// Decrypt decrypts data received from the client +func (c *Ciphers) Decrypt(data []byte) { + if c.client != nil { + c.client.XORKeyStream(data, data) + } +} + +// Encrypt encrypts data to be sent to the client +func (c *Ciphers) Encrypt(data []byte) { + if c.server != nil { + c.server.XORKeyStream(data, data) + } +} diff --git a/opcodes.go b/opcodes.go new file mode 100644 index 0000000..b998232 --- /dev/null +++ b/opcodes.go @@ -0,0 +1,24 @@ +package eq2net + +// Login/chat use 1-byte opcodes +const AppOpcodeSize = 1 + +// The game uses 2-byte opcodes +const GameOpcodeSize = 2 + +// Protocol-level opcodes for the actual UDP stream +const ( + OP_SessionRequest = 0x0001 + OP_SessionResponse = 0x0002 + OP_Combined = 0x0003 + OP_SessionDisconnect = 0x0005 + OP_KeepAlive = 0x0006 + OP_SessionStatRequest = 0x0007 + OP_SessionStatResponse = 0x0008 + OP_Packet = 0x0009 + OP_Fragment = 0x000D + OP_OutOfOrderAck = 0x0011 + OP_Ack = 0x0015 + OP_AppCombined = 0x0019 + OP_OutOfSession = 0x001D +) diff --git a/packet.go b/packet.go new file mode 100644 index 0000000..89febc8 --- /dev/null +++ b/packet.go @@ -0,0 +1,179 @@ +package eq2net + +import ( + "encoding/binary" + "fmt" + "net" + "time" +) + +type EQPacket struct { + Buffer []byte // Raw packet data + Opcode uint16 // Packet opcode + + SrcIP net.IP // Source IP address + SrcPort uint16 // Source port + DstIP net.IP // Destination IP address + DstPort uint16 // Destination port + + Timestamp time.Time // When packet was created/received + Priority uint32 // Priority in processing + Version int16 // Protocol version +} + +func NewEQPacket(opcode uint16, data []byte) *EQPacket { + p := &EQPacket{ + Opcode: opcode, + Timestamp: time.Now(), + Priority: 0, + Version: 0, + } + + if len(data) > 0 { + p.Buffer = make([]byte, len(data)) + copy(p.Buffer, data) + } + + return p +} + +// Size returns the size of the packet data (excluding opcode) +func (p *EQPacket) Size() uint32 { + return uint32(len(p.Buffer)) +} + +// TotalSize returns the total size including opcode +func (p *EQPacket) TotalSize() uint32 { + opcodeSize := uint32(1) + if p.Opcode > 0xFF { + opcodeSize = 2 + } + return p.Size() + opcodeSize +} + +// GetRawOpcode returns the raw opcode value +func (p *EQPacket) GetRawOpcode() uint16 { + return p.Opcode +} + +// GetOpcodeName returns the string name of the opcode +func (p *EQPacket) GetOpcodeName() string { + switch p.Opcode { + case OP_SessionRequest: + return "OP_SessionRequest" + case OP_SessionResponse: + return "OP_SessionResponse" + case OP_Combined: + return "OP_Combined" + case OP_SessionDisconnect: + return "OP_SessionDisconnect" + case OP_KeepAlive: + return "OP_KeepAlive" + case OP_SessionStatRequest: + return "OP_SessionStatRequest" + case OP_SessionStatResponse: + return "OP_SessionStatResponse" + case OP_Packet: + return "OP_Packet" + case OP_Fragment: + return "OP_Fragment" + case OP_OutOfOrderAck: + return "OP_OutOfOrderAck" + case OP_Ack: + return "OP_Ack" + case OP_AppCombined: + return "OP_AppCombined" + case OP_OutOfSession: + return "OP_OutOfSession" + default: + return fmt.Sprintf("Unknown(0x%04X)", p.Opcode) + } +} + +// Serialize writes the packet to a byte buffer +func (p *EQPacket) Serialize() []byte { + // Determine opcode size + opcodeSize := 1 + if p.Opcode > 0xFF { + opcodeSize = 2 + } + + // Create buffer + buf := make([]byte, opcodeSize+len(p.Buffer)) + + // Write opcode + if opcodeSize == 2 { + binary.BigEndian.PutUint16(buf[0:2], p.Opcode) + } else { + buf[0] = 0x00 + buf[1] = byte(p.Opcode) + opcodeSize = 2 // Even 1-byte opcodes use 2 bytes on wire + } + + // Copy data + if len(p.Buffer) > 0 { + copy(buf[opcodeSize:], p.Buffer) + } + + return buf +} + +// SetNetworkInfo sets the network address info +func (p *EQPacket) SetNetworkInfo(srcIP net.IP, srcPort uint16, dstIP net.IP, dstPort uint16) { + p.SrcIP = srcIP + p.SrcPort = srcPort + p.DstIP = dstIP + p.DstPort = dstPort +} + +// Clone creates a deep copy of the packet +func (p *EQPacket) Clone() *EQPacket { + newPacket := &EQPacket{ + Opcode: p.Opcode, + SrcIP: make(net.IP, len(p.SrcIP)), + SrcPort: p.SrcPort, + DstIP: make(net.IP, len(p.DstIP)), + DstPort: p.DstPort, + Timestamp: p.Timestamp, + Priority: p.Priority, + Version: p.Version, + } + + if p.SrcIP != nil { + copy(newPacket.SrcIP, p.SrcIP) + } + if p.DstIP != nil { + copy(newPacket.DstIP, p.DstIP) + } + + if len(p.Buffer) > 0 { + newPacket.Buffer = make([]byte, len(p.Buffer)) + copy(newPacket.Buffer, p.Buffer) + } + + return newPacket +} + +// ParsePacket creates a packet from raw network data +func ParsePacket(data []byte) (*EQPacket, error) { + if len(data) < 2 { + return nil, fmt.Errorf("packet too small: %d bytes", len(data)) + } + + // Extract opcode (always 2 bytes on wire) + opcode := binary.BigEndian.Uint16(data[0:2]) + + // Create packet + p := &EQPacket{ + Opcode: opcode, + Timestamp: time.Now(), + } + + // Copy remaining data if any + if len(data) > 2 { + p.Buffer = make([]byte, len(data)-2) + copy(p.Buffer, data[2:]) + } + + return p, nil +} diff --git a/protocol_packet.go b/protocol_packet.go new file mode 100644 index 0000000..8cb41ec --- /dev/null +++ b/protocol_packet.go @@ -0,0 +1,361 @@ +package eq2net + +import ( + "bytes" + "compress/zlib" + "encoding/binary" + "fmt" + + "git.sharkk.net/EQ2/Protocol/crypto" +) + +// ProtocolPacket handles low-level protocol operations including +// compression, encryption, sequencing, and packet combining +type ProtocolPacket struct { + EQPacket + + Compressed bool + Encrypted bool + PacketPrepared bool + + Sequence uint16 + Acknowledged bool + SentTime int32 + AttemptCount int8 + + SubPackets []*ProtocolPacket +} + +// NewProtocolPacket creates a new protocol packet +func NewProtocolPacket(opcode uint16, data []byte) *ProtocolPacket { + p := &ProtocolPacket{ + EQPacket: *NewEQPacket(opcode, data), + } + return p +} + +// ParseProtocolPacket creates a protocol packet from raw network data +func ParseProtocolPacket(data []byte) (*ProtocolPacket, error) { + base, err := ParsePacket(data) + if err != nil { + return nil, err + } + + p := &ProtocolPacket{ + EQPacket: *base, + } + + // Check for compression flag + if len(p.Buffer) > 0 { + switch p.Buffer[0] { + case 0x5a: // Zlib compressed + p.Compressed = true + case 0xa5: // Simple encoding + p.Compressed = true + } + } + + return p, nil +} + +// Serialize writes the protocol packet to a byte buffer with optional offset +func (p *ProtocolPacket) SerializeWithOffset(offset int8) []byte { + opcodeSize := 2 // Protocol packets always use 2-byte opcodes + buf := make([]byte, opcodeSize+len(p.Buffer)-int(offset)) + + if p.Opcode > 0xff { + binary.BigEndian.PutUint16(buf[0:2], p.Opcode) + } else { + buf[0] = 0x00 + buf[1] = byte(p.Opcode) + } + + if len(p.Buffer) > int(offset) { + copy(buf[opcodeSize:], p.Buffer[offset:]) + } + + return buf +} + +// ValidateCRC checks if the packet has a valid CRC +// Returns true if CRC is valid or packet is exempt +func (p *ProtocolPacket) ValidateCRC(key uint32) bool { + // Session control packets are exempt from CRC + if p.Opcode == OP_SessionRequest || + p.Opcode == OP_SessionResponse || + p.Opcode == OP_OutOfSession { + return true + } + + // Need full packet data including opcode for CRC check + fullData := p.Serialize() + if len(fullData) < 2 { + return false + } + + // CRC is in last 2 bytes + if len(fullData) < 4 { // Minimum: 2 byte opcode + 2 byte CRC + return false + } + + dataLen := len(fullData) - 2 + calculatedCRC := crypto.CalculateCRC(fullData[:dataLen], key) + packetCRC := binary.BigEndian.Uint16(fullData[dataLen:]) + + // CRC of 0 means no CRC check required + return packetCRC == 0 || calculatedCRC == packetCRC +} + +// Compress compresses the packet data using zlib or simple encoding +func (p *ProtocolPacket) Compress() error { + if p.Compressed { + return fmt.Errorf("packet already compressed") + } + + // Only compress packets larger than 30 bytes + if len(p.Buffer) > 30 { + compressed, err := p.zlibCompress() + if err != nil { + return err + } + p.Buffer = compressed + p.Compressed = true + } else if len(p.Buffer) > 0 { + // Simple encoding for small packets + p.simpleEncode() + p.Compressed = true + } + + return nil +} + +// Decompress decompresses the packet data +func (p *ProtocolPacket) Decompress() error { + if !p.Compressed { + return fmt.Errorf("packet not compressed") + } + + if len(p.Buffer) == 0 { + return fmt.Errorf("no data to decompress") + } + + var decompressed []byte + var err error + + switch p.Buffer[0] { + case 0x5a: // Zlib compressed + decompressed, err = p.zlibDecompress() + if err != nil { + return err + } + case 0xa5: // Simple encoding + decompressed = p.simpleDecoded() + default: + return fmt.Errorf("unknown compression flag: 0x%02x", p.Buffer[0]) + } + + p.Buffer = decompressed + p.Compressed = false + return nil +} + +// zlibCompress performs zlib compression +func (p *ProtocolPacket) zlibCompress() ([]byte, error) { + var compressed bytes.Buffer + + // Write compression flag + compressed.WriteByte(0x5a) + + // Compress the data + w := zlib.NewWriter(&compressed) + _, err := w.Write(p.Buffer) + if err != nil { + return nil, err + } + err = w.Close() + if err != nil { + return nil, err + } + + return compressed.Bytes(), nil +} + +// zlibDecompress performs zlib decompression +func (p *ProtocolPacket) zlibDecompress() ([]byte, error) { + if len(p.Buffer) < 2 { + return nil, fmt.Errorf("compressed data too small") + } + + // Skip compression flag + r, err := zlib.NewReader(bytes.NewReader(p.Buffer[1:])) + if err != nil { + return nil, err + } + defer r.Close() + + var decompressed bytes.Buffer + _, err = decompressed.ReadFrom(r) + if err != nil { + return nil, err + } + + return decompressed.Bytes(), nil +} + +// simpleEncode adds simple encoding flag +func (p *ProtocolPacket) simpleEncode() { + // Add encoding flag at the beginning + newBuffer := make([]byte, len(p.Buffer)+1) + newBuffer[0] = 0xa5 + copy(newBuffer[1:], p.Buffer) + p.Buffer = newBuffer +} + +// simpleDecoded removes simple encoding flag +func (p *ProtocolPacket) simpleDecoded() []byte { + if len(p.Buffer) > 1 { + return p.Buffer[1:] + } + return []byte{} +} + +// Combine bundles multiple protocol packets into a single combined packet +func (p *ProtocolPacket) Combine(other *ProtocolPacket) bool { + // Check if this is already a combined packet + if p.Opcode != OP_Combined { + // Convert to combined packet + firstPacket := p.Clone() + p.Opcode = OP_Combined + p.SubPackets = []*ProtocolPacket{ + {EQPacket: *firstPacket}, + } + p.Buffer = p.buildCombinedBuffer() + } + + // Check size limit (max 255 bytes for combined packets) + totalSize := len(p.Buffer) + int(other.TotalSize()) + 1 // +1 for size byte + if totalSize > 255 { + return false + } + + // Add the new packet + p.SubPackets = append(p.SubPackets, other) + p.Buffer = p.buildCombinedBuffer() + return true +} + +// buildCombinedBuffer creates the buffer for a combined packet +func (p *ProtocolPacket) buildCombinedBuffer() []byte { + var buf bytes.Buffer + + for _, subPacket := range p.SubPackets { + serialized := subPacket.Serialize() + buf.WriteByte(byte(len(serialized))) // Size byte + buf.Write(serialized) + } + + return buf.Bytes() +} + +// ExtractSubPackets extracts individual packets from a combined packet +func (p *ProtocolPacket) ExtractSubPackets() ([]*ProtocolPacket, error) { + if p.Opcode != OP_Combined { + return nil, fmt.Errorf("not a combined packet") + } + + var packets []*ProtocolPacket + offset := 0 + data := p.Buffer + + for offset < len(data) { + // Read size byte + if offset >= len(data) { + break + } + size := int(data[offset]) + offset++ + + // Extract sub-packet data + if offset+size > len(data) { + return nil, fmt.Errorf("invalid combined packet: size exceeds buffer") + } + + subData := data[offset : offset+size] + offset += size + + // Parse sub-packet + subPacket, err := ParseProtocolPacket(subData) + if err != nil { + return nil, fmt.Errorf("failed to parse sub-packet: %w", err) + } + + packets = append(packets, subPacket) + } + + return packets, nil +} + +// MakeApplicationPacket converts this protocol packet to an application packet +// This is used when a protocol packet contains application-level data +func (p *ProtocolPacket) MakeApplicationPacket(opcodeSize uint8) *AppPacket { + if len(p.Buffer) < int(opcodeSize) { + return nil + } + + // Extract the application opcode from the buffer + var appOpcode uint16 + if opcodeSize == 1 { + appOpcode = uint16(p.Buffer[0]) + } else { + appOpcode = binary.BigEndian.Uint16(p.Buffer[0:2]) + } + + // Create application packet with remaining data + app := &AppPacket{ + EQPacket: *NewEQPacket(appOpcode, p.Buffer[opcodeSize:]), + OpcodeSize: opcodeSize, + } + + // Copy network info + app.SrcIP = p.SrcIP + app.SrcPort = p.SrcPort + app.DstIP = p.DstIP + app.DstPort = p.DstPort + app.Timestamp = p.Timestamp + app.Version = p.Version + + return app +} + +// ChatEncode applies XOR-based chat encryption +func (p *ProtocolPacket) ChatEncode(key uint32) { + if len(p.Buffer) <= 2 { + return // Skip opcode bytes + } + + data := p.Buffer[2:] // Skip first 2 bytes (opcode) + keyBytes := make([]byte, 4) + binary.LittleEndian.PutUint32(keyBytes, key) + + // Process 4-byte blocks with rolling key + i := 0 + for ; i+4 <= len(data); i += 4 { + // XOR with current key + block := binary.LittleEndian.Uint32(data[i : i+4]) + encrypted := block ^ key + binary.LittleEndian.PutUint32(data[i:i+4], encrypted) + // Update key with encrypted data + key = encrypted + } + + // Handle remaining bytes with last key byte + keyByte := byte(key & 0xFF) + for ; i < len(data); i++ { + data[i] ^= keyByte + } +} + +// ChatDecode applies XOR-based chat decryption (same as encode due to XOR properties) +func (p *ProtocolPacket) ChatDecode(key uint32) { + p.ChatEncode(key) // XOR encryption is symmetric +} diff --git a/stream.go b/stream.go new file mode 100644 index 0000000..9c1e3a3 --- /dev/null +++ b/stream.go @@ -0,0 +1,715 @@ +package eq2net + +import ( + "encoding/binary" + "fmt" + "net" + "sync" + "time" + + "git.sharkk.net/EQ2/Protocol/crypto" + "github.com/panjf2000/gnet/v2" +) + +// StreamState represents the current state of an EQStream connection +type StreamState int + +const ( + CLOSED StreamState = iota // No connection + CONNECTING // Session request sent, awaiting response + ESTABLISHED // Active connection ready for data + DISCONNECTING // Graceful disconnect in progress +) + +// StreamType identifies the type of EQ stream +type StreamType int + +const ( + UnknownStream StreamType = iota + LoginStream + WorldStream + ZoneStream + ChatStream + VoiceStream +) + +// Stream configuration constants +const ( + MaxPacketSize = 512 // Maximum packet size + MaxCombinedSize = 255 // Maximum combined packet size + RetransmitTimeoutMin = 500 // Minimum retransmit timeout (ms) + RetransmitTimeoutMax = 5000 // Maximum retransmit timeout (ms) + RetransmitMaxAttempts = 10 // Maximum retransmission attempts + KeepAliveInterval = 30000 // Keep-alive interval (ms) + SessionTimeout = 90000 // Session timeout (ms) + + // Packet flags + FLAG_COMPRESSED = 0x01 // Packet is compressed + FLAG_ENCODED = 0x04 // Packet is encoded/encrypted + + // Rate limiting + RATEBASE = 1048576 // Base rate: 1 MB + DECAYBASE = 78642 // Decay rate: RATEBASE/10 +) + +// RetransmitEntry tracks packets awaiting acknowledgment +type RetransmitEntry struct { + packet *ProtocolPacket + sentTime time.Time + attempts int +} + +// EQStream manages a single EverQuest network stream connection +type EQStream struct { + // Network connection + conn gnet.Conn + remoteIP net.IP + remotePort uint16 + + // Session identification + sessionID uint32 + streamType StreamType + + // State management + state StreamState + stateMu sync.RWMutex + + // Encryption + crypto *crypto.Ciphers + cryptoKey uint32 + + // Compression settings + compressed bool + encoded bool + + // Sequence numbers + nextInSeq uint16 + nextOutSeq uint16 + lastAckSeq uint16 + seqMu sync.Mutex + + // Packet queues + outbound chan *ProtocolPacket + inbound chan *AppPacket + retransmit map[uint16]*RetransmitEntry + retransmitMu sync.RWMutex + futurePackets map[uint16]*ProtocolPacket + futureMu sync.Mutex + + // Combined packet handling + combinedApp *AppPacket + combinedMu sync.Mutex + combineTimer *time.Timer + + // Flow control + currentRate uint32 + rateThreshold uint32 + rateMu sync.Mutex + + // Timing + lastActivity time.Time + lastPing time.Time + avgDelta time.Duration + timingMu sync.RWMutex + + // Statistics + packetsReceived uint64 + packetsSent uint64 + bytesReceived uint64 + bytesSent uint64 + statsMu sync.RWMutex + + // Configuration + opcodeSize uint8 + maxPacketSize uint32 + + // Callbacks + onAppPacket func(*AppPacket) // Called when app packet is ready + onDisconnect func() // Called on disconnect +} + +// NewEQStream creates a new EQ stream +func NewEQStream(conn gnet.Conn) *EQStream { + stream := &EQStream{ + conn: conn, + state: CLOSED, + sessionID: 0, + streamType: UnknownStream, + compressed: true, + encoded: false, + nextInSeq: 0, + nextOutSeq: 0, + lastAckSeq: 0, + outbound: make(chan *ProtocolPacket, 1024), + inbound: make(chan *AppPacket, 1024), + retransmit: make(map[uint16]*RetransmitEntry), + futurePackets: make(map[uint16]*ProtocolPacket), + currentRate: RATEBASE, + rateThreshold: DECAYBASE, + lastActivity: time.Now(), + opcodeSize: 2, + maxPacketSize: MaxPacketSize, + cryptoKey: 0x33624702, // Default CRC key + } + + // Parse remote address + if addr := conn.RemoteAddr(); addr != nil { + if tcpAddr, ok := addr.(*net.TCPAddr); ok { + stream.remoteIP = tcpAddr.IP + stream.remotePort = uint16(tcpAddr.Port) + } else if udpAddr, ok := addr.(*net.UDPAddr); ok { + stream.remoteIP = udpAddr.IP + stream.remotePort = uint16(udpAddr.Port) + } + } + + return stream +} + +// GetState returns the current stream state +func (s *EQStream) GetState() StreamState { + s.stateMu.RLock() + defer s.stateMu.RUnlock() + return s.state +} + +// SetState updates the stream state +func (s *EQStream) SetState(state StreamState) { + s.stateMu.Lock() + defer s.stateMu.Unlock() + s.state = state +} + +// IsActive returns true if the stream is established +func (s *EQStream) IsActive() bool { + return s.GetState() == ESTABLISHED +} + +// IsClosed returns true if the stream is closed +func (s *EQStream) IsClosed() bool { + return s.GetState() == CLOSED +} + +// ProcessPacket processes an incoming protocol packet +func (s *EQStream) ProcessPacket(p *ProtocolPacket) error { + s.updateActivity() + s.updateStats(true, uint64(p.TotalSize())) + + // Handle based on opcode + switch p.Opcode { + case OP_SessionRequest: + return s.handleSessionRequest(p) + case OP_SessionResponse: + return s.handleSessionResponse(p) + case OP_SessionDisconnect: + return s.handleDisconnect(p) + case OP_KeepAlive: + return s.handleKeepAlive(p) + case OP_Ack: + return s.handleAck(p) + case OP_OutOfOrderAck: + return s.handleOutOfOrderAck(p) + case OP_Packet: + return s.handleDataPacket(p) + case OP_Fragment: + return s.handleFragment(p) + case OP_Combined: + return s.handleCombined(p) + case OP_AppCombined: + return s.handleAppCombined(p) + default: + return fmt.Errorf("unknown opcode: 0x%04X", p.Opcode) + } +} + +// handleSessionRequest processes a session request packet +func (s *EQStream) handleSessionRequest(p *ProtocolPacket) error { + if len(p.Buffer) < 4 { + return fmt.Errorf("session request too small") + } + + // Parse session ID + s.sessionID = binary.BigEndian.Uint32(p.Buffer[0:4]) + + // Send session response + s.sendSessionResponse() + + // Update state + s.SetState(ESTABLISHED) + + return nil +} + +// handleSessionResponse processes a session response packet +func (s *EQStream) handleSessionResponse(p *ProtocolPacket) error { + if len(p.Buffer) < 4 { + return fmt.Errorf("session response too small") + } + + // Parse session parameters + s.sessionID = binary.BigEndian.Uint32(p.Buffer[0:4]) + + // Parse compression/encoding flags if present + if len(p.Buffer) >= 5 { + flags := p.Buffer[4] + s.compressed = (flags & FLAG_COMPRESSED) != 0 + s.encoded = (flags & FLAG_ENCODED) != 0 + } + + // Initialize encryption if needed + if s.encoded && s.crypto == nil { + // This would be initialized with proper key exchange + // For now, using default key + var err error + s.crypto, err = crypto.NewCiphers(int64(s.cryptoKey)) + if err != nil { + return fmt.Errorf("failed to initialize encryption: %w", err) + } + } + + // Update state + s.SetState(ESTABLISHED) + + // Send keep alive to confirm + s.sendKeepAlive() + + return nil +} + +// handleDisconnect processes a disconnect packet +func (s *EQStream) handleDisconnect(p *ProtocolPacket) error { + s.SetState(CLOSED) + + // Call disconnect callback if set + if s.onDisconnect != nil { + s.onDisconnect() + } + + return nil +} + +// handleKeepAlive processes a keep-alive packet +func (s *EQStream) handleKeepAlive(p *ProtocolPacket) error { + // Update activity timestamp + s.updateActivity() + + // Send keep-alive response if needed + // Some implementations echo the keep-alive + + return nil +} + +// handleAck processes an acknowledgment packet +func (s *EQStream) handleAck(p *ProtocolPacket) error { + if len(p.Buffer) < 2 { + return fmt.Errorf("ack packet too small") + } + + // Parse acknowledged sequence number + ackSeq := binary.BigEndian.Uint16(p.Buffer[0:2]) + + // Remove from retransmit queue + s.retransmitMu.Lock() + delete(s.retransmit, ackSeq) + s.retransmitMu.Unlock() + + // Update last acknowledged sequence + s.seqMu.Lock() + if s.sequenceGreaterThan(ackSeq, s.lastAckSeq) { + s.lastAckSeq = ackSeq + } + s.seqMu.Unlock() + + return nil +} + +// handleOutOfOrderAck processes an out-of-order acknowledgment +func (s *EQStream) handleOutOfOrderAck(p *ProtocolPacket) error { + // Similar to handleAck but indicates out-of-order reception + return s.handleAck(p) +} + +// handleDataPacket processes a data packet +func (s *EQStream) handleDataPacket(p *ProtocolPacket) error { + // Check sequence number + seq := p.Sequence + + s.seqMu.Lock() + expectedSeq := s.nextInSeq + s.seqMu.Unlock() + + seqOrder := s.compareSequence(expectedSeq, seq) + + switch seqOrder { + case SeqInOrder: + // Process packet immediately + s.seqMu.Lock() + s.nextInSeq++ + s.seqMu.Unlock() + + // Send ACK + s.sendAck(seq) + + // Convert to application packet and queue + if appPacket := p.MakeApplicationPacket(s.opcodeSize); appPacket != nil { + select { + case s.inbound <- appPacket: + default: + // Queue full, drop packet + } + + // Call callback if set + if s.onAppPacket != nil { + s.onAppPacket(appPacket) + } + } + + // Check for queued future packets + s.processFuturePackets() + + case SeqFuture: + // Queue for later processing + s.futureMu.Lock() + s.futurePackets[seq] = p + s.futureMu.Unlock() + + // Send out-of-order ACK + s.sendOutOfOrderAck(seq) + + case SeqPast: + // Duplicate packet, just ACK it + s.sendAck(seq) + } + + return nil +} + +// handleFragment processes a fragmented packet +func (s *EQStream) handleFragment(p *ProtocolPacket) error { + // TODO: Implement fragment reassembly + // This requires tracking fragment sequences and reassembling + return fmt.Errorf("fragment handling not yet implemented") +} + +// handleCombined processes a combined packet +func (s *EQStream) handleCombined(p *ProtocolPacket) error { + // Extract sub-packets + subPackets, err := p.ExtractSubPackets() + if err != nil { + return fmt.Errorf("failed to extract sub-packets: %w", err) + } + + // Process each sub-packet + for _, subPacket := range subPackets { + if err := s.ProcessPacket(subPacket); err != nil { + // Log error but continue processing other packets + continue + } + } + + return nil +} + +// handleAppCombined processes an application-level combined packet +func (s *EQStream) handleAppCombined(p *ProtocolPacket) error { + // Convert to app packet first + appPacket := p.MakeApplicationPacket(s.opcodeSize) + if appPacket == nil { + return fmt.Errorf("failed to convert to app packet") + } + + // Extract application packets + appPackets, err := ExtractAppPackets(appPacket) + if err != nil { + return fmt.Errorf("failed to extract app packets: %w", err) + } + + // Queue each application packet + for _, app := range appPackets { + select { + case s.inbound <- app: + default: + // Queue full + } + + if s.onAppPacket != nil { + s.onAppPacket(app) + } + } + + return nil +} + +// QueuePacket queues an outbound packet for transmission +func (s *EQStream) QueuePacket(p *ProtocolPacket, reliable bool) error { + if s.GetState() != ESTABLISHED { + return fmt.Errorf("stream not established") + } + + // Add sequence number for reliable packets + if reliable { + s.seqMu.Lock() + p.Sequence = s.nextOutSeq + s.nextOutSeq++ + s.seqMu.Unlock() + + // Add to retransmit queue + s.retransmitMu.Lock() + s.retransmit[p.Sequence] = &RetransmitEntry{ + packet: p, + sentTime: time.Now(), + attempts: 0, + } + s.retransmitMu.Unlock() + } + + // Add CRC if needed + if p.Opcode != OP_SessionRequest && p.Opcode != OP_SessionResponse { + s.addCRC(p) + } + + // Compress if beneficial + if s.compressed && len(p.Buffer) > 30 { + p.Compress() + } + + // Encrypt if enabled + if s.encoded && s.crypto != nil { + s.encryptPacket(p) + } + + // Send packet + return s.sendPacket(p) +} + +// Helper methods + +func (s *EQStream) updateActivity() { + s.timingMu.Lock() + s.lastActivity = time.Now() + s.timingMu.Unlock() +} + +func (s *EQStream) updateStats(received bool, bytes uint64) { + s.statsMu.Lock() + if received { + s.packetsReceived++ + s.bytesReceived += bytes + } else { + s.packetsSent++ + s.bytesSent += bytes + } + s.statsMu.Unlock() +} + +func (s *EQStream) sendPacket(p *ProtocolPacket) error { + data := p.Serialize() + s.updateStats(false, uint64(len(data))) + return s.conn.AsyncWrite(data, nil) +} + +func (s *EQStream) sendSessionResponse() { + resp := NewProtocolPacket(OP_SessionResponse, nil) + + // Build response data + data := make([]byte, 5) + binary.BigEndian.PutUint32(data[0:4], s.sessionID) + + // Set flags + flags := byte(0) + if s.compressed { + flags |= FLAG_COMPRESSED + } + if s.encoded { + flags |= FLAG_ENCODED + } + data[4] = flags + + resp.Buffer = data + s.sendPacket(resp) +} + +func (s *EQStream) sendKeepAlive() { + ka := NewProtocolPacket(OP_KeepAlive, nil) + s.sendPacket(ka) + + s.timingMu.Lock() + s.lastPing = time.Now() + s.timingMu.Unlock() +} + +func (s *EQStream) sendAck(seq uint16) { + ack := NewProtocolPacket(OP_Ack, nil) + ack.Buffer = make([]byte, 2) + binary.BigEndian.PutUint16(ack.Buffer, seq) + s.sendPacket(ack) +} + +func (s *EQStream) sendOutOfOrderAck(seq uint16) { + ack := NewProtocolPacket(OP_OutOfOrderAck, nil) + ack.Buffer = make([]byte, 2) + binary.BigEndian.PutUint16(ack.Buffer, seq) + s.sendPacket(ack) +} + +func (s *EQStream) addCRC(p *ProtocolPacket) { + // Serialize packet without CRC + data := p.Serialize() + + // Calculate CRC + crc := crypto.CalculateCRC(data, s.cryptoKey) + + // Append CRC to buffer + crcBytes := make([]byte, 2) + binary.BigEndian.PutUint16(crcBytes, crc) + p.Buffer = append(p.Buffer, crcBytes...) +} + +func (s *EQStream) encryptPacket(p *ProtocolPacket) { + if s.crypto == nil { + return + } + + // Encrypt the buffer (skip opcode) + if len(p.Buffer) > 2 { + s.crypto.Encrypt(p.Buffer[2:]) + p.Encrypted = true + } +} + +func (s *EQStream) processFuturePackets() { + s.futureMu.Lock() + defer s.futureMu.Unlock() + + s.seqMu.Lock() + expectedSeq := s.nextInSeq + s.seqMu.Unlock() + + // Check if we have the next expected packet + if p, ok := s.futurePackets[expectedSeq]; ok { + delete(s.futurePackets, expectedSeq) + + // Process it (this will recursively check for more) + go s.ProcessPacket(p) + } +} + +// Sequence number comparison + +type SeqOrder int + +const ( + SeqPast SeqOrder = iota // Sequence is from the past + SeqInOrder // Sequence is expected + SeqFuture // Sequence is from the future +) + +func (s *EQStream) compareSequence(expected, received uint16) SeqOrder { + if received == expected { + return SeqInOrder + } + + // Handle wraparound + if s.sequenceGreaterThan(received, expected) { + return SeqFuture + } + + return SeqPast +} + +func (s *EQStream) sequenceGreaterThan(a, b uint16) bool { + // Handle sequence number wraparound + // If the difference is more than half the sequence space, + // assume wraparound occurred + diff := int32(a) - int32(b) + if diff < 0 { + diff = -diff + } + + if diff > 32768 { // Half of uint16 max + return a < b + } + + return a > b +} + +// CheckRetransmissions checks for packets that need retransmission +func (s *EQStream) CheckRetransmissions() { + now := time.Now() + + s.retransmitMu.Lock() + defer s.retransmitMu.Unlock() + + for seq, entry := range s.retransmit { + // Calculate timeout based on average delta and attempts + timeout := time.Duration(RetransmitTimeoutMin) * time.Millisecond + if s.avgDelta > 0 { + timeout = s.avgDelta * 3 + } + + // Exponential backoff + for i := 0; i < entry.attempts; i++ { + timeout *= 2 + if timeout > time.Duration(RetransmitTimeoutMax)*time.Millisecond { + timeout = time.Duration(RetransmitTimeoutMax) * time.Millisecond + break + } + } + + if now.Sub(entry.sentTime) > timeout { + if entry.attempts >= RetransmitMaxAttempts { + // Give up on this packet + delete(s.retransmit, seq) + continue + } + + // Retransmit + entry.attempts++ + entry.sentTime = now + s.sendPacket(entry.packet) + } + } +} + +// CheckTimeout checks if the stream has timed out +func (s *EQStream) CheckTimeout() bool { + s.timingMu.RLock() + lastActivity := s.lastActivity + s.timingMu.RUnlock() + + if time.Since(lastActivity) > time.Duration(SessionTimeout)*time.Millisecond { + s.SetState(CLOSED) + if s.onDisconnect != nil { + s.onDisconnect() + } + return true + } + + // Send keep-alive if needed + if time.Since(lastActivity) > time.Duration(KeepAliveInterval)*time.Millisecond { + s.sendKeepAlive() + } + + return false +} + +// SendDisconnect sends a disconnect packet +func (s *EQStream) SendDisconnect() { + disc := NewProtocolPacket(OP_SessionDisconnect, nil) + s.sendPacket(disc) + s.SetState(DISCONNECTING) +} + +// Close closes the stream +func (s *EQStream) Close() { + s.SendDisconnect() + s.SetState(CLOSED) + + // Clean up resources + close(s.outbound) + close(s.inbound) + + if s.combineTimer != nil { + s.combineTimer.Stop() + } +} diff --git a/stream_factory.go b/stream_factory.go new file mode 100644 index 0000000..ac002a5 --- /dev/null +++ b/stream_factory.go @@ -0,0 +1,54 @@ +package eq2net + +import ( + "fmt" + "sync" +) + +// StreamFactory provides a factory pattern for creating streams +type StreamFactory struct { + servers map[StreamType]*StreamServer + serversMu sync.RWMutex +} + +// NewStreamFactory creates a new stream factory +func NewStreamFactory() *StreamFactory { + return &StreamFactory{ + servers: make(map[StreamType]*StreamServer), + } +} + +// CreateServer creates a new stream server +func (f *StreamFactory) CreateServer(streamType StreamType, address string, port int) (*StreamServer, error) { + f.serversMu.Lock() + defer f.serversMu.Unlock() + + // Check if server already exists for this type + if _, exists := f.servers[streamType]; exists { + return nil, fmt.Errorf("server already exists for stream type %d", streamType) + } + + // Create new server + server := NewStreamServer(address, port, streamType) + f.servers[streamType] = server + + return server, nil +} + +// GetServer returns a server by stream type +func (f *StreamFactory) GetServer(streamType StreamType) *StreamServer { + f.serversMu.RLock() + defer f.serversMu.RUnlock() + return f.servers[streamType] +} + +// StopAll stops all servers +func (f *StreamFactory) StopAll() { + f.serversMu.Lock() + defer f.serversMu.Unlock() + + for _, server := range f.servers { + server.Stop() + } + f.servers = make(map[StreamType]*StreamServer) +} diff --git a/stream_server.go b/stream_server.go new file mode 100644 index 0000000..22c9f94 --- /dev/null +++ b/stream_server.go @@ -0,0 +1,263 @@ +package eq2net + +import ( + "fmt" + "sync" + "time" + + "github.com/panjf2000/gnet/v2" +) + +// StreamServer manages multiple EQStream connections using gnet +type StreamServer struct { + gnet.BuiltinEventEngine + + streams map[string]*EQStream // "IP:Port" -> Stream mapping + streamsMu sync.RWMutex + + address string + port int + streamType StreamType + maxStreams int + + onNewStream func(*EQStream) + onStreamClosed func(*EQStream) + onAppPacket func(*EQStream, *AppPacket) + + totalConnections uint64 + activeConnections uint32 + statsMu sync.RWMutex + + shutdown chan bool +} + +// NewStreamServer creates a new EQ stream server +func NewStreamServer(address string, port int, streamType StreamType) *StreamServer { + return &StreamServer{ + streams: make(map[string]*EQStream), + address: address, + port: port, + streamType: streamType, + maxStreams: 10000, + shutdown: make(chan bool), + } +} + +// Start starts the stream server +func (s *StreamServer) Start() error { + addr := fmt.Sprintf("%s:%d", s.address, s.port) + + // Configure gnet options + options := []gnet.Option{ + gnet.WithMulticore(true), + gnet.WithReusePort(true), + gnet.WithTicker(true), + gnet.WithTCPKeepAlive(time.Minute * 5), + } + + // Start gnet server + return gnet.Run(s, fmt.Sprintf("udp://%s", addr), options...) +} + +// Stop stops the stream server +func (s *StreamServer) Stop() { + close(s.shutdown) + + // Close all active streams + s.streamsMu.Lock() + for _, stream := range s.streams { + stream.Close() + } + s.streams = make(map[string]*EQStream) + s.streamsMu.Unlock() +} + +// OnBoot is called when the server starts +func (s *StreamServer) OnBoot(eng gnet.Engine) gnet.Action { + fmt.Printf("EQStream server started on %s:%d\n", s.address, s.port) + return gnet.None +} + +// OnShutdown is called when the server stops +func (s *StreamServer) OnShutdown(eng gnet.Engine) { + fmt.Printf("EQStream server shutdown\n") +} + +// OnOpen is called when a new connection is established +func (s *StreamServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) { + // Create new stream + stream := NewEQStream(c) + stream.streamType = s.streamType + + // Set up callbacks + stream.onAppPacket = func(p *AppPacket) { + if s.onAppPacket != nil { + s.onAppPacket(stream, p) + } + } + + stream.onDisconnect = func() { + s.removeStream(stream) + if s.onStreamClosed != nil { + s.onStreamClosed(stream) + } + } + + // Store stream in connection context + c.SetContext(stream) + + // Add to stream map + key := s.getStreamKey(c) + s.streamsMu.Lock() + s.streams[key] = stream + s.activeConnections++ + s.totalConnections++ + s.streamsMu.Unlock() + + // Call new stream callback + if s.onNewStream != nil { + s.onNewStream(stream) + } + + return nil, gnet.None +} + +// OnClose is called when a connection is closed +func (s *StreamServer) OnClose(c gnet.Conn, err error) (action gnet.Action) { + if stream, ok := c.Context().(*EQStream); ok { + stream.Close() + s.removeStream(stream) + + if s.onStreamClosed != nil { + s.onStreamClosed(stream) + } + } + + return gnet.None +} + +// OnTraffic is called when data is received +func (s *StreamServer) OnTraffic(c gnet.Conn) (action gnet.Action) { + stream, ok := c.Context().(*EQStream) + if !ok { + return gnet.Close + } + + // Read all available data + for { + // Peek at the data to determine packet size + buf, err := c.Peek(-1) + if err != nil || len(buf) < 2 { + break + } + + // Parse packet length from opcode and data + // For UDP, each datagram is a complete packet + packet, err := ParseProtocolPacket(buf) + if err != nil { + // Invalid packet, skip it + c.Discard(len(buf)) + continue + } + + // Consume the packet data + c.Discard(len(buf)) + + // Process packet + if err := stream.ProcessPacket(packet); err != nil { + // Log error but continue processing + fmt.Printf("Error processing packet: %v\n", err) + } + } + + return gnet.None +} + +// OnTick is called periodically for maintenance tasks +func (s *StreamServer) OnTick() (delay time.Duration, action gnet.Action) { + // Check for timeouts and retransmissions + s.streamsMu.RLock() + streams := make([]*EQStream, 0, len(s.streams)) + for _, stream := range s.streams { + streams = append(streams, stream) + } + s.streamsMu.RUnlock() + + // Process each stream + for _, stream := range streams { + // Check for timeout + if stream.CheckTimeout() { + s.removeStream(stream) + continue + } + + // Check for retransmissions + stream.CheckRetransmissions() + } + + // Return delay until next tick (100ms) + return time.Millisecond * 100, gnet.None +} + +// Helper methods + +func (s *StreamServer) getStreamKey(c gnet.Conn) string { + if addr := c.RemoteAddr(); addr != nil { + return addr.String() + } + return "" +} + +func (s *StreamServer) removeStream(stream *EQStream) { + // Find and remove stream from map + s.streamsMu.Lock() + for key, str := range s.streams { + if str == stream { + delete(s.streams, key) + s.activeConnections-- + break + } + } + s.streamsMu.Unlock() +} + +// SetOnNewStream sets the callback for new streams +func (s *StreamServer) SetOnNewStream(callback func(*EQStream)) { + s.onNewStream = callback +} + +// SetOnStreamClosed sets the callback for closed streams +func (s *StreamServer) SetOnStreamClosed(callback func(*EQStream)) { + s.onStreamClosed = callback +} + +// SetOnAppPacket sets the callback for application packets +func (s *StreamServer) SetOnAppPacket(callback func(*EQStream, *AppPacket)) { + s.onAppPacket = callback +} + +// GetStream returns a stream by remote address +func (s *StreamServer) GetStream(address string) *EQStream { + s.streamsMu.RLock() + defer s.streamsMu.RUnlock() + return s.streams[address] +} + +// GetActiveStreams returns all active streams +func (s *StreamServer) GetActiveStreams() []*EQStream { + s.streamsMu.RLock() + defer s.streamsMu.RUnlock() + + streams := make([]*EQStream, 0, len(s.streams)) + for _, stream := range s.streams { + streams = append(streams, stream) + } + return streams +} + +// GetStats returns server statistics +func (s *StreamServer) GetStats() (total uint64, active uint32) { + s.statsMu.RLock() + defer s.statsMu.RUnlock() + return s.totalConnections, s.activeConnections +}