From 87acbb402a1eb752686deaec902f16334710e7df Mon Sep 17 00:00:00 2001 From: Sky Johnson Date: Thu, 20 Mar 2025 14:12:03 -0500 Subject: [PATCH] optimizations 1 --- core/http/server.go | 66 +++++++++----------- core/logger/logger.go | 119 ++++++++++++++++++++++++++++++++----- core/logger/logger_test.go | 106 ++++++++++++++++++++++++--------- moonshark.go | 6 +- 4 files changed, 214 insertions(+), 83 deletions(-) diff --git a/core/http/server.go b/core/http/server.go index 8d8b873..a89cfe5 100644 --- a/core/http/server.go +++ b/core/http/server.go @@ -28,29 +28,15 @@ func New(luaRouter *routers.LuaRouter, staticRouter *routers.StaticRouter, runne staticRouter: staticRouter, luaRunner: runner, logger: log, - httpServer: &http.Server{ - // Connection timeouts - ReadTimeout: 30 * time.Second, - WriteTimeout: 30 * time.Second, - IdleTimeout: 120 * time.Second, - ReadHeaderTimeout: 10 * time.Second, - - // Improved connection handling - MaxHeaderBytes: 1 << 16, // 64KB - }, + httpServer: &http.Server{}, } server.httpServer.Handler = server - // Set TCP keep-alive settings for the underlying TCP connections + // Set TCP keep-alive for connections server.httpServer.ConnState = func(conn net.Conn, state http.ConnState) { if state == http.StateNew { if tcpConn, ok := conn.(*net.TCPConn); ok { - // Enable TCP keep-alive tcpConn.SetKeepAlive(true) - tcpConn.SetKeepAlivePeriod(30 * time.Second) - - // Set TCP_NODELAY (disable Nagle's algorithm) - tcpConn.SetNoDelay(true) } } } @@ -124,7 +110,17 @@ func (s *Server) handleLuaRoute(w http.ResponseWriter, r *http.Request, bytecode ctx.Set("method", r.Method) ctx.Set("path", r.URL.Path) ctx.Set("host", r.Host) - ctx.Set("headers", makeHeaderMap(r.Header)) + + // Inline the header conversion (previously makeHeaderMap) + headerMap := make(map[string]any, len(r.Header)) + for name, values := range r.Header { + if len(values) == 1 { + headerMap[name] = values[0] + } else { + headerMap[name] = values + } + } + ctx.Set("headers", headerMap) // Add URL parameters if params.Count > 0 { @@ -135,12 +131,11 @@ func (s *Server) handleLuaRoute(w http.ResponseWriter, r *http.Request, bytecode ctx.Set("params", paramMap) } - // Add query parameters - if queryParams := QueryToLua(r); queryParams != nil { - ctx.Set("query", queryParams) - } + // Query parameters will be parsed lazily via metatable in Lua + // Instead of parsing for every request, we'll pass the raw URL + ctx.Set("rawQuery", r.URL.RawQuery) - // Add form data + // Add form data for POST/PUT/PATCH only when needed if r.Method == http.MethodPost || r.Method == http.MethodPut || r.Method == http.MethodPatch { if formData, err := ParseForm(r); err == nil && len(formData) > 0 { ctx.Set("form", formData) @@ -158,18 +153,11 @@ func (s *Server) handleLuaRoute(w http.ResponseWriter, r *http.Request, bytecode writeResponse(w, result, s.logger) } -// makeHeaderMap converts HTTP headers to a map -func makeHeaderMap(header http.Header) map[string]any { - result := make(map[string]any, len(header)) - for name, values := range header { - if len(values) == 1 { - result[name] = values[0] - } else { - result[name] = values - } - } - return result -} +// Content types for responses +const ( + contentTypeJSON = "application/json" + contentTypePlain = "text/plain" +) // writeResponse writes the Lua result to the HTTP response func writeResponse(w http.ResponseWriter, result any, log *logger.Logger) { @@ -181,12 +169,12 @@ func writeResponse(w http.ResponseWriter, result any, log *logger.Logger) { switch res := result.(type) { case string: // String result - w.Header().Set("Content-Type", "text/plain") + w.Header().Set("Content-Type", contentTypePlain) w.Write([]byte(res)) - case map[string]any: - // Table result - convert to JSON - w.Header().Set("Content-Type", "application/json") + case map[string]any, []any: + // Table or array result - convert to JSON + w.Header().Set("Content-Type", contentTypeJSON) data, err := json.Marshal(res) if err != nil { log.Error("Failed to marshal response: %v", err) @@ -197,7 +185,7 @@ func writeResponse(w http.ResponseWriter, result any, log *logger.Logger) { default: // Other result types - convert to JSON - w.Header().Set("Content-Type", "application/json") + w.Header().Set("Content-Type", contentTypeJSON) data, err := json.Marshal(result) if err != nil { log.Error("Failed to marshal response: %v", err) diff --git a/core/logger/logger.go b/core/logger/logger.go index 247907c..16e214d 100644 --- a/core/logger/logger.go +++ b/core/logger/logger.go @@ -5,6 +5,7 @@ import ( "io" "os" "sync" + "sync/atomic" "time" ) @@ -47,6 +48,12 @@ var levelProps = map[int]struct { // Time format for log messages const timeFormat = "15:04:05" +// Default rate limiting settings +const ( + defaultMaxLogs = 1000 // Max logs per second before rate limiting + defaultRateLimitTime = 10 * time.Second // How long to pause during rate limiting +) + // Logger handles logging operations type Logger struct { writer io.Writer @@ -54,16 +61,39 @@ type Logger struct { useColors bool timeFormat string mu sync.Mutex // Mutex for thread-safe writing + + // Simple rate limiting + logCount atomic.Int64 // Number of logs in current window + logCountStart atomic.Int64 // Start time of current counting window + rateLimited atomic.Bool // Whether we're currently rate limited + rateLimitUntil atomic.Int64 // Timestamp when rate limiting ends + maxLogsPerSec int64 // Maximum logs per second before limiting + limitDuration time.Duration // How long to pause logging when rate limited } // New creates a new logger func New(minLevel int, useColors bool) *Logger { - return &Logger{ - writer: os.Stdout, - level: minLevel, - useColors: useColors, - timeFormat: timeFormat, + logger := &Logger{ + writer: os.Stdout, + level: minLevel, + useColors: useColors, + timeFormat: timeFormat, + maxLogsPerSec: defaultMaxLogs, + limitDuration: defaultRateLimitTime, } + + // Initialize counters + logger.resetCounters() + + return logger +} + +// resetCounters resets the rate limiting counters +func (l *Logger) resetCounters() { + l.logCount.Store(0) + l.logCountStart.Store(time.Now().Unix()) + l.rateLimited.Store(false) + l.rateLimitUntil.Store(0) } // SetOutput changes the output destination @@ -119,29 +149,85 @@ func (l *Logger) writeMessage(level int, message string, rawMode bool) { } } - // Asynchronously write the log message - go func(w io.Writer, data string) { - l.mu.Lock() - _, _ = fmt.Fprint(w, data) - l.mu.Unlock() - }(l.writer, logLine) + // Synchronously write the log message + l.mu.Lock() + _, _ = fmt.Fprint(l.writer, logLine) - // For fatal errors, ensure we sync immediately in the current goroutine + // For fatal errors, ensure we sync immediately if level == LevelFatal { - l.mu.Lock() if f, ok := l.writer.(*os.File); ok { _ = f.Sync() } - l.mu.Unlock() } + l.mu.Unlock() +} + +// checkRateLimit checks if we should rate limit logging +// Returns true if the message should be logged, false if it should be dropped +func (l *Logger) checkRateLimit(level int) bool { + // High priority messages are never rate limited + if level >= LevelWarning { + return true + } + + // Check if we're currently in a rate-limited period + if l.rateLimited.Load() { + now := time.Now().Unix() + limitUntil := l.rateLimitUntil.Load() + + if now >= limitUntil { + // Rate limiting period is over + l.rateLimited.Store(false) + l.resetCounters() + } else { + // Still in rate limiting period, drop the message + return false + } + } + + // If not rate limited, check if we should start rate limiting + count := l.logCount.Add(1) + + // Check if we need to reset the counter for a new second + now := time.Now().Unix() + start := l.logCountStart.Load() + if now > start { + // New second, reset counter + l.logCount.Store(1) // Count this message + l.logCountStart.Store(now) + return true + } + + // Check if we've exceeded our threshold + if count > l.maxLogsPerSec { + // Start rate limiting + l.rateLimited.Store(true) + l.rateLimitUntil.Store(now + int64(l.limitDuration.Seconds())) + + // Log a warning about rate limiting + l.writeMessage(LevelServer, + fmt.Sprintf("Rate limiting logger temporarily due to high demand (%d logs/sec exceeded)", count), + false) + + return false + } + + return true } // log handles the core logging logic with level filtering func (l *Logger) log(level int, format string, args ...any) { + // First check normal level filtering if level < l.level { return } + // Check rate limiting - always log high priority messages + if !l.checkRateLimit(level) { + return + } + + // Format message var message string if len(args) > 0 { message = fmt.Sprintf(format, args...) @@ -164,6 +250,11 @@ func (l *Logger) LogRaw(format string, args ...any) { return } + // Check rate limiting + if !l.checkRateLimit(LevelInfo) { + return + } + var message string if len(args) > 0 { message = fmt.Sprintf(format, args...) diff --git a/core/logger/logger_test.go b/core/logger/logger_test.go index 4b944f3..e41dffa 100644 --- a/core/logger/logger_test.go +++ b/core/logger/logger_test.go @@ -2,7 +2,6 @@ package logger import ( "bytes" - "strconv" "strings" "sync" "testing" @@ -16,36 +15,31 @@ func TestLoggerLevels(t *testing.T) { // Debug should be below threshold logger.Debug("This should not appear") - time.Sleep(10 * time.Millisecond) // Wait for processing if buf.Len() > 0 { t.Error("Debug message appeared when it should be filtered") } // Info and above should appear logger.Info("Info message") - time.Sleep(10 * time.Millisecond) // Wait for processing - if !strings.Contains(buf.String(), "[INF]") { + if !strings.Contains(buf.String(), "INFO") { t.Errorf("Info message not logged, got: %q", buf.String()) } buf.Reset() logger.Warning("Warning message") - time.Sleep(10 * time.Millisecond) // Wait for processing - if !strings.Contains(buf.String(), "[WRN]") { + if !strings.Contains(buf.String(), "WARN") { t.Errorf("Warning message not logged, got: %q", buf.String()) } buf.Reset() logger.Error("Error message") - time.Sleep(10 * time.Millisecond) // Wait for processing - if !strings.Contains(buf.String(), "[ERR]") { + if !strings.Contains(buf.String(), "ERROR") { t.Errorf("Error message not logged, got: %q", buf.String()) } buf.Reset() // Test format strings logger.Info("Count: %d", 42) - time.Sleep(10 * time.Millisecond) // Wait for processing if !strings.Contains(buf.String(), "Count: 42") { t.Errorf("Formatted message not logged correctly, got: %q", buf.String()) } @@ -60,17 +54,53 @@ func TestLoggerLevels(t *testing.T) { } logger.Error("Error should appear") - time.Sleep(10 * time.Millisecond) // Wait for processing - if !strings.Contains(buf.String(), "[ERR]") { + if !strings.Contains(buf.String(), "ERROR") { t.Errorf("Error message not logged after level change, got: %q", buf.String()) } } +func TestLoggerRateLimit(t *testing.T) { + var buf bytes.Buffer + logger := New(LevelDebug, false) + logger.SetOutput(&buf) + + // Override max logs per second to something small for testing + logger.maxLogsPerSec = 5 + logger.limitDuration = 1 * time.Second + + // Send debug messages (should get limited) + for i := 0; i < 20; i++ { + logger.Debug("Debug message %d", i) + } + + // Error messages should always go through + logger.Error("Error message should appear") + + content := buf.String() + + // We should see some debug messages, then a warning about rate limiting, + // and finally the error message + if !strings.Contains(content, "Debug message 0") { + t.Error("First debug message should appear") + } + + if !strings.Contains(content, "Rate limiting logger") { + t.Error("Rate limiting message should appear") + } + + if !strings.Contains(content, "ERROR") { + t.Error("Error message should always appear despite rate limiting") + } +} + func TestLoggerConcurrency(t *testing.T) { var buf bytes.Buffer logger := New(LevelDebug, false) logger.SetOutput(&buf) + // Increase log threshold for this test + logger.maxLogsPerSec = 1000 + // Log a bunch of messages concurrently var wg sync.WaitGroup for i := 0; i < 100; i++ { @@ -82,17 +112,10 @@ func TestLoggerConcurrency(t *testing.T) { } wg.Wait() - // Wait for processing - time.Sleep(10 * time.Millisecond) - - // Check all messages were logged + // Check logs were processed content := buf.String() - for i := 0; i < 100; i++ { - msg := "Concurrent message " + strconv.Itoa(i) - if !strings.Contains(content, msg) && !strings.Contains(content, "Concurrent message") { - t.Errorf("Missing concurrent messages") - break - } + if !strings.Contains(content, "Concurrent message") { + t.Error("Concurrent messages should appear") } } @@ -103,7 +126,6 @@ func TestLoggerColors(t *testing.T) { // Test with color logger.Info("Colored message") - time.Sleep(10 * time.Millisecond) // Wait for processing content := buf.String() t.Logf("Colored output: %q", content) // Print actual output for diagnosis @@ -114,7 +136,6 @@ func TestLoggerColors(t *testing.T) { buf.Reset() logger.DisableColors() logger.Info("Non-colored message") - time.Sleep(10 * time.Millisecond) // Wait for processing content = buf.String() if strings.Contains(content, "\033[") { @@ -127,10 +148,9 @@ func TestDefaultLogger(t *testing.T) { SetOutput(&buf) Info("Test default logger") - time.Sleep(10 * time.Millisecond) // Wait for processing content := buf.String() - if !strings.Contains(content, "[INF]") { + if !strings.Contains(content, "INFO") { t.Errorf("Default logger not working, got: %q", content) } } @@ -140,23 +160,55 @@ func BenchmarkLogger(b *testing.B) { logger := New(LevelInfo, false) logger.SetOutput(&buf) + // Set very high threshold to avoid rate limiting during benchmark + logger.maxLogsPerSec = int64(b.N + 1) + b.ResetTimer() for i := 0; i < b.N; i++ { logger.Info("Benchmark message %d", i) } } +func BenchmarkLoggerWithRateLimit(b *testing.B) { + var buf bytes.Buffer + logger := New(LevelDebug, false) + logger.SetOutput(&buf) + + // Set threshold to allow about 10% of messages through + logger.maxLogsPerSec = int64(b.N / 10) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + logger.Debug("Benchmark message %d", i) + } +} + func BenchmarkLoggerParallel(b *testing.B) { var buf bytes.Buffer - logger := New(LevelInfo, false) + logger := New(LevelDebug, false) logger.SetOutput(&buf) + // Set very high threshold to avoid rate limiting during benchmark + logger.maxLogsPerSec = int64(b.N + 1) + b.ResetTimer() b.RunParallel(func(pb *testing.PB) { i := 0 for pb.Next() { - logger.Info("Parallel benchmark message %d", i) + logger.Debug("Parallel benchmark message %d", i) i++ } }) } + +func BenchmarkProductionLevels(b *testing.B) { + var buf bytes.Buffer + logger := New(LevelWarning, false) // Only log warnings and above + logger.SetOutput(&buf) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + // This should be filtered out before any processing + logger.Debug("Debug message that won't be logged %d", i) + } +} diff --git a/moonshark.go b/moonshark.go index 60d23c9..bf00de4 100644 --- a/moonshark.go +++ b/moonshark.go @@ -85,7 +85,7 @@ func main() { log.Fatal("Router initialization failed: %v", err) } - if cfg.GetBool("watchers", false) { + if cfg.GetBool("watchers", true) { // Set up file watchers for automatic reloading luaWatcher, err := watchers.WatchLuaRouter(luaRouter, routesDir, log) if err != nil { @@ -104,10 +104,10 @@ func main() { } } - // Get buffer size from config or use default (used to be worker pool size) + // Get buffer size from config or use default bufferSize := cfg.GetInt("buffer_size", 20) - // Initialize Lua runner (replacing worker pool) + // Initialize Lua runner runner, err := runner.NewRunner( runner.WithBufferSize(bufferSize), runner.WithLibDirs("./libs"),