package runner import ( "context" "errors" "path/filepath" "runtime" "sync" "sync/atomic" luajit "git.sharkk.net/Sky/LuaJIT-to-Go" "git.sharkk.net/Sky/Moonshark/core/logger" ) // Common errors var ( ErrRunnerClosed = errors.New("lua runner is closed") ErrInitFailed = errors.New("initialization failed") ) // StateInitFunc is a function that initializes a Lua state type StateInitFunc func(*luajit.State) error // RunnerOption defines a functional option for configuring the LuaRunner type RunnerOption func(*LuaRunner) // Result channel pool to reduce allocations var resultChanPool = sync.Pool{ New: func() interface{} { return make(chan JobResult, 1) }, } // LuaRunner runs Lua scripts using multiple Lua states in a round-robin fashion type LuaRunner struct { states []*luajit.State // Multiple Lua states for parallel execution jobQueues []chan job // Each state has its own job queue workerCount int // Number of worker states (default 4) nextWorker int32 // Atomic counter for round-robin distribution isRunning atomic.Bool // Flag indicating if the runner is active mu sync.RWMutex // Mutex for thread safety wg sync.WaitGroup // WaitGroup for clean shutdown initFunc StateInitFunc // Optional function to initialize Lua states bufferSize int // Size of each job queue buffer moduleLoader *NativeModuleLoader // Native module loader for require sandboxes []*Sandbox // Sandbox for each state debug bool // Enable debug logging } // WithBufferSize sets the job queue buffer size func WithBufferSize(size int) RunnerOption { return func(r *LuaRunner) { if size > 0 { r.bufferSize = size } } } // WithInitFunc sets the init function for the Lua state func WithInitFunc(initFunc StateInitFunc) RunnerOption { return func(r *LuaRunner) { r.initFunc = initFunc } } // WithLibDirs sets additional library directories func WithLibDirs(dirs ...string) RunnerOption { return func(r *LuaRunner) { if r.moduleLoader == nil || r.moduleLoader.config == nil { r.moduleLoader = NewNativeModuleLoader(&RequireConfig{ LibDirs: dirs, }) } else { r.moduleLoader.config.LibDirs = dirs } } } // WithDebugEnabled enables debug output func WithDebugEnabled() RunnerOption { return func(r *LuaRunner) { r.debug = true } } // WithWorkerCount sets the number of worker states (min 1) func WithWorkerCount(count int) RunnerOption { return func(r *LuaRunner) { if count > 0 { r.workerCount = count } } } // NewRunner creates a new LuaRunner with multiple worker states func NewRunner(options ...RunnerOption) (*LuaRunner, error) { // Default configuration runner := &LuaRunner{ bufferSize: 10, workerCount: runtime.GOMAXPROCS(0), debug: false, } // Apply options for _, opt := range options { opt(runner) } // Initialize states and job queues runner.states = make([]*luajit.State, runner.workerCount) runner.jobQueues = make([]chan job, runner.workerCount) runner.sandboxes = make([]*Sandbox, runner.workerCount) // Set up module loader if not already initialized if runner.moduleLoader == nil { requireConfig := &RequireConfig{ ScriptDir: "", LibDirs: []string{}, } runner.moduleLoader = NewNativeModuleLoader(requireConfig) } // Create job queues and initialize states for i := 0; i < runner.workerCount; i++ { // Create job queue runner.jobQueues[i] = make(chan job, runner.bufferSize) // Create sandbox runner.sandboxes[i] = NewSandbox() // Initialize state if err := runner.initState(i, true); err != nil { // Clean up if initialization fails runner.Close() return nil, err } // Start worker goroutine runner.wg.Add(1) go runner.processJobs(i) } runner.isRunning.Store(true) runner.nextWorker = 0 return runner, nil } // debugLog logs a message if debug mode is enabled func (r *LuaRunner) debugLog(format string, args ...interface{}) { if r.debug { logger.Debug("[LuaRunner] "+format, args...) } } // initState initializes or reinitializes a specific Lua state func (r *LuaRunner) initState(workerIndex int, initial bool) error { r.debugLog("Initializing Lua state %d (initial=%v)", workerIndex, initial) // Clean up existing state if there is one if r.states[workerIndex] != nil { r.debugLog("Cleaning up existing state %d", workerIndex) // Always call Cleanup before Close to properly free function pointers r.states[workerIndex].Cleanup() r.states[workerIndex].Close() r.states[workerIndex] = nil } // Create fresh state state := luajit.New() if state == nil { return errors.New("failed to create Lua state") } r.debugLog("Created new Lua state %d", workerIndex) // Set up require paths and mechanism if err := r.moduleLoader.SetupRequire(state); err != nil { r.debugLog("Failed to set up require for state %d: %v", workerIndex, err) state.Cleanup() state.Close() return ErrInitFailed } r.debugLog("Require system initialized for state %d", workerIndex) // Initialize all core modules from the registry if err := GlobalRegistry.Initialize(state); err != nil { r.debugLog("Failed to initialize core modules for state %d: %v", workerIndex, err) state.Cleanup() state.Close() return ErrInitFailed } r.debugLog("Core modules initialized for state %d", workerIndex) // Check if http module is properly registered testResult, err := state.ExecuteWithResult(` if type(http) == "table" and type(http.client) == "table" and type(http.client.get) == "function" then return true else return false end `) if err != nil || testResult != true { r.debugLog("HTTP module verification failed for state %d: %v, result: %v", workerIndex, err, testResult) } else { r.debugLog("HTTP module verified OK for state %d", workerIndex) } // Verify __http_request function testResult, _ = state.ExecuteWithResult(`return type(__http_request)`) r.debugLog("__http_request function for state %d is of type: %v", workerIndex, testResult) // Set up sandbox after core modules are initialized if err := r.sandboxes[workerIndex].Setup(state); err != nil { r.debugLog("Failed to set up sandbox for state %d: %v", workerIndex, err) state.Cleanup() state.Close() return ErrInitFailed } r.debugLog("Sandbox environment set up for state %d", workerIndex) // Preload all modules into package.loaded if err := r.moduleLoader.PreloadAllModules(state); err != nil { r.debugLog("Failed to preload modules for state %d: %v", workerIndex, err) state.Cleanup() state.Close() return errors.New("failed to preload modules") } r.debugLog("All modules preloaded for state %d", workerIndex) // Run init function if provided if r.initFunc != nil { if err := r.initFunc(state); err != nil { r.debugLog("Custom init function failed for state %d: %v", workerIndex, err) state.Cleanup() state.Close() return ErrInitFailed } r.debugLog("Custom init function completed for state %d", workerIndex) } // Test for HTTP module again after full initialization testResult, err = state.ExecuteWithResult(` if type(http) == "table" and type(http.client) == "table" and type(http.client.get) == "function" then return true else return false end `) if err != nil || testResult != true { r.debugLog("Final HTTP module verification failed for state %d: %v, result: %v", workerIndex, err, testResult) } else { r.debugLog("Final HTTP module verification OK for state %d", workerIndex) } r.states[workerIndex] = state r.debugLog("State %d initialization complete", workerIndex) return nil } // processJobs handles the job queue for a specific worker func (r *LuaRunner) processJobs(workerIndex int) { defer r.wg.Done() defer func() { if r.states[workerIndex] != nil { r.debugLog("Cleaning up Lua state %d in processJobs", workerIndex) r.states[workerIndex].Cleanup() r.states[workerIndex].Close() r.states[workerIndex] = nil } }() for job := range r.jobQueues[workerIndex] { // Execute the job and send result result := r.executeJob(workerIndex, job) select { case job.Result <- result: // Result sent successfully default: // Result channel closed or full, discard the result } } } // executeJob runs a script in the sandbox environment func (r *LuaRunner) executeJob(workerIndex int, j job) JobResult { // If the job has a script path, update script dir for module resolution if j.ScriptPath != "" { r.mu.Lock() r.moduleLoader.config.ScriptDir = filepath.Dir(j.ScriptPath) r.mu.Unlock() } // Convert context for sandbox var ctx map[string]any if j.Context != nil { ctx = j.Context.Values } r.mu.RLock() state := r.states[workerIndex] sandbox := r.sandboxes[workerIndex] r.mu.RUnlock() if state == nil { return JobResult{nil, errors.New("lua state is not initialized")} } // Execute in sandbox value, err := sandbox.Execute(state, j.Bytecode, ctx) return JobResult{value, err} } // RunWithContext executes a Lua script with context and timeout func (r *LuaRunner) RunWithContext(ctx context.Context, bytecode []byte, execCtx *Context, scriptPath string) (any, error) { r.mu.RLock() if !r.isRunning.Load() { r.mu.RUnlock() return nil, ErrRunnerClosed } r.mu.RUnlock() // Get a result channel from the pool resultChanInterface := resultChanPool.Get() resultChan := resultChanInterface.(chan JobResult) // Make sure to clear any previous results select { case <-resultChan: // Drain the channel if it has a value default: // Channel is already empty } j := job{ Bytecode: bytecode, Context: execCtx, ScriptPath: scriptPath, Result: resultChan, } // Choose worker in round-robin fashion workerIndex := int(atomic.AddInt32(&r.nextWorker, 1) % int32(r.workerCount)) // Submit job with context select { case r.jobQueues[workerIndex] <- j: // Job submitted r.debugLog("Job submitted to worker %d", workerIndex) case <-ctx.Done(): // Return the channel to the pool before exiting resultChanPool.Put(resultChan) return nil, ctx.Err() } // Wait for result with context var result JobResult select { case result = <-resultChan: // Got result case <-ctx.Done(): // Return the channel to the pool before exiting resultChanPool.Put(resultChan) return nil, ctx.Err() } // Return the channel to the pool resultChanPool.Put(resultChan) return result.Value, result.Error } // Run executes a Lua script func (r *LuaRunner) Run(bytecode []byte, execCtx *Context, scriptPath string) (any, error) { return r.RunWithContext(context.Background(), bytecode, execCtx, scriptPath) } // Close gracefully shuts down the LuaRunner func (r *LuaRunner) Close() error { r.mu.Lock() defer r.mu.Unlock() if !r.isRunning.Load() { return ErrRunnerClosed } r.isRunning.Store(false) // Close all job queues for i := 0; i < r.workerCount; i++ { if r.jobQueues[i] != nil { close(r.jobQueues[i]) } } // Wait for all workers to finish r.wg.Wait() return nil } // NotifyFileChanged handles file change notifications from watchers func (r *LuaRunner) NotifyFileChanged(filePath string) bool { r.debugLog("File change detected: %s", filePath) r.mu.Lock() defer r.mu.Unlock() // Reset all states on file changes success := true for i := 0; i < r.workerCount; i++ { err := r.initState(i, false) if err != nil { r.debugLog("Failed to reinitialize state %d: %v", i, err) success = false } else { r.debugLog("State %d successfully reinitialized", i) } } return success } // ResetModuleCache clears non-core modules from package.loaded in all states func (r *LuaRunner) ResetModuleCache() { if r.moduleLoader != nil { r.debugLog("Resetting module cache in all states") r.mu.RLock() defer r.mu.RUnlock() for i := 0; i < r.workerCount; i++ { if r.states[i] != nil { r.moduleLoader.ResetModules(r.states[i]) } } } } // ReloadAllModules reloads all modules into package.loaded in all states func (r *LuaRunner) ReloadAllModules() error { if r.moduleLoader != nil { r.debugLog("Reloading all modules in all states") r.mu.RLock() defer r.mu.RUnlock() for i := 0; i < r.workerCount; i++ { if r.states[i] != nil { if err := r.moduleLoader.PreloadAllModules(r.states[i]); err != nil { return err } } } } return nil } // RefreshModuleByName invalidates a specific module in package.loaded in all states func (r *LuaRunner) RefreshModuleByName(modName string) bool { r.mu.RLock() defer r.mu.RUnlock() success := true for i := 0; i < r.workerCount; i++ { if r.states[i] != nil { r.debugLog("Refreshing module %s in state %d", modName, i) if err := r.states[i].DoString(`package.loaded["` + modName + `"] = nil`); err != nil { success = false } } } return success } // AddModule adds a module to all sandbox environments func (r *LuaRunner) AddModule(name string, module any) { r.debugLog("Adding module %s to all sandboxes", name) for i := 0; i < r.workerCount; i++ { r.sandboxes[i].AddModule(name, module) } } // GetModuleCount returns the number of loaded modules in the first state func (r *LuaRunner) GetModuleCount() int { r.mu.RLock() defer r.mu.RUnlock() count := 0 // Get module count from the first Lua state if r.states[0] != nil { // Execute a Lua snippet to count modules if res, err := r.states[0].ExecuteWithResult(` local count = 0 for _ in pairs(package.loaded) do count = count + 1 end return count `); err == nil { if num, ok := res.(float64); ok { count = int(num) } } } return count }