Moonshark/runner/runner.go
2025-06-06 18:57:47 -05:00

336 lines
6.6 KiB
Go

package runner
import (
"errors"
"fmt"
"os"
"path/filepath"
"runtime"
"sync"
"sync/atomic"
"time"
"Moonshark/config"
"Moonshark/logger"
"Moonshark/runner/lualibs"
"Moonshark/runner/sqlite"
luajit "git.sharkk.net/Sky/LuaJIT-to-Go"
)
var emptyMap = make(map[string]any)
var (
ErrRunnerClosed = errors.New("lua runner is closed")
ErrTimeout = errors.New("operation timed out")
ErrStateNotReady = errors.New("lua state not ready")
)
type State struct {
L *luajit.State
sandbox *Sandbox
index int
inUse atomic.Bool
}
type Runner struct {
states []*State
statePool chan int
poolSize int
moduleLoader *ModuleLoader
isRunning atomic.Bool
mu sync.RWMutex
scriptDir string
// Pre-allocated pools for HTTP processing
ctxPool sync.Pool
paramsPool sync.Pool
}
func NewRunner(cfg *config.Config, poolSize int) (*Runner, error) {
if poolSize <= 0 && cfg.Runner.PoolSize <= 0 {
poolSize = runtime.GOMAXPROCS(0)
}
moduleConfig := &ModuleConfig{
LibDirs: cfg.Dirs.Libs,
}
r := &Runner{
poolSize: poolSize,
moduleLoader: NewModuleLoader(moduleConfig),
ctxPool: sync.Pool{
New: func() any { return make(map[string]any, 8) },
},
paramsPool: sync.Pool{
New: func() any { return make(map[string]any, 4) },
},
}
sqlite.InitSQLite(cfg.Dirs.Data)
sqlite.SetSQLitePoolSize(poolSize)
lualibs.InitFS(cfg.Dirs.FS)
lualibs.InitEnv(cfg.Dirs.Data)
r.states = make([]*State, poolSize)
r.statePool = make(chan int, poolSize)
if err := r.initStates(); err != nil {
sqlite.CleanupSQLite()
return nil, err
}
r.isRunning.Store(true)
return r, nil
}
func (r *Runner) Execute(bytecode []byte, ctx ExecutionContext) (*Response, error) {
if !r.isRunning.Load() {
return nil, ErrRunnerClosed
}
var stateIndex int
select {
case stateIndex = <-r.statePool:
case <-time.After(time.Second):
return nil, ErrTimeout
}
state := r.states[stateIndex]
if state == nil {
r.statePool <- stateIndex
return nil, ErrStateNotReady
}
state.inUse.Store(true)
defer func() {
state.inUse.Store(false)
if r.isRunning.Load() {
select {
case r.statePool <- stateIndex:
default:
}
}
}()
return state.sandbox.Execute(state.L, bytecode, ctx, state.index)
}
func (r *Runner) initStates() error {
logger.Infof("[LuaRunner] Creating %d states...", r.poolSize)
for i := range r.poolSize {
state, err := r.createState(i)
if err != nil {
return err
}
r.states[i] = state
r.statePool <- i
}
return nil
}
func (r *Runner) createState(index int) (*State, error) {
L := luajit.New(true)
if L == nil {
return nil, errors.New("failed to create Lua state")
}
sb := NewSandbox()
if err := sb.Setup(L, index, index == 0); err != nil {
L.Cleanup()
L.Close()
return nil, err
}
if err := r.moduleLoader.SetupRequire(L); err != nil {
L.Cleanup()
L.Close()
return nil, err
}
if err := r.moduleLoader.PreloadModules(L); err != nil {
L.Cleanup()
L.Close()
return nil, err
}
return &State{L: L, sandbox: sb, index: index}, nil
}
func (r *Runner) Close() error {
r.mu.Lock()
defer r.mu.Unlock()
if !r.isRunning.Load() {
return ErrRunnerClosed
}
r.isRunning.Store(false)
// Drain pool
for {
select {
case <-r.statePool:
default:
goto cleanup
}
}
cleanup:
// Wait for states to finish
timeout := time.Now().Add(10 * time.Second)
for time.Now().Before(timeout) {
allIdle := true
for _, state := range r.states {
if state != nil && state.inUse.Load() {
allIdle = false
break
}
}
if allIdle {
break
}
time.Sleep(10 * time.Millisecond)
}
// Close states
for i, state := range r.states {
if state != nil {
state.L.Cleanup()
state.L.Close()
r.states[i] = nil
}
}
lualibs.CleanupFS()
sqlite.CleanupSQLite()
lualibs.CleanupEnv()
return nil
}
// NotifyFileChanged alerts the runner about file changes
func (r *Runner) NotifyFileChanged(filePath string) bool {
logger.Debugf("Runner notified of file change: %s", filePath)
module, isModule := r.moduleLoader.GetModuleByPath(filePath)
if isModule {
logger.Debugf("Refreshing module: %s", module)
return r.RefreshModule(module)
}
logger.Debugf("File change noted but no refresh needed: %s", filePath)
return true
}
// RefreshModule refreshes a specific module across all states
func (r *Runner) RefreshModule(moduleName string) bool {
r.mu.RLock()
defer r.mu.RUnlock()
if !r.isRunning.Load() {
return false
}
logger.Debugf("Refreshing module: %s", moduleName)
success := true
for _, state := range r.states {
if state == nil || state.inUse.Load() {
continue
}
if err := r.moduleLoader.RefreshModule(state.L, moduleName); err != nil {
success = false
logger.Debugf("Failed to refresh module %s in state %d: %v", moduleName, state.index, err)
}
}
if success {
logger.Debugf("Successfully refreshed module: %s", moduleName)
}
return success
}
// RunScriptFile loads, compiles and executes a Lua script file
func (r *Runner) RunScriptFile(filePath string) (*Response, error) {
if !r.isRunning.Load() {
return nil, ErrRunnerClosed
}
if _, err := os.Stat(filePath); os.IsNotExist(err) {
return nil, fmt.Errorf("script file not found: %s", filePath)
}
content, err := os.ReadFile(filePath)
if err != nil {
return nil, fmt.Errorf("failed to read file: %w", err)
}
absPath, err := filepath.Abs(filePath)
if err != nil {
return nil, fmt.Errorf("failed to get absolute path: %w", err)
}
scriptDir := filepath.Dir(absPath)
r.mu.Lock()
prevScriptDir := r.scriptDir
r.scriptDir = scriptDir
r.moduleLoader.SetScriptDir(scriptDir)
r.mu.Unlock()
defer func() {
r.mu.Lock()
r.scriptDir = prevScriptDir
r.moduleLoader.SetScriptDir(prevScriptDir)
r.mu.Unlock()
}()
// Get state from pool
var stateIndex int
select {
case stateIndex = <-r.statePool:
case <-time.After(5 * time.Second):
return nil, ErrTimeout
}
state := r.states[stateIndex]
if state == nil {
r.statePool <- stateIndex
return nil, ErrStateNotReady
}
state.inUse.Store(true)
defer func() {
state.inUse.Store(false)
if r.isRunning.Load() {
select {
case r.statePool <- stateIndex:
default:
}
}
}()
// Compile script
bytecode, err := state.L.CompileBytecode(string(content), filepath.Base(absPath))
if err != nil {
return nil, fmt.Errorf("compilation error: %w", err)
}
// Create simple context for script execution
ctx := NewContext()
defer ctx.Release()
ctx.Set("_script_path", absPath)
ctx.Set("_script_dir", scriptDir)
// Execute script
response, err := state.sandbox.Execute(state.L, bytecode, ctx, state.index)
if err != nil {
return nil, fmt.Errorf("execution error: %w", err)
}
return response, nil
}