Compare commits
No commits in common. "78337988bd7e2a3d004a6f83e72eb0ddc4951084" and "d6feb408ce8e52b1cca103d499f68c2d8ba0c7e6" have entirely different histories.
78337988bd
...
d6feb408ce
|
@ -4,7 +4,6 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
|
@ -31,19 +30,17 @@ var resultChanPool = sync.Pool{
|
|||
},
|
||||
}
|
||||
|
||||
// LuaRunner runs Lua scripts using multiple Lua states in a round-robin fashion
|
||||
// LuaRunner runs Lua scripts using a single Lua state
|
||||
type LuaRunner struct {
|
||||
states []*luajit.State // Multiple Lua states for parallel execution
|
||||
jobQueues []chan job // Each state has its own job queue
|
||||
workerCount int // Number of worker states (default 4)
|
||||
nextWorker int32 // Atomic counter for round-robin distribution
|
||||
state *luajit.State // The Lua state
|
||||
jobQueue chan job // Channel for incoming jobs
|
||||
isRunning atomic.Bool // Flag indicating if the runner is active
|
||||
mu sync.RWMutex // Mutex for thread safety
|
||||
wg sync.WaitGroup // WaitGroup for clean shutdown
|
||||
initFunc StateInitFunc // Optional function to initialize Lua states
|
||||
bufferSize int // Size of each job queue buffer
|
||||
initFunc StateInitFunc // Optional function to initialize Lua state
|
||||
bufferSize int // Size of the job queue buffer
|
||||
moduleLoader *NativeModuleLoader // Native module loader for require
|
||||
sandboxes []*Sandbox // Sandbox for each state
|
||||
sandbox *Sandbox // The sandbox environment
|
||||
debug bool // Enable debug logging
|
||||
}
|
||||
|
||||
|
@ -83,22 +80,13 @@ func WithDebugEnabled() RunnerOption {
|
|||
}
|
||||
}
|
||||
|
||||
// WithWorkerCount sets the number of worker states (min 1)
|
||||
func WithWorkerCount(count int) RunnerOption {
|
||||
return func(r *LuaRunner) {
|
||||
if count > 0 {
|
||||
r.workerCount = count
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewRunner creates a new LuaRunner with multiple worker states
|
||||
// NewRunner creates a new LuaRunner
|
||||
func NewRunner(options ...RunnerOption) (*LuaRunner, error) {
|
||||
// Default configuration
|
||||
runner := &LuaRunner{
|
||||
bufferSize: 10,
|
||||
workerCount: runtime.GOMAXPROCS(0),
|
||||
debug: false,
|
||||
bufferSize: 10, // Default buffer size
|
||||
sandbox: NewSandbox(),
|
||||
debug: false,
|
||||
}
|
||||
|
||||
// Apply options
|
||||
|
@ -106,10 +94,9 @@ func NewRunner(options ...RunnerOption) (*LuaRunner, error) {
|
|||
opt(runner)
|
||||
}
|
||||
|
||||
// Initialize states and job queues
|
||||
runner.states = make([]*luajit.State, runner.workerCount)
|
||||
runner.jobQueues = make([]chan job, runner.workerCount)
|
||||
runner.sandboxes = make([]*Sandbox, runner.workerCount)
|
||||
// Create job queue
|
||||
runner.jobQueue = make(chan job, runner.bufferSize)
|
||||
runner.isRunning.Store(true)
|
||||
|
||||
// Set up module loader if not already initialized
|
||||
if runner.moduleLoader == nil {
|
||||
|
@ -120,28 +107,14 @@ func NewRunner(options ...RunnerOption) (*LuaRunner, error) {
|
|||
runner.moduleLoader = NewNativeModuleLoader(requireConfig)
|
||||
}
|
||||
|
||||
// Create job queues and initialize states
|
||||
for i := 0; i < runner.workerCount; i++ {
|
||||
// Create job queue
|
||||
runner.jobQueues[i] = make(chan job, runner.bufferSize)
|
||||
|
||||
// Create sandbox
|
||||
runner.sandboxes[i] = NewSandbox()
|
||||
|
||||
// Initialize state
|
||||
if err := runner.initState(i, true); err != nil {
|
||||
// Clean up if initialization fails
|
||||
runner.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Start worker goroutine
|
||||
runner.wg.Add(1)
|
||||
go runner.processJobs(i)
|
||||
// Initialize Lua state
|
||||
if err := runner.initState(true); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
runner.isRunning.Store(true)
|
||||
runner.nextWorker = 0
|
||||
// Start the event loop
|
||||
runner.wg.Add(1)
|
||||
go runner.processJobs()
|
||||
|
||||
return runner, nil
|
||||
}
|
||||
|
@ -153,17 +126,17 @@ func (r *LuaRunner) debugLog(format string, args ...interface{}) {
|
|||
}
|
||||
}
|
||||
|
||||
// initState initializes or reinitializes a specific Lua state
|
||||
func (r *LuaRunner) initState(workerIndex int, initial bool) error {
|
||||
r.debugLog("Initializing Lua state %d (initial=%v)", workerIndex, initial)
|
||||
// initState initializes or reinitializes the Lua state
|
||||
func (r *LuaRunner) initState(initial bool) error {
|
||||
r.debugLog("Initializing Lua state (initial=%v)", initial)
|
||||
|
||||
// Clean up existing state if there is one
|
||||
if r.states[workerIndex] != nil {
|
||||
r.debugLog("Cleaning up existing state %d", workerIndex)
|
||||
if r.state != nil {
|
||||
r.debugLog("Cleaning up existing state")
|
||||
// Always call Cleanup before Close to properly free function pointers
|
||||
r.states[workerIndex].Cleanup()
|
||||
r.states[workerIndex].Close()
|
||||
r.states[workerIndex] = nil
|
||||
r.state.Cleanup()
|
||||
r.state.Close()
|
||||
r.state = nil
|
||||
}
|
||||
|
||||
// Create fresh state
|
||||
|
@ -171,25 +144,25 @@ func (r *LuaRunner) initState(workerIndex int, initial bool) error {
|
|||
if state == nil {
|
||||
return errors.New("failed to create Lua state")
|
||||
}
|
||||
r.debugLog("Created new Lua state %d", workerIndex)
|
||||
r.debugLog("Created new Lua state")
|
||||
|
||||
// Set up require paths and mechanism
|
||||
if err := r.moduleLoader.SetupRequire(state); err != nil {
|
||||
r.debugLog("Failed to set up require for state %d: %v", workerIndex, err)
|
||||
r.debugLog("Failed to set up require: %v", err)
|
||||
state.Cleanup()
|
||||
state.Close()
|
||||
return ErrInitFailed
|
||||
}
|
||||
r.debugLog("Require system initialized for state %d", workerIndex)
|
||||
r.debugLog("Require system initialized")
|
||||
|
||||
// Initialize all core modules from the registry
|
||||
if err := GlobalRegistry.Initialize(state); err != nil {
|
||||
r.debugLog("Failed to initialize core modules for state %d: %v", workerIndex, err)
|
||||
r.debugLog("Failed to initialize core modules: %v", err)
|
||||
state.Cleanup()
|
||||
state.Close()
|
||||
return ErrInitFailed
|
||||
}
|
||||
r.debugLog("Core modules initialized for state %d", workerIndex)
|
||||
r.debugLog("Core modules initialized")
|
||||
|
||||
// Check if http module is properly registered
|
||||
testResult, err := state.ExecuteWithResult(`
|
||||
|
@ -201,42 +174,42 @@ func (r *LuaRunner) initState(workerIndex int, initial bool) error {
|
|||
end
|
||||
`)
|
||||
if err != nil || testResult != true {
|
||||
r.debugLog("HTTP module verification failed for state %d: %v, result: %v", workerIndex, err, testResult)
|
||||
r.debugLog("HTTP module verification failed: %v, result: %v", err, testResult)
|
||||
} else {
|
||||
r.debugLog("HTTP module verified OK for state %d", workerIndex)
|
||||
r.debugLog("HTTP module verified OK")
|
||||
}
|
||||
|
||||
// Verify __http_request function
|
||||
testResult, _ = state.ExecuteWithResult(`return type(__http_request)`)
|
||||
r.debugLog("__http_request function for state %d is of type: %v", workerIndex, testResult)
|
||||
r.debugLog("__http_request function is of type: %v", testResult)
|
||||
|
||||
// Set up sandbox after core modules are initialized
|
||||
if err := r.sandboxes[workerIndex].Setup(state); err != nil {
|
||||
r.debugLog("Failed to set up sandbox for state %d: %v", workerIndex, err)
|
||||
if err := r.sandbox.Setup(state); err != nil {
|
||||
r.debugLog("Failed to set up sandbox: %v", err)
|
||||
state.Cleanup()
|
||||
state.Close()
|
||||
return ErrInitFailed
|
||||
}
|
||||
r.debugLog("Sandbox environment set up for state %d", workerIndex)
|
||||
r.debugLog("Sandbox environment set up")
|
||||
|
||||
// Preload all modules into package.loaded
|
||||
if err := r.moduleLoader.PreloadAllModules(state); err != nil {
|
||||
r.debugLog("Failed to preload modules for state %d: %v", workerIndex, err)
|
||||
r.debugLog("Failed to preload modules: %v", err)
|
||||
state.Cleanup()
|
||||
state.Close()
|
||||
return errors.New("failed to preload modules")
|
||||
}
|
||||
r.debugLog("All modules preloaded for state %d", workerIndex)
|
||||
r.debugLog("All modules preloaded")
|
||||
|
||||
// Run init function if provided
|
||||
if r.initFunc != nil {
|
||||
if err := r.initFunc(state); err != nil {
|
||||
r.debugLog("Custom init function failed for state %d: %v", workerIndex, err)
|
||||
r.debugLog("Custom init function failed: %v", err)
|
||||
state.Cleanup()
|
||||
state.Close()
|
||||
return ErrInitFailed
|
||||
}
|
||||
r.debugLog("Custom init function completed for state %d", workerIndex)
|
||||
r.debugLog("Custom init function completed")
|
||||
}
|
||||
|
||||
// Test for HTTP module again after full initialization
|
||||
|
@ -249,31 +222,31 @@ func (r *LuaRunner) initState(workerIndex int, initial bool) error {
|
|||
end
|
||||
`)
|
||||
if err != nil || testResult != true {
|
||||
r.debugLog("Final HTTP module verification failed for state %d: %v, result: %v", workerIndex, err, testResult)
|
||||
r.debugLog("Final HTTP module verification failed: %v, result: %v", err, testResult)
|
||||
} else {
|
||||
r.debugLog("Final HTTP module verification OK for state %d", workerIndex)
|
||||
r.debugLog("Final HTTP module verification OK")
|
||||
}
|
||||
|
||||
r.states[workerIndex] = state
|
||||
r.debugLog("State %d initialization complete", workerIndex)
|
||||
r.state = state
|
||||
r.debugLog("State initialization complete")
|
||||
return nil
|
||||
}
|
||||
|
||||
// processJobs handles the job queue for a specific worker
|
||||
func (r *LuaRunner) processJobs(workerIndex int) {
|
||||
// processJobs handles the job queue
|
||||
func (r *LuaRunner) processJobs() {
|
||||
defer r.wg.Done()
|
||||
defer func() {
|
||||
if r.states[workerIndex] != nil {
|
||||
r.debugLog("Cleaning up Lua state %d in processJobs", workerIndex)
|
||||
r.states[workerIndex].Cleanup()
|
||||
r.states[workerIndex].Close()
|
||||
r.states[workerIndex] = nil
|
||||
if r.state != nil {
|
||||
r.debugLog("Cleaning up Lua state in processJobs")
|
||||
r.state.Cleanup()
|
||||
r.state.Close()
|
||||
r.state = nil
|
||||
}
|
||||
}()
|
||||
|
||||
for job := range r.jobQueues[workerIndex] {
|
||||
for job := range r.jobQueue {
|
||||
// Execute the job and send result
|
||||
result := r.executeJob(workerIndex, job)
|
||||
result := r.executeJob(job)
|
||||
select {
|
||||
case job.Result <- result:
|
||||
// Result sent successfully
|
||||
|
@ -284,7 +257,7 @@ func (r *LuaRunner) processJobs(workerIndex int) {
|
|||
}
|
||||
|
||||
// executeJob runs a script in the sandbox environment
|
||||
func (r *LuaRunner) executeJob(workerIndex int, j job) JobResult {
|
||||
func (r *LuaRunner) executeJob(j job) JobResult {
|
||||
// If the job has a script path, update script dir for module resolution
|
||||
if j.ScriptPath != "" {
|
||||
r.mu.Lock()
|
||||
|
@ -299,16 +272,14 @@ func (r *LuaRunner) executeJob(workerIndex int, j job) JobResult {
|
|||
}
|
||||
|
||||
r.mu.RLock()
|
||||
state := r.states[workerIndex]
|
||||
sandbox := r.sandboxes[workerIndex]
|
||||
r.mu.RUnlock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
if state == nil {
|
||||
if r.state == nil {
|
||||
return JobResult{nil, errors.New("lua state is not initialized")}
|
||||
}
|
||||
|
||||
// Execute in sandbox
|
||||
value, err := sandbox.Execute(state, j.Bytecode, ctx)
|
||||
value, err := r.sandbox.Execute(r.state, j.Bytecode, ctx)
|
||||
return JobResult{value, err}
|
||||
}
|
||||
|
||||
|
@ -340,14 +311,10 @@ func (r *LuaRunner) RunWithContext(ctx context.Context, bytecode []byte, execCtx
|
|||
Result: resultChan,
|
||||
}
|
||||
|
||||
// Choose worker in round-robin fashion
|
||||
workerIndex := int(atomic.AddInt32(&r.nextWorker, 1) % int32(r.workerCount))
|
||||
|
||||
// Submit job with context
|
||||
select {
|
||||
case r.jobQueues[workerIndex] <- j:
|
||||
case r.jobQueue <- j:
|
||||
// Job submitted
|
||||
r.debugLog("Job submitted to worker %d", workerIndex)
|
||||
case <-ctx.Done():
|
||||
// Return the channel to the pool before exiting
|
||||
resultChanPool.Put(resultChan)
|
||||
|
@ -386,15 +353,9 @@ func (r *LuaRunner) Close() error {
|
|||
}
|
||||
|
||||
r.isRunning.Store(false)
|
||||
close(r.jobQueue)
|
||||
|
||||
// Close all job queues
|
||||
for i := 0; i < r.workerCount; i++ {
|
||||
if r.jobQueues[i] != nil {
|
||||
close(r.jobQueues[i])
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for all workers to finish
|
||||
// Wait for event loop to finish
|
||||
r.wg.Wait()
|
||||
|
||||
return nil
|
||||
|
@ -407,90 +368,63 @@ func (r *LuaRunner) NotifyFileChanged(filePath string) bool {
|
|||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
// Reset all states on file changes
|
||||
success := true
|
||||
for i := 0; i < r.workerCount; i++ {
|
||||
err := r.initState(i, false)
|
||||
if err != nil {
|
||||
r.debugLog("Failed to reinitialize state %d: %v", i, err)
|
||||
success = false
|
||||
} else {
|
||||
r.debugLog("State %d successfully reinitialized", i)
|
||||
}
|
||||
// Reset the entire state on file changes
|
||||
err := r.initState(false)
|
||||
if err != nil {
|
||||
r.debugLog("Failed to reinitialize state: %v", err)
|
||||
return false
|
||||
}
|
||||
|
||||
return success
|
||||
r.debugLog("State successfully reinitialized")
|
||||
return true
|
||||
}
|
||||
|
||||
// ResetModuleCache clears non-core modules from package.loaded in all states
|
||||
// ResetModuleCache clears non-core modules from package.loaded
|
||||
func (r *LuaRunner) ResetModuleCache() {
|
||||
if r.moduleLoader != nil {
|
||||
r.debugLog("Resetting module cache in all states")
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
for i := 0; i < r.workerCount; i++ {
|
||||
if r.states[i] != nil {
|
||||
r.moduleLoader.ResetModules(r.states[i])
|
||||
}
|
||||
}
|
||||
r.debugLog("Resetting module cache")
|
||||
r.moduleLoader.ResetModules(r.state)
|
||||
}
|
||||
}
|
||||
|
||||
// ReloadAllModules reloads all modules into package.loaded in all states
|
||||
// ReloadAllModules reloads all modules into package.loaded
|
||||
func (r *LuaRunner) ReloadAllModules() error {
|
||||
if r.moduleLoader != nil {
|
||||
r.debugLog("Reloading all modules in all states")
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
for i := 0; i < r.workerCount; i++ {
|
||||
if r.states[i] != nil {
|
||||
if err := r.moduleLoader.PreloadAllModules(r.states[i]); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
r.debugLog("Reloading all modules")
|
||||
return r.moduleLoader.PreloadAllModules(r.state)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// RefreshModuleByName invalidates a specific module in package.loaded in all states
|
||||
// RefreshModuleByName invalidates a specific module in package.loaded
|
||||
func (r *LuaRunner) RefreshModuleByName(modName string) bool {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
success := true
|
||||
for i := 0; i < r.workerCount; i++ {
|
||||
if r.states[i] != nil {
|
||||
r.debugLog("Refreshing module %s in state %d", modName, i)
|
||||
if err := r.states[i].DoString(`package.loaded["` + modName + `"] = nil`); err != nil {
|
||||
success = false
|
||||
}
|
||||
if r.state != nil {
|
||||
r.debugLog("Refreshing module: %s", modName)
|
||||
if err := r.state.DoString(`package.loaded["` + modName + `"] = nil`); err != nil {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
return success
|
||||
return false
|
||||
}
|
||||
|
||||
// AddModule adds a module to all sandbox environments
|
||||
// AddModule adds a module to the sandbox environment
|
||||
func (r *LuaRunner) AddModule(name string, module any) {
|
||||
r.debugLog("Adding module %s to all sandboxes", name)
|
||||
for i := 0; i < r.workerCount; i++ {
|
||||
r.sandboxes[i].AddModule(name, module)
|
||||
}
|
||||
r.debugLog("Adding module: %s", name)
|
||||
r.sandbox.AddModule(name, module)
|
||||
}
|
||||
|
||||
// GetModuleCount returns the number of loaded modules in the first state
|
||||
// GetModuleCount returns the number of loaded modules
|
||||
func (r *LuaRunner) GetModuleCount() int {
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
count := 0
|
||||
|
||||
// Get module count from the first Lua state
|
||||
if r.states[0] != nil {
|
||||
// Get module count from Lua
|
||||
if r.state != nil {
|
||||
// Execute a Lua snippet to count modules
|
||||
if res, err := r.states[0].ExecuteWithResult(`
|
||||
if res, err := r.state.ExecuteWithResult(`
|
||||
local count = 0
|
||||
for _ in pairs(package.loaded) do
|
||||
count = count + 1
|
||||
|
|
Loading…
Reference in New Issue
Block a user