optimizations 1
This commit is contained in:
parent
fc57a03a8e
commit
87acbb402a
|
@ -28,29 +28,15 @@ func New(luaRouter *routers.LuaRouter, staticRouter *routers.StaticRouter, runne
|
|||
staticRouter: staticRouter,
|
||||
luaRunner: runner,
|
||||
logger: log,
|
||||
httpServer: &http.Server{
|
||||
// Connection timeouts
|
||||
ReadTimeout: 30 * time.Second,
|
||||
WriteTimeout: 30 * time.Second,
|
||||
IdleTimeout: 120 * time.Second,
|
||||
ReadHeaderTimeout: 10 * time.Second,
|
||||
|
||||
// Improved connection handling
|
||||
MaxHeaderBytes: 1 << 16, // 64KB
|
||||
},
|
||||
httpServer: &http.Server{},
|
||||
}
|
||||
server.httpServer.Handler = server
|
||||
|
||||
// Set TCP keep-alive settings for the underlying TCP connections
|
||||
// Set TCP keep-alive for connections
|
||||
server.httpServer.ConnState = func(conn net.Conn, state http.ConnState) {
|
||||
if state == http.StateNew {
|
||||
if tcpConn, ok := conn.(*net.TCPConn); ok {
|
||||
// Enable TCP keep-alive
|
||||
tcpConn.SetKeepAlive(true)
|
||||
tcpConn.SetKeepAlivePeriod(30 * time.Second)
|
||||
|
||||
// Set TCP_NODELAY (disable Nagle's algorithm)
|
||||
tcpConn.SetNoDelay(true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -124,7 +110,17 @@ func (s *Server) handleLuaRoute(w http.ResponseWriter, r *http.Request, bytecode
|
|||
ctx.Set("method", r.Method)
|
||||
ctx.Set("path", r.URL.Path)
|
||||
ctx.Set("host", r.Host)
|
||||
ctx.Set("headers", makeHeaderMap(r.Header))
|
||||
|
||||
// Inline the header conversion (previously makeHeaderMap)
|
||||
headerMap := make(map[string]any, len(r.Header))
|
||||
for name, values := range r.Header {
|
||||
if len(values) == 1 {
|
||||
headerMap[name] = values[0]
|
||||
} else {
|
||||
headerMap[name] = values
|
||||
}
|
||||
}
|
||||
ctx.Set("headers", headerMap)
|
||||
|
||||
// Add URL parameters
|
||||
if params.Count > 0 {
|
||||
|
@ -135,12 +131,11 @@ func (s *Server) handleLuaRoute(w http.ResponseWriter, r *http.Request, bytecode
|
|||
ctx.Set("params", paramMap)
|
||||
}
|
||||
|
||||
// Add query parameters
|
||||
if queryParams := QueryToLua(r); queryParams != nil {
|
||||
ctx.Set("query", queryParams)
|
||||
}
|
||||
// Query parameters will be parsed lazily via metatable in Lua
|
||||
// Instead of parsing for every request, we'll pass the raw URL
|
||||
ctx.Set("rawQuery", r.URL.RawQuery)
|
||||
|
||||
// Add form data
|
||||
// Add form data for POST/PUT/PATCH only when needed
|
||||
if r.Method == http.MethodPost || r.Method == http.MethodPut || r.Method == http.MethodPatch {
|
||||
if formData, err := ParseForm(r); err == nil && len(formData) > 0 {
|
||||
ctx.Set("form", formData)
|
||||
|
@ -158,18 +153,11 @@ func (s *Server) handleLuaRoute(w http.ResponseWriter, r *http.Request, bytecode
|
|||
writeResponse(w, result, s.logger)
|
||||
}
|
||||
|
||||
// makeHeaderMap converts HTTP headers to a map
|
||||
func makeHeaderMap(header http.Header) map[string]any {
|
||||
result := make(map[string]any, len(header))
|
||||
for name, values := range header {
|
||||
if len(values) == 1 {
|
||||
result[name] = values[0]
|
||||
} else {
|
||||
result[name] = values
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
// Content types for responses
|
||||
const (
|
||||
contentTypeJSON = "application/json"
|
||||
contentTypePlain = "text/plain"
|
||||
)
|
||||
|
||||
// writeResponse writes the Lua result to the HTTP response
|
||||
func writeResponse(w http.ResponseWriter, result any, log *logger.Logger) {
|
||||
|
@ -181,12 +169,12 @@ func writeResponse(w http.ResponseWriter, result any, log *logger.Logger) {
|
|||
switch res := result.(type) {
|
||||
case string:
|
||||
// String result
|
||||
w.Header().Set("Content-Type", "text/plain")
|
||||
w.Header().Set("Content-Type", contentTypePlain)
|
||||
w.Write([]byte(res))
|
||||
|
||||
case map[string]any:
|
||||
// Table result - convert to JSON
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
case map[string]any, []any:
|
||||
// Table or array result - convert to JSON
|
||||
w.Header().Set("Content-Type", contentTypeJSON)
|
||||
data, err := json.Marshal(res)
|
||||
if err != nil {
|
||||
log.Error("Failed to marshal response: %v", err)
|
||||
|
@ -197,7 +185,7 @@ func writeResponse(w http.ResponseWriter, result any, log *logger.Logger) {
|
|||
|
||||
default:
|
||||
// Other result types - convert to JSON
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Header().Set("Content-Type", contentTypeJSON)
|
||||
data, err := json.Marshal(result)
|
||||
if err != nil {
|
||||
log.Error("Failed to marshal response: %v", err)
|
||||
|
|
|
@ -5,6 +5,7 @@ import (
|
|||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
@ -47,6 +48,12 @@ var levelProps = map[int]struct {
|
|||
// Time format for log messages
|
||||
const timeFormat = "15:04:05"
|
||||
|
||||
// Default rate limiting settings
|
||||
const (
|
||||
defaultMaxLogs = 1000 // Max logs per second before rate limiting
|
||||
defaultRateLimitTime = 10 * time.Second // How long to pause during rate limiting
|
||||
)
|
||||
|
||||
// Logger handles logging operations
|
||||
type Logger struct {
|
||||
writer io.Writer
|
||||
|
@ -54,16 +61,39 @@ type Logger struct {
|
|||
useColors bool
|
||||
timeFormat string
|
||||
mu sync.Mutex // Mutex for thread-safe writing
|
||||
|
||||
// Simple rate limiting
|
||||
logCount atomic.Int64 // Number of logs in current window
|
||||
logCountStart atomic.Int64 // Start time of current counting window
|
||||
rateLimited atomic.Bool // Whether we're currently rate limited
|
||||
rateLimitUntil atomic.Int64 // Timestamp when rate limiting ends
|
||||
maxLogsPerSec int64 // Maximum logs per second before limiting
|
||||
limitDuration time.Duration // How long to pause logging when rate limited
|
||||
}
|
||||
|
||||
// New creates a new logger
|
||||
func New(minLevel int, useColors bool) *Logger {
|
||||
return &Logger{
|
||||
logger := &Logger{
|
||||
writer: os.Stdout,
|
||||
level: minLevel,
|
||||
useColors: useColors,
|
||||
timeFormat: timeFormat,
|
||||
maxLogsPerSec: defaultMaxLogs,
|
||||
limitDuration: defaultRateLimitTime,
|
||||
}
|
||||
|
||||
// Initialize counters
|
||||
logger.resetCounters()
|
||||
|
||||
return logger
|
||||
}
|
||||
|
||||
// resetCounters resets the rate limiting counters
|
||||
func (l *Logger) resetCounters() {
|
||||
l.logCount.Store(0)
|
||||
l.logCountStart.Store(time.Now().Unix())
|
||||
l.rateLimited.Store(false)
|
||||
l.rateLimitUntil.Store(0)
|
||||
}
|
||||
|
||||
// SetOutput changes the output destination
|
||||
|
@ -119,29 +149,85 @@ func (l *Logger) writeMessage(level int, message string, rawMode bool) {
|
|||
}
|
||||
}
|
||||
|
||||
// Asynchronously write the log message
|
||||
go func(w io.Writer, data string) {
|
||||
// Synchronously write the log message
|
||||
l.mu.Lock()
|
||||
_, _ = fmt.Fprint(w, data)
|
||||
l.mu.Unlock()
|
||||
}(l.writer, logLine)
|
||||
_, _ = fmt.Fprint(l.writer, logLine)
|
||||
|
||||
// For fatal errors, ensure we sync immediately in the current goroutine
|
||||
// For fatal errors, ensure we sync immediately
|
||||
if level == LevelFatal {
|
||||
l.mu.Lock()
|
||||
if f, ok := l.writer.(*os.File); ok {
|
||||
_ = f.Sync()
|
||||
}
|
||||
l.mu.Unlock()
|
||||
}
|
||||
l.mu.Unlock()
|
||||
}
|
||||
|
||||
// checkRateLimit checks if we should rate limit logging
|
||||
// Returns true if the message should be logged, false if it should be dropped
|
||||
func (l *Logger) checkRateLimit(level int) bool {
|
||||
// High priority messages are never rate limited
|
||||
if level >= LevelWarning {
|
||||
return true
|
||||
}
|
||||
|
||||
// Check if we're currently in a rate-limited period
|
||||
if l.rateLimited.Load() {
|
||||
now := time.Now().Unix()
|
||||
limitUntil := l.rateLimitUntil.Load()
|
||||
|
||||
if now >= limitUntil {
|
||||
// Rate limiting period is over
|
||||
l.rateLimited.Store(false)
|
||||
l.resetCounters()
|
||||
} else {
|
||||
// Still in rate limiting period, drop the message
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// If not rate limited, check if we should start rate limiting
|
||||
count := l.logCount.Add(1)
|
||||
|
||||
// Check if we need to reset the counter for a new second
|
||||
now := time.Now().Unix()
|
||||
start := l.logCountStart.Load()
|
||||
if now > start {
|
||||
// New second, reset counter
|
||||
l.logCount.Store(1) // Count this message
|
||||
l.logCountStart.Store(now)
|
||||
return true
|
||||
}
|
||||
|
||||
// Check if we've exceeded our threshold
|
||||
if count > l.maxLogsPerSec {
|
||||
// Start rate limiting
|
||||
l.rateLimited.Store(true)
|
||||
l.rateLimitUntil.Store(now + int64(l.limitDuration.Seconds()))
|
||||
|
||||
// Log a warning about rate limiting
|
||||
l.writeMessage(LevelServer,
|
||||
fmt.Sprintf("Rate limiting logger temporarily due to high demand (%d logs/sec exceeded)", count),
|
||||
false)
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
// log handles the core logging logic with level filtering
|
||||
func (l *Logger) log(level int, format string, args ...any) {
|
||||
// First check normal level filtering
|
||||
if level < l.level {
|
||||
return
|
||||
}
|
||||
|
||||
// Check rate limiting - always log high priority messages
|
||||
if !l.checkRateLimit(level) {
|
||||
return
|
||||
}
|
||||
|
||||
// Format message
|
||||
var message string
|
||||
if len(args) > 0 {
|
||||
message = fmt.Sprintf(format, args...)
|
||||
|
@ -164,6 +250,11 @@ func (l *Logger) LogRaw(format string, args ...any) {
|
|||
return
|
||||
}
|
||||
|
||||
// Check rate limiting
|
||||
if !l.checkRateLimit(LevelInfo) {
|
||||
return
|
||||
}
|
||||
|
||||
var message string
|
||||
if len(args) > 0 {
|
||||
message = fmt.Sprintf(format, args...)
|
||||
|
|
|
@ -2,7 +2,6 @@ package logger
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
@ -16,36 +15,31 @@ func TestLoggerLevels(t *testing.T) {
|
|||
|
||||
// Debug should be below threshold
|
||||
logger.Debug("This should not appear")
|
||||
time.Sleep(10 * time.Millisecond) // Wait for processing
|
||||
if buf.Len() > 0 {
|
||||
t.Error("Debug message appeared when it should be filtered")
|
||||
}
|
||||
|
||||
// Info and above should appear
|
||||
logger.Info("Info message")
|
||||
time.Sleep(10 * time.Millisecond) // Wait for processing
|
||||
if !strings.Contains(buf.String(), "[INF]") {
|
||||
if !strings.Contains(buf.String(), "INFO") {
|
||||
t.Errorf("Info message not logged, got: %q", buf.String())
|
||||
}
|
||||
buf.Reset()
|
||||
|
||||
logger.Warning("Warning message")
|
||||
time.Sleep(10 * time.Millisecond) // Wait for processing
|
||||
if !strings.Contains(buf.String(), "[WRN]") {
|
||||
if !strings.Contains(buf.String(), "WARN") {
|
||||
t.Errorf("Warning message not logged, got: %q", buf.String())
|
||||
}
|
||||
buf.Reset()
|
||||
|
||||
logger.Error("Error message")
|
||||
time.Sleep(10 * time.Millisecond) // Wait for processing
|
||||
if !strings.Contains(buf.String(), "[ERR]") {
|
||||
if !strings.Contains(buf.String(), "ERROR") {
|
||||
t.Errorf("Error message not logged, got: %q", buf.String())
|
||||
}
|
||||
buf.Reset()
|
||||
|
||||
// Test format strings
|
||||
logger.Info("Count: %d", 42)
|
||||
time.Sleep(10 * time.Millisecond) // Wait for processing
|
||||
if !strings.Contains(buf.String(), "Count: 42") {
|
||||
t.Errorf("Formatted message not logged correctly, got: %q", buf.String())
|
||||
}
|
||||
|
@ -60,17 +54,53 @@ func TestLoggerLevels(t *testing.T) {
|
|||
}
|
||||
|
||||
logger.Error("Error should appear")
|
||||
time.Sleep(10 * time.Millisecond) // Wait for processing
|
||||
if !strings.Contains(buf.String(), "[ERR]") {
|
||||
if !strings.Contains(buf.String(), "ERROR") {
|
||||
t.Errorf("Error message not logged after level change, got: %q", buf.String())
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoggerRateLimit(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
logger := New(LevelDebug, false)
|
||||
logger.SetOutput(&buf)
|
||||
|
||||
// Override max logs per second to something small for testing
|
||||
logger.maxLogsPerSec = 5
|
||||
logger.limitDuration = 1 * time.Second
|
||||
|
||||
// Send debug messages (should get limited)
|
||||
for i := 0; i < 20; i++ {
|
||||
logger.Debug("Debug message %d", i)
|
||||
}
|
||||
|
||||
// Error messages should always go through
|
||||
logger.Error("Error message should appear")
|
||||
|
||||
content := buf.String()
|
||||
|
||||
// We should see some debug messages, then a warning about rate limiting,
|
||||
// and finally the error message
|
||||
if !strings.Contains(content, "Debug message 0") {
|
||||
t.Error("First debug message should appear")
|
||||
}
|
||||
|
||||
if !strings.Contains(content, "Rate limiting logger") {
|
||||
t.Error("Rate limiting message should appear")
|
||||
}
|
||||
|
||||
if !strings.Contains(content, "ERROR") {
|
||||
t.Error("Error message should always appear despite rate limiting")
|
||||
}
|
||||
}
|
||||
|
||||
func TestLoggerConcurrency(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
logger := New(LevelDebug, false)
|
||||
logger.SetOutput(&buf)
|
||||
|
||||
// Increase log threshold for this test
|
||||
logger.maxLogsPerSec = 1000
|
||||
|
||||
// Log a bunch of messages concurrently
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < 100; i++ {
|
||||
|
@ -82,17 +112,10 @@ func TestLoggerConcurrency(t *testing.T) {
|
|||
}
|
||||
wg.Wait()
|
||||
|
||||
// Wait for processing
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
|
||||
// Check all messages were logged
|
||||
// Check logs were processed
|
||||
content := buf.String()
|
||||
for i := 0; i < 100; i++ {
|
||||
msg := "Concurrent message " + strconv.Itoa(i)
|
||||
if !strings.Contains(content, msg) && !strings.Contains(content, "Concurrent message") {
|
||||
t.Errorf("Missing concurrent messages")
|
||||
break
|
||||
}
|
||||
if !strings.Contains(content, "Concurrent message") {
|
||||
t.Error("Concurrent messages should appear")
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -103,7 +126,6 @@ func TestLoggerColors(t *testing.T) {
|
|||
|
||||
// Test with color
|
||||
logger.Info("Colored message")
|
||||
time.Sleep(10 * time.Millisecond) // Wait for processing
|
||||
|
||||
content := buf.String()
|
||||
t.Logf("Colored output: %q", content) // Print actual output for diagnosis
|
||||
|
@ -114,7 +136,6 @@ func TestLoggerColors(t *testing.T) {
|
|||
buf.Reset()
|
||||
logger.DisableColors()
|
||||
logger.Info("Non-colored message")
|
||||
time.Sleep(10 * time.Millisecond) // Wait for processing
|
||||
|
||||
content = buf.String()
|
||||
if strings.Contains(content, "\033[") {
|
||||
|
@ -127,10 +148,9 @@ func TestDefaultLogger(t *testing.T) {
|
|||
SetOutput(&buf)
|
||||
|
||||
Info("Test default logger")
|
||||
time.Sleep(10 * time.Millisecond) // Wait for processing
|
||||
|
||||
content := buf.String()
|
||||
if !strings.Contains(content, "[INF]") {
|
||||
if !strings.Contains(content, "INFO") {
|
||||
t.Errorf("Default logger not working, got: %q", content)
|
||||
}
|
||||
}
|
||||
|
@ -140,23 +160,55 @@ func BenchmarkLogger(b *testing.B) {
|
|||
logger := New(LevelInfo, false)
|
||||
logger.SetOutput(&buf)
|
||||
|
||||
// Set very high threshold to avoid rate limiting during benchmark
|
||||
logger.maxLogsPerSec = int64(b.N + 1)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
logger.Info("Benchmark message %d", i)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkLoggerWithRateLimit(b *testing.B) {
|
||||
var buf bytes.Buffer
|
||||
logger := New(LevelDebug, false)
|
||||
logger.SetOutput(&buf)
|
||||
|
||||
// Set threshold to allow about 10% of messages through
|
||||
logger.maxLogsPerSec = int64(b.N / 10)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
logger.Debug("Benchmark message %d", i)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkLoggerParallel(b *testing.B) {
|
||||
var buf bytes.Buffer
|
||||
logger := New(LevelInfo, false)
|
||||
logger := New(LevelDebug, false)
|
||||
logger.SetOutput(&buf)
|
||||
|
||||
// Set very high threshold to avoid rate limiting during benchmark
|
||||
logger.maxLogsPerSec = int64(b.N + 1)
|
||||
|
||||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
i := 0
|
||||
for pb.Next() {
|
||||
logger.Info("Parallel benchmark message %d", i)
|
||||
logger.Debug("Parallel benchmark message %d", i)
|
||||
i++
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func BenchmarkProductionLevels(b *testing.B) {
|
||||
var buf bytes.Buffer
|
||||
logger := New(LevelWarning, false) // Only log warnings and above
|
||||
logger.SetOutput(&buf)
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
// This should be filtered out before any processing
|
||||
logger.Debug("Debug message that won't be logged %d", i)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -85,7 +85,7 @@ func main() {
|
|||
log.Fatal("Router initialization failed: %v", err)
|
||||
}
|
||||
|
||||
if cfg.GetBool("watchers", false) {
|
||||
if cfg.GetBool("watchers", true) {
|
||||
// Set up file watchers for automatic reloading
|
||||
luaWatcher, err := watchers.WatchLuaRouter(luaRouter, routesDir, log)
|
||||
if err != nil {
|
||||
|
@ -104,10 +104,10 @@ func main() {
|
|||
}
|
||||
}
|
||||
|
||||
// Get buffer size from config or use default (used to be worker pool size)
|
||||
// Get buffer size from config or use default
|
||||
bufferSize := cfg.GetInt("buffer_size", 20)
|
||||
|
||||
// Initialize Lua runner (replacing worker pool)
|
||||
// Initialize Lua runner
|
||||
runner, err := runner.NewRunner(
|
||||
runner.WithBufferSize(bufferSize),
|
||||
runner.WithLibDirs("./libs"),
|
||||
|
|
Loading…
Reference in New Issue
Block a user