Moonshark/core/runner/LuaRunner.go

325 lines
7.7 KiB
Go

package runner
import (
"context"
"errors"
"path/filepath"
"sync"
"sync/atomic"
luajit "git.sharkk.net/Sky/LuaJIT-to-Go"
)
// Common errors
var (
ErrRunnerClosed = errors.New("lua runner is closed")
ErrInitFailed = errors.New("initialization failed")
)
// StateInitFunc is a function that initializes a Lua state
type StateInitFunc func(*luajit.State) error
// RunnerOption defines a functional option for configuring the LuaRunner
type RunnerOption func(*LuaRunner)
// Result channel pool to reduce allocations
var resultChanPool = sync.Pool{
New: func() interface{} {
return make(chan JobResult, 1)
},
}
// LuaRunner runs Lua scripts using a single Lua state
type LuaRunner struct {
state *luajit.State // The Lua state
jobQueue chan job // Channel for incoming jobs
isRunning atomic.Bool // Flag indicating if the runner is active
mu sync.RWMutex // Mutex for thread safety
wg sync.WaitGroup // WaitGroup for clean shutdown
initFunc StateInitFunc // Optional function to initialize Lua state
bufferSize int // Size of the job queue buffer
moduleLoader *NativeModuleLoader // Native module loader for require
sandbox *Sandbox // The sandbox environment
}
// WithBufferSize sets the job queue buffer size
func WithBufferSize(size int) RunnerOption {
return func(r *LuaRunner) {
if size > 0 {
r.bufferSize = size
}
}
}
// WithInitFunc sets the init function for the Lua state
func WithInitFunc(initFunc StateInitFunc) RunnerOption {
return func(r *LuaRunner) {
r.initFunc = initFunc
}
}
// WithLibDirs sets additional library directories
func WithLibDirs(dirs ...string) RunnerOption {
return func(r *LuaRunner) {
if r.moduleLoader == nil || r.moduleLoader.config == nil {
r.moduleLoader = NewNativeModuleLoader(&RequireConfig{
LibDirs: dirs,
})
} else {
r.moduleLoader.config.LibDirs = dirs
}
}
}
// NewRunner creates a new LuaRunner
func NewRunner(options ...RunnerOption) (*LuaRunner, error) {
// Default configuration
runner := &LuaRunner{
bufferSize: 10, // Default buffer size
sandbox: NewSandbox(),
}
// Apply options
for _, opt := range options {
opt(runner)
}
// Initialize Lua state
state := luajit.New()
if state == nil {
return nil, errors.New("failed to create Lua state")
}
runner.state = state
// Create job queue
runner.jobQueue = make(chan job, runner.bufferSize)
runner.isRunning.Store(true)
// Set up module loader if not already initialized
if runner.moduleLoader == nil {
requireConfig := &RequireConfig{
ScriptDir: "",
LibDirs: []string{},
}
runner.moduleLoader = NewNativeModuleLoader(requireConfig)
}
// Set up require paths and mechanism
if err := runner.moduleLoader.SetupRequire(state); err != nil {
state.Close()
return nil, ErrInitFailed
}
// Initialize all core modules from the registry
if err := GlobalRegistry.Initialize(state); err != nil {
state.Close()
return nil, ErrInitFailed
}
// Set up sandbox after core modules are initialized
if err := runner.sandbox.Setup(state); err != nil {
state.Close()
return nil, ErrInitFailed
}
// Preload all modules into package.loaded
if err := runner.moduleLoader.PreloadAllModules(state); err != nil {
state.Close()
return nil, errors.New("failed to preload modules")
}
// Run init function if provided
if runner.initFunc != nil {
if err := runner.initFunc(state); err != nil {
state.Close()
return nil, ErrInitFailed
}
}
// Start the event loop
runner.wg.Add(1)
go runner.processJobs()
return runner, nil
}
// processJobs handles the job queue
func (r *LuaRunner) processJobs() {
defer r.wg.Done()
defer r.state.Close()
for job := range r.jobQueue {
// Execute the job and send result
result := r.executeJob(job)
select {
case job.Result <- result:
// Result sent successfully
default:
// Result channel closed or full, discard the result
}
}
}
// executeJob runs a script in the sandbox environment
func (r *LuaRunner) executeJob(j job) JobResult {
// If the job has a script path, update script dir for module resolution
if j.ScriptPath != "" {
r.mu.Lock()
r.moduleLoader.config.ScriptDir = filepath.Dir(j.ScriptPath)
r.mu.Unlock()
}
// Convert context for sandbox
var ctx map[string]any
if j.Context != nil {
ctx = j.Context.Values
}
// Execute in sandbox
value, err := r.sandbox.Execute(r.state, j.Bytecode, ctx)
return JobResult{value, err}
}
// RunWithContext executes a Lua script with context and timeout
func (r *LuaRunner) RunWithContext(ctx context.Context, bytecode []byte, execCtx *Context, scriptPath string) (any, error) {
r.mu.RLock()
if !r.isRunning.Load() {
r.mu.RUnlock()
return nil, ErrRunnerClosed
}
r.mu.RUnlock()
// Get a result channel from the pool
resultChanInterface := resultChanPool.Get()
resultChan := resultChanInterface.(chan JobResult)
// Make sure to clear any previous results
select {
case <-resultChan:
// Drain the channel if it has a value
default:
// Channel is already empty
}
j := job{
Bytecode: bytecode,
Context: execCtx,
ScriptPath: scriptPath,
Result: resultChan,
}
// Submit job with context
select {
case r.jobQueue <- j:
// Job submitted
case <-ctx.Done():
// Return the channel to the pool before exiting
resultChanPool.Put(resultChan)
return nil, ctx.Err()
}
// Wait for result with context
var result JobResult
select {
case result = <-resultChan:
// Got result
case <-ctx.Done():
// Return the channel to the pool before exiting
resultChanPool.Put(resultChan)
return nil, ctx.Err()
}
// Return the channel to the pool
resultChanPool.Put(resultChan)
return result.Value, result.Error
}
// Run executes a Lua script
func (r *LuaRunner) Run(bytecode []byte, execCtx *Context, scriptPath string) (any, error) {
return r.RunWithContext(context.Background(), bytecode, execCtx, scriptPath)
}
// Close gracefully shuts down the LuaRunner
func (r *LuaRunner) Close() error {
r.mu.Lock()
defer r.mu.Unlock()
if !r.isRunning.Load() {
return ErrRunnerClosed
}
r.isRunning.Store(false)
close(r.jobQueue)
// Wait for event loop to finish
r.wg.Wait()
return nil
}
// NotifyFileChanged handles file change notifications from watchers
func (r *LuaRunner) NotifyFileChanged(filePath string) bool {
if r.moduleLoader != nil {
return r.moduleLoader.NotifyFileChanged(r.state, filePath)
}
return false
}
// ResetModuleCache clears non-core modules from package.loaded
func (r *LuaRunner) ResetModuleCache() {
if r.moduleLoader != nil {
r.moduleLoader.ResetModules(r.state)
}
}
// ReloadAllModules reloads all modules into package.loaded
func (r *LuaRunner) ReloadAllModules() error {
if r.moduleLoader != nil {
return r.moduleLoader.PreloadAllModules(r.state)
}
return nil
}
// RefreshModuleByName invalidates a specific module in package.loaded
func (r *LuaRunner) RefreshModuleByName(modName string) bool {
if r.state != nil {
if err := r.state.DoString(`package.loaded["` + modName + `"] = nil`); err != nil {
return false
}
return true
}
return false
}
// AddModule adds a module to the sandbox environment
func (r *LuaRunner) AddModule(name string, module any) {
r.sandbox.AddModule(name, module)
}
// GetModuleCount returns the number of loaded modules
func (r *LuaRunner) GetModuleCount() int {
r.mu.RLock()
defer r.mu.RUnlock()
count := 0
// Get module count from Lua
if r.state != nil {
// Execute a Lua snippet to count modules
if res, err := r.state.ExecuteWithResult(`
local count = 0
for _ in pairs(package.loaded) do
count = count + 1
end
return count
`); err == nil {
if num, ok := res.(float64); ok {
count = int(num)
}
}
}
return count
}