675 lines
15 KiB
Go
675 lines
15 KiB
Go
package runner
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"path/filepath"
|
|
"runtime"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/panjf2000/ants/v2"
|
|
"github.com/valyala/bytebufferpool"
|
|
|
|
"Moonshark/core/utils/logger"
|
|
|
|
luajit "git.sharkk.net/Sky/LuaJIT-to-Go"
|
|
)
|
|
|
|
// Common errors
|
|
var (
|
|
ErrRunnerClosed = errors.New("lua runner is closed")
|
|
ErrInitFailed = errors.New("initialization failed")
|
|
ErrStateNotReady = errors.New("lua state not ready")
|
|
ErrTimeout = errors.New("operation timed out")
|
|
)
|
|
|
|
// RunnerOption defines a functional option for configuring the Runner
|
|
type RunnerOption func(*Runner)
|
|
|
|
// State wraps a Lua state with its sandbox
|
|
type State struct {
|
|
L *luajit.State // The Lua state
|
|
sandbox *Sandbox // Associated sandbox
|
|
index int // Index for debugging
|
|
inUse bool // Whether the state is currently in use
|
|
initTime time.Time // When this state was initialized
|
|
}
|
|
|
|
// InitHook runs before executing a script
|
|
type InitHook func(*luajit.State, *Context) error
|
|
|
|
// FinalizeHook runs after executing a script
|
|
type FinalizeHook func(*luajit.State, *Context, any) error
|
|
|
|
// ExecuteTask represents a task in the execution goroutine pool
|
|
type ExecuteTask struct {
|
|
bytecode []byte
|
|
context *Context
|
|
scriptPath string
|
|
result chan<- taskResult
|
|
}
|
|
|
|
// taskResult holds the result of an execution task
|
|
type taskResult struct {
|
|
value any
|
|
err error
|
|
}
|
|
|
|
// Runner runs Lua scripts using a pool of Lua states
|
|
type Runner struct {
|
|
states []*State // All states managed by this runner
|
|
statePool chan int // Pool of available state indexes
|
|
poolSize int // Size of the state pool
|
|
moduleLoader *ModuleLoader // Module loader
|
|
isRunning atomic.Bool // Whether the runner is active
|
|
mu sync.RWMutex // Mutex for thread safety
|
|
debug bool // Enable debug logging
|
|
initHooks []InitHook // Hooks run before script execution
|
|
finalizeHooks []FinalizeHook // Hooks run after script execution
|
|
scriptDir string // Current script directory
|
|
pool *ants.Pool // Goroutine pool for task execution
|
|
}
|
|
|
|
// WithPoolSize sets the state pool size
|
|
func WithPoolSize(size int) RunnerOption {
|
|
return func(r *Runner) {
|
|
if size > 0 {
|
|
r.poolSize = size
|
|
}
|
|
}
|
|
}
|
|
|
|
// WithDebugEnabled enables debug output
|
|
func WithDebugEnabled() RunnerOption {
|
|
return func(r *Runner) {
|
|
r.debug = true
|
|
}
|
|
}
|
|
|
|
// WithLibDirs sets additional library directories
|
|
func WithLibDirs(dirs ...string) RunnerOption {
|
|
return func(r *Runner) {
|
|
if r.moduleLoader == nil {
|
|
r.moduleLoader = NewModuleLoader(&ModuleConfig{
|
|
LibDirs: dirs,
|
|
})
|
|
} else {
|
|
r.moduleLoader.config.LibDirs = dirs
|
|
}
|
|
}
|
|
}
|
|
|
|
// WithInitHook adds a hook to run before script execution
|
|
func WithInitHook(hook InitHook) RunnerOption {
|
|
return func(r *Runner) {
|
|
r.initHooks = append(r.initHooks, hook)
|
|
}
|
|
}
|
|
|
|
// WithFinalizeHook adds a hook to run after script execution
|
|
func WithFinalizeHook(hook FinalizeHook) RunnerOption {
|
|
return func(r *Runner) {
|
|
r.finalizeHooks = append(r.finalizeHooks, hook)
|
|
}
|
|
}
|
|
|
|
// NewRunner creates a new Runner with a pool of states
|
|
func NewRunner(options ...RunnerOption) (*Runner, error) {
|
|
// Default configuration
|
|
runner := &Runner{
|
|
poolSize: runtime.GOMAXPROCS(0),
|
|
debug: false,
|
|
initHooks: make([]InitHook, 0, 4),
|
|
finalizeHooks: make([]FinalizeHook, 0, 4),
|
|
}
|
|
|
|
// Apply options
|
|
for _, opt := range options {
|
|
opt(runner)
|
|
}
|
|
|
|
// Set up module loader if not already initialized
|
|
if runner.moduleLoader == nil {
|
|
config := &ModuleConfig{
|
|
ScriptDir: "",
|
|
LibDirs: []string{},
|
|
}
|
|
runner.moduleLoader = NewModuleLoader(config)
|
|
}
|
|
|
|
// Initialize states and pool
|
|
runner.states = make([]*State, runner.poolSize)
|
|
runner.statePool = make(chan int, runner.poolSize)
|
|
|
|
// Create ants goroutine pool
|
|
var err error
|
|
runner.pool, err = ants.NewPool(runner.poolSize * 2)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Create and initialize all states
|
|
if err := runner.initializeStates(); err != nil {
|
|
runner.Close() // Clean up already created states
|
|
return nil, err
|
|
}
|
|
|
|
runner.isRunning.Store(true)
|
|
return runner, nil
|
|
}
|
|
|
|
// debugLog logs a message if debug mode is enabled
|
|
func (r *Runner) debugLog(format string, args ...interface{}) {
|
|
if r.debug {
|
|
logger.Debug("Runner "+format, args...)
|
|
}
|
|
}
|
|
|
|
func (r *Runner) debugLogCont(format string, args ...interface{}) {
|
|
if r.debug {
|
|
logger.DebugCont(format, args...)
|
|
}
|
|
}
|
|
|
|
// initializeStates creates and initializes all states in the pool
|
|
func (r *Runner) initializeStates() error {
|
|
r.debugLog("is initializing %d states", r.poolSize)
|
|
|
|
// Create main template state first with full logging
|
|
templateState, err := r.createState(0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
r.states[0] = templateState
|
|
r.statePool <- 0 // Add index to the pool
|
|
|
|
// Create remaining states with minimal logging
|
|
successCount := 1
|
|
for i := 1; i < r.poolSize; i++ {
|
|
state, err := r.createState(i)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
r.states[i] = state
|
|
r.statePool <- i // Add index to the pool
|
|
successCount++
|
|
}
|
|
|
|
r.debugLog("has built %d/%d states successfully", successCount, r.poolSize)
|
|
return nil
|
|
}
|
|
|
|
// createState initializes a new Lua state
|
|
func (r *Runner) createState(index int) (*State, error) {
|
|
verbose := index == 0
|
|
if verbose {
|
|
r.debugLog("Creating Lua state %d", index)
|
|
}
|
|
|
|
// Create a new state
|
|
L := luajit.New()
|
|
if L == nil {
|
|
return nil, errors.New("failed to create Lua state")
|
|
}
|
|
|
|
// Create sandbox
|
|
sandbox := NewSandbox()
|
|
if r.debug && verbose {
|
|
sandbox.EnableDebug()
|
|
}
|
|
|
|
// Set up require system
|
|
if err := r.moduleLoader.SetupRequire(L); err != nil {
|
|
if verbose {
|
|
r.debugLogCont("Failed to set up require for state %d: %v", index, err)
|
|
}
|
|
L.Cleanup()
|
|
L.Close()
|
|
return nil, ErrInitFailed
|
|
}
|
|
|
|
// Initialize all core modules from the registry
|
|
if err := GlobalRegistry.Initialize(L, index); err != nil {
|
|
if verbose {
|
|
r.debugLogCont("Failed to initialize core modules for state %d: %v", index, err)
|
|
}
|
|
L.Cleanup()
|
|
L.Close()
|
|
return nil, ErrInitFailed
|
|
}
|
|
|
|
// Set up sandbox after core modules are initialized
|
|
if err := sandbox.Setup(L, index); err != nil {
|
|
if verbose {
|
|
r.debugLogCont("Failed to set up sandbox for state %d: %v", index, err)
|
|
}
|
|
L.Cleanup()
|
|
L.Close()
|
|
return nil, ErrInitFailed
|
|
}
|
|
|
|
// Preload all modules
|
|
if err := r.moduleLoader.PreloadModules(L); err != nil {
|
|
if verbose {
|
|
r.debugLogCont("Failed to preload modules for state %d: %v", index, err)
|
|
}
|
|
L.Cleanup()
|
|
L.Close()
|
|
return nil, errors.New("failed to preload modules")
|
|
}
|
|
|
|
state := &State{
|
|
L: L,
|
|
sandbox: sandbox,
|
|
index: index,
|
|
inUse: false,
|
|
initTime: time.Now(),
|
|
}
|
|
|
|
if verbose {
|
|
r.debugLog("State %d created successfully", index)
|
|
}
|
|
return state, nil
|
|
}
|
|
|
|
// executeTask is the worker function for the ants pool
|
|
func (r *Runner) executeTask(i interface{}) {
|
|
task, ok := i.(*ExecuteTask)
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
// Set script directory if provided
|
|
if task.scriptPath != "" {
|
|
r.mu.Lock()
|
|
r.scriptDir = filepath.Dir(task.scriptPath)
|
|
r.moduleLoader.SetScriptDir(r.scriptDir)
|
|
r.mu.Unlock()
|
|
}
|
|
|
|
// Get a state index from the pool
|
|
var stateIndex int
|
|
select {
|
|
case stateIndex = <-r.statePool:
|
|
// Got a state
|
|
case <-time.After(5 * time.Second): // 5-second timeout
|
|
// Timed out waiting for a state
|
|
task.result <- taskResult{nil, errors.New("server busy - timed out waiting for a Lua state")}
|
|
return
|
|
}
|
|
|
|
// Get the actual state
|
|
r.mu.RLock()
|
|
state := r.states[stateIndex]
|
|
r.mu.RUnlock()
|
|
|
|
if state == nil {
|
|
r.statePool <- stateIndex
|
|
task.result <- taskResult{nil, ErrStateNotReady}
|
|
return
|
|
}
|
|
|
|
// Mark state as in use
|
|
state.inUse = true
|
|
|
|
// Ensure state is returned to pool when done
|
|
defer func() {
|
|
state.inUse = false
|
|
if r.isRunning.Load() {
|
|
select {
|
|
case r.statePool <- stateIndex:
|
|
// State returned to pool
|
|
default:
|
|
// Pool is full or closed (shouldn't happen)
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Copy hooks to avoid holding lock during execution
|
|
r.mu.RLock()
|
|
initHooks := make([]InitHook, len(r.initHooks))
|
|
copy(initHooks, r.initHooks)
|
|
finalizeHooks := make([]FinalizeHook, len(r.finalizeHooks))
|
|
copy(finalizeHooks, r.finalizeHooks)
|
|
r.mu.RUnlock()
|
|
|
|
// Run init hooks
|
|
for _, hook := range initHooks {
|
|
if err := hook(state.L, task.context); err != nil {
|
|
task.result <- taskResult{nil, err}
|
|
return
|
|
}
|
|
}
|
|
|
|
// Prepare context values
|
|
var ctxValues map[string]any
|
|
if task.context != nil {
|
|
ctxValues = task.context.Values
|
|
}
|
|
|
|
// Execute in sandbox
|
|
result, err := state.sandbox.Execute(state.L, task.bytecode, ctxValues)
|
|
|
|
// Run finalize hooks
|
|
for _, hook := range finalizeHooks {
|
|
if hookErr := hook(state.L, task.context, result); hookErr != nil && err == nil {
|
|
err = hookErr
|
|
}
|
|
}
|
|
|
|
task.result <- taskResult{result, err}
|
|
}
|
|
|
|
// Execute runs a script with context
|
|
func (r *Runner) Execute(ctx context.Context, bytecode []byte, execCtx *Context, scriptPath string) (any, error) {
|
|
if !r.isRunning.Load() {
|
|
return nil, ErrRunnerClosed
|
|
}
|
|
|
|
// Create result channel
|
|
resultChan := make(chan taskResult, 1)
|
|
|
|
// Create task
|
|
task := &ExecuteTask{
|
|
bytecode: bytecode,
|
|
context: execCtx,
|
|
scriptPath: scriptPath,
|
|
result: resultChan,
|
|
}
|
|
|
|
// Submit task to pool
|
|
if err := r.pool.Submit(func() { r.executeTask(task) }); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Wait for result with context timeout
|
|
select {
|
|
case result := <-resultChan:
|
|
return result.value, result.err
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
|
|
// Run executes a Lua script (convenience wrapper)
|
|
func (r *Runner) Run(bytecode []byte, execCtx *Context, scriptPath string) (any, error) {
|
|
return r.Execute(context.Background(), bytecode, execCtx, scriptPath)
|
|
}
|
|
|
|
// Close gracefully shuts down the Runner
|
|
func (r *Runner) Close() error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
if !r.isRunning.Load() {
|
|
return ErrRunnerClosed
|
|
}
|
|
|
|
r.isRunning.Store(false)
|
|
r.debugLog("Closing Runner and destroying all states")
|
|
|
|
// Shut down goroutine pool
|
|
r.pool.Release()
|
|
|
|
// Drain the state pool
|
|
r.drainStatePool()
|
|
|
|
// Clean up all states
|
|
for i, state := range r.states {
|
|
if state != nil {
|
|
state.L.Cleanup()
|
|
state.L.Close()
|
|
r.states[i] = nil
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// drainStatePool removes all states from the pool
|
|
func (r *Runner) drainStatePool() {
|
|
for {
|
|
select {
|
|
case <-r.statePool:
|
|
// Drain one state
|
|
default:
|
|
// Pool is empty
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// RefreshStates rebuilds all states in the pool
|
|
func (r *Runner) RefreshStates() error {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
if !r.isRunning.Load() {
|
|
return ErrRunnerClosed
|
|
}
|
|
|
|
r.debugLog("Refreshing all Lua states")
|
|
|
|
// Drain all states from the pool
|
|
r.drainStatePool()
|
|
|
|
// Destroy all existing states
|
|
for i, state := range r.states {
|
|
if state != nil {
|
|
if state.inUse {
|
|
r.debugLog("Warning: attempting to refresh state %d that is in use", i)
|
|
}
|
|
state.L.Cleanup()
|
|
state.L.Close()
|
|
r.states[i] = nil
|
|
}
|
|
}
|
|
|
|
// Reinitialize all states
|
|
if err := r.initializeStates(); err != nil {
|
|
return err
|
|
}
|
|
|
|
r.debugLog("All states refreshed successfully")
|
|
return nil
|
|
}
|
|
|
|
// NotifyFileChanged handles file change notifications
|
|
func (r *Runner) NotifyFileChanged(filePath string) bool {
|
|
r.debugLog("File change detected: %s", filePath)
|
|
|
|
// Check if it's a module file
|
|
module, isModule := r.moduleLoader.GetModuleByPath(filePath)
|
|
if isModule {
|
|
r.debugLog("File is a module: %s", module)
|
|
return r.RefreshModule(module)
|
|
}
|
|
|
|
// For non-module files, refresh all states
|
|
if err := r.RefreshStates(); err != nil {
|
|
r.debugLog("Failed to refresh states: %v", err)
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// RefreshModule refreshes a specific module across all states
|
|
func (r *Runner) RefreshModule(moduleName string) bool {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
if !r.isRunning.Load() {
|
|
return false
|
|
}
|
|
|
|
r.debugLog("Refreshing module: %s", moduleName)
|
|
|
|
// Check if it's a core module
|
|
coreName, isCore := GlobalRegistry.MatchModuleName(moduleName)
|
|
|
|
success := true
|
|
for _, state := range r.states {
|
|
if state == nil {
|
|
continue
|
|
}
|
|
|
|
// Skip states that are in use
|
|
if state.inUse {
|
|
r.debugLog("Skipping refresh for state %d (in use)", state.index)
|
|
success = false
|
|
continue
|
|
}
|
|
|
|
// Invalidate module in Lua
|
|
if err := state.L.DoString(`package.loaded["` + moduleName + `"] = nil`); err != nil {
|
|
r.debugLog("Failed to invalidate module %s in state %d: %v",
|
|
moduleName, state.index, err)
|
|
success = false
|
|
continue
|
|
}
|
|
|
|
// For core modules, reinitialize them
|
|
if isCore {
|
|
if err := GlobalRegistry.InitializeModule(state.L, coreName); err != nil {
|
|
r.debugLog("Failed to reinitialize core module %s in state %d: %v",
|
|
coreName, state.index, err)
|
|
success = false
|
|
}
|
|
}
|
|
}
|
|
|
|
if success {
|
|
r.debugLog("Module %s refreshed successfully in all states", moduleName)
|
|
} else {
|
|
r.debugLog("Module %s refresh had some failures", moduleName)
|
|
}
|
|
|
|
return success
|
|
}
|
|
|
|
// AddModule adds a module to all sandbox environments
|
|
func (r *Runner) AddModule(name string, module any) {
|
|
r.debugLog("Adding module %s to all sandboxes", name)
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
for _, state := range r.states {
|
|
if state != nil && state.sandbox != nil && !state.inUse {
|
|
state.sandbox.AddModule(name, module)
|
|
}
|
|
}
|
|
}
|
|
|
|
// AddInitHook adds a hook to be called before script execution
|
|
func (r *Runner) AddInitHook(hook InitHook) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
r.initHooks = append(r.initHooks, hook)
|
|
}
|
|
|
|
// AddFinalizeHook adds a hook to be called after script execution
|
|
func (r *Runner) AddFinalizeHook(hook FinalizeHook) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
r.finalizeHooks = append(r.finalizeHooks, hook)
|
|
}
|
|
|
|
// ResetModuleCache clears the module cache in all states
|
|
func (r *Runner) ResetModuleCache() {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
if !r.isRunning.Load() {
|
|
return
|
|
}
|
|
|
|
r.debugLog("Resetting module cache in all states")
|
|
|
|
for _, state := range r.states {
|
|
if state != nil && !state.inUse {
|
|
r.moduleLoader.ResetModules(state.L)
|
|
}
|
|
}
|
|
}
|
|
|
|
// GetStateCount returns the number of initialized states
|
|
func (r *Runner) GetStateCount() int {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
count := 0
|
|
for _, state := range r.states {
|
|
if state != nil {
|
|
count++
|
|
}
|
|
}
|
|
|
|
return count
|
|
}
|
|
|
|
// GetActiveStateCount returns the number of states currently in use
|
|
func (r *Runner) GetActiveStateCount() int {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
count := 0
|
|
for _, state := range r.states {
|
|
if state != nil && state.inUse {
|
|
count++
|
|
}
|
|
}
|
|
|
|
return count
|
|
}
|
|
|
|
// GetWorkerPoolStats returns statistics about the worker pool
|
|
func (r *Runner) GetWorkerPoolStats() (running, capacity int) {
|
|
return r.pool.Running(), r.pool.Cap()
|
|
}
|
|
|
|
// GetModuleCount returns the number of loaded modules in the first available state
|
|
func (r *Runner) GetModuleCount() int {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
if !r.isRunning.Load() {
|
|
return 0
|
|
}
|
|
|
|
// Find first available state
|
|
for _, state := range r.states {
|
|
if state != nil && !state.inUse {
|
|
// Execute a Lua snippet to count modules
|
|
if res, err := state.L.ExecuteWithResult(`
|
|
local count = 0
|
|
for _ in pairs(package.loaded) do
|
|
count = count + 1
|
|
end
|
|
return count
|
|
`); err == nil {
|
|
if num, ok := res.(float64); ok {
|
|
return int(num)
|
|
}
|
|
}
|
|
break
|
|
}
|
|
}
|
|
|
|
return 0
|
|
}
|
|
|
|
// GetBufferPool returns a buffer from the bytebufferpool
|
|
func GetBufferPool() *bytebufferpool.ByteBuffer {
|
|
return bytebufferpool.Get()
|
|
}
|
|
|
|
// ReleaseBufferPool returns a buffer to the bytebufferpool
|
|
func ReleaseBufferPool(buf *bytebufferpool.ByteBuffer) {
|
|
bytebufferpool.Put(buf)
|
|
}
|