package eq2net import ( "fmt" "sync" "time" "github.com/panjf2000/gnet/v2" ) // StreamServer manages multiple EQStream connections using gnet type StreamServer struct { gnet.BuiltinEventEngine streams map[string]*EQStream // "IP:Port" -> Stream mapping streamsMu sync.RWMutex address string port int streamType StreamType maxStreams int onNewStream func(*EQStream) onStreamClosed func(*EQStream) onEmuPacket func(*EQStream, *EQPacket) totalConnections uint64 activeConnections uint32 statsMu sync.RWMutex shutdown chan bool } // NewStreamServer creates a new EQ stream server func NewStreamServer(address string, port int, streamType StreamType) *StreamServer { return &StreamServer{ streams: make(map[string]*EQStream), address: address, port: port, streamType: streamType, maxStreams: 10000, shutdown: make(chan bool), } } // Start starts the stream server func (s *StreamServer) Start() error { addr := fmt.Sprintf("%s:%d", s.address, s.port) // Configure gnet options options := []gnet.Option{ gnet.WithMulticore(true), gnet.WithReusePort(true), gnet.WithTicker(true), gnet.WithTCPKeepAlive(time.Minute * 5), } // Start gnet server return gnet.Run(s, fmt.Sprintf("udp://%s", addr), options...) } // Stop stops the stream server func (s *StreamServer) Stop() { close(s.shutdown) // Close all active streams s.streamsMu.Lock() for _, stream := range s.streams { stream.Close() } s.streams = make(map[string]*EQStream) s.streamsMu.Unlock() } // OnBoot is called when the server starts func (s *StreamServer) OnBoot(eng gnet.Engine) gnet.Action { fmt.Printf("EQStream server started on %s:%d\n", s.address, s.port) return gnet.None } // OnShutdown is called when the server stops func (s *StreamServer) OnShutdown(eng gnet.Engine) { fmt.Printf("EQStream server shutdown\n") } // OnOpen is called when a new connection is established func (s *StreamServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) { // Create new stream stream := NewEQStream(c) stream.streamType = s.streamType // Set up callbacks stream.onEmuPacket = func(p *EQPacket) { if s.onEmuPacket != nil { s.onEmuPacket(stream, p) } } stream.onDisconnect = func() { s.removeStream(stream) if s.onStreamClosed != nil { s.onStreamClosed(stream) } } // Store stream in connection context c.SetContext(stream) // Add to stream map key := s.getStreamKey(c) s.streamsMu.Lock() s.streams[key] = stream s.activeConnections++ s.totalConnections++ s.streamsMu.Unlock() // Call new stream callback if s.onNewStream != nil { s.onNewStream(stream) } return nil, gnet.None } // OnClose is called when a connection is closed func (s *StreamServer) OnClose(c gnet.Conn, err error) (action gnet.Action) { if stream, ok := c.Context().(*EQStream); ok { stream.Close() s.removeStream(stream) if s.onStreamClosed != nil { s.onStreamClosed(stream) } } return gnet.None } // OnTraffic is called when data is received func (s *StreamServer) OnTraffic(c gnet.Conn) (action gnet.Action) { stream, ok := c.Context().(*EQStream) if !ok { return gnet.Close } // Read all available data for { // Peek at the data to determine packet size buf, err := c.Peek(-1) if err != nil || len(buf) < 2 { break } // Parse packet length from opcode and data // For UDP, each datagram is a complete packet packet, err := ParseProtocolPacket(buf) if err != nil { // Invalid packet, skip it c.Discard(len(buf)) continue } // Consume the packet data c.Discard(len(buf)) // Process packet if err := stream.ProcessPacket(packet); err != nil { // Log error but continue processing fmt.Printf("Error processing packet: %v\n", err) } } return gnet.None } // OnTick is called periodically for maintenance tasks func (s *StreamServer) OnTick() (delay time.Duration, action gnet.Action) { // Check for timeouts and retransmissions s.streamsMu.RLock() streams := make([]*EQStream, 0, len(s.streams)) for _, stream := range s.streams { streams = append(streams, stream) } s.streamsMu.RUnlock() // Process each stream for _, stream := range streams { // Check for timeout if stream.CheckTimeout() { s.removeStream(stream) continue } // Check for retransmissions stream.CheckRetransmissions() } // Return delay until next tick (100ms) return time.Millisecond * 100, gnet.None } // Helper methods func (s *StreamServer) getStreamKey(c gnet.Conn) string { if addr := c.RemoteAddr(); addr != nil { return addr.String() } return "" } func (s *StreamServer) removeStream(stream *EQStream) { // Find and remove stream from map s.streamsMu.Lock() for key, str := range s.streams { if str == stream { delete(s.streams, key) s.activeConnections-- break } } s.streamsMu.Unlock() } // SetOnNewStream sets the callback for new streams func (s *StreamServer) SetOnNewStream(callback func(*EQStream)) { s.onNewStream = callback } // SetOnStreamClosed sets the callback for closed streams func (s *StreamServer) SetOnStreamClosed(callback func(*EQStream)) { s.onStreamClosed = callback } // SetOnEmuPacket sets the callback for emulator packets func (s *StreamServer) SetOnEmuPacket(callback func(*EQStream, *EQPacket)) { s.onEmuPacket = callback } // GetStream returns a stream by remote address func (s *StreamServer) GetStream(address string) *EQStream { s.streamsMu.RLock() defer s.streamsMu.RUnlock() return s.streams[address] } // GetActiveStreams returns all active streams func (s *StreamServer) GetActiveStreams() []*EQStream { s.streamsMu.RLock() defer s.streamsMu.RUnlock() streams := make([]*EQStream, 0, len(s.streams)) for _, stream := range s.streams { streams = append(streams, stream) } return streams } // GetStats returns server statistics func (s *StreamServer) GetStats() (total uint64, active uint32) { s.statsMu.RLock() defer s.statsMu.RUnlock() return s.totalConnections, s.activeConnections }