diff --git a/core/workers/README.md b/core/workers/README.md new file mode 100644 index 0000000..ba84b8f --- /dev/null +++ b/core/workers/README.md @@ -0,0 +1,107 @@ +# Worker Pool + +### Pool + +```go +type Pool struct { ... } + +// Create a pool with specified number of workers +func NewPool(numWorkers int) (*Pool, error) + +// Submit a job with default context +func (p *Pool) Submit(bytecode []byte, ctx *Context) (any, error) + +// Submit with timeout/cancellation support +func (p *Pool) SubmitWithContext(ctx context.Context, bytecode []byte, execCtx *Context) (any, error) + +// Shutdown the pool +func (p *Pool) Shutdown() error + +// Get number of active workers +func (p *Pool) ActiveWorkers() uint32 +``` + +### Context + +```go +type Context struct { ... } + +// Create a new execution context +func NewContext() *Context + +// Set a value +func (c *Context) Set(key string, value any) + +// Get a value +func (c *Context) Get(key string) any +``` + +## Basic Usage + +```go +// Create worker pool +pool, err := workers.NewPool(4) +if err != nil { + log.Fatal(err) +} +defer pool.Shutdown() + +// Compile bytecode (typically done once and reused) +state := luajit.New() +bytecode, err := state.CompileBytecode(` + return ctx.message .. " from Lua" +`, "script") +state.Close() + +// Set up execution context +ctx := workers.NewContext() +ctx.Set("message", "Hello") +ctx.Set("params", map[string]any{"id": "123"}) + +// Execute bytecode +result, err := pool.Submit(bytecode, ctx) +if err != nil { + log.Fatal(err) +} + +fmt.Println(result) // "Hello from Lua" +``` + +## With Timeout + +```go +// Create context with timeout +ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) +defer cancel() + +// Execute with timeout +result, err := pool.SubmitWithContext(ctx, bytecode, execCtx) +if err != nil { + // Handle timeout or error +} +``` + +## In Lua Scripts + +Inside Lua, the context is available as the global `ctx` table: + +```lua +-- Access a simple value +local msg = ctx.message + +-- Access nested values +local id = ctx.params.id + +-- Return a result to Go +return { + status = "success", + data = msg +} +``` + +## Important Notes + +- The pool is thread-safe; multiple goroutines can submit jobs concurrently +- Each execution is isolated; global state is reset between executions +- Bytecode should be compiled once and reused for better performance +- Context values should be serializable to Lua (numbers, strings, booleans, maps, slices) \ No newline at end of file diff --git a/core/workers/context.go b/core/workers/context.go new file mode 100644 index 0000000..17737b7 --- /dev/null +++ b/core/workers/context.go @@ -0,0 +1,24 @@ +package workers + +// Context represents execution context for a Lua script +type Context struct { + // Generic map for any context values (route params, HTTP request info, etc.) + Values map[string]any +} + +// NewContext creates a new context with initialized maps +func NewContext() *Context { + return &Context{ + Values: make(map[string]any), + } +} + +// Set adds a value to the context +func (c *Context) Set(key string, value any) { + c.Values[key] = value +} + +// Get retrieves a value from the context +func (c *Context) Get(key string) any { + return c.Values[key] +} diff --git a/core/workers/job.go b/core/workers/job.go index b268508..845c611 100644 --- a/core/workers/job.go +++ b/core/workers/job.go @@ -9,6 +9,6 @@ type JobResult struct { // job represents a Lua script execution request type job struct { Bytecode []byte // Compiled LuaJIT bytecode - Params map[string]any // Parameters to pass to the script + Context *Context // Execution context Result chan<- JobResult // Channel to send result back } diff --git a/core/workers/pool.go b/core/workers/pool.go index 0d92159..2859caf 100644 --- a/core/workers/pool.go +++ b/core/workers/pool.go @@ -42,7 +42,7 @@ func NewPool(numWorkers int) (*Pool, error) { } // SubmitWithContext sends a job to the worker pool with context -func (p *Pool) SubmitWithContext(ctx context.Context, bytecode []byte, params map[string]any) (any, error) { +func (p *Pool) SubmitWithContext(ctx context.Context, bytecode []byte, execCtx *Context) (any, error) { if !p.isRunning.Load() { return nil, ErrPoolClosed } @@ -50,7 +50,7 @@ func (p *Pool) SubmitWithContext(ctx context.Context, bytecode []byte, params ma resultChan := make(chan JobResult, 1) j := job{ Bytecode: bytecode, - Params: params, + Context: execCtx, Result: resultChan, } @@ -73,6 +73,11 @@ func (p *Pool) SubmitWithContext(ctx context.Context, bytecode []byte, params ma } } +// Submit sends a job to the worker pool +func (p *Pool) Submit(bytecode []byte, execCtx *Context) (any, error) { + return p.SubmitWithContext(context.Background(), bytecode, execCtx) +} + // Shutdown gracefully shuts down the worker pool func (p *Pool) Shutdown() error { if !p.isRunning.Load() { diff --git a/core/workers/worker.go b/core/workers/worker.go index e02259c..958cfed 100644 --- a/core/workers/worker.go +++ b/core/workers/worker.go @@ -1,7 +1,6 @@ package workers import ( - "context" "errors" "sync/atomic" @@ -102,13 +101,17 @@ func (w *worker) resetState() { w.state.DoString("__reset_globals()") } -// setParams sets job parameters as a global 'params' table -func (w *worker) setParams(params map[string]any) error { - // Create new table for params +// setContext sets job context as global tables in Lua state +func (w *worker) setContext(ctx *Context) error { + if ctx == nil { + return nil + } + + // Create context table w.state.NewTable() - // Add each parameter to the table - for key, value := range params { + // Add values to context table + for key, value := range ctx.Values { // Push key w.state.PushString(key) @@ -121,8 +124,8 @@ func (w *worker) setParams(params map[string]any) error { w.state.SetTable(-3) } - // Set the table as global 'params' - w.state.SetGlobal("params") + // Set the table as global 'ctx' + w.state.SetGlobal("ctx") return nil } @@ -132,9 +135,9 @@ func (w *worker) executeJob(j job) JobResult { // Reset state before execution w.resetState() - // Set parameters - if j.Params != nil { - if err := w.setParams(j.Params); err != nil { + // Set context + if j.Context != nil { + if err := w.setContext(j.Context); err != nil { return JobResult{nil, err} } } @@ -155,8 +158,3 @@ func (w *worker) executeJob(j job) JobResult { return JobResult{value, err} } - -// Submit sends a job to the worker pool -func (p *Pool) Submit(bytecode []byte, params map[string]any) (any, error) { - return p.SubmitWithContext(context.Background(), bytecode, params) -} diff --git a/core/workers/workers_test.go b/core/workers/workers_test.go index 63517f8..23dd712 100644 --- a/core/workers/workers_test.go +++ b/core/workers/workers_test.go @@ -127,10 +127,10 @@ func TestPoolSubmitWithContext(t *testing.T) { } } -// We need to make sure we can pass different types of parameters from Go to Lua and +// We need to make sure we can pass different types of context values from Go to Lua and // get them back properly. This test sends numbers, strings, booleans, and arrays to // a Lua script and verifies they're all handled correctly in both directions. -func TestJobParameters(t *testing.T) { +func TestContextValues(t *testing.T) { pool, err := NewPool(2) if err != nil { t.Fatalf("Failed to create pool: %v", err) @@ -139,21 +139,20 @@ func TestJobParameters(t *testing.T) { bytecode := createTestBytecode(t, ` return { - num = params.number, - str = params.text, - flag = params.enabled, - list = {params.table[1], params.table[2], params.table[3]}, + num = ctx.number, + str = ctx.text, + flag = ctx.enabled, + list = {ctx.table[1], ctx.table[2], ctx.table[3]}, } `) - params := map[string]any{ - "number": 42.5, - "text": "hello", - "enabled": true, - "table": []float64{10, 20, 30}, - } + execCtx := NewContext() + execCtx.Set("number", 42.5) + execCtx.Set("text", "hello") + execCtx.Set("enabled", true) + execCtx.Set("table", []float64{10, 20, 30}) - result, err := pool.Submit(bytecode, params) + result, err := pool.Submit(bytecode, execCtx) if err != nil { t.Fatalf("Failed to submit job: %v", err) } @@ -188,6 +187,64 @@ func TestJobParameters(t *testing.T) { } } +// Test context with nested data structures +func TestNestedContext(t *testing.T) { + pool, err := NewPool(2) + if err != nil { + t.Fatalf("Failed to create pool: %v", err) + } + defer pool.Shutdown() + + bytecode := createTestBytecode(t, ` + return { + id = ctx.params.id, + name = ctx.params.name, + method = ctx.request.method, + path = ctx.request.path + } + `) + + execCtx := NewContext() + + // Set nested params + params := map[string]any{ + "id": "123", + "name": "test", + } + execCtx.Set("params", params) + + // Set nested request info + request := map[string]any{ + "method": "GET", + "path": "/api/test", + } + execCtx.Set("request", request) + + result, err := pool.Submit(bytecode, execCtx) + if err != nil { + t.Fatalf("Failed to submit job: %v", err) + } + + // Result should be a map + resultMap, ok := result.(map[string]any) + if !ok { + t.Fatalf("Expected map result, got %T", result) + } + + if resultMap["id"] != "123" { + t.Errorf("Expected id=123, got %v", resultMap["id"]) + } + if resultMap["name"] != "test" { + t.Errorf("Expected name=test, got %v", resultMap["name"]) + } + if resultMap["method"] != "GET" { + t.Errorf("Expected method=GET, got %v", resultMap["method"]) + } + if resultMap["path"] != "/api/test" { + t.Errorf("Expected path=/api/test, got %v", resultMap["path"]) + } +} + // A key requirement for our worker pool is that we don't leak state between executions. // This test confirms that by setting a global variable in one job and then checking // that it's been cleared before the next job runs on the same worker. @@ -261,7 +318,7 @@ func TestPoolShutdown(t *testing.T) { } // A robust worker pool needs to handle errors gracefully. This test checks various -// error scenarios: invalid bytecode, Lua runtime errors, nil parameters (which +// error scenarios: invalid bytecode, Lua runtime errors, nil context (which // should work fine), and unsupported parameter types (which should properly error out). func TestErrorHandling(t *testing.T) { pool, err := NewPool(2) @@ -287,25 +344,24 @@ func TestErrorHandling(t *testing.T) { t.Errorf("Expected error from Lua error() call, got nil") } - // Test invalid parameter - bytecode = createTestBytecode(t, "return param") - - // This should work with nil value - _, err = pool.Submit(bytecode, map[string]any{ - "param": nil, - }) + // Test with nil context + bytecode = createTestBytecode(t, "return ctx == nil") + result, err := pool.Submit(bytecode, nil) if err != nil { - t.Errorf("Unexpected error with nil param: %v", err) + t.Errorf("Unexpected error with nil context: %v", err) + } + if result.(bool) != true { + t.Errorf("Expected ctx to be nil in Lua, but it wasn't") } - // Complex type that can't be converted - complex := map[string]any{ - "param": complex128(1 + 2i), // Unsupported type - } + // Test invalid context value + execCtx := NewContext() + execCtx.Set("param", complex128(1+2i)) // Unsupported type - _, err = pool.Submit(bytecode, complex) + bytecode = createTestBytecode(t, "return ctx.param") + _, err = pool.Submit(bytecode, execCtx) if err == nil { - t.Errorf("Expected error for unsupported parameter type, got nil") + t.Errorf("Expected error for unsupported context value type, got nil") } } @@ -323,15 +379,17 @@ func TestConcurrentExecution(t *testing.T) { defer pool.Shutdown() // Create bytecode that returns its input - bytecode := createTestBytecode(t, "return params.n") + bytecode := createTestBytecode(t, "return ctx.n") // Run multiple jobs concurrently results := make(chan int, jobs) for i := 0; i < jobs; i++ { i := i // Capture loop variable go func() { - params := map[string]any{"n": float64(i)} - result, err := pool.Submit(bytecode, params) + execCtx := NewContext() + execCtx.Set("n", float64(i)) + + result, err := pool.Submit(bytecode, execCtx) if err != nil { t.Errorf("Job %d failed: %v", i, err) results <- -1 @@ -363,3 +421,25 @@ func TestConcurrentExecution(t *testing.T) { t.Errorf("Expected %d unique results, got %d", jobs, len(counts)) } } + +// Test context operations +func TestContext(t *testing.T) { + ctx := NewContext() + + // Test Set and Get + ctx.Set("key", "value") + if ctx.Get("key") != "value" { + t.Errorf("Expected value, got %v", ctx.Get("key")) + } + + // Test overwriting + ctx.Set("key", 123) + if ctx.Get("key") != 123 { + t.Errorf("Expected 123, got %v", ctx.Get("key")) + } + + // Test missing key + if ctx.Get("missing") != nil { + t.Errorf("Expected nil for missing key, got %v", ctx.Get("missing")) + } +}