1
0
Protocol/stream_server.go
2025-09-02 12:30:36 -05:00

264 lines
5.9 KiB
Go

package eq2net
import (
"fmt"
"sync"
"time"
"github.com/panjf2000/gnet/v2"
)
// StreamServer manages multiple EQStream connections using gnet
type StreamServer struct {
gnet.BuiltinEventEngine
streams map[string]*EQStream // "IP:Port" -> Stream mapping
streamsMu sync.RWMutex
address string
port int
streamType StreamType
maxStreams int
onNewStream func(*EQStream)
onStreamClosed func(*EQStream)
onAppPacket func(*EQStream, *AppPacket)
totalConnections uint64
activeConnections uint32
statsMu sync.RWMutex
shutdown chan bool
}
// NewStreamServer creates a new EQ stream server
func NewStreamServer(address string, port int, streamType StreamType) *StreamServer {
return &StreamServer{
streams: make(map[string]*EQStream),
address: address,
port: port,
streamType: streamType,
maxStreams: 10000,
shutdown: make(chan bool),
}
}
// Start starts the stream server
func (s *StreamServer) Start() error {
addr := fmt.Sprintf("%s:%d", s.address, s.port)
// Configure gnet options
options := []gnet.Option{
gnet.WithMulticore(true),
gnet.WithReusePort(true),
gnet.WithTicker(true),
gnet.WithTCPKeepAlive(time.Minute * 5),
}
// Start gnet server
return gnet.Run(s, fmt.Sprintf("udp://%s", addr), options...)
}
// Stop stops the stream server
func (s *StreamServer) Stop() {
close(s.shutdown)
// Close all active streams
s.streamsMu.Lock()
for _, stream := range s.streams {
stream.Close()
}
s.streams = make(map[string]*EQStream)
s.streamsMu.Unlock()
}
// OnBoot is called when the server starts
func (s *StreamServer) OnBoot(eng gnet.Engine) gnet.Action {
fmt.Printf("EQStream server started on %s:%d\n", s.address, s.port)
return gnet.None
}
// OnShutdown is called when the server stops
func (s *StreamServer) OnShutdown(eng gnet.Engine) {
fmt.Printf("EQStream server shutdown\n")
}
// OnOpen is called when a new connection is established
func (s *StreamServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
// Create new stream
stream := NewEQStream(c)
stream.streamType = s.streamType
// Set up callbacks
stream.onAppPacket = func(p *AppPacket) {
if s.onAppPacket != nil {
s.onAppPacket(stream, p)
}
}
stream.onDisconnect = func() {
s.removeStream(stream)
if s.onStreamClosed != nil {
s.onStreamClosed(stream)
}
}
// Store stream in connection context
c.SetContext(stream)
// Add to stream map
key := s.getStreamKey(c)
s.streamsMu.Lock()
s.streams[key] = stream
s.activeConnections++
s.totalConnections++
s.streamsMu.Unlock()
// Call new stream callback
if s.onNewStream != nil {
s.onNewStream(stream)
}
return nil, gnet.None
}
// OnClose is called when a connection is closed
func (s *StreamServer) OnClose(c gnet.Conn, err error) (action gnet.Action) {
if stream, ok := c.Context().(*EQStream); ok {
stream.Close()
s.removeStream(stream)
if s.onStreamClosed != nil {
s.onStreamClosed(stream)
}
}
return gnet.None
}
// OnTraffic is called when data is received
func (s *StreamServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
stream, ok := c.Context().(*EQStream)
if !ok {
return gnet.Close
}
// Read all available data
for {
// Peek at the data to determine packet size
buf, err := c.Peek(-1)
if err != nil || len(buf) < 2 {
break
}
// Parse packet length from opcode and data
// For UDP, each datagram is a complete packet
packet, err := ParseProtocolPacket(buf)
if err != nil {
// Invalid packet, skip it
c.Discard(len(buf))
continue
}
// Consume the packet data
c.Discard(len(buf))
// Process packet
if err := stream.ProcessPacket(packet); err != nil {
// Log error but continue processing
fmt.Printf("Error processing packet: %v\n", err)
}
}
return gnet.None
}
// OnTick is called periodically for maintenance tasks
func (s *StreamServer) OnTick() (delay time.Duration, action gnet.Action) {
// Check for timeouts and retransmissions
s.streamsMu.RLock()
streams := make([]*EQStream, 0, len(s.streams))
for _, stream := range s.streams {
streams = append(streams, stream)
}
s.streamsMu.RUnlock()
// Process each stream
for _, stream := range streams {
// Check for timeout
if stream.CheckTimeout() {
s.removeStream(stream)
continue
}
// Check for retransmissions
stream.CheckRetransmissions()
}
// Return delay until next tick (100ms)
return time.Millisecond * 100, gnet.None
}
// Helper methods
func (s *StreamServer) getStreamKey(c gnet.Conn) string {
if addr := c.RemoteAddr(); addr != nil {
return addr.String()
}
return ""
}
func (s *StreamServer) removeStream(stream *EQStream) {
// Find and remove stream from map
s.streamsMu.Lock()
for key, str := range s.streams {
if str == stream {
delete(s.streams, key)
s.activeConnections--
break
}
}
s.streamsMu.Unlock()
}
// SetOnNewStream sets the callback for new streams
func (s *StreamServer) SetOnNewStream(callback func(*EQStream)) {
s.onNewStream = callback
}
// SetOnStreamClosed sets the callback for closed streams
func (s *StreamServer) SetOnStreamClosed(callback func(*EQStream)) {
s.onStreamClosed = callback
}
// SetOnAppPacket sets the callback for application packets
func (s *StreamServer) SetOnAppPacket(callback func(*EQStream, *AppPacket)) {
s.onAppPacket = callback
}
// GetStream returns a stream by remote address
func (s *StreamServer) GetStream(address string) *EQStream {
s.streamsMu.RLock()
defer s.streamsMu.RUnlock()
return s.streams[address]
}
// GetActiveStreams returns all active streams
func (s *StreamServer) GetActiveStreams() []*EQStream {
s.streamsMu.RLock()
defer s.streamsMu.RUnlock()
streams := make([]*EQStream, 0, len(s.streams))
for _, stream := range s.streams {
streams = append(streams, stream)
}
return streams
}
// GetStats returns server statistics
func (s *StreamServer) GetStats() (total uint64, active uint32) {
s.statsMu.RLock()
defer s.statsMu.RUnlock()
return s.totalConnections, s.activeConnections
}