From 7a7ff3f6bae89af5c8c595883dacd67b914d0fc9 Mon Sep 17 00:00:00 2001 From: Sky Johnson Date: Wed, 3 Sep 2025 11:54:26 -0500 Subject: [PATCH] start stream --- stream/factory.go | 267 ++++++++++++++++++++++++ stream/stream.go | 512 ++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 779 insertions(+) create mode 100644 stream/factory.go create mode 100644 stream/stream.go diff --git a/stream/factory.go b/stream/factory.go new file mode 100644 index 0000000..e0e8fdb --- /dev/null +++ b/stream/factory.go @@ -0,0 +1,267 @@ +package stream + +import ( + "database/sql" + "log" + "sync" + "time" + + "git.sharkk.net/EQ2/Protocol/opcodes" + "git.sharkk.net/EQ2/Protocol/packets" + "github.com/panjf2000/gnet/v2" +) + +// StreamFactory manages EQStream instances using gnet +type StreamFactory struct { + gnet.BuiltinEventEngine + + mu sync.RWMutex + streams map[gnet.Conn]*Stream + + // Configuration + crcKey uint32 + maxPacketSize uint16 + encodeKey int + decodeKey int + opcodeSize uint8 + clientVersion int16 + + // Database for opcodes + db *sql.DB + opcodeTable string + opcodeManager opcodes.Manager + + // Callbacks + onNewStream func(*Stream) + onStreamClosed func(*Stream) + onPacket func(*Stream, *packets.AppPacket) + + // Stats + totalStreams uint64 + activeStreams uint64 +} + +// FactoryConfig holds factory configuration +type FactoryConfig struct { + CRCKey uint32 + MaxPacketSize uint16 + EncodeKey int + DecodeKey int + OpcodeSize uint8 + ClientVersion int16 + Database *sql.DB + OpcodeTable string +} + +// NewStreamFactory creates a new factory +func NewStreamFactory(cfg *FactoryConfig) *StreamFactory { + f := &StreamFactory{ + streams: make(map[gnet.Conn]*Stream), + crcKey: cfg.CRCKey, + maxPacketSize: cfg.MaxPacketSize, + encodeKey: cfg.EncodeKey, + decodeKey: cfg.DecodeKey, + opcodeSize: cfg.OpcodeSize, + clientVersion: cfg.ClientVersion, + db: cfg.Database, + opcodeTable: cfg.OpcodeTable, + } + + if f.maxPacketSize == 0 { + f.maxPacketSize = packets.DefaultMTU + } + if f.opcodeSize == 0 { + f.opcodeSize = packets.DefaultOpcodeSize + } + if f.opcodeTable == "" { + f.opcodeTable = "opcodes" + } + + // Initialize opcode manager + if cfg.ClientVersion > 0 && cfg.Database != nil { + f.opcodeManager = opcodes.GetManager(cfg.ClientVersion, cfg.Database, f.opcodeTable) + } + + return f +} + +// OnBoot called when server starts +func (f *StreamFactory) OnBoot(eng gnet.Engine) gnet.Action { + log.Printf("StreamFactory started") + return gnet.None +} + +// OnOpen called when new connection opens +func (f *StreamFactory) OnOpen(c gnet.Conn) ([]byte, gnet.Action) { + // Create stream configuration + streamCfg := &Config{ + SessionID: uint32(time.Now().UnixNano()), // Generate session ID + CRCKey: f.crcKey, + MaxPacketSize: f.maxPacketSize, + EncodeKey: f.encodeKey, + DecodeKey: f.decodeKey, + OpcodeManager: f.opcodeManager, + OpcodeSize: f.opcodeSize, + AckThreshold: 200 * time.Millisecond, + KeepaliveTime: 10 * time.Second, + TimeoutDuration: 30 * time.Second, + } + + // Create new stream + stream := NewStream(c, streamCfg) + + // Set packet callback + stream.SetPacketCallback(func(app *packets.AppPacket) { + if f.onPacket != nil { + f.onPacket(stream, app) + } + }) + + // Store stream + f.mu.Lock() + f.streams[c] = stream + f.activeStreams++ + f.totalStreams++ + f.mu.Unlock() + + // Notify callback + if f.onNewStream != nil { + f.onNewStream(stream) + } + + c.SetContext(stream) + log.Printf("New stream from %s", c.RemoteAddr()) + + return nil, gnet.None +} + +// OnClose called when connection closes +func (f *StreamFactory) OnClose(c gnet.Conn, err error) gnet.Action { + f.mu.Lock() + stream, exists := f.streams[c] + if exists { + delete(f.streams, c) + f.activeStreams-- + } + f.mu.Unlock() + + if stream != nil { + stream.Close() + + // Notify callback + if f.onStreamClosed != nil { + f.onStreamClosed(stream) + } + + log.Printf("Stream closed: %s", c.RemoteAddr()) + } + + return gnet.None +} + +// OnTraffic handles incoming data +func (f *StreamFactory) OnTraffic(c gnet.Conn) gnet.Action { + stream := c.Context().(*Stream) + if stream == nil { + return gnet.Close + } + + // Read all available data + buf, err := c.Next(-1) + if err != nil { + return gnet.Close + } + + // Process packet + if err := stream.Process(buf); err != nil { + log.Printf("Stream process error: %v", err) + return gnet.Close + } + + return gnet.None +} + +// OnTick called periodically +func (f *StreamFactory) OnTick() (time.Duration, gnet.Action) { + f.mu.RLock() + defer f.mu.RUnlock() + + // Process retransmissions for all streams + for _, stream := range f.streams { + stream.processQueues() + } + + return 100 * time.Millisecond, gnet.None +} + +// GetStream returns stream for a connection +func (f *StreamFactory) GetStream(c gnet.Conn) *Stream { + f.mu.RLock() + defer f.mu.RUnlock() + return f.streams[c] +} + +// GetAllStreams returns all active streams +func (f *StreamFactory) GetAllStreams() []*Stream { + f.mu.RLock() + defer f.mu.RUnlock() + + result := make([]*Stream, 0, len(f.streams)) + for _, stream := range f.streams { + result = append(result, stream) + } + return result +} + +// SetNewStreamCallback sets callback for new streams +func (f *StreamFactory) SetNewStreamCallback(fn func(*Stream)) { + f.onNewStream = fn +} + +// SetStreamClosedCallback sets callback for closed streams +func (f *StreamFactory) SetStreamClosedCallback(fn func(*Stream)) { + f.onStreamClosed = fn +} + +// SetPacketCallback sets global packet callback +func (f *StreamFactory) SetPacketCallback(fn func(*Stream, *packets.AppPacket)) { + f.onPacket = fn +} + +// UpdateOpcodeManager updates opcodes for a specific version +func (f *StreamFactory) UpdateOpcodeManager(version int16) { + if f.db != nil { + f.opcodeManager = opcodes.GetManager(version, f.db, f.opcodeTable) + log.Printf("Updated opcode manager to version %d", version) + } +} + +// Stats returns factory statistics +func (f *StreamFactory) Stats() map[string]uint64 { + f.mu.RLock() + defer f.mu.RUnlock() + + return map[string]uint64{ + "total_streams": f.totalStreams, + "active_streams": f.activeStreams, + } +} + +// Run starts the factory as a UDP server +func Run(addr string, factory *StreamFactory) error { + return gnet.Run(factory, addr, + gnet.WithMulticore(true), + gnet.WithReuseAddr(true), + gnet.WithReusePort(true), + gnet.WithTicker(true), + gnet.WithTCPKeepAlive(0), // Disable for UDP + gnet.WithSocketRecvBuffer(1024*1024), + gnet.WithSocketSendBuffer(1024*1024), + ) +} + +// RunTLS starts the factory with TLS (future enhancement) +func RunTLS(addr string, factory *StreamFactory, certFile, keyFile string) error { + // TLS configuration would go here if needed + return Run(addr, factory) +} diff --git a/stream/stream.go b/stream/stream.go new file mode 100644 index 0000000..1553708 --- /dev/null +++ b/stream/stream.go @@ -0,0 +1,512 @@ +package stream + +import ( + "encoding/binary" + "sync" + "sync/atomic" + "time" + + "git.sharkk.net/EQ2/Protocol/crypto" + "git.sharkk.net/EQ2/Protocol/opcodes" + "git.sharkk.net/EQ2/Protocol/packets" + "github.com/panjf2000/gnet/v2" +) + +// Stream implements EQ2's reliable UDP protocol +type Stream struct { + conn gnet.Conn + mu sync.RWMutex + + // Connection state + state atomic.Value // StreamState + sessionID uint32 + crcKey uint32 + maxLen uint16 + encodeKey int + decodeKey int + + // Sequence management + seqOut uint16 + seqIn uint16 + seqLastAck uint16 + + // Acknowledgment tracking + pendingAcks map[uint16]*pendingPacket + ackTimer *time.Timer + lastAckSent time.Time + ackThreshold time.Duration + + // Fragment assembly + fragments map[uint16]*fragmentBuffer + nextFragID uint16 + + // Packet queues + reliableQueue []*packets.ProtoPacket + unreliableQueue []*packets.ProtoPacket + resendQueue []*pendingPacket + + // Opcode management + opcodeManager opcodes.Manager + opcodeSize uint8 + + // Cipher for encryption + cipher *crypto.Ciphers + + // Timers + keepAliveTimer *time.Timer + timeoutTimer *time.Timer + + // Stats + packetsOut uint64 + packetsIn uint64 + bytesOut uint64 + bytesIn uint64 + + // Callbacks + onPacket func(*packets.AppPacket) + onDisconnect func() +} + +type pendingPacket struct { + packet *packets.ProtoPacket + seq uint16 + sentTime time.Time + attempts int + nextRetry time.Time +} + +type fragmentBuffer struct { + totalSize uint32 + chunks map[uint16][]byte + received uint32 + startTime time.Time +} + +type StreamState int + +const ( + StateDisconnected StreamState = iota + StateConnecting + StateEstablished + StateClosing + StateClosed +) + +// Config holds stream configuration +type Config struct { + SessionID uint32 + CRCKey uint32 + MaxPacketSize uint16 + EncodeKey int + DecodeKey int + OpcodeManager opcodes.Manager + OpcodeSize uint8 + AckThreshold time.Duration + KeepaliveTime time.Duration + TimeoutDuration time.Duration +} + +// NewStream creates a new EQStream instance +func NewStream(conn gnet.Conn, cfg *Config) *Stream { + s := &Stream{ + conn: conn, + sessionID: cfg.SessionID, + crcKey: cfg.CRCKey, + maxLen: cfg.MaxPacketSize, + encodeKey: cfg.EncodeKey, + decodeKey: cfg.DecodeKey, + opcodeManager: cfg.OpcodeManager, + opcodeSize: cfg.OpcodeSize, + ackThreshold: cfg.AckThreshold, + pendingAcks: make(map[uint16]*pendingPacket), + fragments: make(map[uint16]*fragmentBuffer), + seqOut: 0, + seqIn: 0, + } + + if s.maxLen == 0 { + s.maxLen = packets.DefaultMTU + } + if s.ackThreshold == 0 { + s.ackThreshold = 200 * time.Millisecond + } + + s.state.Store(StateDisconnected) + + // Initialize cipher if keys provided + if cfg.EncodeKey != 0 || cfg.DecodeKey != 0 { + cipher, _ := crypto.NewCiphers(int64(cfg.EncodeKey)) + s.cipher = cipher + } + + // Start keepalive timer + if cfg.KeepaliveTime > 0 { + s.keepAliveTimer = time.AfterFunc(cfg.KeepaliveTime, s.sendKeepalive) + } + + return s +} + +// Process handles incoming data from gnet +func (s *Stream) Process(data []byte) error { + // Validate CRC + if len(data) < 2 { + return nil + } + + // Check for CRC (last 2 bytes) + if len(data) > 2 { + providedCRC := binary.BigEndian.Uint16(data[len(data)-2:]) + calculatedCRC := crypto.CalculateCRC(data[:len(data)-2], s.crcKey) + if providedCRC != calculatedCRC { + return nil // Drop packet with bad CRC + } + data = data[:len(data)-2] // Strip CRC + } + + // Decrypt if needed + if s.cipher != nil { + s.cipher.Decrypt(data) + } + + // Parse protocol opcode + if len(data) < 2 { + return nil + } + opcode := binary.BigEndian.Uint16(data[:2]) + + switch opcode { + case opcodes.OP_SessionRequest: + return s.handleSessionRequest(data[2:]) + case opcodes.OP_SessionResponse: + return s.handleSessionResponse(data[2:]) + case opcodes.OP_Packet: + return s.handlePacket(data[2:]) + case opcodes.OP_Fragment: + return s.handleFragment(data[2:]) + case opcodes.OP_Ack: + return s.handleAck(data[2:]) + case opcodes.OP_Combined: + return s.handleCombined(data[2:]) + case opcodes.OP_AppCombined: + return s.handleAppCombined(data[2:]) + case opcodes.OP_KeepAlive: + s.resetTimeout() + return nil + case opcodes.OP_SessionDisconnect: + return s.handleDisconnect() + } + + atomic.AddUint64(&s.packetsIn, 1) + atomic.AddUint64(&s.bytesIn, uint64(len(data))) + + return nil +} + +// handlePacket processes reliable packets +func (s *Stream) handlePacket(data []byte) error { + if len(data) < 2 { + return nil + } + + seq := binary.BigEndian.Uint16(data[:2]) + data = data[2:] + + s.mu.Lock() + defer s.mu.Unlock() + + // Check sequence + if seq != s.seqIn { + // Out of order - send immediate ACK + s.sendAckImmediate(seq) + return nil + } + + // Update sequence + s.seqIn++ + + // Schedule ACK + s.scheduleAck(seq) + + // Process packet data + return s.processPacketData(data) +} + +// handleFragment assembles fragmented packets +func (s *Stream) handleFragment(data []byte) error { + if len(data) < 8 { + return nil + } + + seq := binary.BigEndian.Uint16(data[:2]) + fragSeq := binary.BigEndian.Uint16(data[2:4]) + fragTotal := binary.BigEndian.Uint16(data[4:6]) + fragCur := binary.BigEndian.Uint16(data[6:8]) + data = data[8:] + + s.mu.Lock() + defer s.mu.Unlock() + + // Get or create fragment buffer + frag, exists := s.fragments[fragSeq] + if !exists { + frag = &fragmentBuffer{ + totalSize: uint32(fragTotal), + chunks: make(map[uint16][]byte), + startTime: time.Now(), + } + s.fragments[fragSeq] = frag + } + + // Store chunk + frag.chunks[fragCur] = append([]byte(nil), data...) + frag.received++ + + // Check if complete + if frag.received == uint32(fragTotal) { + // Reassemble + complete := make([]byte, 0) + for i := uint16(0); i < fragTotal; i++ { + if chunk, ok := frag.chunks[i]; ok { + complete = append(complete, chunk...) + } + } + delete(s.fragments, fragSeq) + + // Process reassembled packet + return s.processPacketData(complete) + } + + // Schedule ACK for fragment + s.scheduleAck(seq) + + return nil +} + +// handleAck processes acknowledgments +func (s *Stream) handleAck(data []byte) error { + if len(data) < 2 { + return nil + } + + ackSeq := binary.BigEndian.Uint16(data[:2]) + + s.mu.Lock() + defer s.mu.Unlock() + + // Remove from pending + if pending, exists := s.pendingAcks[ackSeq]; exists { + delete(s.pendingAcks, ackSeq) + // Update RTT stats here if needed + _ = pending + } + + // Update last ACK seen + if ackSeq > s.seqLastAck { + s.seqLastAck = ackSeq + } + + return nil +} + +// SendPacket sends an application packet +func (s *Stream) SendPacket(app *packets.AppPacket) error { + // Convert to protocol packet + proto := s.appToProto(app) + + // Check if needs fragmentation + if proto.Size() > uint32(s.maxLen) { + return s.sendFragmented(proto) + } + + // Add to appropriate queue + if app.Priority > packets.PriorityNormal { + s.mu.Lock() + s.reliableQueue = append(s.reliableQueue, proto) + s.mu.Unlock() + } else { + s.mu.Lock() + s.unreliableQueue = append(s.unreliableQueue, proto) + s.mu.Unlock() + } + + // Process queues + return s.processQueues() +} + +// sendReliable sends a reliable packet with sequence number +func (s *Stream) sendReliable(proto *packets.ProtoPacket) error { + s.mu.Lock() + seq := s.seqOut + s.seqOut++ + s.mu.Unlock() + + // Build packet with sequence + data := make([]byte, proto.Size()+2) + binary.BigEndian.PutUint16(data[:2], seq) + proto.Serialize(data[2:], 0) + + // Track for retransmission + pending := &pendingPacket{ + packet: proto, + seq: seq, + sentTime: time.Now(), + attempts: 1, + nextRetry: time.Now().Add(time.Second), + } + + s.mu.Lock() + s.pendingAcks[seq] = pending + s.mu.Unlock() + + // Send with protocol header + return s.sendRaw(opcodes.OP_Packet, data) +} + +// sendRaw sends raw data with protocol opcode +func (s *Stream) sendRaw(opcode uint16, data []byte) error { + // Build packet: opcode + data + CRC + packet := make([]byte, 2+len(data)+2) + binary.BigEndian.PutUint16(packet[:2], opcode) + copy(packet[2:], data) + + // Add CRC + crc := crypto.CalculateCRC(packet[:len(packet)-2], s.crcKey) + binary.BigEndian.PutUint16(packet[len(packet)-2:], crc) + + // Encrypt if needed + if s.cipher != nil { + s.cipher.Encrypt(packet) + } + + // Send via gnet + atomic.AddUint64(&s.packetsOut, 1) + atomic.AddUint64(&s.bytesOut, uint64(len(packet))) + + return s.conn.AsyncWrite(packet, nil) +} + +// Helper methods + +func (s *Stream) processPacketData(data []byte) error { + // Create ProtoPacket from raw data + proto := packets.NewProtoPacketFromRaw(data, -1, s.opcodeManager) + + // Decompress if needed + proto.DecompressPacket() + + // Convert to AppPacket + app := proto.MakeApplicationPacket(s.opcodeSize) + + // Deliver to callback + if s.onPacket != nil { + s.onPacket(app) + } + + return nil +} + +func (s *Stream) scheduleAck(seq uint16) { + if s.ackTimer == nil { + s.ackTimer = time.AfterFunc(s.ackThreshold, func() { + s.sendAckImmediate(seq) + }) + } +} + +func (s *Stream) sendAckImmediate(seq uint16) error { + data := make([]byte, 2) + binary.BigEndian.PutUint16(data, seq) + return s.sendRaw(opcodes.OP_Ack, data) +} + +func (s *Stream) sendKeepalive() { + s.sendRaw(opcodes.OP_KeepAlive, nil) + // Reschedule + if s.keepAliveTimer != nil { + s.keepAliveTimer.Reset(10 * time.Second) + } +} + +func (s *Stream) resetTimeout() { + if s.timeoutTimer != nil { + s.timeoutTimer.Reset(30 * time.Second) + } +} + +func (s *Stream) appToProto(app *packets.AppPacket) *packets.ProtoPacket { + proto := packets.NewProtoPacket(app.Opcode, app.Buffer, s.opcodeManager) + proto.CopyInfo(app.Packet) + return proto +} + +func (s *Stream) processQueues() error { + // Process reliable queue first + s.mu.Lock() + for len(s.reliableQueue) > 0 { + proto := s.reliableQueue[0] + s.reliableQueue = s.reliableQueue[1:] + s.mu.Unlock() + if err := s.sendReliable(proto); err != nil { + return err + } + s.mu.Lock() + } + s.mu.Unlock() + + return nil +} + +func (s *Stream) sendFragmented(proto *packets.ProtoPacket) error { + // Fragment implementation + return nil // TODO +} + +func (s *Stream) handleSessionRequest(data []byte) error { + // Session setup + s.state.Store(StateConnecting) + return nil +} + +func (s *Stream) handleSessionResponse(data []byte) error { + s.state.Store(StateEstablished) + return nil +} + +func (s *Stream) handleCombined(data []byte) error { + // Process combined packets + return nil +} + +func (s *Stream) handleAppCombined(data []byte) error { + // Process app-combined packets + return nil +} + +func (s *Stream) handleDisconnect() error { + s.state.Store(StateClosed) + if s.onDisconnect != nil { + s.onDisconnect() + } + return nil +} + +// SetPacketCallback sets the callback for received packets +func (s *Stream) SetPacketCallback(fn func(*packets.AppPacket)) { + s.onPacket = fn +} + +// Close closes the stream +func (s *Stream) Close() error { + s.state.Store(StateClosed) + // Send disconnect + s.sendRaw(opcodes.OP_SessionDisconnect, nil) + // Clean up timers + if s.keepAliveTimer != nil { + s.keepAliveTimer.Stop() + } + if s.ackTimer != nil { + s.ackTimer.Stop() + } + return nil +}