goroutine runner 1
This commit is contained in:
parent
78337988bd
commit
6fadf26bea
22
Moonshark.go
22
Moonshark.go
|
@ -6,6 +6,7 @@ import (
|
|||
"fmt"
|
||||
"os"
|
||||
"os/signal"
|
||||
"runtime"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
|
@ -151,7 +152,7 @@ func main() {
|
|||
routesDir := cfg.GetString("routes_dir", "./routes")
|
||||
staticDir := cfg.GetString("static_dir", "./static")
|
||||
overrideDir := cfg.GetString("override_dir", "./override")
|
||||
bufferSize := cfg.GetInt("buffer_size", 20)
|
||||
poolSize := cfg.GetInt("pool_size", runtime.NumCPU())
|
||||
|
||||
if err := utils.EnsureDir(overrideDir); err != nil {
|
||||
logger.Warning("Override directory doesn't exist, and could not create it: %v", err)
|
||||
|
@ -181,16 +182,23 @@ func main() {
|
|||
logger.Fatal("Router initialization failed: %v", err)
|
||||
}
|
||||
|
||||
// Initialize Lua runner
|
||||
luaRunner, err := runner.NewRunner(
|
||||
runner.WithBufferSize(bufferSize),
|
||||
// Initialize Lua runner with options
|
||||
runnerOpts := []runner.RunnerOption{
|
||||
runner.WithPoolSize(poolSize),
|
||||
runner.WithLibDirs(libDirs...),
|
||||
runner.WithDebugEnabled(),
|
||||
)
|
||||
}
|
||||
|
||||
// Add debug option conditionally
|
||||
if debugMode {
|
||||
runnerOpts = append(runnerOpts, runner.WithDebugEnabled())
|
||||
logger.Debug("Debug logging enabled for Lua runner")
|
||||
}
|
||||
|
||||
luaRunner, err := runner.NewRunner(runnerOpts...)
|
||||
if err != nil {
|
||||
logger.Fatal("Failed to initialize Lua runner: %v", err)
|
||||
}
|
||||
logger.Server("Lua runner initialized with buffer size %d", bufferSize)
|
||||
logger.Server("Lua runner initialized with pool size %d", poolSize)
|
||||
defer luaRunner.Close()
|
||||
|
||||
// Set up file watchers if enabled
|
||||
|
|
|
@ -3,6 +3,7 @@ package config
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"runtime"
|
||||
|
||||
luajit "git.sharkk.net/Sky/LuaJIT-to-Go"
|
||||
)
|
||||
|
@ -22,6 +23,7 @@ type Config struct {
|
|||
|
||||
// Performance settings
|
||||
BufferSize int
|
||||
PoolSize int // Number of Lua states in the pool
|
||||
|
||||
// Feature flags
|
||||
HTTPLoggingEnabled bool
|
||||
|
@ -46,7 +48,7 @@ func New() *Config {
|
|||
LibDirs: []string{"./libs"},
|
||||
|
||||
// Performance defaults
|
||||
BufferSize: 20,
|
||||
PoolSize: runtime.NumCPU(),
|
||||
|
||||
// Feature flag defaults
|
||||
HTTPLoggingEnabled: true,
|
||||
|
@ -233,9 +235,9 @@ func processConfigValue(state *luajit.State, config *Config, key string, valueTy
|
|||
if strVal, ok := value.(string); ok {
|
||||
config.OverrideDir = strVal
|
||||
}
|
||||
case "buffer_size":
|
||||
case "pool_size":
|
||||
if numVal, ok := value.(float64); ok {
|
||||
config.BufferSize = int(numVal)
|
||||
config.PoolSize = int(numVal)
|
||||
}
|
||||
case "http_logging_enabled":
|
||||
if boolVal, ok := value.(bool); ok {
|
||||
|
@ -357,6 +359,8 @@ func (c *Config) GetInt(key string, defaultValue int) int {
|
|||
return c.Port
|
||||
case "buffer_size":
|
||||
return c.BufferSize
|
||||
case "pool_size":
|
||||
return c.PoolSize
|
||||
}
|
||||
|
||||
// Fall back to values map for other keys
|
||||
|
@ -608,11 +612,11 @@ func (c *Config) Set(key string, value any) {
|
|||
if strVal, ok := value.(string); ok {
|
||||
c.OverrideDir = strVal
|
||||
}
|
||||
case "buffer_size":
|
||||
case "pool_size":
|
||||
if numVal, ok := value.(float64); ok {
|
||||
c.BufferSize = int(numVal)
|
||||
c.PoolSize = int(numVal)
|
||||
} else if intVal, ok := value.(int); ok {
|
||||
c.BufferSize = intVal
|
||||
c.PoolSize = intVal
|
||||
}
|
||||
case "http_logging_enabled":
|
||||
if boolVal, ok := value.(bool); ok {
|
||||
|
|
|
@ -1,15 +0,0 @@
|
|||
package runner
|
||||
|
||||
// JobResult represents the result of a Lua script execution
|
||||
type JobResult struct {
|
||||
Value any // Return value from Lua
|
||||
Error error // Error if any
|
||||
}
|
||||
|
||||
// job represents a Lua script execution request
|
||||
type job struct {
|
||||
Bytecode []byte // Compiled LuaJIT bytecode
|
||||
Context *Context // Execution context
|
||||
ScriptPath string // Path to the original script (for require resolution)
|
||||
Result chan<- JobResult // Channel to send result back
|
||||
}
|
|
@ -24,34 +24,36 @@ type StateInitFunc func(*luajit.State) error
|
|||
// RunnerOption defines a functional option for configuring the LuaRunner
|
||||
type RunnerOption func(*LuaRunner)
|
||||
|
||||
// Result channel pool to reduce allocations
|
||||
var resultChanPool = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return make(chan JobResult, 1)
|
||||
},
|
||||
// JobResult represents the result of a Lua script execution
|
||||
type JobResult struct {
|
||||
Value any // Return value from Lua
|
||||
Error error // Error if any
|
||||
}
|
||||
|
||||
// LuaRunner runs Lua scripts using multiple Lua states in a round-robin fashion
|
||||
// StateWrapper wraps a Lua state with its sandbox
|
||||
type StateWrapper struct {
|
||||
state *luajit.State // The Lua state
|
||||
sandbox *Sandbox // Associated sandbox
|
||||
index int // Index for debugging
|
||||
}
|
||||
|
||||
// LuaRunner runs Lua scripts using a pool of Lua states
|
||||
type LuaRunner struct {
|
||||
states []*luajit.State // Multiple Lua states for parallel execution
|
||||
jobQueues []chan job // Each state has its own job queue
|
||||
workerCount int // Number of worker states (default 4)
|
||||
nextWorker int32 // Atomic counter for round-robin distribution
|
||||
states []*StateWrapper // Pool of Lua states
|
||||
stateSem chan int // Semaphore with state indexes
|
||||
poolSize int // Size of the state pool
|
||||
initFunc StateInitFunc // Optional function to initialize Lua states
|
||||
moduleLoader *NativeModuleLoader // Native module loader for require
|
||||
isRunning atomic.Bool // Flag indicating if the runner is active
|
||||
mu sync.RWMutex // Mutex for thread safety
|
||||
wg sync.WaitGroup // WaitGroup for clean shutdown
|
||||
initFunc StateInitFunc // Optional function to initialize Lua states
|
||||
bufferSize int // Size of each job queue buffer
|
||||
moduleLoader *NativeModuleLoader // Native module loader for require
|
||||
sandboxes []*Sandbox // Sandbox for each state
|
||||
debug bool // Enable debug logging
|
||||
}
|
||||
|
||||
// WithBufferSize sets the job queue buffer size
|
||||
func WithBufferSize(size int) RunnerOption {
|
||||
// WithPoolSize sets the state pool size
|
||||
func WithPoolSize(size int) RunnerOption {
|
||||
return func(r *LuaRunner) {
|
||||
if size > 0 {
|
||||
r.bufferSize = size
|
||||
r.poolSize = size
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -83,21 +85,11 @@ func WithDebugEnabled() RunnerOption {
|
|||
}
|
||||
}
|
||||
|
||||
// WithWorkerCount sets the number of worker states (min 1)
|
||||
func WithWorkerCount(count int) RunnerOption {
|
||||
return func(r *LuaRunner) {
|
||||
if count > 0 {
|
||||
r.workerCount = count
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NewRunner creates a new LuaRunner with multiple worker states
|
||||
// NewRunner creates a new LuaRunner with a pool of states
|
||||
func NewRunner(options ...RunnerOption) (*LuaRunner, error) {
|
||||
// Default configuration
|
||||
runner := &LuaRunner{
|
||||
bufferSize: 10,
|
||||
workerCount: runtime.GOMAXPROCS(0),
|
||||
poolSize: runtime.GOMAXPROCS(0), // Default pool size to number of CPUs
|
||||
debug: false,
|
||||
}
|
||||
|
||||
|
@ -106,11 +98,6 @@ func NewRunner(options ...RunnerOption) (*LuaRunner, error) {
|
|||
opt(runner)
|
||||
}
|
||||
|
||||
// Initialize states and job queues
|
||||
runner.states = make([]*luajit.State, runner.workerCount)
|
||||
runner.jobQueues = make([]chan job, runner.workerCount)
|
||||
runner.sandboxes = make([]*Sandbox, runner.workerCount)
|
||||
|
||||
// Set up module loader if not already initialized
|
||||
if runner.moduleLoader == nil {
|
||||
requireConfig := &RequireConfig{
|
||||
|
@ -120,28 +107,23 @@ func NewRunner(options ...RunnerOption) (*LuaRunner, error) {
|
|||
runner.moduleLoader = NewNativeModuleLoader(requireConfig)
|
||||
}
|
||||
|
||||
// Create job queues and initialize states
|
||||
for i := 0; i < runner.workerCount; i++ {
|
||||
// Create job queue
|
||||
runner.jobQueues[i] = make(chan job, runner.bufferSize)
|
||||
// Initialize states and semaphore
|
||||
runner.states = make([]*StateWrapper, runner.poolSize)
|
||||
runner.stateSem = make(chan int, runner.poolSize)
|
||||
|
||||
// Create sandbox
|
||||
runner.sandboxes[i] = NewSandbox()
|
||||
|
||||
// Initialize state
|
||||
if err := runner.initState(i, true); err != nil {
|
||||
// Clean up if initialization fails
|
||||
runner.Close()
|
||||
// Create and initialize all states
|
||||
for i := 0; i < runner.poolSize; i++ {
|
||||
wrapper, err := runner.initState(i)
|
||||
if err != nil {
|
||||
runner.Close() // Clean up already created states
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Start worker goroutine
|
||||
runner.wg.Add(1)
|
||||
go runner.processJobs(i)
|
||||
runner.states[i] = wrapper
|
||||
runner.stateSem <- i // Add index to semaphore
|
||||
}
|
||||
|
||||
runner.isRunning.Store(true)
|
||||
runner.nextWorker = 0
|
||||
|
||||
return runner, nil
|
||||
}
|
||||
|
@ -153,222 +135,158 @@ func (r *LuaRunner) debugLog(format string, args ...interface{}) {
|
|||
}
|
||||
}
|
||||
|
||||
// initState initializes or reinitializes a specific Lua state
|
||||
func (r *LuaRunner) initState(workerIndex int, initial bool) error {
|
||||
r.debugLog("Initializing Lua state %d (initial=%v)", workerIndex, initial)
|
||||
// initState creates and initializes a new state
|
||||
func (r *LuaRunner) initState(index int) (*StateWrapper, error) {
|
||||
r.debugLog("Initializing Lua state %d", index)
|
||||
|
||||
// Clean up existing state if there is one
|
||||
if r.states[workerIndex] != nil {
|
||||
r.debugLog("Cleaning up existing state %d", workerIndex)
|
||||
// Always call Cleanup before Close to properly free function pointers
|
||||
r.states[workerIndex].Cleanup()
|
||||
r.states[workerIndex].Close()
|
||||
r.states[workerIndex] = nil
|
||||
}
|
||||
|
||||
// Create fresh state
|
||||
// Create a new state
|
||||
state := luajit.New()
|
||||
if state == nil {
|
||||
return errors.New("failed to create Lua state")
|
||||
return nil, errors.New("failed to create Lua state")
|
||||
}
|
||||
r.debugLog("Created new Lua state %d", index)
|
||||
|
||||
// Create sandbox
|
||||
sandbox := NewSandbox()
|
||||
if r.debug {
|
||||
sandbox.EnableDebug()
|
||||
}
|
||||
r.debugLog("Created new Lua state %d", workerIndex)
|
||||
|
||||
// Set up require paths and mechanism
|
||||
if err := r.moduleLoader.SetupRequire(state); err != nil {
|
||||
r.debugLog("Failed to set up require for state %d: %v", workerIndex, err)
|
||||
r.debugLog("Failed to set up require for state %d: %v", index, err)
|
||||
state.Cleanup()
|
||||
state.Close()
|
||||
return ErrInitFailed
|
||||
return nil, ErrInitFailed
|
||||
}
|
||||
r.debugLog("Require system initialized for state %d", workerIndex)
|
||||
r.debugLog("Require system initialized for state %d", index)
|
||||
|
||||
// Initialize all core modules from the registry
|
||||
if err := GlobalRegistry.Initialize(state); err != nil {
|
||||
r.debugLog("Failed to initialize core modules for state %d: %v", workerIndex, err)
|
||||
r.debugLog("Failed to initialize core modules for state %d: %v", index, err)
|
||||
state.Cleanup()
|
||||
state.Close()
|
||||
return ErrInitFailed
|
||||
return nil, ErrInitFailed
|
||||
}
|
||||
r.debugLog("Core modules initialized for state %d", workerIndex)
|
||||
|
||||
// Check if http module is properly registered
|
||||
testResult, err := state.ExecuteWithResult(`
|
||||
if type(http) == "table" and type(http.client) == "table" and
|
||||
type(http.client.get) == "function" then
|
||||
return true
|
||||
else
|
||||
return false
|
||||
end
|
||||
`)
|
||||
if err != nil || testResult != true {
|
||||
r.debugLog("HTTP module verification failed for state %d: %v, result: %v", workerIndex, err, testResult)
|
||||
} else {
|
||||
r.debugLog("HTTP module verified OK for state %d", workerIndex)
|
||||
}
|
||||
|
||||
// Verify __http_request function
|
||||
testResult, _ = state.ExecuteWithResult(`return type(__http_request)`)
|
||||
r.debugLog("__http_request function for state %d is of type: %v", workerIndex, testResult)
|
||||
r.debugLog("Core modules initialized for state %d", index)
|
||||
|
||||
// Set up sandbox after core modules are initialized
|
||||
if err := r.sandboxes[workerIndex].Setup(state); err != nil {
|
||||
r.debugLog("Failed to set up sandbox for state %d: %v", workerIndex, err)
|
||||
if err := sandbox.Setup(state); err != nil {
|
||||
r.debugLog("Failed to set up sandbox for state %d: %v", index, err)
|
||||
state.Cleanup()
|
||||
state.Close()
|
||||
return ErrInitFailed
|
||||
return nil, ErrInitFailed
|
||||
}
|
||||
r.debugLog("Sandbox environment set up for state %d", workerIndex)
|
||||
r.debugLog("Sandbox environment set up for state %d", index)
|
||||
|
||||
// Preload all modules into package.loaded
|
||||
if err := r.moduleLoader.PreloadAllModules(state); err != nil {
|
||||
r.debugLog("Failed to preload modules for state %d: %v", workerIndex, err)
|
||||
r.debugLog("Failed to preload modules for state %d: %v", index, err)
|
||||
state.Cleanup()
|
||||
state.Close()
|
||||
return errors.New("failed to preload modules")
|
||||
return nil, errors.New("failed to preload modules")
|
||||
}
|
||||
r.debugLog("All modules preloaded for state %d", workerIndex)
|
||||
r.debugLog("All modules preloaded for state %d", index)
|
||||
|
||||
// Run init function if provided
|
||||
if r.initFunc != nil {
|
||||
if err := r.initFunc(state); err != nil {
|
||||
r.debugLog("Custom init function failed for state %d: %v", workerIndex, err)
|
||||
r.debugLog("Custom init function failed for state %d: %v", index, err)
|
||||
state.Cleanup()
|
||||
state.Close()
|
||||
return ErrInitFailed
|
||||
return nil, ErrInitFailed
|
||||
}
|
||||
r.debugLog("Custom init function completed for state %d", workerIndex)
|
||||
r.debugLog("Custom init function completed for state %d", index)
|
||||
}
|
||||
|
||||
// Test for HTTP module again after full initialization
|
||||
testResult, err = state.ExecuteWithResult(`
|
||||
if type(http) == "table" and type(http.client) == "table" and
|
||||
type(http.client.get) == "function" then
|
||||
return true
|
||||
else
|
||||
return false
|
||||
end
|
||||
`)
|
||||
if err != nil || testResult != true {
|
||||
r.debugLog("Final HTTP module verification failed for state %d: %v, result: %v", workerIndex, err, testResult)
|
||||
} else {
|
||||
r.debugLog("Final HTTP module verification OK for state %d", workerIndex)
|
||||
}
|
||||
r.debugLog("State %d initialization complete", index)
|
||||
|
||||
r.states[workerIndex] = state
|
||||
r.debugLog("State %d initialization complete", workerIndex)
|
||||
return nil
|
||||
}
|
||||
|
||||
// processJobs handles the job queue for a specific worker
|
||||
func (r *LuaRunner) processJobs(workerIndex int) {
|
||||
defer r.wg.Done()
|
||||
defer func() {
|
||||
if r.states[workerIndex] != nil {
|
||||
r.debugLog("Cleaning up Lua state %d in processJobs", workerIndex)
|
||||
r.states[workerIndex].Cleanup()
|
||||
r.states[workerIndex].Close()
|
||||
r.states[workerIndex] = nil
|
||||
}
|
||||
}()
|
||||
|
||||
for job := range r.jobQueues[workerIndex] {
|
||||
// Execute the job and send result
|
||||
result := r.executeJob(workerIndex, job)
|
||||
select {
|
||||
case job.Result <- result:
|
||||
// Result sent successfully
|
||||
default:
|
||||
// Result channel closed or full, discard the result
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// executeJob runs a script in the sandbox environment
|
||||
func (r *LuaRunner) executeJob(workerIndex int, j job) JobResult {
|
||||
// If the job has a script path, update script dir for module resolution
|
||||
if j.ScriptPath != "" {
|
||||
r.mu.Lock()
|
||||
r.moduleLoader.config.ScriptDir = filepath.Dir(j.ScriptPath)
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
// Convert context for sandbox
|
||||
var ctx map[string]any
|
||||
if j.Context != nil {
|
||||
ctx = j.Context.Values
|
||||
}
|
||||
|
||||
r.mu.RLock()
|
||||
state := r.states[workerIndex]
|
||||
sandbox := r.sandboxes[workerIndex]
|
||||
r.mu.RUnlock()
|
||||
|
||||
if state == nil {
|
||||
return JobResult{nil, errors.New("lua state is not initialized")}
|
||||
}
|
||||
|
||||
// Execute in sandbox
|
||||
value, err := sandbox.Execute(state, j.Bytecode, ctx)
|
||||
return JobResult{value, err}
|
||||
return &StateWrapper{
|
||||
state: state,
|
||||
sandbox: sandbox,
|
||||
index: index,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// RunWithContext executes a Lua script with context and timeout
|
||||
func (r *LuaRunner) RunWithContext(ctx context.Context, bytecode []byte, execCtx *Context, scriptPath string) (any, error) {
|
||||
r.mu.RLock()
|
||||
if !r.isRunning.Load() {
|
||||
r.mu.RUnlock()
|
||||
return nil, ErrRunnerClosed
|
||||
}
|
||||
|
||||
// Create a result channel
|
||||
resultChan := make(chan JobResult, 1)
|
||||
|
||||
// Get a state index with timeout
|
||||
var stateIndex int
|
||||
select {
|
||||
case stateIndex = <-r.stateSem:
|
||||
// Got a state
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
// Launch a goroutine to execute the job
|
||||
go func() {
|
||||
// Make sure to return the state to the pool when done
|
||||
defer func() {
|
||||
// Only return if runner is still open
|
||||
if r.isRunning.Load() {
|
||||
select {
|
||||
case r.stateSem <- stateIndex:
|
||||
// State returned to pool
|
||||
default:
|
||||
// Pool is full or closed (shouldn't happen)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Execute the job
|
||||
var result JobResult
|
||||
|
||||
r.mu.RLock()
|
||||
state := r.states[stateIndex]
|
||||
r.mu.RUnlock()
|
||||
|
||||
// Get a result channel from the pool
|
||||
resultChanInterface := resultChanPool.Get()
|
||||
resultChan := resultChanInterface.(chan JobResult)
|
||||
if state == nil {
|
||||
result = JobResult{nil, errors.New("state is not initialized")}
|
||||
} else {
|
||||
// Set script directory for module resolution
|
||||
if scriptPath != "" {
|
||||
r.mu.Lock()
|
||||
r.moduleLoader.config.ScriptDir = filepath.Dir(scriptPath)
|
||||
r.mu.Unlock()
|
||||
}
|
||||
|
||||
// Make sure to clear any previous results
|
||||
// Convert context
|
||||
var ctxMap map[string]any
|
||||
if execCtx != nil {
|
||||
ctxMap = execCtx.Values
|
||||
}
|
||||
|
||||
// Execute in sandbox
|
||||
value, err := state.sandbox.Execute(state.state, bytecode, ctxMap)
|
||||
result = JobResult{value, err}
|
||||
}
|
||||
|
||||
// Send result
|
||||
select {
|
||||
case <-resultChan:
|
||||
// Drain the channel if it has a value
|
||||
case resultChan <- result:
|
||||
// Result sent
|
||||
default:
|
||||
// Channel is already empty
|
||||
}
|
||||
|
||||
j := job{
|
||||
Bytecode: bytecode,
|
||||
Context: execCtx,
|
||||
ScriptPath: scriptPath,
|
||||
Result: resultChan,
|
||||
}
|
||||
|
||||
// Choose worker in round-robin fashion
|
||||
workerIndex := int(atomic.AddInt32(&r.nextWorker, 1) % int32(r.workerCount))
|
||||
|
||||
// Submit job with context
|
||||
select {
|
||||
case r.jobQueues[workerIndex] <- j:
|
||||
// Job submitted
|
||||
r.debugLog("Job submitted to worker %d", workerIndex)
|
||||
case <-ctx.Done():
|
||||
// Return the channel to the pool before exiting
|
||||
resultChanPool.Put(resultChan)
|
||||
return nil, ctx.Err()
|
||||
// Result channel closed or full (shouldn't happen with buffered channel)
|
||||
}
|
||||
}()
|
||||
|
||||
// Wait for result with context
|
||||
var result JobResult
|
||||
select {
|
||||
case result = <-resultChan:
|
||||
// Got result
|
||||
case result := <-resultChan:
|
||||
return result.Value, result.Error
|
||||
case <-ctx.Done():
|
||||
// Return the channel to the pool before exiting
|
||||
resultChanPool.Put(resultChan)
|
||||
// Note: we can't cancel the Lua execution, but we can stop waiting for it
|
||||
// The state will be returned to the pool when the goroutine completes
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
// Return the channel to the pool
|
||||
resultChanPool.Put(resultChan)
|
||||
|
||||
return result.Value, result.Error
|
||||
}
|
||||
|
||||
// Run executes a Lua script
|
||||
|
@ -387,15 +305,26 @@ func (r *LuaRunner) Close() error {
|
|||
|
||||
r.isRunning.Store(false)
|
||||
|
||||
// Close all job queues
|
||||
for i := 0; i < r.workerCount; i++ {
|
||||
if r.jobQueues[i] != nil {
|
||||
close(r.jobQueues[i])
|
||||
// Drain the semaphore (non-blocking)
|
||||
for {
|
||||
select {
|
||||
case <-r.stateSem:
|
||||
// Drained one slot
|
||||
default:
|
||||
// Empty
|
||||
goto drained
|
||||
}
|
||||
}
|
||||
drained:
|
||||
|
||||
// Wait for all workers to finish
|
||||
r.wg.Wait()
|
||||
// Clean up all states
|
||||
for i := 0; i < len(r.states); i++ {
|
||||
if r.states[i] != nil {
|
||||
r.states[i].state.Cleanup()
|
||||
r.states[i].state.Close()
|
||||
r.states[i] = nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -407,15 +336,54 @@ func (r *LuaRunner) NotifyFileChanged(filePath string) bool {
|
|||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
|
||||
// Reset all states on file changes
|
||||
// Check if runner is closed
|
||||
if !r.isRunning.Load() {
|
||||
return false
|
||||
}
|
||||
|
||||
// Create a new semaphore
|
||||
newSem := make(chan int, cap(r.stateSem))
|
||||
|
||||
// Drain the current semaphore (non-blocking)
|
||||
for {
|
||||
select {
|
||||
case <-r.stateSem:
|
||||
// Drained one slot
|
||||
default:
|
||||
// Empty
|
||||
goto drained
|
||||
}
|
||||
}
|
||||
drained:
|
||||
|
||||
r.stateSem = newSem
|
||||
|
||||
// Reinitialize all states
|
||||
success := true
|
||||
for i := 0; i < r.workerCount; i++ {
|
||||
err := r.initState(i, false)
|
||||
for i := 0; i < len(r.states); i++ {
|
||||
// Clean up old state
|
||||
if r.states[i] != nil {
|
||||
r.states[i].state.Cleanup()
|
||||
r.states[i].state.Close()
|
||||
}
|
||||
|
||||
// Initialize new state
|
||||
wrapper, err := r.initState(i)
|
||||
if err != nil {
|
||||
r.debugLog("Failed to reinitialize state %d: %v", i, err)
|
||||
success = false
|
||||
} else {
|
||||
r.debugLog("State %d successfully reinitialized", i)
|
||||
r.states[i] = nil
|
||||
continue
|
||||
}
|
||||
|
||||
r.states[i] = wrapper
|
||||
|
||||
// Add to semaphore
|
||||
select {
|
||||
case newSem <- i:
|
||||
// Added to semaphore
|
||||
default:
|
||||
// Semaphore full (shouldn't happen)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -429,9 +397,9 @@ func (r *LuaRunner) ResetModuleCache() {
|
|||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
for i := 0; i < r.workerCount; i++ {
|
||||
if r.states[i] != nil {
|
||||
r.moduleLoader.ResetModules(r.states[i])
|
||||
for i := 0; i < len(r.states); i++ {
|
||||
if r.states[i] != nil && r.states[i].state != nil {
|
||||
r.moduleLoader.ResetModules(r.states[i].state)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -444,9 +412,9 @@ func (r *LuaRunner) ReloadAllModules() error {
|
|||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
for i := 0; i < r.workerCount; i++ {
|
||||
if r.states[i] != nil {
|
||||
if err := r.moduleLoader.PreloadAllModules(r.states[i]); err != nil {
|
||||
for i := 0; i < len(r.states); i++ {
|
||||
if r.states[i] != nil && r.states[i].state != nil {
|
||||
if err := r.moduleLoader.PreloadAllModules(r.states[i].state); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -461,10 +429,10 @@ func (r *LuaRunner) RefreshModuleByName(modName string) bool {
|
|||
defer r.mu.RUnlock()
|
||||
|
||||
success := true
|
||||
for i := 0; i < r.workerCount; i++ {
|
||||
if r.states[i] != nil {
|
||||
for i := 0; i < len(r.states); i++ {
|
||||
if r.states[i] != nil && r.states[i].state != nil {
|
||||
r.debugLog("Refreshing module %s in state %d", modName, i)
|
||||
if err := r.states[i].DoString(`package.loaded["` + modName + `"] = nil`); err != nil {
|
||||
if err := r.states[i].state.DoString(`package.loaded["` + modName + `"] = nil`); err != nil {
|
||||
success = false
|
||||
}
|
||||
}
|
||||
|
@ -475,8 +443,13 @@ func (r *LuaRunner) RefreshModuleByName(modName string) bool {
|
|||
// AddModule adds a module to all sandbox environments
|
||||
func (r *LuaRunner) AddModule(name string, module any) {
|
||||
r.debugLog("Adding module %s to all sandboxes", name)
|
||||
for i := 0; i < r.workerCount; i++ {
|
||||
r.sandboxes[i].AddModule(name, module)
|
||||
r.mu.RLock()
|
||||
defer r.mu.RUnlock()
|
||||
|
||||
for i := 0; i < len(r.states); i++ {
|
||||
if r.states[i] != nil && r.states[i].sandbox != nil {
|
||||
r.states[i].sandbox.AddModule(name, module)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -487,10 +460,11 @@ func (r *LuaRunner) GetModuleCount() int {
|
|||
|
||||
count := 0
|
||||
|
||||
// Get module count from the first Lua state
|
||||
if r.states[0] != nil {
|
||||
// Get count from the first available state
|
||||
for i := 0; i < len(r.states); i++ {
|
||||
if r.states[i] != nil && r.states[i].state != nil {
|
||||
// Execute a Lua snippet to count modules
|
||||
if res, err := r.states[0].ExecuteWithResult(`
|
||||
if res, err := r.states[i].state.ExecuteWithResult(`
|
||||
local count = 0
|
||||
for _ in pairs(package.loaded) do
|
||||
count = count + 1
|
||||
|
@ -501,6 +475,8 @@ func (r *LuaRunner) GetModuleCount() int {
|
|||
count = int(num)
|
||||
}
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return count
|
||||
|
|
|
@ -129,6 +129,7 @@ table tr:nth-child(even), tbody tr:nth-child(even) { background-color: rgba(0, 0
|
|||
<tr><th>Active Routes</th><td>{{.Components.RouteCount}}</td></tr>
|
||||
<tr><th>Bytecode Size</th><td>{{ByteCount .Components.BytecodeBytes}}</td></tr>
|
||||
<tr><th>Loaded Modules</th><td>{{.Components.ModuleCount}}</td></tr>
|
||||
<tr><th>State Pool Size</th><td>{{.Config.PoolSize}}</td></tr>
|
||||
</table>
|
||||
</div>
|
||||
</div>
|
||||
|
@ -138,17 +139,27 @@ table tr:nth-child(even), tbody tr:nth-child(even) { background-color: rgba(0, 0
|
|||
<div class="card">
|
||||
<table>
|
||||
<tr><th>Port</th><td>{{.Config.Port}}</td></tr>
|
||||
<tr><th>Pool Size</th><td>{{.Config.PoolSize}}</td></tr>
|
||||
<tr><th>Debug Mode</th><td>{{.Config.Debug}}</td></tr>
|
||||
<tr><th>Log Level</th><td>{{.Config.LogLevel}}</td></tr>
|
||||
<tr><th>Routes Directory</th><td>{{.Config.RoutesDir}}</td></tr>
|
||||
<tr><th>Static Directory</th><td>{{.Config.StaticDir}}</td></tr>
|
||||
<tr><th>Override Directory</th><td>{{.Config.OverrideDir}}</td></tr>
|
||||
<tr><th>Buffer Size</th><td>{{.Config.BufferSize}}</td></tr>
|
||||
<tr><th>HTTP Logging</th><td>{{.Config.HTTPLoggingEnabled}}</td></tr>
|
||||
<tr><th>Lib Directories</th><td>{{range .Config.LibDirs}}{{.}}<br>{{end}}</td></tr>
|
||||
<tr><th>Watch Routes</th><td>{{index .Config.Watchers "routes"}}</td></tr>
|
||||
<tr><th>Watch Static</th><td>{{index .Config.Watchers "static"}}</td></tr>
|
||||
<tr><th>Watch Modules</th><td>{{index .Config.Watchers "modules"}}</td></tr>
|
||||
<tr>
|
||||
<th>Directories</th>
|
||||
<td>
|
||||
<div>Routes: {{.Config.RoutesDir}}</div>
|
||||
<div>Static: {{.Config.StaticDir}}</div>
|
||||
<div>Override: {{.Config.OverrideDir}}</div>
|
||||
<div>Libs: {{range .Config.LibDirs}}{{.}}, {{end}}</div>
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<th>Watchers</th>
|
||||
<td>
|
||||
<div>Routes: {{index .Config.Watchers "routes"}}</div>
|
||||
<div>Static: {{index .Config.Watchers "static"}}</div>
|
||||
<div>Modules: {{index .Config.Watchers "modules"}}</div>
|
||||
</td>
|
||||
</tr>
|
||||
</table>
|
||||
</div>
|
||||
</div>
|
||||
|
|
2
luajit
2
luajit
|
@ -1 +1 @@
|
|||
Subproject commit 13686b3e66b388a31d459fe95d1aa3bfa05aeb27
|
||||
Subproject commit 0756cabcaaf1e33f2b8eb535e5a24e448c2501a9
|
Loading…
Reference in New Issue
Block a user