Compare commits

...

2 Commits

Author SHA1 Message Date
95a4187d3f logger and config updates 2025-03-16 12:57:56 -05:00
d66cb603c6 luarunner 2025-03-15 22:38:03 -05:00
14 changed files with 760 additions and 1286 deletions

View File

@ -9,24 +9,24 @@ import (
"git.sharkk.net/Sky/Moonshark/core/logger"
"git.sharkk.net/Sky/Moonshark/core/routers"
"git.sharkk.net/Sky/Moonshark/core/workers"
"git.sharkk.net/Sky/Moonshark/core/runner"
)
// Server handles HTTP requests using Lua and static file routers
type Server struct {
luaRouter *routers.LuaRouter
staticRouter *routers.StaticRouter
workerPool *workers.Pool
luaRunner *runner.LuaRunner
logger *logger.Logger
httpServer *http.Server
}
// New creates a new HTTP server with optimized connection settings
func New(luaRouter *routers.LuaRouter, staticRouter *routers.StaticRouter, pool *workers.Pool, log *logger.Logger) *Server {
func New(luaRouter *routers.LuaRouter, staticRouter *routers.StaticRouter, runner *runner.LuaRunner, log *logger.Logger) *Server {
server := &Server{
luaRouter: luaRouter,
staticRouter: staticRouter,
workerPool: pool,
luaRunner: runner,
logger: log,
httpServer: &http.Server{
// Connection timeouts
@ -115,7 +115,7 @@ func (s *Server) handleRequest(w http.ResponseWriter, r *http.Request) {
// handleLuaRoute executes a Lua route
func (s *Server) handleLuaRoute(w http.ResponseWriter, r *http.Request, bytecode []byte, params *routers.Params) {
ctx := workers.NewContext()
ctx := runner.NewContext()
// Log bytecode size
s.logger.Debug("Executing Lua route with %d bytes of bytecode", len(bytecode))
@ -148,7 +148,7 @@ func (s *Server) handleLuaRoute(w http.ResponseWriter, r *http.Request, bytecode
}
// Execute Lua script
result, err := s.workerPool.Submit(bytecode, ctx)
result, err := s.luaRunner.Run(bytecode, ctx)
if err != nil {
s.logger.Error("Error executing Lua route: %v", err)
http.Error(w, "Internal Server Error", http.StatusInternalServerError)

View File

@ -27,6 +27,7 @@ const (
LevelWarning
LevelError
LevelFatal
LevelServer
)
// Level names and colors
@ -39,6 +40,7 @@ var levelProps = map[int]struct {
LevelWarning: {"WARN", colorYellow},
LevelError: {" ERR", colorRed},
LevelFatal: {"FATL", colorPurple},
LevelServer: {"SRVR", colorGreen},
}
// Time format for log messages
@ -227,6 +229,11 @@ func (l *Logger) Fatal(format string, args ...any) {
// No need for os.Exit here as it's handled in log()
}
// Server logs a server message
func (l *Logger) Server(format string, args ...any) {
l.log(LevelServer, format, args...)
}
// Default global logger
var defaultLogger = New(LevelInfo, true)
@ -255,6 +262,11 @@ func Fatal(format string, args ...any) {
defaultLogger.Fatal(format, args...)
}
// Server logs a server message to the default logger
func Server(format string, args ...any) {
defaultLogger.Server(format, args...)
}
// LogRaw logs a raw message to the default logger
func LogRaw(format string, args ...any) {
defaultLogger.LogRaw(format, args...)

View File

@ -1,4 +1,4 @@
package workers
package runner
// Context represents execution context for a Lua script
type Context struct {

View File

@ -1,4 +1,4 @@
package workers
package runner
// JobResult represents the result of a Lua script execution
type JobResult struct {

308
core/runner/luarunner.go Normal file
View File

@ -0,0 +1,308 @@
package runner
import (
"context"
"errors"
"sync"
"sync/atomic"
luajit "git.sharkk.net/Sky/LuaJIT-to-Go"
)
// Common errors
var (
ErrRunnerClosed = errors.New("lua runner is closed")
ErrInitFailed = errors.New("initialization failed")
)
// StateInitFunc is a function that initializes a Lua state
type StateInitFunc func(*luajit.State) error
// LuaRunner runs Lua scripts using a single Lua state
type LuaRunner struct {
state *luajit.State // The Lua state
jobQueue chan job // Channel for incoming jobs
isRunning atomic.Bool // Flag indicating if the runner is active
mu sync.RWMutex // Mutex for thread safety
wg sync.WaitGroup // WaitGroup for clean shutdown
initFunc StateInitFunc // Optional function to initialize Lua state
bufferSize int // Size of the job queue buffer
}
// NewRunner creates a new LuaRunner
func NewRunner(options ...RunnerOption) (*LuaRunner, error) {
// Default configuration
runner := &LuaRunner{
bufferSize: 10, // Default buffer size
}
// Apply options
for _, opt := range options {
opt(runner)
}
// Initialize Lua state
state := luajit.New()
if state == nil {
return nil, errors.New("failed to create Lua state")
}
runner.state = state
// Create job queue
runner.jobQueue = make(chan job, runner.bufferSize)
runner.isRunning.Store(true)
// Set up sandbox
if err := runner.setupSandbox(); err != nil {
state.Close()
return nil, ErrInitFailed
}
// Run init function if provided
if runner.initFunc != nil {
if err := runner.initFunc(state); err != nil {
state.Close()
return nil, ErrInitFailed
}
}
// Start the event loop
runner.wg.Add(1)
go runner.eventLoop()
return runner, nil
}
// RunnerOption defines a functional option for configuring the LuaRunner
type RunnerOption func(*LuaRunner)
// WithBufferSize sets the job queue buffer size
func WithBufferSize(size int) RunnerOption {
return func(r *LuaRunner) {
if size > 0 {
r.bufferSize = size
}
}
}
// WithInitFunc sets the init function for the Lua state
func WithInitFunc(initFunc StateInitFunc) RunnerOption {
return func(r *LuaRunner) {
r.initFunc = initFunc
}
}
// setupSandbox initializes the sandbox environment
func (r *LuaRunner) setupSandbox() error {
// This is the Lua script that creates our sandbox function
setupScript := `
-- Create a function to run code in a sandbox environment
function __create_sandbox()
-- Create new environment table
local env = {}
-- Add standard library modules (can be restricted as needed)
env.string = string
env.table = table
env.math = math
env.os = {
time = os.time,
date = os.date,
difftime = os.difftime,
clock = os.clock
}
env.tonumber = tonumber
env.tostring = tostring
env.type = type
env.pairs = pairs
env.ipairs = ipairs
env.next = next
env.select = select
env.unpack = unpack
env.pcall = pcall
env.xpcall = xpcall
env.error = error
env.assert = assert
-- Allow access to package.loaded for modules
env.require = function(name)
return package.loaded[name]
end
-- Create metatable to restrict access to _G
local mt = {
__index = function(t, k)
-- First check in env table
local v = rawget(env, k)
if v ~= nil then return v end
-- If not found, check for registered modules/functions
local moduleValue = _G[k]
if type(moduleValue) == "table" or
type(moduleValue) == "function" then
return moduleValue
end
return nil
end,
__newindex = function(t, k, v)
rawset(env, k, v)
end
}
setmetatable(env, mt)
return env
end
-- Create function to execute code with a sandbox
function __run_sandboxed(f, ctx)
local env = __create_sandbox()
-- Add context to the environment if provided
if ctx then
env.ctx = ctx
end
-- Set the environment and run the function
setfenv(f, env)
return f()
end
`
return r.state.DoString(setupScript)
}
// eventLoop processes jobs from the queue
func (r *LuaRunner) eventLoop() {
defer r.wg.Done()
defer r.state.Close()
// Process jobs until closure
for job := range r.jobQueue {
// Execute the job and send result
result := r.executeJob(job)
select {
case job.Result <- result:
// Result sent successfully
default:
// Result channel closed or full, discard the result
}
}
}
// executeJob runs a script in the sandbox environment
func (r *LuaRunner) executeJob(j job) JobResult {
// Re-run init function if needed
if r.initFunc != nil {
if err := r.initFunc(r.state); err != nil {
return JobResult{nil, err}
}
}
// Set up context if provided
if j.Context != nil {
// Push context table
r.state.NewTable()
// Add values to context table
for key, value := range j.Context.Values {
// Push key
r.state.PushString(key)
// Push value
if err := r.state.PushValue(value); err != nil {
return JobResult{nil, err}
}
// Set table[key] = value
r.state.SetTable(-3)
}
} else {
// Push nil if no context
r.state.PushNil()
}
// Load bytecode
if err := r.state.LoadBytecode(j.Bytecode, "script"); err != nil {
r.state.Pop(1) // Pop context
return JobResult{nil, err}
}
// Get the sandbox runner function
r.state.GetGlobal("__run_sandboxed")
// Push loaded function and context as arguments
r.state.PushCopy(-2) // Copy the loaded function
r.state.PushCopy(-4) // Copy the context table or nil
// Remove the original function and context
r.state.Remove(-5) // Remove original context
r.state.Remove(-4) // Remove original function
// Call the sandbox runner with 2 args (function and context), expecting 1 result
if err := r.state.Call(2, 1); err != nil {
return JobResult{nil, err}
}
// Get result
value, err := r.state.ToValue(-1)
r.state.Pop(1) // Pop result
return JobResult{value, err}
}
// RunWithContext executes a Lua script with context and timeout
func (r *LuaRunner) RunWithContext(ctx context.Context, bytecode []byte, execCtx *Context) (any, error) {
r.mu.RLock()
if !r.isRunning.Load() {
r.mu.RUnlock()
return nil, ErrRunnerClosed
}
r.mu.RUnlock()
resultChan := make(chan JobResult, 1)
j := job{
Bytecode: bytecode,
Context: execCtx,
Result: resultChan,
}
// Submit job with context
select {
case r.jobQueue <- j:
// Job submitted
case <-ctx.Done():
return nil, ctx.Err()
}
// Wait for result with context
select {
case result := <-resultChan:
return result.Value, result.Error
case <-ctx.Done():
return nil, ctx.Err()
}
}
// Run executes a Lua script
func (r *LuaRunner) Run(bytecode []byte, execCtx *Context) (any, error) {
return r.RunWithContext(context.Background(), bytecode, execCtx)
}
// Close gracefully shuts down the LuaRunner
func (r *LuaRunner) Close() error {
r.mu.Lock()
defer r.mu.Unlock()
if !r.isRunning.Load() {
return ErrRunnerClosed
}
r.isRunning.Store(false)
close(r.jobQueue)
// Wait for event loop to finish
r.wg.Wait()
return nil
}

View File

@ -0,0 +1,373 @@
package runner
import (
"context"
"testing"
"time"
luajit "git.sharkk.net/Sky/LuaJIT-to-Go"
)
// Helper function to create bytecode for testing
func createTestBytecode(t *testing.T, code string) []byte {
state := luajit.New()
if state == nil {
t.Fatal("Failed to create Lua state")
}
defer state.Close()
bytecode, err := state.CompileBytecode(code, "test")
if err != nil {
t.Fatalf("Failed to compile test bytecode: %v", err)
}
return bytecode
}
func TestRunnerBasic(t *testing.T) {
runner, err := NewRunner()
if err != nil {
t.Fatalf("Failed to create runner: %v", err)
}
defer runner.Close()
bytecode := createTestBytecode(t, "return 42")
result, err := runner.Run(bytecode, nil)
if err != nil {
t.Fatalf("Failed to run script: %v", err)
}
num, ok := result.(float64)
if !ok {
t.Fatalf("Expected float64 result, got %T", result)
}
if num != 42 {
t.Errorf("Expected 42, got %f", num)
}
}
func TestRunnerWithContext(t *testing.T) {
runner, err := NewRunner()
if err != nil {
t.Fatalf("Failed to create runner: %v", err)
}
defer runner.Close()
bytecode := createTestBytecode(t, `
return {
num = ctx.number,
str = ctx.text,
flag = ctx.enabled,
list = {ctx.table[1], ctx.table[2], ctx.table[3]},
}
`)
execCtx := NewContext()
execCtx.Set("number", 42.5)
execCtx.Set("text", "hello")
execCtx.Set("enabled", true)
execCtx.Set("table", []float64{10, 20, 30})
result, err := runner.Run(bytecode, execCtx)
if err != nil {
t.Fatalf("Failed to run job: %v", err)
}
// Result should be a map
resultMap, ok := result.(map[string]any)
if !ok {
t.Fatalf("Expected map result, got %T", result)
}
// Check values
if resultMap["num"] != 42.5 {
t.Errorf("Expected num=42.5, got %v", resultMap["num"])
}
if resultMap["str"] != "hello" {
t.Errorf("Expected str=hello, got %v", resultMap["str"])
}
if resultMap["flag"] != true {
t.Errorf("Expected flag=true, got %v", resultMap["flag"])
}
arr, ok := resultMap["list"].([]float64)
if !ok {
t.Fatalf("Expected []float64, got %T", resultMap["list"])
}
expected := []float64{10, 20, 30}
for i, v := range expected {
if arr[i] != v {
t.Errorf("Expected list[%d]=%f, got %f", i, v, arr[i])
}
}
}
func TestRunnerWithTimeout(t *testing.T) {
runner, err := NewRunner()
if err != nil {
t.Fatalf("Failed to create runner: %v", err)
}
defer runner.Close()
// Create bytecode that sleeps
bytecode := createTestBytecode(t, `
-- Sleep for 500ms
local start = os.time()
while os.difftime(os.time(), start) < 0.5 do end
return "done"
`)
// Test with timeout that should succeed
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
result, err := runner.RunWithContext(ctx, bytecode, nil)
if err != nil {
t.Fatalf("Unexpected error with sufficient timeout: %v", err)
}
if result != "done" {
t.Errorf("Expected 'done', got %v", result)
}
// Test with timeout that should fail
ctx, cancel = context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
_, err = runner.RunWithContext(ctx, bytecode, nil)
if err == nil {
t.Errorf("Expected timeout error, got nil")
}
}
func TestSandboxIsolation(t *testing.T) {
runner, err := NewRunner()
if err != nil {
t.Fatalf("Failed to create runner: %v", err)
}
defer runner.Close()
// Create a script that tries to modify a global variable
bytecode1 := createTestBytecode(t, `
-- Set a "global" variable
my_global = "test value"
return true
`)
_, err = runner.Run(bytecode1, nil)
if err != nil {
t.Fatalf("Failed to execute first script: %v", err)
}
// Now try to access that variable from another script
bytecode2 := createTestBytecode(t, `
-- Try to access the previously set global
return my_global ~= nil
`)
result, err := runner.Run(bytecode2, nil)
if err != nil {
t.Fatalf("Failed to execute second script: %v", err)
}
// The variable should not be accessible (sandbox isolation)
if result.(bool) {
t.Errorf("Expected sandbox isolation, but global variable was accessible")
}
}
func TestRunnerWithInit(t *testing.T) {
// Define an init function that registers a simple "math" module
mathInit := func(state *luajit.State) error {
// Register the "add" function
err := state.RegisterGoFunction("add", func(s *luajit.State) int {
a := s.ToNumber(1)
b := s.ToNumber(2)
s.PushNumber(a + b)
return 1 // Return one result
})
if err != nil {
return err
}
// Register a whole module
mathFuncs := map[string]luajit.GoFunction{
"multiply": func(s *luajit.State) int {
a := s.ToNumber(1)
b := s.ToNumber(2)
s.PushNumber(a * b)
return 1
},
"subtract": func(s *luajit.State) int {
a := s.ToNumber(1)
b := s.ToNumber(2)
s.PushNumber(a - b)
return 1
},
}
return RegisterModule(state, "math2", mathFuncs)
}
// Create a runner with our init function
runner, err := NewRunner(WithInitFunc(mathInit))
if err != nil {
t.Fatalf("Failed to create runner: %v", err)
}
defer runner.Close()
// Test the add function
bytecode1 := createTestBytecode(t, "return add(5, 7)")
result1, err := runner.Run(bytecode1, nil)
if err != nil {
t.Fatalf("Failed to call add function: %v", err)
}
num1, ok := result1.(float64)
if !ok || num1 != 12 {
t.Errorf("Expected add(5, 7) = 12, got %v", result1)
}
// Test the math2 module
bytecode2 := createTestBytecode(t, "return math2.multiply(6, 8)")
result2, err := runner.Run(bytecode2, nil)
if err != nil {
t.Fatalf("Failed to call math2.multiply: %v", err)
}
num2, ok := result2.(float64)
if !ok || num2 != 48 {
t.Errorf("Expected math2.multiply(6, 8) = 48, got %v", result2)
}
}
func TestConcurrentExecution(t *testing.T) {
const jobs = 20
runner, err := NewRunner(WithBufferSize(20))
if err != nil {
t.Fatalf("Failed to create runner: %v", err)
}
defer runner.Close()
// Create bytecode that returns its input
bytecode := createTestBytecode(t, "return ctx.n")
// Run multiple jobs concurrently
results := make(chan int, jobs)
for i := 0; i < jobs; i++ {
i := i // Capture loop variable
go func() {
execCtx := NewContext()
execCtx.Set("n", float64(i))
result, err := runner.Run(bytecode, execCtx)
if err != nil {
t.Errorf("Job %d failed: %v", i, err)
results <- -1
return
}
num, ok := result.(float64)
if !ok {
t.Errorf("Job %d: expected float64, got %T", i, result)
results <- -1
return
}
results <- int(num)
}()
}
// Collect results
seen := make(map[int]bool)
for i := 0; i < jobs; i++ {
result := <-results
if result != -1 {
seen[result] = true
}
}
// Verify all jobs were processed
if len(seen) != jobs {
t.Errorf("Expected %d unique results, got %d", jobs, len(seen))
}
}
func TestRunnerClose(t *testing.T) {
runner, err := NewRunner()
if err != nil {
t.Fatalf("Failed to create runner: %v", err)
}
// Submit a job to verify runner works
bytecode := createTestBytecode(t, "return 42")
_, err = runner.Run(bytecode, nil)
if err != nil {
t.Fatalf("Failed to run job: %v", err)
}
// Close
if err := runner.Close(); err != nil {
t.Errorf("Close failed: %v", err)
}
// Run after close should fail
_, err = runner.Run(bytecode, nil)
if err != ErrRunnerClosed {
t.Errorf("Expected ErrRunnerClosed, got %v", err)
}
// Second close should return error
if err := runner.Close(); err != ErrRunnerClosed {
t.Errorf("Expected ErrRunnerClosed on second close, got %v", err)
}
}
func TestErrorHandling(t *testing.T) {
runner, err := NewRunner()
if err != nil {
t.Fatalf("Failed to create runner: %v", err)
}
defer runner.Close()
// Test invalid bytecode
_, err = runner.Run([]byte("not valid bytecode"), nil)
if err == nil {
t.Errorf("Expected error for invalid bytecode, got nil")
}
// Test Lua runtime error
bytecode := createTestBytecode(t, `
error("intentional error")
return true
`)
_, err = runner.Run(bytecode, nil)
if err == nil {
t.Errorf("Expected error from Lua error() call, got nil")
}
// Test with nil context
bytecode = createTestBytecode(t, "return ctx == nil")
result, err := runner.Run(bytecode, nil)
if err != nil {
t.Errorf("Unexpected error with nil context: %v", err)
}
if result.(bool) != true {
t.Errorf("Expected ctx to be nil in Lua, but it wasn't")
}
// Test invalid context value
execCtx := NewContext()
execCtx.Set("param", complex128(1+2i)) // Unsupported type
bytecode = createTestBytecode(t, "return ctx.param")
_, err = runner.Run(bytecode, execCtx)
if err == nil {
t.Errorf("Expected error for unsupported context value type, got nil")
}
}

View File

@ -1,4 +1,4 @@
package workers
package runner
import (
luajit "git.sharkk.net/Sky/LuaJIT-to-Go"
@ -7,18 +7,6 @@ import (
// ModuleFunc is a function that returns a map of module functions
type ModuleFunc func() map[string]luajit.GoFunction
// ModuleInitFunc creates a state initializer that registers multiple modules
func ModuleInitFunc(modules map[string]ModuleFunc) StateInitFunc {
return func(state *luajit.State) error {
for name, moduleFunc := range modules {
if err := RegisterModule(state, name, moduleFunc()); err != nil {
return err
}
}
return nil
}
}
// RegisterModule registers a map of functions as a Lua module
func RegisterModule(state *luajit.State, name string, funcs map[string]luajit.GoFunction) error {
// Create a new table for the module
@ -44,6 +32,18 @@ func RegisterModule(state *luajit.State, name string, funcs map[string]luajit.Go
return nil
}
// ModuleInitFunc creates a state initializer that registers multiple modules
func ModuleInitFunc(modules map[string]ModuleFunc) StateInitFunc {
return func(state *luajit.State) error {
for name, moduleFunc := range modules {
if err := RegisterModule(state, name, moduleFunc()); err != nil {
return err
}
}
return nil
}
}
// CombineInitFuncs combines multiple state initializer functions into one
func CombineInitFuncs(funcs ...StateInitFunc) StateInitFunc {
return func(state *luajit.State) error {

View File

@ -1,107 +0,0 @@
# Worker Pool
### Pool
```go
type Pool struct { ... }
// Create a pool with specified number of workers
func NewPool(numWorkers int) (*Pool, error)
// Submit a job with default context
func (p *Pool) Submit(bytecode []byte, ctx *Context) (any, error)
// Submit with timeout/cancellation support
func (p *Pool) SubmitWithContext(ctx context.Context, bytecode []byte, execCtx *Context) (any, error)
// Shutdown the pool
func (p *Pool) Shutdown() error
// Get number of active workers
func (p *Pool) ActiveWorkers() uint32
```
### Context
```go
type Context struct { ... }
// Create a new execution context
func NewContext() *Context
// Set a value
func (c *Context) Set(key string, value any)
// Get a value
func (c *Context) Get(key string) any
```
## Basic Usage
```go
// Create worker pool
pool, err := workers.NewPool(4)
if err != nil {
log.Fatal(err)
}
defer pool.Shutdown()
// Compile bytecode (typically done once and reused)
state := luajit.New()
bytecode, err := state.CompileBytecode(`
return ctx.message .. " from Lua"
`, "script")
state.Close()
// Set up execution context
ctx := workers.NewContext()
ctx.Set("message", "Hello")
ctx.Set("params", map[string]any{"id": "123"})
// Execute bytecode
result, err := pool.Submit(bytecode, ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println(result) // "Hello from Lua"
```
## With Timeout
```go
// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
// Execute with timeout
result, err := pool.SubmitWithContext(ctx, bytecode, execCtx)
if err != nil {
// Handle timeout or error
}
```
## In Lua Scripts
Inside Lua, the context is available as the global `ctx` table:
```lua
-- Access a simple value
local msg = ctx.message
-- Access nested values
local id = ctx.params.id
-- Return a result to Go
return {
status = "success",
data = msg
}
```
## Important Notes
- The pool is thread-safe; multiple goroutines can submit jobs concurrently
- Each execution is isolated; global state is reset between executions
- Bytecode should be compiled once and reused for better performance
- Context values should be serializable to Lua (numbers, strings, booleans, maps, slices)

View File

@ -1,346 +0,0 @@
package workers
import (
"testing"
luajit "git.sharkk.net/Sky/LuaJIT-to-Go"
)
func TestModuleRegistration(t *testing.T) {
// Define an init function that registers a simple "math" module
mathInit := func(state *luajit.State) error {
// Register the "add" function
err := state.RegisterGoFunction("add", func(s *luajit.State) int {
a := s.ToNumber(1)
b := s.ToNumber(2)
s.PushNumber(a + b)
return 1 // Return one result
})
if err != nil {
return err
}
// Register a whole module
mathFuncs := map[string]luajit.GoFunction{
"multiply": func(s *luajit.State) int {
a := s.ToNumber(1)
b := s.ToNumber(2)
s.PushNumber(a * b)
return 1
},
"subtract": func(s *luajit.State) int {
a := s.ToNumber(1)
b := s.ToNumber(2)
s.PushNumber(a - b)
return 1
},
}
return RegisterModule(state, "math2", mathFuncs)
}
// Create a pool with our init function
pool, err := NewPoolWithInit(2, mathInit)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
defer pool.Shutdown()
// Test the add function
bytecode1 := createTestBytecode(t, "return add(5, 7)")
result1, err := pool.Submit(bytecode1, nil)
if err != nil {
t.Fatalf("Failed to call add function: %v", err)
}
num1, ok := result1.(float64)
if !ok || num1 != 12 {
t.Errorf("Expected add(5, 7) = 12, got %v", result1)
}
// Test the math2 module
bytecode2 := createTestBytecode(t, "return math2.multiply(6, 8)")
result2, err := pool.Submit(bytecode2, nil)
if err != nil {
t.Fatalf("Failed to call math2.multiply: %v", err)
}
num2, ok := result2.(float64)
if !ok || num2 != 48 {
t.Errorf("Expected math2.multiply(6, 8) = 48, got %v", result2)
}
// Test multiple operations
bytecode3 := createTestBytecode(t, `
local a = add(10, 20)
local b = math2.subtract(a, 5)
return math2.multiply(b, 2)
`)
result3, err := pool.Submit(bytecode3, nil)
if err != nil {
t.Fatalf("Failed to execute combined operations: %v", err)
}
num3, ok := result3.(float64)
if !ok || num3 != 50 {
t.Errorf("Expected ((10 + 20) - 5) * 2 = 50, got %v", result3)
}
}
func TestModuleInitFunc(t *testing.T) {
// Define math module functions
mathModule := func() map[string]luajit.GoFunction {
return map[string]luajit.GoFunction{
"add": func(s *luajit.State) int {
a := s.ToNumber(1)
b := s.ToNumber(2)
s.PushNumber(a + b)
return 1
},
"multiply": func(s *luajit.State) int {
a := s.ToNumber(1)
b := s.ToNumber(2)
s.PushNumber(a * b)
return 1
},
}
}
// Define string module functions
strModule := func() map[string]luajit.GoFunction {
return map[string]luajit.GoFunction{
"concat": func(s *luajit.State) int {
a := s.ToString(1)
b := s.ToString(2)
s.PushString(a + b)
return 1
},
}
}
// Create module map
modules := map[string]ModuleFunc{
"math2": mathModule,
"str": strModule,
}
// Create pool with module init
pool, err := NewPoolWithInit(2, ModuleInitFunc(modules))
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
defer pool.Shutdown()
// Test math module
bytecode1 := createTestBytecode(t, "return math2.add(5, 7)")
result1, err := pool.Submit(bytecode1, nil)
if err != nil {
t.Fatalf("Failed to call math2.add: %v", err)
}
num1, ok := result1.(float64)
if !ok || num1 != 12 {
t.Errorf("Expected math2.add(5, 7) = 12, got %v", result1)
}
// Test string module
bytecode2 := createTestBytecode(t, "return str.concat('hello', 'world')")
result2, err := pool.Submit(bytecode2, nil)
if err != nil {
t.Fatalf("Failed to call str.concat: %v", err)
}
str2, ok := result2.(string)
if !ok || str2 != "helloworld" {
t.Errorf("Expected str.concat('hello', 'world') = 'helloworld', got %v", result2)
}
}
func TestCombineInitFuncs(t *testing.T) {
// First init function adds a function to get a constant value
init1 := func(state *luajit.State) error {
return state.RegisterGoFunction("getAnswer", func(s *luajit.State) int {
s.PushNumber(42)
return 1
})
}
// Second init function registers a function that multiplies a number by 2
init2 := func(state *luajit.State) error {
return state.RegisterGoFunction("double", func(s *luajit.State) int {
n := s.ToNumber(1)
s.PushNumber(n * 2)
return 1
})
}
// Combine the init functions
combinedInit := CombineInitFuncs(init1, init2)
// Create a pool with the combined init function
pool, err := NewPoolWithInit(1, combinedInit)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
defer pool.Shutdown()
// Test using both functions together in a single script
bytecode := createTestBytecode(t, "return double(getAnswer())")
result, err := pool.Submit(bytecode, nil)
if err != nil {
t.Fatalf("Failed to execute: %v", err)
}
num, ok := result.(float64)
if !ok || num != 84 {
t.Errorf("Expected double(getAnswer()) = 84, got %v", result)
}
}
func TestSandboxIsolation(t *testing.T) {
// Create a pool
pool, err := NewPool(2)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
defer pool.Shutdown()
// Create a script that tries to modify a global variable
bytecode1 := createTestBytecode(t, `
-- Set a "global" variable
my_global = "test value"
return true
`)
_, err = pool.Submit(bytecode1, nil)
if err != nil {
t.Fatalf("Failed to execute first script: %v", err)
}
// Now try to access that variable from another script
bytecode2 := createTestBytecode(t, `
-- Try to access the previously set global
return my_global ~= nil
`)
result, err := pool.Submit(bytecode2, nil)
if err != nil {
t.Fatalf("Failed to execute second script: %v", err)
}
// The variable should not be accessible (sandbox isolation)
if result.(bool) {
t.Errorf("Expected sandbox isolation, but global variable was accessible")
}
}
func TestContextInSandbox(t *testing.T) {
// Create a pool
pool, err := NewPool(2)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
defer pool.Shutdown()
// Create a context with test data
ctx := NewContext()
ctx.Set("name", "test")
ctx.Set("value", 42.5)
ctx.Set("items", []float64{1, 2, 3})
bytecode := createTestBytecode(t, `
-- Access and manipulate context values
local sum = 0
for i, v in ipairs(ctx.items) do
sum = sum + v
end
return {
name_length = string.len(ctx.name),
value_doubled = ctx.value * 2,
items_sum = sum
}
`)
result, err := pool.Submit(bytecode, ctx)
if err != nil {
t.Fatalf("Failed to execute script with context: %v", err)
}
resultMap, ok := result.(map[string]any)
if !ok {
t.Fatalf("Expected map result, got %T", result)
}
// Check context values were correctly accessible
if resultMap["name_length"].(float64) != 4 {
t.Errorf("Expected name_length = 4, got %v", resultMap["name_length"])
}
if resultMap["value_doubled"].(float64) != 85 {
t.Errorf("Expected value_doubled = 85, got %v", resultMap["value_doubled"])
}
if resultMap["items_sum"].(float64) != 6 {
t.Errorf("Expected items_sum = 6, got %v", resultMap["items_sum"])
}
}
func TestStandardLibsInSandbox(t *testing.T) {
// Create a pool
pool, err := NewPool(2)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
defer pool.Shutdown()
// Test access to standard libraries
bytecode := createTestBytecode(t, `
local results = {}
-- Test string library
results.string_upper = string.upper("test")
-- Test math library
results.math_sqrt = math.sqrt(16)
-- Test table library
local tbl = {10, 20, 30}
table.insert(tbl, 40)
results.table_length = #tbl
-- Test os library (limited functions)
results.has_os_time = type(os.time) == "function"
return results
`)
result, err := pool.Submit(bytecode, nil)
if err != nil {
t.Fatalf("Failed to execute script: %v", err)
}
resultMap, ok := result.(map[string]any)
if !ok {
t.Fatalf("Expected map result, got %T", result)
}
// Check standard library functions worked
if resultMap["string_upper"] != "TEST" {
t.Errorf("Expected string_upper = 'TEST', got %v", resultMap["string_upper"])
}
if resultMap["math_sqrt"].(float64) != 4 {
t.Errorf("Expected math_sqrt = 4, got %v", resultMap["math_sqrt"])
}
if resultMap["table_length"].(float64) != 4 {
t.Errorf("Expected table_length = 4, got %v", resultMap["table_length"])
}
if resultMap["has_os_time"] != true {
t.Errorf("Expected has_os_time = true, got %v", resultMap["has_os_time"])
}
}

View File

@ -1,123 +0,0 @@
package workers
import (
"context"
"sync"
"sync/atomic"
luajit "git.sharkk.net/Sky/LuaJIT-to-Go"
)
// StateInitFunc is a function that initializes a Lua state
// It can be used to register custom functions and modules
type StateInitFunc func(*luajit.State) error
// Pool manages a pool of Lua worker goroutines
type Pool struct {
workers uint32 // Number of workers
jobs chan job // Channel to send jobs to workers
wg sync.WaitGroup // WaitGroup to track active workers
quit chan struct{} // Channel to signal shutdown
isRunning atomic.Bool // Flag to track if pool is running
stateInit StateInitFunc // Optional function to initialize Lua state
}
// NewPool creates a new worker pool with the specified number of workers
func NewPool(numWorkers int) (*Pool, error) {
return NewPoolWithInit(numWorkers, nil)
}
// NewPoolWithInit creates a new worker pool with the specified number of workers
// and a function to initialize each worker's Lua state
func NewPoolWithInit(numWorkers int, initFunc StateInitFunc) (*Pool, error) {
if numWorkers <= 0 {
return nil, ErrNoWorkers
}
p := &Pool{
workers: uint32(numWorkers),
jobs: make(chan job, numWorkers), // Buffer equal to worker count
quit: make(chan struct{}),
stateInit: initFunc,
}
p.isRunning.Store(true)
// Start workers
p.wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
w := &worker{
pool: p,
id: uint32(i),
}
go w.run()
}
return p, nil
}
// RegisterGlobal is no longer needed with the sandbox approach
// but kept as a no-op for backward compatibility
func (p *Pool) RegisterGlobal(name string) {
// No-op in sandbox mode
}
// SubmitWithContext sends a job to the worker pool with context
func (p *Pool) SubmitWithContext(ctx context.Context, bytecode []byte, execCtx *Context) (any, error) {
if !p.isRunning.Load() {
return nil, ErrPoolClosed
}
resultChan := make(chan JobResult, 1)
j := job{
Bytecode: bytecode,
Context: execCtx,
Result: resultChan,
}
// Submit job with context
select {
case p.jobs <- j:
// Job submitted
case <-ctx.Done():
return nil, ctx.Err()
}
// Wait for result with context
select {
case result := <-resultChan:
return result.Value, result.Error
case <-ctx.Done():
// Note: The job will still be processed by a worker,
// but the result will be discarded
return nil, ctx.Err()
}
}
// Submit sends a job to the worker pool
func (p *Pool) Submit(bytecode []byte, execCtx *Context) (any, error) {
return p.SubmitWithContext(context.Background(), bytecode, execCtx)
}
// Shutdown gracefully shuts down the worker pool
func (p *Pool) Shutdown() error {
if !p.isRunning.Load() {
return ErrPoolClosed
}
p.isRunning.Store(false)
// Signal workers to quit
close(p.quit)
// Wait for workers to finish
p.wg.Wait()
// Close jobs channel
close(p.jobs)
return nil
}
// ActiveWorkers returns the number of active workers
func (p *Pool) ActiveWorkers() uint32 {
return atomic.LoadUint32(&p.workers)
}

View File

@ -1,144 +0,0 @@
package workers
// setupSandbox initializes the sandbox environment creation function
func (w *worker) setupSandbox() error {
// This is the Lua script that creates our sandbox function
setupScript := `
-- Create a function to run code in a sandbox environment
function __create_sandbox()
-- Create new environment table
local env = {}
-- Add standard library modules (can be restricted as needed)
env.string = string
env.table = table
env.math = math
env.os = {
time = os.time,
date = os.date,
difftime = os.difftime,
clock = os.clock
}
env.tonumber = tonumber
env.tostring = tostring
env.type = type
env.pairs = pairs
env.ipairs = ipairs
env.next = next
env.select = select
env.unpack = unpack
env.pcall = pcall
env.xpcall = xpcall
env.error = error
env.assert = assert
-- Allow access to package.loaded for modules
env.require = function(name)
return package.loaded[name]
end
-- Create metatable to restrict access to _G
local mt = {
__index = function(t, k)
-- First check in env table
local v = rawget(env, k)
if v ~= nil then return v end
-- If not found, check for registered modules/functions
local moduleValue = _G[k]
if type(moduleValue) == "table" or
type(moduleValue) == "function" then
return moduleValue
end
return nil
end,
__newindex = function(t, k, v)
rawset(env, k, v)
end
}
setmetatable(env, mt)
return env
end
-- Create function to execute code with a sandbox
function __run_sandboxed(f, ctx)
local env = __create_sandbox()
-- Add context to the environment if provided
if ctx then
env.ctx = ctx
end
-- Set the environment and run the function
setfenv(f, env)
return f()
end
`
return w.state.DoString(setupScript)
}
// executeJobSandboxed runs a script in a sandbox environment
func (w *worker) executeJobSandboxed(j job) JobResult {
// No need to reset the state for each execution, since we're using a sandbox
// Re-run init function to register functions and modules if needed
if w.pool.stateInit != nil {
if err := w.pool.stateInit(w.state); err != nil {
return JobResult{nil, err}
}
}
// Set up context if provided
if j.Context != nil {
// Push context table
w.state.NewTable()
// Add values to context table
for key, value := range j.Context.Values {
// Push key
w.state.PushString(key)
// Push value
if err := w.state.PushValue(value); err != nil {
return JobResult{nil, err}
}
// Set table[key] = value
w.state.SetTable(-3)
}
} else {
// Push nil if no context
w.state.PushNil()
}
// Load bytecode
if err := w.state.LoadBytecode(j.Bytecode, "script"); err != nil {
w.state.Pop(1) // Pop context
return JobResult{nil, err}
}
// Get the sandbox runner function
w.state.GetGlobal("__run_sandboxed")
// Push loaded function and context as arguments
w.state.PushCopy(-2) // Copy the loaded function
w.state.PushCopy(-4) // Copy the context table or nil
// Remove the original function and context
w.state.Remove(-5) // Remove original context
w.state.Remove(-4) // Remove original function
// Call the sandbox runner with 2 args (function and context), expecting 1 result
if err := w.state.Call(2, 1); err != nil {
return JobResult{nil, err}
}
// Get result
value, err := w.state.ToValue(-1)
w.state.Pop(1) // Pop result
return JobResult{value, err}
}

View File

@ -1,71 +0,0 @@
package workers
import (
"errors"
"sync/atomic"
luajit "git.sharkk.net/Sky/LuaJIT-to-Go"
)
// Common errors
var (
ErrPoolClosed = errors.New("worker pool is closed")
ErrNoWorkers = errors.New("no workers available")
ErrInitFailed = errors.New("worker initialization failed")
)
// worker represents a single Lua execution worker
type worker struct {
pool *Pool // Reference to the pool
state *luajit.State // Lua state
id uint32 // Worker ID
}
// run is the main worker function that processes jobs
func (w *worker) run() {
defer w.pool.wg.Done()
// Initialize Lua state
w.state = luajit.New()
if w.state == nil {
// Worker failed to initialize, decrement counter
atomic.AddUint32(&w.pool.workers, ^uint32(0))
return
}
defer w.state.Close()
// Set up sandbox environment
if err := w.setupSandbox(); err != nil {
// Worker failed to initialize sandbox, decrement counter
atomic.AddUint32(&w.pool.workers, ^uint32(0))
return
}
// Run init function if provided
if w.pool.stateInit != nil {
if err := w.pool.stateInit(w.state); err != nil {
// Worker failed to initialize with custom init function
atomic.AddUint32(&w.pool.workers, ^uint32(0))
return
}
}
// Main worker loop
for {
select {
case job, ok := <-w.pool.jobs:
if !ok {
// Jobs channel closed, exit
return
}
// Execute job
result := w.executeJobSandboxed(job)
job.Result <- result
case <-w.pool.quit:
// Quit signal received, exit
return
}
}
}

View File

@ -1,445 +0,0 @@
package workers
import (
"context"
"testing"
"time"
luajit "git.sharkk.net/Sky/LuaJIT-to-Go"
)
// This helper function creates real LuaJIT bytecode for our tests. Instead of using
// mocks, we compile actual Lua code into bytecode just like we would in production.
func createTestBytecode(t *testing.T, code string) []byte {
state := luajit.New()
if state == nil {
t.Fatal("Failed to create Lua state")
}
defer state.Close()
bytecode, err := state.CompileBytecode(code, "test")
if err != nil {
t.Fatalf("Failed to compile test bytecode: %v", err)
}
return bytecode
}
// This test makes sure we can create a worker pool with a valid number of workers,
// and that we properly reject attempts to create a pool with zero or negative workers.
func TestNewPool(t *testing.T) {
tests := []struct {
name string
workers int
expectErr bool
}{
{"valid workers", 4, false},
{"zero workers", 0, true},
{"negative workers", -1, true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pool, err := NewPool(tt.workers)
if tt.expectErr {
if err == nil {
t.Errorf("Expected error for %d workers, got nil", tt.workers)
}
} else {
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if pool == nil {
t.Errorf("Expected non-nil pool")
} else {
pool.Shutdown()
}
}
})
}
}
// Here we're testing the basic job submission flow. We run a simple Lua script
// that returns the number 42 and make sure we get that same value back from the worker pool.
func TestPoolSubmit(t *testing.T) {
pool, err := NewPool(2)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
defer pool.Shutdown()
bytecode := createTestBytecode(t, "return 42")
result, err := pool.Submit(bytecode, nil)
if err != nil {
t.Fatalf("Failed to submit job: %v", err)
}
num, ok := result.(float64)
if !ok {
t.Fatalf("Expected float64 result, got %T", result)
}
if num != 42 {
t.Errorf("Expected 42, got %f", num)
}
}
// This test checks how our worker pool handles timeouts. We run a script that takes
// some time to complete and verify two scenarios: one where the timeout is long enough
// for successful completion, and another where we expect the operation to be canceled
// due to a short timeout.
func TestPoolSubmitWithContext(t *testing.T) {
pool, err := NewPool(2)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
defer pool.Shutdown()
// Create bytecode that sleeps
bytecode := createTestBytecode(t, `
-- Sleep for 500ms
local start = os.time()
while os.difftime(os.time(), start) < 0.5 do end
return "done"
`)
// Test with timeout that should succeed
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
result, err := pool.SubmitWithContext(ctx, bytecode, nil)
if err != nil {
t.Fatalf("Unexpected error with sufficient timeout: %v", err)
}
if result != "done" {
t.Errorf("Expected 'done', got %v", result)
}
// Test with timeout that should fail
ctx, cancel = context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel()
_, err = pool.SubmitWithContext(ctx, bytecode, nil)
if err == nil {
t.Errorf("Expected timeout error, got nil")
}
}
// We need to make sure we can pass different types of context values from Go to Lua and
// get them back properly. This test sends numbers, strings, booleans, and arrays to
// a Lua script and verifies they're all handled correctly in both directions.
func TestContextValues(t *testing.T) {
pool, err := NewPool(2)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
defer pool.Shutdown()
bytecode := createTestBytecode(t, `
return {
num = ctx.number,
str = ctx.text,
flag = ctx.enabled,
list = {ctx.table[1], ctx.table[2], ctx.table[3]},
}
`)
execCtx := NewContext()
execCtx.Set("number", 42.5)
execCtx.Set("text", "hello")
execCtx.Set("enabled", true)
execCtx.Set("table", []float64{10, 20, 30})
result, err := pool.Submit(bytecode, execCtx)
if err != nil {
t.Fatalf("Failed to submit job: %v", err)
}
// Result should be a map
resultMap, ok := result.(map[string]any)
if !ok {
t.Fatalf("Expected map result, got %T", result)
}
// Check values
if resultMap["num"] != 42.5 {
t.Errorf("Expected num=42.5, got %v", resultMap["num"])
}
if resultMap["str"] != "hello" {
t.Errorf("Expected str=hello, got %v", resultMap["str"])
}
if resultMap["flag"] != true {
t.Errorf("Expected flag=true, got %v", resultMap["flag"])
}
arr, ok := resultMap["list"].([]float64)
if !ok {
t.Fatalf("Expected []float64, got %T", resultMap["list"])
}
expected := []float64{10, 20, 30}
for i, v := range expected {
if arr[i] != v {
t.Errorf("Expected list[%d]=%f, got %f", i, v, arr[i])
}
}
}
// Test context with nested data structures
func TestNestedContext(t *testing.T) {
pool, err := NewPool(2)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
defer pool.Shutdown()
bytecode := createTestBytecode(t, `
return {
id = ctx.params.id,
name = ctx.params.name,
method = ctx.request.method,
path = ctx.request.path
}
`)
execCtx := NewContext()
// Set nested params
params := map[string]any{
"id": "123",
"name": "test",
}
execCtx.Set("params", params)
// Set nested request info
request := map[string]any{
"method": "GET",
"path": "/api/test",
}
execCtx.Set("request", request)
result, err := pool.Submit(bytecode, execCtx)
if err != nil {
t.Fatalf("Failed to submit job: %v", err)
}
// Result should be a map
resultMap, ok := result.(map[string]any)
if !ok {
t.Fatalf("Expected map result, got %T", result)
}
if resultMap["id"] != "123" {
t.Errorf("Expected id=123, got %v", resultMap["id"])
}
if resultMap["name"] != "test" {
t.Errorf("Expected name=test, got %v", resultMap["name"])
}
if resultMap["method"] != "GET" {
t.Errorf("Expected method=GET, got %v", resultMap["method"])
}
if resultMap["path"] != "/api/test" {
t.Errorf("Expected path=/api/test, got %v", resultMap["path"])
}
}
// A key requirement for our worker pool is that we don't leak state between executions.
// This test confirms that by setting a global variable in one job and then checking
// that it's been cleared before the next job runs on the same worker.
func TestStateReset(t *testing.T) {
pool, err := NewPool(1) // Use 1 worker to ensure same state is reused
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
defer pool.Shutdown()
// First job sets a global
bytecode1 := createTestBytecode(t, `
global_var = "should be cleared"
return true
`)
// Second job checks if global exists
bytecode2 := createTestBytecode(t, `
return global_var ~= nil
`)
// Run first job
_, err = pool.Submit(bytecode1, nil)
if err != nil {
t.Fatalf("Failed to submit first job: %v", err)
}
// Run second job
result, err := pool.Submit(bytecode2, nil)
if err != nil {
t.Fatalf("Failed to submit second job: %v", err)
}
// Global should be cleared
if result.(bool) {
t.Errorf("Expected global_var to be cleared, but it still exists")
}
}
// Let's make sure our pool shuts down cleanly. This test confirms that jobs work
// before shutdown, that we get the right error when trying to submit after shutdown,
// and that we properly handle attempts to shut down an already closed pool.
func TestPoolShutdown(t *testing.T) {
pool, err := NewPool(2)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
// Submit a job to verify pool works
bytecode := createTestBytecode(t, "return 42")
_, err = pool.Submit(bytecode, nil)
if err != nil {
t.Fatalf("Failed to submit job: %v", err)
}
// Shutdown
if err := pool.Shutdown(); err != nil {
t.Errorf("Shutdown failed: %v", err)
}
// Submit after shutdown should fail
_, err = pool.Submit(bytecode, nil)
if err != ErrPoolClosed {
t.Errorf("Expected ErrPoolClosed, got %v", err)
}
// Second shutdown should return error
if err := pool.Shutdown(); err != ErrPoolClosed {
t.Errorf("Expected ErrPoolClosed on second shutdown, got %v", err)
}
}
// A robust worker pool needs to handle errors gracefully. This test checks various
// error scenarios: invalid bytecode, Lua runtime errors, nil context (which
// should work fine), and unsupported parameter types (which should properly error out).
func TestErrorHandling(t *testing.T) {
pool, err := NewPool(2)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
defer pool.Shutdown()
// Test invalid bytecode
_, err = pool.Submit([]byte("not valid bytecode"), nil)
if err == nil {
t.Errorf("Expected error for invalid bytecode, got nil")
}
// Test Lua runtime error
bytecode := createTestBytecode(t, `
error("intentional error")
return true
`)
_, err = pool.Submit(bytecode, nil)
if err == nil {
t.Errorf("Expected error from Lua error() call, got nil")
}
// Test with nil context
bytecode = createTestBytecode(t, "return ctx == nil")
result, err := pool.Submit(bytecode, nil)
if err != nil {
t.Errorf("Unexpected error with nil context: %v", err)
}
if result.(bool) != true {
t.Errorf("Expected ctx to be nil in Lua, but it wasn't")
}
// Test invalid context value
execCtx := NewContext()
execCtx.Set("param", complex128(1+2i)) // Unsupported type
bytecode = createTestBytecode(t, "return ctx.param")
_, err = pool.Submit(bytecode, execCtx)
if err == nil {
t.Errorf("Expected error for unsupported context value type, got nil")
}
}
// The whole point of a worker pool is concurrent processing, so we need to verify
// it works under load. This test submits multiple jobs simultaneously and makes sure
// they all complete correctly with their own unique results.
func TestConcurrentExecution(t *testing.T) {
const workers = 4
const jobs = 20
pool, err := NewPool(workers)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
defer pool.Shutdown()
// Create bytecode that returns its input
bytecode := createTestBytecode(t, "return ctx.n")
// Run multiple jobs concurrently
results := make(chan int, jobs)
for i := 0; i < jobs; i++ {
i := i // Capture loop variable
go func() {
execCtx := NewContext()
execCtx.Set("n", float64(i))
result, err := pool.Submit(bytecode, execCtx)
if err != nil {
t.Errorf("Job %d failed: %v", i, err)
results <- -1
return
}
num, ok := result.(float64)
if !ok {
t.Errorf("Job %d: expected float64, got %T", i, result)
results <- -1
return
}
results <- int(num)
}()
}
// Collect results
counts := make(map[int]bool)
for i := 0; i < jobs; i++ {
result := <-results
if result != -1 {
counts[result] = true
}
}
// Verify all jobs were processed
if len(counts) != jobs {
t.Errorf("Expected %d unique results, got %d", jobs, len(counts))
}
}
// Test context operations
func TestContext(t *testing.T) {
ctx := NewContext()
// Test Set and Get
ctx.Set("key", "value")
if ctx.Get("key") != "value" {
t.Errorf("Expected value, got %v", ctx.Get("key"))
}
// Test overwriting
ctx.Set("key", 123)
if ctx.Get("key") != 123 {
t.Errorf("Expected 123, got %v", ctx.Get("key"))
}
// Test missing key
if ctx.Get("missing") != nil {
t.Errorf("Expected nil for missing key, got %v", ctx.Get("missing"))
}
}

View File

@ -12,9 +12,9 @@ import (
"git.sharkk.net/Sky/Moonshark/core/http"
"git.sharkk.net/Sky/Moonshark/core/logger"
"git.sharkk.net/Sky/Moonshark/core/routers"
"git.sharkk.net/Sky/Moonshark/core/runner"
"git.sharkk.net/Sky/Moonshark/core/utils"
"git.sharkk.net/Sky/Moonshark/core/watchers"
"git.sharkk.net/Sky/Moonshark/core/workers"
)
// initRouters sets up the Lua and static routers
@ -47,9 +47,9 @@ func initRouters(routesDir, staticDir string, log *logger.Logger) (*routers.LuaR
func main() {
// Initialize logger
log := logger.New(logger.LevelInfo, true)
log := logger.New(logger.LevelDebug, true)
log.Info("Starting Moonshark server")
log.Server("Starting Moonshark server")
// Load configuration from config.lua
cfg, err := config.Load("config.lua")
@ -59,6 +59,19 @@ func main() {
cfg = config.New()
}
switch cfg.GetString("log_level", "info") {
case "debug":
log.SetLevel(logger.LevelDebug)
case "warn":
log.SetLevel(logger.LevelWarning)
case "error":
log.SetLevel(logger.LevelError)
case "fatal":
log.SetLevel(logger.LevelFatal)
default:
log.SetLevel(logger.LevelInfo)
}
// Get port from config or use default
port := cfg.GetInt("port", 3117)
@ -70,36 +83,40 @@ func main() {
log.Fatal("Router initialization failed: %v", err)
}
// Set up file watchers for automatic reloading
luaWatcher, err := watchers.WatchLuaRouter(luaRouter, routesDir, log)
if err != nil {
log.Warning("Failed to watch routes directory: %v", err)
} else {
defer luaWatcher.Close()
log.Info("File watcher active for Lua routes")
if cfg.GetBool("watchers", false) {
// Set up file watchers for automatic reloading
luaWatcher, err := watchers.WatchLuaRouter(luaRouter, routesDir, log)
if err != nil {
log.Warning("Failed to watch routes directory: %v", err)
} else {
defer luaWatcher.Close()
log.Info("File watcher active for Lua routes")
}
staticWatcher, err := watchers.WatchStaticRouter(staticRouter, staticDir, log)
if err != nil {
log.Warning("Failed to watch static directory: %v", err)
} else {
defer staticWatcher.Close()
log.Info("File watcher active for static files")
}
}
staticWatcher, err := watchers.WatchStaticRouter(staticRouter, staticDir, log)
if err != nil {
log.Warning("Failed to watch static directory: %v", err)
} else {
defer staticWatcher.Close()
log.Info("File watcher active for static files")
}
// Get buffer size from config or use default (used to be worker pool size)
bufferSize := cfg.GetInt("buffer_size", 20)
// Get worker pool size from config or use default
workerPoolSize := cfg.GetInt("worker_pool_size", 4)
// Initialize worker pool
pool, err := workers.NewPool(workerPoolSize)
// Initialize Lua runner (replacing worker pool)
runner, err := runner.NewRunner(
runner.WithBufferSize(bufferSize),
)
if err != nil {
log.Fatal("Failed to initialize worker pool: %v", err)
log.Fatal("Failed to initialize Lua runner: %v", err)
}
log.Info("Worker pool initialized with %d workers", workerPoolSize)
defer pool.Shutdown()
log.Server("Lua runner initialized with buffer size %d", bufferSize)
defer runner.Close()
// Create HTTP server
server := http.New(luaRouter, staticRouter, pool, log)
server := http.New(luaRouter, staticRouter, runner, log)
// Handle graceful shutdown
stop := make(chan os.Signal, 1)
@ -114,11 +131,11 @@ func main() {
}
}()
log.Info("Server started on port %d", port)
log.Server("Server started on port %d", port)
// Wait for interrupt signal
<-stop
log.Info("Shutdown signal received")
log.Server("Shutdown signal received")
// Gracefully shut down the server
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
@ -128,5 +145,5 @@ func main() {
log.Error("Server shutdown error: %v", err)
}
log.Info("Server stopped")
log.Server("Server stopped")
}