diff --git a/Moonshark.go b/Moonshark.go index ef2236b..2bea636 100644 --- a/Moonshark.go +++ b/Moonshark.go @@ -6,6 +6,7 @@ import ( "fmt" "os" "os/signal" + "runtime" "syscall" "time" @@ -151,7 +152,7 @@ func main() { routesDir := cfg.GetString("routes_dir", "./routes") staticDir := cfg.GetString("static_dir", "./static") overrideDir := cfg.GetString("override_dir", "./override") - bufferSize := cfg.GetInt("buffer_size", 20) + poolSize := cfg.GetInt("pool_size", runtime.NumCPU()) if err := utils.EnsureDir(overrideDir); err != nil { logger.Warning("Override directory doesn't exist, and could not create it: %v", err) @@ -181,16 +182,23 @@ func main() { logger.Fatal("Router initialization failed: %v", err) } - // Initialize Lua runner - luaRunner, err := runner.NewRunner( - runner.WithBufferSize(bufferSize), + // Initialize Lua runner with options + runnerOpts := []runner.RunnerOption{ + runner.WithPoolSize(poolSize), runner.WithLibDirs(libDirs...), - runner.WithDebugEnabled(), - ) + } + + // Add debug option conditionally + if debugMode { + runnerOpts = append(runnerOpts, runner.WithDebugEnabled()) + logger.Debug("Debug logging enabled for Lua runner") + } + + luaRunner, err := runner.NewRunner(runnerOpts...) if err != nil { logger.Fatal("Failed to initialize Lua runner: %v", err) } - logger.Server("Lua runner initialized with buffer size %d", bufferSize) + logger.Server("Lua runner initialized with pool size %d", poolSize) defer luaRunner.Close() // Set up file watchers if enabled diff --git a/core/config/Config.go b/core/config/Config.go index 50e67a2..8059130 100644 --- a/core/config/Config.go +++ b/core/config/Config.go @@ -3,6 +3,7 @@ package config import ( "errors" "fmt" + "runtime" luajit "git.sharkk.net/Sky/LuaJIT-to-Go" ) @@ -22,6 +23,7 @@ type Config struct { // Performance settings BufferSize int + PoolSize int // Number of Lua states in the pool // Feature flags HTTPLoggingEnabled bool @@ -46,7 +48,7 @@ func New() *Config { LibDirs: []string{"./libs"}, // Performance defaults - BufferSize: 20, + PoolSize: runtime.NumCPU(), // Feature flag defaults HTTPLoggingEnabled: true, @@ -233,9 +235,9 @@ func processConfigValue(state *luajit.State, config *Config, key string, valueTy if strVal, ok := value.(string); ok { config.OverrideDir = strVal } - case "buffer_size": + case "pool_size": if numVal, ok := value.(float64); ok { - config.BufferSize = int(numVal) + config.PoolSize = int(numVal) } case "http_logging_enabled": if boolVal, ok := value.(bool); ok { @@ -357,6 +359,8 @@ func (c *Config) GetInt(key string, defaultValue int) int { return c.Port case "buffer_size": return c.BufferSize + case "pool_size": + return c.PoolSize } // Fall back to values map for other keys @@ -608,11 +612,11 @@ func (c *Config) Set(key string, value any) { if strVal, ok := value.(string); ok { c.OverrideDir = strVal } - case "buffer_size": + case "pool_size": if numVal, ok := value.(float64); ok { - c.BufferSize = int(numVal) + c.PoolSize = int(numVal) } else if intVal, ok := value.(int); ok { - c.BufferSize = intVal + c.PoolSize = intVal } case "http_logging_enabled": if boolVal, ok := value.(bool); ok { diff --git a/core/runner/Job.go b/core/runner/Job.go deleted file mode 100644 index 2040016..0000000 --- a/core/runner/Job.go +++ /dev/null @@ -1,15 +0,0 @@ -package runner - -// JobResult represents the result of a Lua script execution -type JobResult struct { - Value any // Return value from Lua - Error error // Error if any -} - -// job represents a Lua script execution request -type job struct { - Bytecode []byte // Compiled LuaJIT bytecode - Context *Context // Execution context - ScriptPath string // Path to the original script (for require resolution) - Result chan<- JobResult // Channel to send result back -} diff --git a/core/runner/LuaRunner.go b/core/runner/LuaRunner.go index a632133..f478d46 100644 --- a/core/runner/LuaRunner.go +++ b/core/runner/LuaRunner.go @@ -24,34 +24,36 @@ type StateInitFunc func(*luajit.State) error // RunnerOption defines a functional option for configuring the LuaRunner type RunnerOption func(*LuaRunner) -// Result channel pool to reduce allocations -var resultChanPool = sync.Pool{ - New: func() interface{} { - return make(chan JobResult, 1) - }, +// JobResult represents the result of a Lua script execution +type JobResult struct { + Value any // Return value from Lua + Error error // Error if any } -// LuaRunner runs Lua scripts using multiple Lua states in a round-robin fashion +// StateWrapper wraps a Lua state with its sandbox +type StateWrapper struct { + state *luajit.State // The Lua state + sandbox *Sandbox // Associated sandbox + index int // Index for debugging +} + +// LuaRunner runs Lua scripts using a pool of Lua states type LuaRunner struct { - states []*luajit.State // Multiple Lua states for parallel execution - jobQueues []chan job // Each state has its own job queue - workerCount int // Number of worker states (default 4) - nextWorker int32 // Atomic counter for round-robin distribution + states []*StateWrapper // Pool of Lua states + stateSem chan int // Semaphore with state indexes + poolSize int // Size of the state pool + initFunc StateInitFunc // Optional function to initialize Lua states + moduleLoader *NativeModuleLoader // Native module loader for require isRunning atomic.Bool // Flag indicating if the runner is active mu sync.RWMutex // Mutex for thread safety - wg sync.WaitGroup // WaitGroup for clean shutdown - initFunc StateInitFunc // Optional function to initialize Lua states - bufferSize int // Size of each job queue buffer - moduleLoader *NativeModuleLoader // Native module loader for require - sandboxes []*Sandbox // Sandbox for each state debug bool // Enable debug logging } -// WithBufferSize sets the job queue buffer size -func WithBufferSize(size int) RunnerOption { +// WithPoolSize sets the state pool size +func WithPoolSize(size int) RunnerOption { return func(r *LuaRunner) { if size > 0 { - r.bufferSize = size + r.poolSize = size } } } @@ -83,22 +85,12 @@ func WithDebugEnabled() RunnerOption { } } -// WithWorkerCount sets the number of worker states (min 1) -func WithWorkerCount(count int) RunnerOption { - return func(r *LuaRunner) { - if count > 0 { - r.workerCount = count - } - } -} - -// NewRunner creates a new LuaRunner with multiple worker states +// NewRunner creates a new LuaRunner with a pool of states func NewRunner(options ...RunnerOption) (*LuaRunner, error) { // Default configuration runner := &LuaRunner{ - bufferSize: 10, - workerCount: runtime.GOMAXPROCS(0), - debug: false, + poolSize: runtime.GOMAXPROCS(0), // Default pool size to number of CPUs + debug: false, } // Apply options @@ -106,11 +98,6 @@ func NewRunner(options ...RunnerOption) (*LuaRunner, error) { opt(runner) } - // Initialize states and job queues - runner.states = make([]*luajit.State, runner.workerCount) - runner.jobQueues = make([]chan job, runner.workerCount) - runner.sandboxes = make([]*Sandbox, runner.workerCount) - // Set up module loader if not already initialized if runner.moduleLoader == nil { requireConfig := &RequireConfig{ @@ -120,28 +107,23 @@ func NewRunner(options ...RunnerOption) (*LuaRunner, error) { runner.moduleLoader = NewNativeModuleLoader(requireConfig) } - // Create job queues and initialize states - for i := 0; i < runner.workerCount; i++ { - // Create job queue - runner.jobQueues[i] = make(chan job, runner.bufferSize) + // Initialize states and semaphore + runner.states = make([]*StateWrapper, runner.poolSize) + runner.stateSem = make(chan int, runner.poolSize) - // Create sandbox - runner.sandboxes[i] = NewSandbox() - - // Initialize state - if err := runner.initState(i, true); err != nil { - // Clean up if initialization fails - runner.Close() + // Create and initialize all states + for i := 0; i < runner.poolSize; i++ { + wrapper, err := runner.initState(i) + if err != nil { + runner.Close() // Clean up already created states return nil, err } - // Start worker goroutine - runner.wg.Add(1) - go runner.processJobs(i) + runner.states[i] = wrapper + runner.stateSem <- i // Add index to semaphore } runner.isRunning.Store(true) - runner.nextWorker = 0 return runner, nil } @@ -153,222 +135,158 @@ func (r *LuaRunner) debugLog(format string, args ...interface{}) { } } -// initState initializes or reinitializes a specific Lua state -func (r *LuaRunner) initState(workerIndex int, initial bool) error { - r.debugLog("Initializing Lua state %d (initial=%v)", workerIndex, initial) +// initState creates and initializes a new state +func (r *LuaRunner) initState(index int) (*StateWrapper, error) { + r.debugLog("Initializing Lua state %d", index) - // Clean up existing state if there is one - if r.states[workerIndex] != nil { - r.debugLog("Cleaning up existing state %d", workerIndex) - // Always call Cleanup before Close to properly free function pointers - r.states[workerIndex].Cleanup() - r.states[workerIndex].Close() - r.states[workerIndex] = nil - } - - // Create fresh state + // Create a new state state := luajit.New() if state == nil { - return errors.New("failed to create Lua state") + return nil, errors.New("failed to create Lua state") + } + r.debugLog("Created new Lua state %d", index) + + // Create sandbox + sandbox := NewSandbox() + if r.debug { + sandbox.EnableDebug() } - r.debugLog("Created new Lua state %d", workerIndex) // Set up require paths and mechanism if err := r.moduleLoader.SetupRequire(state); err != nil { - r.debugLog("Failed to set up require for state %d: %v", workerIndex, err) + r.debugLog("Failed to set up require for state %d: %v", index, err) state.Cleanup() state.Close() - return ErrInitFailed + return nil, ErrInitFailed } - r.debugLog("Require system initialized for state %d", workerIndex) + r.debugLog("Require system initialized for state %d", index) // Initialize all core modules from the registry if err := GlobalRegistry.Initialize(state); err != nil { - r.debugLog("Failed to initialize core modules for state %d: %v", workerIndex, err) + r.debugLog("Failed to initialize core modules for state %d: %v", index, err) state.Cleanup() state.Close() - return ErrInitFailed + return nil, ErrInitFailed } - r.debugLog("Core modules initialized for state %d", workerIndex) - - // Check if http module is properly registered - testResult, err := state.ExecuteWithResult(` - if type(http) == "table" and type(http.client) == "table" and - type(http.client.get) == "function" then - return true - else - return false - end - `) - if err != nil || testResult != true { - r.debugLog("HTTP module verification failed for state %d: %v, result: %v", workerIndex, err, testResult) - } else { - r.debugLog("HTTP module verified OK for state %d", workerIndex) - } - - // Verify __http_request function - testResult, _ = state.ExecuteWithResult(`return type(__http_request)`) - r.debugLog("__http_request function for state %d is of type: %v", workerIndex, testResult) + r.debugLog("Core modules initialized for state %d", index) // Set up sandbox after core modules are initialized - if err := r.sandboxes[workerIndex].Setup(state); err != nil { - r.debugLog("Failed to set up sandbox for state %d: %v", workerIndex, err) + if err := sandbox.Setup(state); err != nil { + r.debugLog("Failed to set up sandbox for state %d: %v", index, err) state.Cleanup() state.Close() - return ErrInitFailed + return nil, ErrInitFailed } - r.debugLog("Sandbox environment set up for state %d", workerIndex) + r.debugLog("Sandbox environment set up for state %d", index) // Preload all modules into package.loaded if err := r.moduleLoader.PreloadAllModules(state); err != nil { - r.debugLog("Failed to preload modules for state %d: %v", workerIndex, err) + r.debugLog("Failed to preload modules for state %d: %v", index, err) state.Cleanup() state.Close() - return errors.New("failed to preload modules") + return nil, errors.New("failed to preload modules") } - r.debugLog("All modules preloaded for state %d", workerIndex) + r.debugLog("All modules preloaded for state %d", index) // Run init function if provided if r.initFunc != nil { if err := r.initFunc(state); err != nil { - r.debugLog("Custom init function failed for state %d: %v", workerIndex, err) + r.debugLog("Custom init function failed for state %d: %v", index, err) state.Cleanup() state.Close() - return ErrInitFailed + return nil, ErrInitFailed } - r.debugLog("Custom init function completed for state %d", workerIndex) + r.debugLog("Custom init function completed for state %d", index) } - // Test for HTTP module again after full initialization - testResult, err = state.ExecuteWithResult(` - if type(http) == "table" and type(http.client) == "table" and - type(http.client.get) == "function" then - return true - else - return false - end - `) - if err != nil || testResult != true { - r.debugLog("Final HTTP module verification failed for state %d: %v, result: %v", workerIndex, err, testResult) - } else { - r.debugLog("Final HTTP module verification OK for state %d", workerIndex) - } + r.debugLog("State %d initialization complete", index) - r.states[workerIndex] = state - r.debugLog("State %d initialization complete", workerIndex) - return nil -} - -// processJobs handles the job queue for a specific worker -func (r *LuaRunner) processJobs(workerIndex int) { - defer r.wg.Done() - defer func() { - if r.states[workerIndex] != nil { - r.debugLog("Cleaning up Lua state %d in processJobs", workerIndex) - r.states[workerIndex].Cleanup() - r.states[workerIndex].Close() - r.states[workerIndex] = nil - } - }() - - for job := range r.jobQueues[workerIndex] { - // Execute the job and send result - result := r.executeJob(workerIndex, job) - select { - case job.Result <- result: - // Result sent successfully - default: - // Result channel closed or full, discard the result - } - } -} - -// executeJob runs a script in the sandbox environment -func (r *LuaRunner) executeJob(workerIndex int, j job) JobResult { - // If the job has a script path, update script dir for module resolution - if j.ScriptPath != "" { - r.mu.Lock() - r.moduleLoader.config.ScriptDir = filepath.Dir(j.ScriptPath) - r.mu.Unlock() - } - - // Convert context for sandbox - var ctx map[string]any - if j.Context != nil { - ctx = j.Context.Values - } - - r.mu.RLock() - state := r.states[workerIndex] - sandbox := r.sandboxes[workerIndex] - r.mu.RUnlock() - - if state == nil { - return JobResult{nil, errors.New("lua state is not initialized")} - } - - // Execute in sandbox - value, err := sandbox.Execute(state, j.Bytecode, ctx) - return JobResult{value, err} + return &StateWrapper{ + state: state, + sandbox: sandbox, + index: index, + }, nil } // RunWithContext executes a Lua script with context and timeout func (r *LuaRunner) RunWithContext(ctx context.Context, bytecode []byte, execCtx *Context, scriptPath string) (any, error) { - r.mu.RLock() if !r.isRunning.Load() { - r.mu.RUnlock() return nil, ErrRunnerClosed } - r.mu.RUnlock() - // Get a result channel from the pool - resultChanInterface := resultChanPool.Get() - resultChan := resultChanInterface.(chan JobResult) + // Create a result channel + resultChan := make(chan JobResult, 1) - // Make sure to clear any previous results + // Get a state index with timeout + var stateIndex int select { - case <-resultChan: - // Drain the channel if it has a value - default: - // Channel is already empty - } - - j := job{ - Bytecode: bytecode, - Context: execCtx, - ScriptPath: scriptPath, - Result: resultChan, - } - - // Choose worker in round-robin fashion - workerIndex := int(atomic.AddInt32(&r.nextWorker, 1) % int32(r.workerCount)) - - // Submit job with context - select { - case r.jobQueues[workerIndex] <- j: - // Job submitted - r.debugLog("Job submitted to worker %d", workerIndex) + case stateIndex = <-r.stateSem: + // Got a state case <-ctx.Done(): - // Return the channel to the pool before exiting - resultChanPool.Put(resultChan) return nil, ctx.Err() } + // Launch a goroutine to execute the job + go func() { + // Make sure to return the state to the pool when done + defer func() { + // Only return if runner is still open + if r.isRunning.Load() { + select { + case r.stateSem <- stateIndex: + // State returned to pool + default: + // Pool is full or closed (shouldn't happen) + } + } + }() + + // Execute the job + var result JobResult + + r.mu.RLock() + state := r.states[stateIndex] + r.mu.RUnlock() + + if state == nil { + result = JobResult{nil, errors.New("state is not initialized")} + } else { + // Set script directory for module resolution + if scriptPath != "" { + r.mu.Lock() + r.moduleLoader.config.ScriptDir = filepath.Dir(scriptPath) + r.mu.Unlock() + } + + // Convert context + var ctxMap map[string]any + if execCtx != nil { + ctxMap = execCtx.Values + } + + // Execute in sandbox + value, err := state.sandbox.Execute(state.state, bytecode, ctxMap) + result = JobResult{value, err} + } + + // Send result + select { + case resultChan <- result: + // Result sent + default: + // Result channel closed or full (shouldn't happen with buffered channel) + } + }() + // Wait for result with context - var result JobResult select { - case result = <-resultChan: - // Got result + case result := <-resultChan: + return result.Value, result.Error case <-ctx.Done(): - // Return the channel to the pool before exiting - resultChanPool.Put(resultChan) + // Note: we can't cancel the Lua execution, but we can stop waiting for it + // The state will be returned to the pool when the goroutine completes return nil, ctx.Err() } - - // Return the channel to the pool - resultChanPool.Put(resultChan) - - return result.Value, result.Error } // Run executes a Lua script @@ -387,15 +305,26 @@ func (r *LuaRunner) Close() error { r.isRunning.Store(false) - // Close all job queues - for i := 0; i < r.workerCount; i++ { - if r.jobQueues[i] != nil { - close(r.jobQueues[i]) + // Drain the semaphore (non-blocking) + for { + select { + case <-r.stateSem: + // Drained one slot + default: + // Empty + goto drained } } +drained: - // Wait for all workers to finish - r.wg.Wait() + // Clean up all states + for i := 0; i < len(r.states); i++ { + if r.states[i] != nil { + r.states[i].state.Cleanup() + r.states[i].state.Close() + r.states[i] = nil + } + } return nil } @@ -407,15 +336,54 @@ func (r *LuaRunner) NotifyFileChanged(filePath string) bool { r.mu.Lock() defer r.mu.Unlock() - // Reset all states on file changes + // Check if runner is closed + if !r.isRunning.Load() { + return false + } + + // Create a new semaphore + newSem := make(chan int, cap(r.stateSem)) + + // Drain the current semaphore (non-blocking) + for { + select { + case <-r.stateSem: + // Drained one slot + default: + // Empty + goto drained + } + } +drained: + + r.stateSem = newSem + + // Reinitialize all states success := true - for i := 0; i < r.workerCount; i++ { - err := r.initState(i, false) + for i := 0; i < len(r.states); i++ { + // Clean up old state + if r.states[i] != nil { + r.states[i].state.Cleanup() + r.states[i].state.Close() + } + + // Initialize new state + wrapper, err := r.initState(i) if err != nil { r.debugLog("Failed to reinitialize state %d: %v", i, err) success = false - } else { - r.debugLog("State %d successfully reinitialized", i) + r.states[i] = nil + continue + } + + r.states[i] = wrapper + + // Add to semaphore + select { + case newSem <- i: + // Added to semaphore + default: + // Semaphore full (shouldn't happen) } } @@ -429,9 +397,9 @@ func (r *LuaRunner) ResetModuleCache() { r.mu.RLock() defer r.mu.RUnlock() - for i := 0; i < r.workerCount; i++ { - if r.states[i] != nil { - r.moduleLoader.ResetModules(r.states[i]) + for i := 0; i < len(r.states); i++ { + if r.states[i] != nil && r.states[i].state != nil { + r.moduleLoader.ResetModules(r.states[i].state) } } } @@ -444,9 +412,9 @@ func (r *LuaRunner) ReloadAllModules() error { r.mu.RLock() defer r.mu.RUnlock() - for i := 0; i < r.workerCount; i++ { - if r.states[i] != nil { - if err := r.moduleLoader.PreloadAllModules(r.states[i]); err != nil { + for i := 0; i < len(r.states); i++ { + if r.states[i] != nil && r.states[i].state != nil { + if err := r.moduleLoader.PreloadAllModules(r.states[i].state); err != nil { return err } } @@ -461,10 +429,10 @@ func (r *LuaRunner) RefreshModuleByName(modName string) bool { defer r.mu.RUnlock() success := true - for i := 0; i < r.workerCount; i++ { - if r.states[i] != nil { + for i := 0; i < len(r.states); i++ { + if r.states[i] != nil && r.states[i].state != nil { r.debugLog("Refreshing module %s in state %d", modName, i) - if err := r.states[i].DoString(`package.loaded["` + modName + `"] = nil`); err != nil { + if err := r.states[i].state.DoString(`package.loaded["` + modName + `"] = nil`); err != nil { success = false } } @@ -475,8 +443,13 @@ func (r *LuaRunner) RefreshModuleByName(modName string) bool { // AddModule adds a module to all sandbox environments func (r *LuaRunner) AddModule(name string, module any) { r.debugLog("Adding module %s to all sandboxes", name) - for i := 0; i < r.workerCount; i++ { - r.sandboxes[i].AddModule(name, module) + r.mu.RLock() + defer r.mu.RUnlock() + + for i := 0; i < len(r.states); i++ { + if r.states[i] != nil && r.states[i].sandbox != nil { + r.states[i].sandbox.AddModule(name, module) + } } } @@ -487,19 +460,22 @@ func (r *LuaRunner) GetModuleCount() int { count := 0 - // Get module count from the first Lua state - if r.states[0] != nil { - // Execute a Lua snippet to count modules - if res, err := r.states[0].ExecuteWithResult(` - local count = 0 - for _ in pairs(package.loaded) do - count = count + 1 - end - return count - `); err == nil { - if num, ok := res.(float64); ok { - count = int(num) + // Get count from the first available state + for i := 0; i < len(r.states); i++ { + if r.states[i] != nil && r.states[i].state != nil { + // Execute a Lua snippet to count modules + if res, err := r.states[i].state.ExecuteWithResult(` + local count = 0 + for _ in pairs(package.loaded) do + count = count + 1 + end + return count + `); err == nil { + if num, ok := res.(float64); ok { + count = int(num) + } } + break } } diff --git a/core/utils/Debug.go b/core/utils/Debug.go index 4c5491f..0cfc768 100644 --- a/core/utils/Debug.go +++ b/core/utils/Debug.go @@ -129,6 +129,7 @@ table tr:nth-child(even), tbody tr:nth-child(even) { background-color: rgba(0, 0 Active Routes{{.Components.RouteCount}} Bytecode Size{{ByteCount .Components.BytecodeBytes}} Loaded Modules{{.Components.ModuleCount}} + State Pool Size{{.Config.PoolSize}} @@ -138,17 +139,27 @@ table tr:nth-child(even), tbody tr:nth-child(even) { background-color: rgba(0, 0
+ - - - - - - - - + + + + + + + +
Port{{.Config.Port}}
Pool Size{{.Config.PoolSize}}
Debug Mode{{.Config.Debug}}
Log Level{{.Config.LogLevel}}
Routes Directory{{.Config.RoutesDir}}
Static Directory{{.Config.StaticDir}}
Override Directory{{.Config.OverrideDir}}
Buffer Size{{.Config.BufferSize}}
HTTP Logging{{.Config.HTTPLoggingEnabled}}
Lib Directories{{range .Config.LibDirs}}{{.}}
{{end}}
Watch Routes{{index .Config.Watchers "routes"}}
Watch Static{{index .Config.Watchers "static"}}
Watch Modules{{index .Config.Watchers "modules"}}
Directories +
Routes: {{.Config.RoutesDir}}
+
Static: {{.Config.StaticDir}}
+
Override: {{.Config.OverrideDir}}
+
Libs: {{range .Config.LibDirs}}{{.}}, {{end}}
+
Watchers +
Routes: {{index .Config.Watchers "routes"}}
+
Static: {{index .Config.Watchers "static"}}
+
Modules: {{index .Config.Watchers "modules"}}
+
diff --git a/luajit b/luajit index 13686b3..0756cab 160000 --- a/luajit +++ b/luajit @@ -1 +1 @@ -Subproject commit 13686b3e66b388a31d459fe95d1aa3bfa05aeb27 +Subproject commit 0756cabcaaf1e33f2b8eb535e5a24e448c2501a9