442 lines
11 KiB
Go
442 lines
11 KiB
Go
package runner
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"path/filepath"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
luajit "git.sharkk.net/Sky/LuaJIT-to-Go"
|
|
"git.sharkk.net/Sky/Moonshark/core/logger"
|
|
)
|
|
|
|
// Common errors
|
|
var (
|
|
ErrRunnerClosed = errors.New("lua runner is closed")
|
|
ErrInitFailed = errors.New("initialization failed")
|
|
)
|
|
|
|
// StateInitFunc is a function that initializes a Lua state
|
|
type StateInitFunc func(*luajit.State) error
|
|
|
|
// RunnerOption defines a functional option for configuring the LuaRunner
|
|
type RunnerOption func(*LuaRunner)
|
|
|
|
// Result channel pool to reduce allocations
|
|
var resultChanPool = sync.Pool{
|
|
New: func() interface{} {
|
|
return make(chan JobResult, 1)
|
|
},
|
|
}
|
|
|
|
// LuaRunner runs Lua scripts using a single Lua state
|
|
type LuaRunner struct {
|
|
state *luajit.State // The Lua state
|
|
jobQueue chan job // Channel for incoming jobs
|
|
isRunning atomic.Bool // Flag indicating if the runner is active
|
|
mu sync.RWMutex // Mutex for thread safety
|
|
wg sync.WaitGroup // WaitGroup for clean shutdown
|
|
initFunc StateInitFunc // Optional function to initialize Lua state
|
|
bufferSize int // Size of the job queue buffer
|
|
moduleLoader *NativeModuleLoader // Native module loader for require
|
|
sandbox *Sandbox // The sandbox environment
|
|
debug bool // Enable debug logging
|
|
}
|
|
|
|
// WithBufferSize sets the job queue buffer size
|
|
func WithBufferSize(size int) RunnerOption {
|
|
return func(r *LuaRunner) {
|
|
if size > 0 {
|
|
r.bufferSize = size
|
|
}
|
|
}
|
|
}
|
|
|
|
// WithInitFunc sets the init function for the Lua state
|
|
func WithInitFunc(initFunc StateInitFunc) RunnerOption {
|
|
return func(r *LuaRunner) {
|
|
r.initFunc = initFunc
|
|
}
|
|
}
|
|
|
|
// WithLibDirs sets additional library directories
|
|
func WithLibDirs(dirs ...string) RunnerOption {
|
|
return func(r *LuaRunner) {
|
|
if r.moduleLoader == nil || r.moduleLoader.config == nil {
|
|
r.moduleLoader = NewNativeModuleLoader(&RequireConfig{
|
|
LibDirs: dirs,
|
|
})
|
|
} else {
|
|
r.moduleLoader.config.LibDirs = dirs
|
|
}
|
|
}
|
|
}
|
|
|
|
// WithDebugEnabled enables debug output
|
|
func WithDebugEnabled() RunnerOption {
|
|
return func(r *LuaRunner) {
|
|
r.debug = true
|
|
}
|
|
}
|
|
|
|
// NewRunner creates a new LuaRunner
|
|
func NewRunner(options ...RunnerOption) (*LuaRunner, error) {
|
|
// Default configuration
|
|
runner := &LuaRunner{
|
|
bufferSize: 10, // Default buffer size
|
|
sandbox: NewSandbox(),
|
|
debug: false,
|
|
}
|
|
|
|
// Apply options
|
|
for _, opt := range options {
|
|
opt(runner)
|
|
}
|
|
|
|
// Create job queue
|
|
runner.jobQueue = make(chan job, runner.bufferSize)
|
|
runner.isRunning.Store(true)
|
|
|
|
// Set up module loader if not already initialized
|
|
if runner.moduleLoader == nil {
|
|
requireConfig := &RequireConfig{
|
|
ScriptDir: "",
|
|
LibDirs: []string{},
|
|
}
|
|
runner.moduleLoader = NewNativeModuleLoader(requireConfig)
|
|
}
|
|
|
|
// Initialize Lua state
|
|
if err := runner.initState(true); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Start the event loop
|
|
runner.wg.Add(1)
|
|
go runner.processJobs()
|
|
|
|
return runner, nil
|
|
}
|
|
|
|
// debugLog logs a message if debug mode is enabled
|
|
func (r *LuaRunner) debugLog(format string, args ...interface{}) {
|
|
if r.debug {
|
|
logger.Debug("[LuaRunner] "+format, args...)
|
|
}
|
|
}
|
|
|
|
// initState initializes or reinitializes the Lua state
|
|
func (r *LuaRunner) initState(initial bool) error {
|
|
r.debugLog("Initializing Lua state (initial=%v)", initial)
|
|
|
|
// Clean up existing state if there is one
|
|
if r.state != nil {
|
|
r.debugLog("Cleaning up existing state")
|
|
// Always call Cleanup before Close to properly free function pointers
|
|
r.state.Cleanup()
|
|
r.state.Close()
|
|
r.state = nil
|
|
}
|
|
|
|
// Create fresh state
|
|
state := luajit.New()
|
|
if state == nil {
|
|
return errors.New("failed to create Lua state")
|
|
}
|
|
r.debugLog("Created new Lua state")
|
|
|
|
// Set up require paths and mechanism
|
|
if err := r.moduleLoader.SetupRequire(state); err != nil {
|
|
r.debugLog("Failed to set up require: %v", err)
|
|
state.Cleanup()
|
|
state.Close()
|
|
return ErrInitFailed
|
|
}
|
|
r.debugLog("Require system initialized")
|
|
|
|
// Initialize all core modules from the registry
|
|
if err := GlobalRegistry.Initialize(state); err != nil {
|
|
r.debugLog("Failed to initialize core modules: %v", err)
|
|
state.Cleanup()
|
|
state.Close()
|
|
return ErrInitFailed
|
|
}
|
|
r.debugLog("Core modules initialized")
|
|
|
|
// Check if http module is properly registered
|
|
testResult, err := state.ExecuteWithResult(`
|
|
if type(http) == "table" and type(http.client) == "table" and
|
|
type(http.client.get) == "function" then
|
|
return true
|
|
else
|
|
return false
|
|
end
|
|
`)
|
|
if err != nil || testResult != true {
|
|
r.debugLog("HTTP module verification failed: %v, result: %v", err, testResult)
|
|
} else {
|
|
r.debugLog("HTTP module verified OK")
|
|
}
|
|
|
|
// Verify __http_request function
|
|
testResult, _ = state.ExecuteWithResult(`return type(__http_request)`)
|
|
r.debugLog("__http_request function is of type: %v", testResult)
|
|
|
|
// Set up sandbox after core modules are initialized
|
|
if err := r.sandbox.Setup(state); err != nil {
|
|
r.debugLog("Failed to set up sandbox: %v", err)
|
|
state.Cleanup()
|
|
state.Close()
|
|
return ErrInitFailed
|
|
}
|
|
r.debugLog("Sandbox environment set up")
|
|
|
|
// Preload all modules into package.loaded
|
|
if err := r.moduleLoader.PreloadAllModules(state); err != nil {
|
|
r.debugLog("Failed to preload modules: %v", err)
|
|
state.Cleanup()
|
|
state.Close()
|
|
return errors.New("failed to preload modules")
|
|
}
|
|
r.debugLog("All modules preloaded")
|
|
|
|
// Run init function if provided
|
|
if r.initFunc != nil {
|
|
if err := r.initFunc(state); err != nil {
|
|
r.debugLog("Custom init function failed: %v", err)
|
|
state.Cleanup()
|
|
state.Close()
|
|
return ErrInitFailed
|
|
}
|
|
r.debugLog("Custom init function completed")
|
|
}
|
|
|
|
// Test for HTTP module again after full initialization
|
|
testResult, err = state.ExecuteWithResult(`
|
|
if type(http) == "table" and type(http.client) == "table" and
|
|
type(http.client.get) == "function" then
|
|
return true
|
|
else
|
|
return false
|
|
end
|
|
`)
|
|
if err != nil || testResult != true {
|
|
r.debugLog("Final HTTP module verification failed: %v, result: %v", err, testResult)
|
|
} else {
|
|
r.debugLog("Final HTTP module verification OK")
|
|
}
|
|
|
|
r.state = state
|
|
r.debugLog("State initialization complete")
|
|
return nil
|
|
}
|
|
|
|
// processJobs handles the job queue
|
|
func (r *LuaRunner) processJobs() {
|
|
defer r.wg.Done()
|
|
defer func() {
|
|
if r.state != nil {
|
|
r.debugLog("Cleaning up Lua state in processJobs")
|
|
r.state.Cleanup()
|
|
r.state.Close()
|
|
r.state = nil
|
|
}
|
|
}()
|
|
|
|
for job := range r.jobQueue {
|
|
// Execute the job and send result
|
|
result := r.executeJob(job)
|
|
select {
|
|
case job.Result <- result:
|
|
// Result sent successfully
|
|
default:
|
|
// Result channel closed or full, discard the result
|
|
}
|
|
}
|
|
}
|
|
|
|
// executeJob runs a script in the sandbox environment
|
|
func (r *LuaRunner) executeJob(j job) JobResult {
|
|
// If the job has a script path, update script dir for module resolution
|
|
if j.ScriptPath != "" {
|
|
r.mu.Lock()
|
|
r.moduleLoader.config.ScriptDir = filepath.Dir(j.ScriptPath)
|
|
r.mu.Unlock()
|
|
}
|
|
|
|
// Convert context for sandbox
|
|
var ctx map[string]any
|
|
if j.Context != nil {
|
|
ctx = j.Context.Values
|
|
}
|
|
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
if r.state == nil {
|
|
return JobResult{nil, errors.New("lua state is not initialized")}
|
|
}
|
|
|
|
// Execute in sandbox
|
|
value, err := r.sandbox.Execute(r.state, j.Bytecode, ctx)
|
|
return JobResult{value, err}
|
|
}
|
|
|
|
// RunWithContext executes a Lua script with context and timeout
|
|
func (r *LuaRunner) RunWithContext(ctx context.Context, bytecode []byte, execCtx *Context, scriptPath string) (any, error) {
|
|
r.mu.RLock()
|
|
if !r.isRunning.Load() {
|
|
r.mu.RUnlock()
|
|
return nil, ErrRunnerClosed
|
|
}
|
|
r.mu.RUnlock()
|
|
|
|
// Get a result channel from the pool
|
|
resultChanInterface := resultChanPool.Get()
|
|
resultChan := resultChanInterface.(chan JobResult)
|
|
|
|
// Make sure to clear any previous results
|
|
select {
|
|
case <-resultChan:
|
|
// Drain the channel if it has a value
|
|
default:
|
|
// Channel is already empty
|
|
}
|
|
|
|
j := job{
|
|
Bytecode: bytecode,
|
|
Context: execCtx,
|
|
ScriptPath: scriptPath,
|
|
Result: resultChan,
|
|
}
|
|
|
|
// Submit job with context
|
|
select {
|
|
case r.jobQueue <- j:
|
|
// Job submitted
|
|
case <-ctx.Done():
|
|
// Return the channel to the pool before exiting
|
|
resultChanPool.Put(resultChan)
|
|
return nil, ctx.Err()
|
|
}
|
|
|
|
// Wait for result with context
|
|
var result JobResult
|
|
select {
|
|
case result = <-resultChan:
|
|
// Got result
|
|
case <-ctx.Done():
|
|
// Return the channel to the pool before exiting
|
|
resultChanPool.Put(resultChan)
|
|
return nil, ctx.Err()
|
|
}
|
|
|
|
// Return the channel to the pool
|
|
resultChanPool.Put(resultChan)
|
|
|
|
return result.Value, result.Error
|
|
}
|
|
|
|
// Run executes a Lua script
|
|
func (r *LuaRunner) Run(bytecode []byte, execCtx *Context, scriptPath string) (any, error) {
|
|
return r.RunWithContext(context.Background(), bytecode, execCtx, scriptPath)
|
|
}
|
|
|
|
// Close gracefully shuts down the LuaRunner
|
|
func (r *LuaRunner) Close() error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
if !r.isRunning.Load() {
|
|
return ErrRunnerClosed
|
|
}
|
|
|
|
r.isRunning.Store(false)
|
|
close(r.jobQueue)
|
|
|
|
// Wait for event loop to finish
|
|
r.wg.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
// NotifyFileChanged handles file change notifications from watchers
|
|
func (r *LuaRunner) NotifyFileChanged(filePath string) bool {
|
|
r.debugLog("File change detected: %s", filePath)
|
|
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
// Reset the entire state on file changes
|
|
err := r.initState(false)
|
|
if err != nil {
|
|
r.debugLog("Failed to reinitialize state: %v", err)
|
|
return false
|
|
}
|
|
|
|
r.debugLog("State successfully reinitialized")
|
|
return true
|
|
}
|
|
|
|
// ResetModuleCache clears non-core modules from package.loaded
|
|
func (r *LuaRunner) ResetModuleCache() {
|
|
if r.moduleLoader != nil {
|
|
r.debugLog("Resetting module cache")
|
|
r.moduleLoader.ResetModules(r.state)
|
|
}
|
|
}
|
|
|
|
// ReloadAllModules reloads all modules into package.loaded
|
|
func (r *LuaRunner) ReloadAllModules() error {
|
|
if r.moduleLoader != nil {
|
|
r.debugLog("Reloading all modules")
|
|
return r.moduleLoader.PreloadAllModules(r.state)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RefreshModuleByName invalidates a specific module in package.loaded
|
|
func (r *LuaRunner) RefreshModuleByName(modName string) bool {
|
|
if r.state != nil {
|
|
r.debugLog("Refreshing module: %s", modName)
|
|
if err := r.state.DoString(`package.loaded["` + modName + `"] = nil`); err != nil {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
// AddModule adds a module to the sandbox environment
|
|
func (r *LuaRunner) AddModule(name string, module any) {
|
|
r.debugLog("Adding module: %s", name)
|
|
r.sandbox.AddModule(name, module)
|
|
}
|
|
|
|
// GetModuleCount returns the number of loaded modules
|
|
func (r *LuaRunner) GetModuleCount() int {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
count := 0
|
|
|
|
// Get module count from Lua
|
|
if r.state != nil {
|
|
// Execute a Lua snippet to count modules
|
|
if res, err := r.state.ExecuteWithResult(`
|
|
local count = 0
|
|
for _ in pairs(package.loaded) do
|
|
count = count + 1
|
|
end
|
|
return count
|
|
`); err == nil {
|
|
if num, ok := res.(float64); ok {
|
|
count = int(num)
|
|
}
|
|
}
|
|
}
|
|
|
|
return count
|
|
}
|