package runner import ( "context" "errors" "path/filepath" "runtime" "sync" "sync/atomic" "time" "github.com/panjf2000/ants/v2" "github.com/valyala/bytebufferpool" "Moonshark/core/runner/sandbox" "Moonshark/core/utils/logger" luajit "git.sharkk.net/Sky/LuaJIT-to-Go" ) // Common errors var ( ErrRunnerClosed = errors.New("lua runner is closed") ErrInitFailed = errors.New("initialization failed") ErrStateNotReady = errors.New("lua state not ready") ErrTimeout = errors.New("operation timed out") ) // RunnerOption defines a functional option for configuring the Runner type RunnerOption func(*Runner) // State wraps a Lua state with its sandbox type State struct { L *luajit.State // The Lua state sandbox *sandbox.Sandbox // Associated sandbox index int // Index for debugging inUse bool // Whether the state is currently in use initTime time.Time // When this state was initialized } // InitHook runs before executing a script type InitHook func(*luajit.State, *Context) error // FinalizeHook runs after executing a script type FinalizeHook func(*luajit.State, *Context, any) error // ExecuteTask represents a task in the execution goroutine pool type ExecuteTask struct { bytecode []byte context *Context scriptPath string result chan<- taskResult } // taskResult holds the result of an execution task type taskResult struct { value any err error } // Runner runs Lua scripts using a pool of Lua states type Runner struct { states []*State // All states managed by this runner statePool chan int // Pool of available state indexes poolSize int // Size of the state pool moduleLoader *ModuleLoader // Module loader isRunning atomic.Bool // Whether the runner is active mu sync.RWMutex // Mutex for thread safety debug bool // Enable debug logging initHooks []InitHook // Hooks run before script execution finalizeHooks []FinalizeHook // Hooks run after script execution scriptDir string // Current script directory pool *ants.Pool // Goroutine pool for task execution } // WithPoolSize sets the state pool size func WithPoolSize(size int) RunnerOption { return func(r *Runner) { if size > 0 { r.poolSize = size } } } // WithDebugEnabled enables debug output func WithDebugEnabled() RunnerOption { return func(r *Runner) { r.debug = true } } // WithLibDirs sets additional library directories func WithLibDirs(dirs ...string) RunnerOption { return func(r *Runner) { if r.moduleLoader == nil { r.moduleLoader = NewModuleLoader(&ModuleConfig{ LibDirs: dirs, }) } else { r.moduleLoader.config.LibDirs = dirs } } } // WithInitHook adds a hook to run before script execution func WithInitHook(hook InitHook) RunnerOption { return func(r *Runner) { r.initHooks = append(r.initHooks, hook) } } // WithFinalizeHook adds a hook to run after script execution func WithFinalizeHook(hook FinalizeHook) RunnerOption { return func(r *Runner) { r.finalizeHooks = append(r.finalizeHooks, hook) } } // NewRunner creates a new Runner with a pool of states func NewRunner(options ...RunnerOption) (*Runner, error) { // Default configuration runner := &Runner{ poolSize: runtime.GOMAXPROCS(0), debug: false, initHooks: make([]InitHook, 0, 4), finalizeHooks: make([]FinalizeHook, 0, 4), } // Apply options for _, opt := range options { opt(runner) } // Set up module loader if not already initialized if runner.moduleLoader == nil { config := &ModuleConfig{ ScriptDir: "", LibDirs: []string{}, } runner.moduleLoader = NewModuleLoader(config) } // Initialize states and pool runner.states = make([]*State, runner.poolSize) runner.statePool = make(chan int, runner.poolSize) // Create ants goroutine pool var err error runner.pool, err = ants.NewPool(runner.poolSize * 2) if err != nil { return nil, err } // Create and initialize all states if err := runner.initializeStates(); err != nil { runner.Close() // Clean up already created states return nil, err } runner.isRunning.Store(true) return runner, nil } // debugLog logs a message if debug mode is enabled func (r *Runner) debugLog(format string, args ...interface{}) { if r.debug { logger.Debug("Runner "+format, args...) } } func (r *Runner) debugLogCont(format string, args ...interface{}) { if r.debug { logger.DebugCont(format, args...) } } // initializeStates creates and initializes all states in the pool func (r *Runner) initializeStates() error { r.debugLog("is initializing %d states", r.poolSize) // Create main template state first with full logging templateState, err := r.createState(0) if err != nil { return err } r.states[0] = templateState r.statePool <- 0 // Add index to the pool // Create remaining states with minimal logging successCount := 1 for i := 1; i < r.poolSize; i++ { state, err := r.createState(i) if err != nil { return err } r.states[i] = state r.statePool <- i // Add index to the pool successCount++ } r.debugLog("has built %d/%d states successfully", successCount, r.poolSize) return nil } // createState initializes a new Lua state func (r *Runner) createState(index int) (*State, error) { verbose := index == 0 if verbose { r.debugLog("Creating Lua state %d", index) } // Create a new state L := luajit.New() if L == nil { return nil, errors.New("failed to create Lua state") } // Create sandbox sandbox := sandbox.NewSandbox() if r.debug && verbose { sandbox.EnableDebug() } // Set up require system if err := r.moduleLoader.SetupRequire(L); err != nil { if verbose { r.debugLogCont("Failed to set up require for state %d: %v", index, err) } L.Cleanup() L.Close() return nil, ErrInitFailed } // Initialize all core modules from the registry if err := GlobalRegistry.Initialize(L, index); err != nil { if verbose { r.debugLogCont("Failed to initialize core modules for state %d: %v", index, err) } L.Cleanup() L.Close() return nil, ErrInitFailed } // Set up sandbox after core modules are initialized if err := sandbox.Setup(L, index); err != nil { if verbose { r.debugLogCont("Failed to set up sandbox for state %d: %v", index, err) } L.Cleanup() L.Close() return nil, ErrInitFailed } // Preload all modules if err := r.moduleLoader.PreloadModules(L); err != nil { if verbose { r.debugLogCont("Failed to preload modules for state %d: %v", index, err) } L.Cleanup() L.Close() return nil, errors.New("failed to preload modules") } state := &State{ L: L, sandbox: sandbox, index: index, inUse: false, initTime: time.Now(), } if verbose { r.debugLog("State %d created successfully", index) } return state, nil } // executeTask is the worker function for the ants pool func (r *Runner) executeTask(i interface{}) { task, ok := i.(*ExecuteTask) if !ok { return } // Set script directory if provided if task.scriptPath != "" { r.mu.Lock() r.scriptDir = filepath.Dir(task.scriptPath) r.moduleLoader.SetScriptDir(r.scriptDir) r.mu.Unlock() } // Get a state index from the pool var stateIndex int select { case stateIndex = <-r.statePool: // Got a state case <-time.After(5 * time.Second): // 5-second timeout // Timed out waiting for a state task.result <- taskResult{nil, errors.New("server busy - timed out waiting for a Lua state")} return } // Get the actual state r.mu.RLock() state := r.states[stateIndex] r.mu.RUnlock() if state == nil { r.statePool <- stateIndex task.result <- taskResult{nil, ErrStateNotReady} return } // Mark state as in use state.inUse = true // Ensure state is returned to pool when done defer func() { state.inUse = false if r.isRunning.Load() { select { case r.statePool <- stateIndex: // State returned to pool default: // Pool is full or closed (shouldn't happen) } } }() // Copy hooks to avoid holding lock during execution r.mu.RLock() initHooks := make([]InitHook, len(r.initHooks)) copy(initHooks, r.initHooks) finalizeHooks := make([]FinalizeHook, len(r.finalizeHooks)) copy(finalizeHooks, r.finalizeHooks) r.mu.RUnlock() // Run init hooks for _, hook := range initHooks { if err := hook(state.L, task.context); err != nil { task.result <- taskResult{nil, err} return } } // Prepare context values var ctxValues map[string]any if task.context != nil { ctxValues = task.context.Values } // Execute in sandbox result, err := state.sandbox.Execute(state.L, task.bytecode, ctxValues) // Run finalize hooks for _, hook := range finalizeHooks { if hookErr := hook(state.L, task.context, result); hookErr != nil && err == nil { err = hookErr } } task.result <- taskResult{result, err} } // Execute runs a script with context func (r *Runner) Execute(ctx context.Context, bytecode []byte, execCtx *Context, scriptPath string) (any, error) { if !r.isRunning.Load() { return nil, ErrRunnerClosed } // Create result channel resultChan := make(chan taskResult, 1) // Create task task := &ExecuteTask{ bytecode: bytecode, context: execCtx, scriptPath: scriptPath, result: resultChan, } // Submit task to pool if err := r.pool.Submit(func() { r.executeTask(task) }); err != nil { return nil, err } // Wait for result with context timeout select { case result := <-resultChan: return result.value, result.err case <-ctx.Done(): return nil, ctx.Err() } } // Run executes a Lua script (convenience wrapper) func (r *Runner) Run(bytecode []byte, execCtx *Context, scriptPath string) (any, error) { return r.Execute(context.Background(), bytecode, execCtx, scriptPath) } // Close gracefully shuts down the Runner func (r *Runner) Close() error { r.mu.Lock() defer r.mu.Unlock() if !r.isRunning.Load() { return ErrRunnerClosed } r.isRunning.Store(false) r.debugLog("Closing Runner and destroying all states") // Shut down goroutine pool r.pool.Release() // Drain the state pool r.drainStatePool() // Clean up all states for i, state := range r.states { if state != nil { state.L.Cleanup() state.L.Close() r.states[i] = nil } } return nil } // drainStatePool removes all states from the pool func (r *Runner) drainStatePool() { for { select { case <-r.statePool: // Drain one state default: // Pool is empty return } } } // RefreshStates rebuilds all states in the pool func (r *Runner) RefreshStates() error { r.mu.Lock() defer r.mu.Unlock() if !r.isRunning.Load() { return ErrRunnerClosed } r.debugLog("Refreshing all Lua states") // Drain all states from the pool r.drainStatePool() // Destroy all existing states for i, state := range r.states { if state != nil { if state.inUse { r.debugLog("Warning: attempting to refresh state %d that is in use", i) } state.L.Cleanup() state.L.Close() r.states[i] = nil } } // Reinitialize all states if err := r.initializeStates(); err != nil { return err } r.debugLog("All states refreshed successfully") return nil } // NotifyFileChanged handles file change notifications func (r *Runner) NotifyFileChanged(filePath string) bool { r.debugLog("File change detected: %s", filePath) // Check if it's a module file module, isModule := r.moduleLoader.GetModuleByPath(filePath) if isModule { r.debugLog("File is a module: %s", module) return r.RefreshModule(module) } // For non-module files, refresh all states if err := r.RefreshStates(); err != nil { r.debugLog("Failed to refresh states: %v", err) return false } return true } // RefreshModule refreshes a specific module across all states func (r *Runner) RefreshModule(moduleName string) bool { r.mu.RLock() defer r.mu.RUnlock() if !r.isRunning.Load() { return false } r.debugLog("Refreshing module: %s", moduleName) // Check if it's a core module coreName, isCore := GlobalRegistry.MatchModuleName(moduleName) success := true for _, state := range r.states { if state == nil { continue } // Skip states that are in use if state.inUse { r.debugLog("Skipping refresh for state %d (in use)", state.index) success = false continue } // Invalidate module in Lua if err := state.L.DoString(`package.loaded["` + moduleName + `"] = nil`); err != nil { r.debugLog("Failed to invalidate module %s in state %d: %v", moduleName, state.index, err) success = false continue } // For core modules, reinitialize them if isCore { if err := GlobalRegistry.InitializeModule(state.L, coreName); err != nil { r.debugLog("Failed to reinitialize core module %s in state %d: %v", coreName, state.index, err) success = false } } } if success { r.debugLog("Module %s refreshed successfully in all states", moduleName) } else { r.debugLog("Module %s refresh had some failures", moduleName) } return success } // AddModule adds a module to all sandbox environments func (r *Runner) AddModule(name string, module any) { r.debugLog("Adding module %s to all sandboxes", name) r.mu.RLock() defer r.mu.RUnlock() for _, state := range r.states { if state != nil && state.sandbox != nil && !state.inUse { state.sandbox.AddModule(name, module) } } } // AddInitHook adds a hook to be called before script execution func (r *Runner) AddInitHook(hook InitHook) { r.mu.Lock() defer r.mu.Unlock() r.initHooks = append(r.initHooks, hook) } // AddFinalizeHook adds a hook to be called after script execution func (r *Runner) AddFinalizeHook(hook FinalizeHook) { r.mu.Lock() defer r.mu.Unlock() r.finalizeHooks = append(r.finalizeHooks, hook) } // ResetModuleCache clears the module cache in all states func (r *Runner) ResetModuleCache() { r.mu.RLock() defer r.mu.RUnlock() if !r.isRunning.Load() { return } r.debugLog("Resetting module cache in all states") for _, state := range r.states { if state != nil && !state.inUse { r.moduleLoader.ResetModules(state.L) } } } // GetStateCount returns the number of initialized states func (r *Runner) GetStateCount() int { r.mu.RLock() defer r.mu.RUnlock() count := 0 for _, state := range r.states { if state != nil { count++ } } return count } // GetActiveStateCount returns the number of states currently in use func (r *Runner) GetActiveStateCount() int { r.mu.RLock() defer r.mu.RUnlock() count := 0 for _, state := range r.states { if state != nil && state.inUse { count++ } } return count } // GetWorkerPoolStats returns statistics about the worker pool func (r *Runner) GetWorkerPoolStats() (running, capacity int) { return r.pool.Running(), r.pool.Cap() } // GetModuleCount returns the number of loaded modules in the first available state func (r *Runner) GetModuleCount() int { r.mu.RLock() defer r.mu.RUnlock() if !r.isRunning.Load() { return 0 } // Find first available state for _, state := range r.states { if state != nil && !state.inUse { // Execute a Lua snippet to count modules if res, err := state.L.ExecuteWithResult(` local count = 0 for _ in pairs(package.loaded) do count = count + 1 end return count `); err == nil { if num, ok := res.(float64); ok { return int(num) } } break } } return 0 } // GetBufferPool returns a buffer from the bytebufferpool func GetBufferPool() *bytebufferpool.ByteBuffer { return bytebufferpool.Get() } // ReleaseBufferPool returns a buffer to the bytebufferpool func ReleaseBufferPool(buf *bytebufferpool.ByteBuffer) { bytebufferpool.Put(buf) }