diff --git a/core/http/server.go b/core/http/server.go index 48d9f88..a546eda 100644 --- a/core/http/server.go +++ b/core/http/server.go @@ -9,24 +9,24 @@ import ( "git.sharkk.net/Sky/Moonshark/core/logger" "git.sharkk.net/Sky/Moonshark/core/routers" - "git.sharkk.net/Sky/Moonshark/core/workers" + "git.sharkk.net/Sky/Moonshark/core/runner" ) // Server handles HTTP requests using Lua and static file routers type Server struct { luaRouter *routers.LuaRouter staticRouter *routers.StaticRouter - workerPool *workers.Pool + luaRunner *runner.LuaRunner logger *logger.Logger httpServer *http.Server } // New creates a new HTTP server with optimized connection settings -func New(luaRouter *routers.LuaRouter, staticRouter *routers.StaticRouter, pool *workers.Pool, log *logger.Logger) *Server { +func New(luaRouter *routers.LuaRouter, staticRouter *routers.StaticRouter, runner *runner.LuaRunner, log *logger.Logger) *Server { server := &Server{ luaRouter: luaRouter, staticRouter: staticRouter, - workerPool: pool, + luaRunner: runner, logger: log, httpServer: &http.Server{ // Connection timeouts @@ -115,7 +115,7 @@ func (s *Server) handleRequest(w http.ResponseWriter, r *http.Request) { // handleLuaRoute executes a Lua route func (s *Server) handleLuaRoute(w http.ResponseWriter, r *http.Request, bytecode []byte, params *routers.Params) { - ctx := workers.NewContext() + ctx := runner.NewContext() // Log bytecode size s.logger.Debug("Executing Lua route with %d bytes of bytecode", len(bytecode)) @@ -148,7 +148,7 @@ func (s *Server) handleLuaRoute(w http.ResponseWriter, r *http.Request, bytecode } // Execute Lua script - result, err := s.workerPool.Submit(bytecode, ctx) + result, err := s.luaRunner.Run(bytecode, ctx) if err != nil { s.logger.Error("Error executing Lua route: %v", err) http.Error(w, "Internal Server Error", http.StatusInternalServerError) diff --git a/core/workers/context.go b/core/runner/context.go similarity index 97% rename from core/workers/context.go rename to core/runner/context.go index 17737b7..7052a3a 100644 --- a/core/workers/context.go +++ b/core/runner/context.go @@ -1,4 +1,4 @@ -package workers +package runner // Context represents execution context for a Lua script type Context struct { diff --git a/core/workers/job.go b/core/runner/job.go similarity index 96% rename from core/workers/job.go rename to core/runner/job.go index 845c611..70f6264 100644 --- a/core/workers/job.go +++ b/core/runner/job.go @@ -1,4 +1,4 @@ -package workers +package runner // JobResult represents the result of a Lua script execution type JobResult struct { diff --git a/core/runner/luarunner.go b/core/runner/luarunner.go new file mode 100644 index 0000000..7dc591c --- /dev/null +++ b/core/runner/luarunner.go @@ -0,0 +1,308 @@ +package runner + +import ( + "context" + "errors" + "sync" + "sync/atomic" + + luajit "git.sharkk.net/Sky/LuaJIT-to-Go" +) + +// Common errors +var ( + ErrRunnerClosed = errors.New("lua runner is closed") + ErrInitFailed = errors.New("initialization failed") +) + +// StateInitFunc is a function that initializes a Lua state +type StateInitFunc func(*luajit.State) error + +// LuaRunner runs Lua scripts using a single Lua state +type LuaRunner struct { + state *luajit.State // The Lua state + jobQueue chan job // Channel for incoming jobs + isRunning atomic.Bool // Flag indicating if the runner is active + mu sync.RWMutex // Mutex for thread safety + wg sync.WaitGroup // WaitGroup for clean shutdown + initFunc StateInitFunc // Optional function to initialize Lua state + bufferSize int // Size of the job queue buffer +} + +// NewRunner creates a new LuaRunner +func NewRunner(options ...RunnerOption) (*LuaRunner, error) { + // Default configuration + runner := &LuaRunner{ + bufferSize: 10, // Default buffer size + } + + // Apply options + for _, opt := range options { + opt(runner) + } + + // Initialize Lua state + state := luajit.New() + if state == nil { + return nil, errors.New("failed to create Lua state") + } + runner.state = state + + // Create job queue + runner.jobQueue = make(chan job, runner.bufferSize) + runner.isRunning.Store(true) + + // Set up sandbox + if err := runner.setupSandbox(); err != nil { + state.Close() + return nil, ErrInitFailed + } + + // Run init function if provided + if runner.initFunc != nil { + if err := runner.initFunc(state); err != nil { + state.Close() + return nil, ErrInitFailed + } + } + + // Start the event loop + runner.wg.Add(1) + go runner.eventLoop() + + return runner, nil +} + +// RunnerOption defines a functional option for configuring the LuaRunner +type RunnerOption func(*LuaRunner) + +// WithBufferSize sets the job queue buffer size +func WithBufferSize(size int) RunnerOption { + return func(r *LuaRunner) { + if size > 0 { + r.bufferSize = size + } + } +} + +// WithInitFunc sets the init function for the Lua state +func WithInitFunc(initFunc StateInitFunc) RunnerOption { + return func(r *LuaRunner) { + r.initFunc = initFunc + } +} + +// setupSandbox initializes the sandbox environment +func (r *LuaRunner) setupSandbox() error { + // This is the Lua script that creates our sandbox function + setupScript := ` + -- Create a function to run code in a sandbox environment + function __create_sandbox() + -- Create new environment table + local env = {} + + -- Add standard library modules (can be restricted as needed) + env.string = string + env.table = table + env.math = math + env.os = { + time = os.time, + date = os.date, + difftime = os.difftime, + clock = os.clock + } + env.tonumber = tonumber + env.tostring = tostring + env.type = type + env.pairs = pairs + env.ipairs = ipairs + env.next = next + env.select = select + env.unpack = unpack + env.pcall = pcall + env.xpcall = xpcall + env.error = error + env.assert = assert + + -- Allow access to package.loaded for modules + env.require = function(name) + return package.loaded[name] + end + + -- Create metatable to restrict access to _G + local mt = { + __index = function(t, k) + -- First check in env table + local v = rawget(env, k) + if v ~= nil then return v end + + -- If not found, check for registered modules/functions + local moduleValue = _G[k] + if type(moduleValue) == "table" or + type(moduleValue) == "function" then + return moduleValue + end + + return nil + end, + __newindex = function(t, k, v) + rawset(env, k, v) + end + } + + setmetatable(env, mt) + return env + end + + -- Create function to execute code with a sandbox + function __run_sandboxed(f, ctx) + local env = __create_sandbox() + + -- Add context to the environment if provided + if ctx then + env.ctx = ctx + end + + -- Set the environment and run the function + setfenv(f, env) + return f() + end + ` + + return r.state.DoString(setupScript) +} + +// eventLoop processes jobs from the queue +func (r *LuaRunner) eventLoop() { + defer r.wg.Done() + defer r.state.Close() + + // Process jobs until closure + for job := range r.jobQueue { + // Execute the job and send result + result := r.executeJob(job) + select { + case job.Result <- result: + // Result sent successfully + default: + // Result channel closed or full, discard the result + } + } +} + +// executeJob runs a script in the sandbox environment +func (r *LuaRunner) executeJob(j job) JobResult { + // Re-run init function if needed + if r.initFunc != nil { + if err := r.initFunc(r.state); err != nil { + return JobResult{nil, err} + } + } + + // Set up context if provided + if j.Context != nil { + // Push context table + r.state.NewTable() + + // Add values to context table + for key, value := range j.Context.Values { + // Push key + r.state.PushString(key) + + // Push value + if err := r.state.PushValue(value); err != nil { + return JobResult{nil, err} + } + + // Set table[key] = value + r.state.SetTable(-3) + } + } else { + // Push nil if no context + r.state.PushNil() + } + + // Load bytecode + if err := r.state.LoadBytecode(j.Bytecode, "script"); err != nil { + r.state.Pop(1) // Pop context + return JobResult{nil, err} + } + + // Get the sandbox runner function + r.state.GetGlobal("__run_sandboxed") + + // Push loaded function and context as arguments + r.state.PushCopy(-2) // Copy the loaded function + r.state.PushCopy(-4) // Copy the context table or nil + + // Remove the original function and context + r.state.Remove(-5) // Remove original context + r.state.Remove(-4) // Remove original function + + // Call the sandbox runner with 2 args (function and context), expecting 1 result + if err := r.state.Call(2, 1); err != nil { + return JobResult{nil, err} + } + + // Get result + value, err := r.state.ToValue(-1) + r.state.Pop(1) // Pop result + + return JobResult{value, err} +} + +// RunWithContext executes a Lua script with context and timeout +func (r *LuaRunner) RunWithContext(ctx context.Context, bytecode []byte, execCtx *Context) (any, error) { + r.mu.RLock() + if !r.isRunning.Load() { + r.mu.RUnlock() + return nil, ErrRunnerClosed + } + r.mu.RUnlock() + + resultChan := make(chan JobResult, 1) + j := job{ + Bytecode: bytecode, + Context: execCtx, + Result: resultChan, + } + + // Submit job with context + select { + case r.jobQueue <- j: + // Job submitted + case <-ctx.Done(): + return nil, ctx.Err() + } + + // Wait for result with context + select { + case result := <-resultChan: + return result.Value, result.Error + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +// Run executes a Lua script +func (r *LuaRunner) Run(bytecode []byte, execCtx *Context) (any, error) { + return r.RunWithContext(context.Background(), bytecode, execCtx) +} + +// Close gracefully shuts down the LuaRunner +func (r *LuaRunner) Close() error { + r.mu.Lock() + defer r.mu.Unlock() + + if !r.isRunning.Load() { + return ErrRunnerClosed + } + + r.isRunning.Store(false) + close(r.jobQueue) + + // Wait for event loop to finish + r.wg.Wait() + + return nil +} diff --git a/core/runner/luarunner_test.go b/core/runner/luarunner_test.go new file mode 100644 index 0000000..7debb43 --- /dev/null +++ b/core/runner/luarunner_test.go @@ -0,0 +1,373 @@ +package runner + +import ( + "context" + "testing" + "time" + + luajit "git.sharkk.net/Sky/LuaJIT-to-Go" +) + +// Helper function to create bytecode for testing +func createTestBytecode(t *testing.T, code string) []byte { + state := luajit.New() + if state == nil { + t.Fatal("Failed to create Lua state") + } + defer state.Close() + + bytecode, err := state.CompileBytecode(code, "test") + if err != nil { + t.Fatalf("Failed to compile test bytecode: %v", err) + } + + return bytecode +} + +func TestRunnerBasic(t *testing.T) { + runner, err := NewRunner() + if err != nil { + t.Fatalf("Failed to create runner: %v", err) + } + defer runner.Close() + + bytecode := createTestBytecode(t, "return 42") + + result, err := runner.Run(bytecode, nil) + if err != nil { + t.Fatalf("Failed to run script: %v", err) + } + + num, ok := result.(float64) + if !ok { + t.Fatalf("Expected float64 result, got %T", result) + } + + if num != 42 { + t.Errorf("Expected 42, got %f", num) + } +} + +func TestRunnerWithContext(t *testing.T) { + runner, err := NewRunner() + if err != nil { + t.Fatalf("Failed to create runner: %v", err) + } + defer runner.Close() + + bytecode := createTestBytecode(t, ` + return { + num = ctx.number, + str = ctx.text, + flag = ctx.enabled, + list = {ctx.table[1], ctx.table[2], ctx.table[3]}, + } + `) + + execCtx := NewContext() + execCtx.Set("number", 42.5) + execCtx.Set("text", "hello") + execCtx.Set("enabled", true) + execCtx.Set("table", []float64{10, 20, 30}) + + result, err := runner.Run(bytecode, execCtx) + if err != nil { + t.Fatalf("Failed to run job: %v", err) + } + + // Result should be a map + resultMap, ok := result.(map[string]any) + if !ok { + t.Fatalf("Expected map result, got %T", result) + } + + // Check values + if resultMap["num"] != 42.5 { + t.Errorf("Expected num=42.5, got %v", resultMap["num"]) + } + if resultMap["str"] != "hello" { + t.Errorf("Expected str=hello, got %v", resultMap["str"]) + } + if resultMap["flag"] != true { + t.Errorf("Expected flag=true, got %v", resultMap["flag"]) + } + + arr, ok := resultMap["list"].([]float64) + if !ok { + t.Fatalf("Expected []float64, got %T", resultMap["list"]) + } + + expected := []float64{10, 20, 30} + for i, v := range expected { + if arr[i] != v { + t.Errorf("Expected list[%d]=%f, got %f", i, v, arr[i]) + } + } +} + +func TestRunnerWithTimeout(t *testing.T) { + runner, err := NewRunner() + if err != nil { + t.Fatalf("Failed to create runner: %v", err) + } + defer runner.Close() + + // Create bytecode that sleeps + bytecode := createTestBytecode(t, ` + -- Sleep for 500ms + local start = os.time() + while os.difftime(os.time(), start) < 0.5 do end + return "done" + `) + + // Test with timeout that should succeed + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + result, err := runner.RunWithContext(ctx, bytecode, nil) + if err != nil { + t.Fatalf("Unexpected error with sufficient timeout: %v", err) + } + if result != "done" { + t.Errorf("Expected 'done', got %v", result) + } + + // Test with timeout that should fail + ctx, cancel = context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + _, err = runner.RunWithContext(ctx, bytecode, nil) + if err == nil { + t.Errorf("Expected timeout error, got nil") + } +} + +func TestSandboxIsolation(t *testing.T) { + runner, err := NewRunner() + if err != nil { + t.Fatalf("Failed to create runner: %v", err) + } + defer runner.Close() + + // Create a script that tries to modify a global variable + bytecode1 := createTestBytecode(t, ` + -- Set a "global" variable + my_global = "test value" + return true + `) + + _, err = runner.Run(bytecode1, nil) + if err != nil { + t.Fatalf("Failed to execute first script: %v", err) + } + + // Now try to access that variable from another script + bytecode2 := createTestBytecode(t, ` + -- Try to access the previously set global + return my_global ~= nil + `) + + result, err := runner.Run(bytecode2, nil) + if err != nil { + t.Fatalf("Failed to execute second script: %v", err) + } + + // The variable should not be accessible (sandbox isolation) + if result.(bool) { + t.Errorf("Expected sandbox isolation, but global variable was accessible") + } +} + +func TestRunnerWithInit(t *testing.T) { + // Define an init function that registers a simple "math" module + mathInit := func(state *luajit.State) error { + // Register the "add" function + err := state.RegisterGoFunction("add", func(s *luajit.State) int { + a := s.ToNumber(1) + b := s.ToNumber(2) + s.PushNumber(a + b) + return 1 // Return one result + }) + + if err != nil { + return err + } + + // Register a whole module + mathFuncs := map[string]luajit.GoFunction{ + "multiply": func(s *luajit.State) int { + a := s.ToNumber(1) + b := s.ToNumber(2) + s.PushNumber(a * b) + return 1 + }, + "subtract": func(s *luajit.State) int { + a := s.ToNumber(1) + b := s.ToNumber(2) + s.PushNumber(a - b) + return 1 + }, + } + + return RegisterModule(state, "math2", mathFuncs) + } + + // Create a runner with our init function + runner, err := NewRunner(WithInitFunc(mathInit)) + if err != nil { + t.Fatalf("Failed to create runner: %v", err) + } + defer runner.Close() + + // Test the add function + bytecode1 := createTestBytecode(t, "return add(5, 7)") + result1, err := runner.Run(bytecode1, nil) + if err != nil { + t.Fatalf("Failed to call add function: %v", err) + } + + num1, ok := result1.(float64) + if !ok || num1 != 12 { + t.Errorf("Expected add(5, 7) = 12, got %v", result1) + } + + // Test the math2 module + bytecode2 := createTestBytecode(t, "return math2.multiply(6, 8)") + result2, err := runner.Run(bytecode2, nil) + if err != nil { + t.Fatalf("Failed to call math2.multiply: %v", err) + } + + num2, ok := result2.(float64) + if !ok || num2 != 48 { + t.Errorf("Expected math2.multiply(6, 8) = 48, got %v", result2) + } +} + +func TestConcurrentExecution(t *testing.T) { + const jobs = 20 + + runner, err := NewRunner(WithBufferSize(20)) + if err != nil { + t.Fatalf("Failed to create runner: %v", err) + } + defer runner.Close() + + // Create bytecode that returns its input + bytecode := createTestBytecode(t, "return ctx.n") + + // Run multiple jobs concurrently + results := make(chan int, jobs) + for i := 0; i < jobs; i++ { + i := i // Capture loop variable + go func() { + execCtx := NewContext() + execCtx.Set("n", float64(i)) + + result, err := runner.Run(bytecode, execCtx) + if err != nil { + t.Errorf("Job %d failed: %v", i, err) + results <- -1 + return + } + + num, ok := result.(float64) + if !ok { + t.Errorf("Job %d: expected float64, got %T", i, result) + results <- -1 + return + } + + results <- int(num) + }() + } + + // Collect results + seen := make(map[int]bool) + for i := 0; i < jobs; i++ { + result := <-results + if result != -1 { + seen[result] = true + } + } + + // Verify all jobs were processed + if len(seen) != jobs { + t.Errorf("Expected %d unique results, got %d", jobs, len(seen)) + } +} + +func TestRunnerClose(t *testing.T) { + runner, err := NewRunner() + if err != nil { + t.Fatalf("Failed to create runner: %v", err) + } + + // Submit a job to verify runner works + bytecode := createTestBytecode(t, "return 42") + _, err = runner.Run(bytecode, nil) + if err != nil { + t.Fatalf("Failed to run job: %v", err) + } + + // Close + if err := runner.Close(); err != nil { + t.Errorf("Close failed: %v", err) + } + + // Run after close should fail + _, err = runner.Run(bytecode, nil) + if err != ErrRunnerClosed { + t.Errorf("Expected ErrRunnerClosed, got %v", err) + } + + // Second close should return error + if err := runner.Close(); err != ErrRunnerClosed { + t.Errorf("Expected ErrRunnerClosed on second close, got %v", err) + } +} + +func TestErrorHandling(t *testing.T) { + runner, err := NewRunner() + if err != nil { + t.Fatalf("Failed to create runner: %v", err) + } + defer runner.Close() + + // Test invalid bytecode + _, err = runner.Run([]byte("not valid bytecode"), nil) + if err == nil { + t.Errorf("Expected error for invalid bytecode, got nil") + } + + // Test Lua runtime error + bytecode := createTestBytecode(t, ` + error("intentional error") + return true + `) + + _, err = runner.Run(bytecode, nil) + if err == nil { + t.Errorf("Expected error from Lua error() call, got nil") + } + + // Test with nil context + bytecode = createTestBytecode(t, "return ctx == nil") + result, err := runner.Run(bytecode, nil) + if err != nil { + t.Errorf("Unexpected error with nil context: %v", err) + } + if result.(bool) != true { + t.Errorf("Expected ctx to be nil in Lua, but it wasn't") + } + + // Test invalid context value + execCtx := NewContext() + execCtx.Set("param", complex128(1+2i)) // Unsupported type + + bytecode = createTestBytecode(t, "return ctx.param") + _, err = runner.Run(bytecode, execCtx) + if err == nil { + t.Errorf("Expected error for unsupported context value type, got nil") + } +} diff --git a/core/workers/modules.go b/core/runner/modules.go similarity index 98% rename from core/workers/modules.go rename to core/runner/modules.go index b1194ce..3a66921 100644 --- a/core/workers/modules.go +++ b/core/runner/modules.go @@ -1,4 +1,4 @@ -package workers +package runner import ( luajit "git.sharkk.net/Sky/LuaJIT-to-Go" @@ -7,18 +7,6 @@ import ( // ModuleFunc is a function that returns a map of module functions type ModuleFunc func() map[string]luajit.GoFunction -// ModuleInitFunc creates a state initializer that registers multiple modules -func ModuleInitFunc(modules map[string]ModuleFunc) StateInitFunc { - return func(state *luajit.State) error { - for name, moduleFunc := range modules { - if err := RegisterModule(state, name, moduleFunc()); err != nil { - return err - } - } - return nil - } -} - // RegisterModule registers a map of functions as a Lua module func RegisterModule(state *luajit.State, name string, funcs map[string]luajit.GoFunction) error { // Create a new table for the module @@ -44,6 +32,18 @@ func RegisterModule(state *luajit.State, name string, funcs map[string]luajit.Go return nil } +// ModuleInitFunc creates a state initializer that registers multiple modules +func ModuleInitFunc(modules map[string]ModuleFunc) StateInitFunc { + return func(state *luajit.State) error { + for name, moduleFunc := range modules { + if err := RegisterModule(state, name, moduleFunc()); err != nil { + return err + } + } + return nil + } +} + // CombineInitFuncs combines multiple state initializer functions into one func CombineInitFuncs(funcs ...StateInitFunc) StateInitFunc { return func(state *luajit.State) error { diff --git a/core/workers/README.md b/core/workers/README.md deleted file mode 100644 index ba84b8f..0000000 --- a/core/workers/README.md +++ /dev/null @@ -1,107 +0,0 @@ -# Worker Pool - -### Pool - -```go -type Pool struct { ... } - -// Create a pool with specified number of workers -func NewPool(numWorkers int) (*Pool, error) - -// Submit a job with default context -func (p *Pool) Submit(bytecode []byte, ctx *Context) (any, error) - -// Submit with timeout/cancellation support -func (p *Pool) SubmitWithContext(ctx context.Context, bytecode []byte, execCtx *Context) (any, error) - -// Shutdown the pool -func (p *Pool) Shutdown() error - -// Get number of active workers -func (p *Pool) ActiveWorkers() uint32 -``` - -### Context - -```go -type Context struct { ... } - -// Create a new execution context -func NewContext() *Context - -// Set a value -func (c *Context) Set(key string, value any) - -// Get a value -func (c *Context) Get(key string) any -``` - -## Basic Usage - -```go -// Create worker pool -pool, err := workers.NewPool(4) -if err != nil { - log.Fatal(err) -} -defer pool.Shutdown() - -// Compile bytecode (typically done once and reused) -state := luajit.New() -bytecode, err := state.CompileBytecode(` - return ctx.message .. " from Lua" -`, "script") -state.Close() - -// Set up execution context -ctx := workers.NewContext() -ctx.Set("message", "Hello") -ctx.Set("params", map[string]any{"id": "123"}) - -// Execute bytecode -result, err := pool.Submit(bytecode, ctx) -if err != nil { - log.Fatal(err) -} - -fmt.Println(result) // "Hello from Lua" -``` - -## With Timeout - -```go -// Create context with timeout -ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) -defer cancel() - -// Execute with timeout -result, err := pool.SubmitWithContext(ctx, bytecode, execCtx) -if err != nil { - // Handle timeout or error -} -``` - -## In Lua Scripts - -Inside Lua, the context is available as the global `ctx` table: - -```lua --- Access a simple value -local msg = ctx.message - --- Access nested values -local id = ctx.params.id - --- Return a result to Go -return { - status = "success", - data = msg -} -``` - -## Important Notes - -- The pool is thread-safe; multiple goroutines can submit jobs concurrently -- Each execution is isolated; global state is reset between executions -- Bytecode should be compiled once and reused for better performance -- Context values should be serializable to Lua (numbers, strings, booleans, maps, slices) \ No newline at end of file diff --git a/core/workers/init_test.go b/core/workers/init_test.go deleted file mode 100644 index ea0557d..0000000 --- a/core/workers/init_test.go +++ /dev/null @@ -1,346 +0,0 @@ -package workers - -import ( - "testing" - - luajit "git.sharkk.net/Sky/LuaJIT-to-Go" -) - -func TestModuleRegistration(t *testing.T) { - // Define an init function that registers a simple "math" module - mathInit := func(state *luajit.State) error { - // Register the "add" function - err := state.RegisterGoFunction("add", func(s *luajit.State) int { - a := s.ToNumber(1) - b := s.ToNumber(2) - s.PushNumber(a + b) - return 1 // Return one result - }) - - if err != nil { - return err - } - - // Register a whole module - mathFuncs := map[string]luajit.GoFunction{ - "multiply": func(s *luajit.State) int { - a := s.ToNumber(1) - b := s.ToNumber(2) - s.PushNumber(a * b) - return 1 - }, - "subtract": func(s *luajit.State) int { - a := s.ToNumber(1) - b := s.ToNumber(2) - s.PushNumber(a - b) - return 1 - }, - } - - return RegisterModule(state, "math2", mathFuncs) - } - - // Create a pool with our init function - pool, err := NewPoolWithInit(2, mathInit) - if err != nil { - t.Fatalf("Failed to create pool: %v", err) - } - defer pool.Shutdown() - - // Test the add function - bytecode1 := createTestBytecode(t, "return add(5, 7)") - result1, err := pool.Submit(bytecode1, nil) - if err != nil { - t.Fatalf("Failed to call add function: %v", err) - } - - num1, ok := result1.(float64) - if !ok || num1 != 12 { - t.Errorf("Expected add(5, 7) = 12, got %v", result1) - } - - // Test the math2 module - bytecode2 := createTestBytecode(t, "return math2.multiply(6, 8)") - result2, err := pool.Submit(bytecode2, nil) - if err != nil { - t.Fatalf("Failed to call math2.multiply: %v", err) - } - - num2, ok := result2.(float64) - if !ok || num2 != 48 { - t.Errorf("Expected math2.multiply(6, 8) = 48, got %v", result2) - } - - // Test multiple operations - bytecode3 := createTestBytecode(t, ` - local a = add(10, 20) - local b = math2.subtract(a, 5) - return math2.multiply(b, 2) - `) - - result3, err := pool.Submit(bytecode3, nil) - if err != nil { - t.Fatalf("Failed to execute combined operations: %v", err) - } - - num3, ok := result3.(float64) - if !ok || num3 != 50 { - t.Errorf("Expected ((10 + 20) - 5) * 2 = 50, got %v", result3) - } -} - -func TestModuleInitFunc(t *testing.T) { - // Define math module functions - mathModule := func() map[string]luajit.GoFunction { - return map[string]luajit.GoFunction{ - "add": func(s *luajit.State) int { - a := s.ToNumber(1) - b := s.ToNumber(2) - s.PushNumber(a + b) - return 1 - }, - "multiply": func(s *luajit.State) int { - a := s.ToNumber(1) - b := s.ToNumber(2) - s.PushNumber(a * b) - return 1 - }, - } - } - - // Define string module functions - strModule := func() map[string]luajit.GoFunction { - return map[string]luajit.GoFunction{ - "concat": func(s *luajit.State) int { - a := s.ToString(1) - b := s.ToString(2) - s.PushString(a + b) - return 1 - }, - } - } - - // Create module map - modules := map[string]ModuleFunc{ - "math2": mathModule, - "str": strModule, - } - - // Create pool with module init - pool, err := NewPoolWithInit(2, ModuleInitFunc(modules)) - if err != nil { - t.Fatalf("Failed to create pool: %v", err) - } - defer pool.Shutdown() - - // Test math module - bytecode1 := createTestBytecode(t, "return math2.add(5, 7)") - result1, err := pool.Submit(bytecode1, nil) - if err != nil { - t.Fatalf("Failed to call math2.add: %v", err) - } - - num1, ok := result1.(float64) - if !ok || num1 != 12 { - t.Errorf("Expected math2.add(5, 7) = 12, got %v", result1) - } - - // Test string module - bytecode2 := createTestBytecode(t, "return str.concat('hello', 'world')") - result2, err := pool.Submit(bytecode2, nil) - if err != nil { - t.Fatalf("Failed to call str.concat: %v", err) - } - - str2, ok := result2.(string) - if !ok || str2 != "helloworld" { - t.Errorf("Expected str.concat('hello', 'world') = 'helloworld', got %v", result2) - } -} - -func TestCombineInitFuncs(t *testing.T) { - // First init function adds a function to get a constant value - init1 := func(state *luajit.State) error { - return state.RegisterGoFunction("getAnswer", func(s *luajit.State) int { - s.PushNumber(42) - return 1 - }) - } - - // Second init function registers a function that multiplies a number by 2 - init2 := func(state *luajit.State) error { - return state.RegisterGoFunction("double", func(s *luajit.State) int { - n := s.ToNumber(1) - s.PushNumber(n * 2) - return 1 - }) - } - - // Combine the init functions - combinedInit := CombineInitFuncs(init1, init2) - - // Create a pool with the combined init function - pool, err := NewPoolWithInit(1, combinedInit) - if err != nil { - t.Fatalf("Failed to create pool: %v", err) - } - defer pool.Shutdown() - - // Test using both functions together in a single script - bytecode := createTestBytecode(t, "return double(getAnswer())") - result, err := pool.Submit(bytecode, nil) - if err != nil { - t.Fatalf("Failed to execute: %v", err) - } - - num, ok := result.(float64) - if !ok || num != 84 { - t.Errorf("Expected double(getAnswer()) = 84, got %v", result) - } -} - -func TestSandboxIsolation(t *testing.T) { - // Create a pool - pool, err := NewPool(2) - if err != nil { - t.Fatalf("Failed to create pool: %v", err) - } - defer pool.Shutdown() - - // Create a script that tries to modify a global variable - bytecode1 := createTestBytecode(t, ` - -- Set a "global" variable - my_global = "test value" - return true - `) - - _, err = pool.Submit(bytecode1, nil) - if err != nil { - t.Fatalf("Failed to execute first script: %v", err) - } - - // Now try to access that variable from another script - bytecode2 := createTestBytecode(t, ` - -- Try to access the previously set global - return my_global ~= nil - `) - - result, err := pool.Submit(bytecode2, nil) - if err != nil { - t.Fatalf("Failed to execute second script: %v", err) - } - - // The variable should not be accessible (sandbox isolation) - if result.(bool) { - t.Errorf("Expected sandbox isolation, but global variable was accessible") - } -} - -func TestContextInSandbox(t *testing.T) { - // Create a pool - pool, err := NewPool(2) - if err != nil { - t.Fatalf("Failed to create pool: %v", err) - } - defer pool.Shutdown() - - // Create a context with test data - ctx := NewContext() - ctx.Set("name", "test") - ctx.Set("value", 42.5) - ctx.Set("items", []float64{1, 2, 3}) - - bytecode := createTestBytecode(t, ` - -- Access and manipulate context values - local sum = 0 - for i, v in ipairs(ctx.items) do - sum = sum + v - end - - return { - name_length = string.len(ctx.name), - value_doubled = ctx.value * 2, - items_sum = sum - } - `) - - result, err := pool.Submit(bytecode, ctx) - if err != nil { - t.Fatalf("Failed to execute script with context: %v", err) - } - - resultMap, ok := result.(map[string]any) - if !ok { - t.Fatalf("Expected map result, got %T", result) - } - - // Check context values were correctly accessible - if resultMap["name_length"].(float64) != 4 { - t.Errorf("Expected name_length = 4, got %v", resultMap["name_length"]) - } - - if resultMap["value_doubled"].(float64) != 85 { - t.Errorf("Expected value_doubled = 85, got %v", resultMap["value_doubled"]) - } - - if resultMap["items_sum"].(float64) != 6 { - t.Errorf("Expected items_sum = 6, got %v", resultMap["items_sum"]) - } -} - -func TestStandardLibsInSandbox(t *testing.T) { - // Create a pool - pool, err := NewPool(2) - if err != nil { - t.Fatalf("Failed to create pool: %v", err) - } - defer pool.Shutdown() - - // Test access to standard libraries - bytecode := createTestBytecode(t, ` - local results = {} - - -- Test string library - results.string_upper = string.upper("test") - - -- Test math library - results.math_sqrt = math.sqrt(16) - - -- Test table library - local tbl = {10, 20, 30} - table.insert(tbl, 40) - results.table_length = #tbl - - -- Test os library (limited functions) - results.has_os_time = type(os.time) == "function" - - return results - `) - - result, err := pool.Submit(bytecode, nil) - if err != nil { - t.Fatalf("Failed to execute script: %v", err) - } - - resultMap, ok := result.(map[string]any) - if !ok { - t.Fatalf("Expected map result, got %T", result) - } - - // Check standard library functions worked - if resultMap["string_upper"] != "TEST" { - t.Errorf("Expected string_upper = 'TEST', got %v", resultMap["string_upper"]) - } - - if resultMap["math_sqrt"].(float64) != 4 { - t.Errorf("Expected math_sqrt = 4, got %v", resultMap["math_sqrt"]) - } - - if resultMap["table_length"].(float64) != 4 { - t.Errorf("Expected table_length = 4, got %v", resultMap["table_length"]) - } - - if resultMap["has_os_time"] != true { - t.Errorf("Expected has_os_time = true, got %v", resultMap["has_os_time"]) - } -} diff --git a/core/workers/pool.go b/core/workers/pool.go deleted file mode 100644 index e6b85f5..0000000 --- a/core/workers/pool.go +++ /dev/null @@ -1,123 +0,0 @@ -package workers - -import ( - "context" - "sync" - "sync/atomic" - - luajit "git.sharkk.net/Sky/LuaJIT-to-Go" -) - -// StateInitFunc is a function that initializes a Lua state -// It can be used to register custom functions and modules -type StateInitFunc func(*luajit.State) error - -// Pool manages a pool of Lua worker goroutines -type Pool struct { - workers uint32 // Number of workers - jobs chan job // Channel to send jobs to workers - wg sync.WaitGroup // WaitGroup to track active workers - quit chan struct{} // Channel to signal shutdown - isRunning atomic.Bool // Flag to track if pool is running - stateInit StateInitFunc // Optional function to initialize Lua state -} - -// NewPool creates a new worker pool with the specified number of workers -func NewPool(numWorkers int) (*Pool, error) { - return NewPoolWithInit(numWorkers, nil) -} - -// NewPoolWithInit creates a new worker pool with the specified number of workers -// and a function to initialize each worker's Lua state -func NewPoolWithInit(numWorkers int, initFunc StateInitFunc) (*Pool, error) { - if numWorkers <= 0 { - return nil, ErrNoWorkers - } - - p := &Pool{ - workers: uint32(numWorkers), - jobs: make(chan job, numWorkers), // Buffer equal to worker count - quit: make(chan struct{}), - stateInit: initFunc, - } - p.isRunning.Store(true) - - // Start workers - p.wg.Add(numWorkers) - for i := 0; i < numWorkers; i++ { - w := &worker{ - pool: p, - id: uint32(i), - } - go w.run() - } - - return p, nil -} - -// RegisterGlobal is no longer needed with the sandbox approach -// but kept as a no-op for backward compatibility -func (p *Pool) RegisterGlobal(name string) { - // No-op in sandbox mode -} - -// SubmitWithContext sends a job to the worker pool with context -func (p *Pool) SubmitWithContext(ctx context.Context, bytecode []byte, execCtx *Context) (any, error) { - if !p.isRunning.Load() { - return nil, ErrPoolClosed - } - - resultChan := make(chan JobResult, 1) - j := job{ - Bytecode: bytecode, - Context: execCtx, - Result: resultChan, - } - - // Submit job with context - select { - case p.jobs <- j: - // Job submitted - case <-ctx.Done(): - return nil, ctx.Err() - } - - // Wait for result with context - select { - case result := <-resultChan: - return result.Value, result.Error - case <-ctx.Done(): - // Note: The job will still be processed by a worker, - // but the result will be discarded - return nil, ctx.Err() - } -} - -// Submit sends a job to the worker pool -func (p *Pool) Submit(bytecode []byte, execCtx *Context) (any, error) { - return p.SubmitWithContext(context.Background(), bytecode, execCtx) -} - -// Shutdown gracefully shuts down the worker pool -func (p *Pool) Shutdown() error { - if !p.isRunning.Load() { - return ErrPoolClosed - } - p.isRunning.Store(false) - - // Signal workers to quit - close(p.quit) - - // Wait for workers to finish - p.wg.Wait() - - // Close jobs channel - close(p.jobs) - - return nil -} - -// ActiveWorkers returns the number of active workers -func (p *Pool) ActiveWorkers() uint32 { - return atomic.LoadUint32(&p.workers) -} diff --git a/core/workers/sandbox.go b/core/workers/sandbox.go deleted file mode 100644 index ddf9930..0000000 --- a/core/workers/sandbox.go +++ /dev/null @@ -1,144 +0,0 @@ -package workers - -// setupSandbox initializes the sandbox environment creation function -func (w *worker) setupSandbox() error { - // This is the Lua script that creates our sandbox function - setupScript := ` - -- Create a function to run code in a sandbox environment - function __create_sandbox() - -- Create new environment table - local env = {} - - -- Add standard library modules (can be restricted as needed) - env.string = string - env.table = table - env.math = math - env.os = { - time = os.time, - date = os.date, - difftime = os.difftime, - clock = os.clock - } - env.tonumber = tonumber - env.tostring = tostring - env.type = type - env.pairs = pairs - env.ipairs = ipairs - env.next = next - env.select = select - env.unpack = unpack - env.pcall = pcall - env.xpcall = xpcall - env.error = error - env.assert = assert - - -- Allow access to package.loaded for modules - env.require = function(name) - return package.loaded[name] - end - - -- Create metatable to restrict access to _G - local mt = { - __index = function(t, k) - -- First check in env table - local v = rawget(env, k) - if v ~= nil then return v end - - -- If not found, check for registered modules/functions - local moduleValue = _G[k] - if type(moduleValue) == "table" or - type(moduleValue) == "function" then - return moduleValue - end - - return nil - end, - __newindex = function(t, k, v) - rawset(env, k, v) - end - } - - setmetatable(env, mt) - return env - end - - -- Create function to execute code with a sandbox - function __run_sandboxed(f, ctx) - local env = __create_sandbox() - - -- Add context to the environment if provided - if ctx then - env.ctx = ctx - end - - -- Set the environment and run the function - setfenv(f, env) - return f() - end - ` - - return w.state.DoString(setupScript) -} - -// executeJobSandboxed runs a script in a sandbox environment -func (w *worker) executeJobSandboxed(j job) JobResult { - // No need to reset the state for each execution, since we're using a sandbox - - // Re-run init function to register functions and modules if needed - if w.pool.stateInit != nil { - if err := w.pool.stateInit(w.state); err != nil { - return JobResult{nil, err} - } - } - - // Set up context if provided - if j.Context != nil { - // Push context table - w.state.NewTable() - - // Add values to context table - for key, value := range j.Context.Values { - // Push key - w.state.PushString(key) - - // Push value - if err := w.state.PushValue(value); err != nil { - return JobResult{nil, err} - } - - // Set table[key] = value - w.state.SetTable(-3) - } - } else { - // Push nil if no context - w.state.PushNil() - } - - // Load bytecode - if err := w.state.LoadBytecode(j.Bytecode, "script"); err != nil { - w.state.Pop(1) // Pop context - return JobResult{nil, err} - } - - // Get the sandbox runner function - w.state.GetGlobal("__run_sandboxed") - - // Push loaded function and context as arguments - w.state.PushCopy(-2) // Copy the loaded function - w.state.PushCopy(-4) // Copy the context table or nil - - // Remove the original function and context - w.state.Remove(-5) // Remove original context - w.state.Remove(-4) // Remove original function - - // Call the sandbox runner with 2 args (function and context), expecting 1 result - if err := w.state.Call(2, 1); err != nil { - return JobResult{nil, err} - } - - // Get result - value, err := w.state.ToValue(-1) - w.state.Pop(1) // Pop result - - return JobResult{value, err} -} diff --git a/core/workers/worker.go b/core/workers/worker.go deleted file mode 100644 index e83ce0c..0000000 --- a/core/workers/worker.go +++ /dev/null @@ -1,71 +0,0 @@ -package workers - -import ( - "errors" - "sync/atomic" - - luajit "git.sharkk.net/Sky/LuaJIT-to-Go" -) - -// Common errors -var ( - ErrPoolClosed = errors.New("worker pool is closed") - ErrNoWorkers = errors.New("no workers available") - ErrInitFailed = errors.New("worker initialization failed") -) - -// worker represents a single Lua execution worker -type worker struct { - pool *Pool // Reference to the pool - state *luajit.State // Lua state - id uint32 // Worker ID -} - -// run is the main worker function that processes jobs -func (w *worker) run() { - defer w.pool.wg.Done() - - // Initialize Lua state - w.state = luajit.New() - if w.state == nil { - // Worker failed to initialize, decrement counter - atomic.AddUint32(&w.pool.workers, ^uint32(0)) - return - } - defer w.state.Close() - - // Set up sandbox environment - if err := w.setupSandbox(); err != nil { - // Worker failed to initialize sandbox, decrement counter - atomic.AddUint32(&w.pool.workers, ^uint32(0)) - return - } - - // Run init function if provided - if w.pool.stateInit != nil { - if err := w.pool.stateInit(w.state); err != nil { - // Worker failed to initialize with custom init function - atomic.AddUint32(&w.pool.workers, ^uint32(0)) - return - } - } - - // Main worker loop - for { - select { - case job, ok := <-w.pool.jobs: - if !ok { - // Jobs channel closed, exit - return - } - - // Execute job - result := w.executeJobSandboxed(job) - job.Result <- result - - case <-w.pool.quit: - // Quit signal received, exit - return - } - } -} diff --git a/core/workers/workers_test.go b/core/workers/workers_test.go deleted file mode 100644 index 23dd712..0000000 --- a/core/workers/workers_test.go +++ /dev/null @@ -1,445 +0,0 @@ -package workers - -import ( - "context" - "testing" - "time" - - luajit "git.sharkk.net/Sky/LuaJIT-to-Go" -) - -// This helper function creates real LuaJIT bytecode for our tests. Instead of using -// mocks, we compile actual Lua code into bytecode just like we would in production. -func createTestBytecode(t *testing.T, code string) []byte { - state := luajit.New() - if state == nil { - t.Fatal("Failed to create Lua state") - } - defer state.Close() - - bytecode, err := state.CompileBytecode(code, "test") - if err != nil { - t.Fatalf("Failed to compile test bytecode: %v", err) - } - - return bytecode -} - -// This test makes sure we can create a worker pool with a valid number of workers, -// and that we properly reject attempts to create a pool with zero or negative workers. -func TestNewPool(t *testing.T) { - tests := []struct { - name string - workers int - expectErr bool - }{ - {"valid workers", 4, false}, - {"zero workers", 0, true}, - {"negative workers", -1, true}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - pool, err := NewPool(tt.workers) - - if tt.expectErr { - if err == nil { - t.Errorf("Expected error for %d workers, got nil", tt.workers) - } - } else { - if err != nil { - t.Errorf("Unexpected error: %v", err) - } - if pool == nil { - t.Errorf("Expected non-nil pool") - } else { - pool.Shutdown() - } - } - }) - } -} - -// Here we're testing the basic job submission flow. We run a simple Lua script -// that returns the number 42 and make sure we get that same value back from the worker pool. -func TestPoolSubmit(t *testing.T) { - pool, err := NewPool(2) - if err != nil { - t.Fatalf("Failed to create pool: %v", err) - } - defer pool.Shutdown() - - bytecode := createTestBytecode(t, "return 42") - - result, err := pool.Submit(bytecode, nil) - if err != nil { - t.Fatalf("Failed to submit job: %v", err) - } - - num, ok := result.(float64) - if !ok { - t.Fatalf("Expected float64 result, got %T", result) - } - - if num != 42 { - t.Errorf("Expected 42, got %f", num) - } -} - -// This test checks how our worker pool handles timeouts. We run a script that takes -// some time to complete and verify two scenarios: one where the timeout is long enough -// for successful completion, and another where we expect the operation to be canceled -// due to a short timeout. -func TestPoolSubmitWithContext(t *testing.T) { - pool, err := NewPool(2) - if err != nil { - t.Fatalf("Failed to create pool: %v", err) - } - defer pool.Shutdown() - - // Create bytecode that sleeps - bytecode := createTestBytecode(t, ` - -- Sleep for 500ms - local start = os.time() - while os.difftime(os.time(), start) < 0.5 do end - return "done" - `) - - // Test with timeout that should succeed - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - - result, err := pool.SubmitWithContext(ctx, bytecode, nil) - if err != nil { - t.Fatalf("Unexpected error with sufficient timeout: %v", err) - } - if result != "done" { - t.Errorf("Expected 'done', got %v", result) - } - - // Test with timeout that should fail - ctx, cancel = context.WithTimeout(context.Background(), 50*time.Millisecond) - defer cancel() - - _, err = pool.SubmitWithContext(ctx, bytecode, nil) - if err == nil { - t.Errorf("Expected timeout error, got nil") - } -} - -// We need to make sure we can pass different types of context values from Go to Lua and -// get them back properly. This test sends numbers, strings, booleans, and arrays to -// a Lua script and verifies they're all handled correctly in both directions. -func TestContextValues(t *testing.T) { - pool, err := NewPool(2) - if err != nil { - t.Fatalf("Failed to create pool: %v", err) - } - defer pool.Shutdown() - - bytecode := createTestBytecode(t, ` - return { - num = ctx.number, - str = ctx.text, - flag = ctx.enabled, - list = {ctx.table[1], ctx.table[2], ctx.table[3]}, - } - `) - - execCtx := NewContext() - execCtx.Set("number", 42.5) - execCtx.Set("text", "hello") - execCtx.Set("enabled", true) - execCtx.Set("table", []float64{10, 20, 30}) - - result, err := pool.Submit(bytecode, execCtx) - if err != nil { - t.Fatalf("Failed to submit job: %v", err) - } - - // Result should be a map - resultMap, ok := result.(map[string]any) - if !ok { - t.Fatalf("Expected map result, got %T", result) - } - - // Check values - if resultMap["num"] != 42.5 { - t.Errorf("Expected num=42.5, got %v", resultMap["num"]) - } - if resultMap["str"] != "hello" { - t.Errorf("Expected str=hello, got %v", resultMap["str"]) - } - if resultMap["flag"] != true { - t.Errorf("Expected flag=true, got %v", resultMap["flag"]) - } - - arr, ok := resultMap["list"].([]float64) - if !ok { - t.Fatalf("Expected []float64, got %T", resultMap["list"]) - } - - expected := []float64{10, 20, 30} - for i, v := range expected { - if arr[i] != v { - t.Errorf("Expected list[%d]=%f, got %f", i, v, arr[i]) - } - } -} - -// Test context with nested data structures -func TestNestedContext(t *testing.T) { - pool, err := NewPool(2) - if err != nil { - t.Fatalf("Failed to create pool: %v", err) - } - defer pool.Shutdown() - - bytecode := createTestBytecode(t, ` - return { - id = ctx.params.id, - name = ctx.params.name, - method = ctx.request.method, - path = ctx.request.path - } - `) - - execCtx := NewContext() - - // Set nested params - params := map[string]any{ - "id": "123", - "name": "test", - } - execCtx.Set("params", params) - - // Set nested request info - request := map[string]any{ - "method": "GET", - "path": "/api/test", - } - execCtx.Set("request", request) - - result, err := pool.Submit(bytecode, execCtx) - if err != nil { - t.Fatalf("Failed to submit job: %v", err) - } - - // Result should be a map - resultMap, ok := result.(map[string]any) - if !ok { - t.Fatalf("Expected map result, got %T", result) - } - - if resultMap["id"] != "123" { - t.Errorf("Expected id=123, got %v", resultMap["id"]) - } - if resultMap["name"] != "test" { - t.Errorf("Expected name=test, got %v", resultMap["name"]) - } - if resultMap["method"] != "GET" { - t.Errorf("Expected method=GET, got %v", resultMap["method"]) - } - if resultMap["path"] != "/api/test" { - t.Errorf("Expected path=/api/test, got %v", resultMap["path"]) - } -} - -// A key requirement for our worker pool is that we don't leak state between executions. -// This test confirms that by setting a global variable in one job and then checking -// that it's been cleared before the next job runs on the same worker. -func TestStateReset(t *testing.T) { - pool, err := NewPool(1) // Use 1 worker to ensure same state is reused - if err != nil { - t.Fatalf("Failed to create pool: %v", err) - } - defer pool.Shutdown() - - // First job sets a global - bytecode1 := createTestBytecode(t, ` - global_var = "should be cleared" - return true - `) - - // Second job checks if global exists - bytecode2 := createTestBytecode(t, ` - return global_var ~= nil - `) - - // Run first job - _, err = pool.Submit(bytecode1, nil) - if err != nil { - t.Fatalf("Failed to submit first job: %v", err) - } - - // Run second job - result, err := pool.Submit(bytecode2, nil) - if err != nil { - t.Fatalf("Failed to submit second job: %v", err) - } - - // Global should be cleared - if result.(bool) { - t.Errorf("Expected global_var to be cleared, but it still exists") - } -} - -// Let's make sure our pool shuts down cleanly. This test confirms that jobs work -// before shutdown, that we get the right error when trying to submit after shutdown, -// and that we properly handle attempts to shut down an already closed pool. -func TestPoolShutdown(t *testing.T) { - pool, err := NewPool(2) - if err != nil { - t.Fatalf("Failed to create pool: %v", err) - } - - // Submit a job to verify pool works - bytecode := createTestBytecode(t, "return 42") - _, err = pool.Submit(bytecode, nil) - if err != nil { - t.Fatalf("Failed to submit job: %v", err) - } - - // Shutdown - if err := pool.Shutdown(); err != nil { - t.Errorf("Shutdown failed: %v", err) - } - - // Submit after shutdown should fail - _, err = pool.Submit(bytecode, nil) - if err != ErrPoolClosed { - t.Errorf("Expected ErrPoolClosed, got %v", err) - } - - // Second shutdown should return error - if err := pool.Shutdown(); err != ErrPoolClosed { - t.Errorf("Expected ErrPoolClosed on second shutdown, got %v", err) - } -} - -// A robust worker pool needs to handle errors gracefully. This test checks various -// error scenarios: invalid bytecode, Lua runtime errors, nil context (which -// should work fine), and unsupported parameter types (which should properly error out). -func TestErrorHandling(t *testing.T) { - pool, err := NewPool(2) - if err != nil { - t.Fatalf("Failed to create pool: %v", err) - } - defer pool.Shutdown() - - // Test invalid bytecode - _, err = pool.Submit([]byte("not valid bytecode"), nil) - if err == nil { - t.Errorf("Expected error for invalid bytecode, got nil") - } - - // Test Lua runtime error - bytecode := createTestBytecode(t, ` - error("intentional error") - return true - `) - - _, err = pool.Submit(bytecode, nil) - if err == nil { - t.Errorf("Expected error from Lua error() call, got nil") - } - - // Test with nil context - bytecode = createTestBytecode(t, "return ctx == nil") - result, err := pool.Submit(bytecode, nil) - if err != nil { - t.Errorf("Unexpected error with nil context: %v", err) - } - if result.(bool) != true { - t.Errorf("Expected ctx to be nil in Lua, but it wasn't") - } - - // Test invalid context value - execCtx := NewContext() - execCtx.Set("param", complex128(1+2i)) // Unsupported type - - bytecode = createTestBytecode(t, "return ctx.param") - _, err = pool.Submit(bytecode, execCtx) - if err == nil { - t.Errorf("Expected error for unsupported context value type, got nil") - } -} - -// The whole point of a worker pool is concurrent processing, so we need to verify -// it works under load. This test submits multiple jobs simultaneously and makes sure -// they all complete correctly with their own unique results. -func TestConcurrentExecution(t *testing.T) { - const workers = 4 - const jobs = 20 - - pool, err := NewPool(workers) - if err != nil { - t.Fatalf("Failed to create pool: %v", err) - } - defer pool.Shutdown() - - // Create bytecode that returns its input - bytecode := createTestBytecode(t, "return ctx.n") - - // Run multiple jobs concurrently - results := make(chan int, jobs) - for i := 0; i < jobs; i++ { - i := i // Capture loop variable - go func() { - execCtx := NewContext() - execCtx.Set("n", float64(i)) - - result, err := pool.Submit(bytecode, execCtx) - if err != nil { - t.Errorf("Job %d failed: %v", i, err) - results <- -1 - return - } - - num, ok := result.(float64) - if !ok { - t.Errorf("Job %d: expected float64, got %T", i, result) - results <- -1 - return - } - - results <- int(num) - }() - } - - // Collect results - counts := make(map[int]bool) - for i := 0; i < jobs; i++ { - result := <-results - if result != -1 { - counts[result] = true - } - } - - // Verify all jobs were processed - if len(counts) != jobs { - t.Errorf("Expected %d unique results, got %d", jobs, len(counts)) - } -} - -// Test context operations -func TestContext(t *testing.T) { - ctx := NewContext() - - // Test Set and Get - ctx.Set("key", "value") - if ctx.Get("key") != "value" { - t.Errorf("Expected value, got %v", ctx.Get("key")) - } - - // Test overwriting - ctx.Set("key", 123) - if ctx.Get("key") != 123 { - t.Errorf("Expected 123, got %v", ctx.Get("key")) - } - - // Test missing key - if ctx.Get("missing") != nil { - t.Errorf("Expected nil for missing key, got %v", ctx.Get("missing")) - } -} diff --git a/moonshark.go b/moonshark.go index 1e17c49..77fc25a 100644 --- a/moonshark.go +++ b/moonshark.go @@ -12,9 +12,9 @@ import ( "git.sharkk.net/Sky/Moonshark/core/http" "git.sharkk.net/Sky/Moonshark/core/logger" "git.sharkk.net/Sky/Moonshark/core/routers" + "git.sharkk.net/Sky/Moonshark/core/runner" "git.sharkk.net/Sky/Moonshark/core/utils" "git.sharkk.net/Sky/Moonshark/core/watchers" - "git.sharkk.net/Sky/Moonshark/core/workers" ) // initRouters sets up the Lua and static routers @@ -47,7 +47,7 @@ func initRouters(routesDir, staticDir string, log *logger.Logger) (*routers.LuaR func main() { // Initialize logger - log := logger.New(logger.LevelInfo, true) + log := logger.New(logger.LevelFatal, true) log.Info("Starting Moonshark server") @@ -87,19 +87,21 @@ func main() { log.Info("File watcher active for static files") } - // Get worker pool size from config or use default - workerPoolSize := cfg.GetInt("worker_pool_size", 4) + // Get buffer size from config or use default (used to be worker pool size) + bufferSize := cfg.GetInt("buffer_size", 20) - // Initialize worker pool - pool, err := workers.NewPool(workerPoolSize) + // Initialize Lua runner (replacing worker pool) + runner, err := runner.NewRunner( + runner.WithBufferSize(bufferSize), + ) if err != nil { - log.Fatal("Failed to initialize worker pool: %v", err) + log.Fatal("Failed to initialize Lua runner: %v", err) } - log.Info("Worker pool initialized with %d workers", workerPoolSize) - defer pool.Shutdown() + log.Info("Lua runner initialized with buffer size %d", bufferSize) + defer runner.Close() // Create HTTP server - server := http.New(luaRouter, staticRouter, pool, log) + server := http.New(luaRouter, staticRouter, runner, log) // Handle graceful shutdown stop := make(chan os.Signal, 1)