workers 2

This commit is contained in:
Sky Johnson 2025-03-06 06:23:17 -06:00
parent e95babaa25
commit 6395b3ee58
6 changed files with 264 additions and 50 deletions

107
core/workers/README.md Normal file
View File

@ -0,0 +1,107 @@
# Worker Pool
### Pool
```go
type Pool struct { ... }
// Create a pool with specified number of workers
func NewPool(numWorkers int) (*Pool, error)
// Submit a job with default context
func (p *Pool) Submit(bytecode []byte, ctx *Context) (any, error)
// Submit with timeout/cancellation support
func (p *Pool) SubmitWithContext(ctx context.Context, bytecode []byte, execCtx *Context) (any, error)
// Shutdown the pool
func (p *Pool) Shutdown() error
// Get number of active workers
func (p *Pool) ActiveWorkers() uint32
```
### Context
```go
type Context struct { ... }
// Create a new execution context
func NewContext() *Context
// Set a value
func (c *Context) Set(key string, value any)
// Get a value
func (c *Context) Get(key string) any
```
## Basic Usage
```go
// Create worker pool
pool, err := workers.NewPool(4)
if err != nil {
log.Fatal(err)
}
defer pool.Shutdown()
// Compile bytecode (typically done once and reused)
state := luajit.New()
bytecode, err := state.CompileBytecode(`
return ctx.message .. " from Lua"
`, "script")
state.Close()
// Set up execution context
ctx := workers.NewContext()
ctx.Set("message", "Hello")
ctx.Set("params", map[string]any{"id": "123"})
// Execute bytecode
result, err := pool.Submit(bytecode, ctx)
if err != nil {
log.Fatal(err)
}
fmt.Println(result) // "Hello from Lua"
```
## With Timeout
```go
// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
// Execute with timeout
result, err := pool.SubmitWithContext(ctx, bytecode, execCtx)
if err != nil {
// Handle timeout or error
}
```
## In Lua Scripts
Inside Lua, the context is available as the global `ctx` table:
```lua
-- Access a simple value
local msg = ctx.message
-- Access nested values
local id = ctx.params.id
-- Return a result to Go
return {
status = "success",
data = msg
}
```
## Important Notes
- The pool is thread-safe; multiple goroutines can submit jobs concurrently
- Each execution is isolated; global state is reset between executions
- Bytecode should be compiled once and reused for better performance
- Context values should be serializable to Lua (numbers, strings, booleans, maps, slices)

24
core/workers/context.go Normal file
View File

@ -0,0 +1,24 @@
package workers
// Context represents execution context for a Lua script
type Context struct {
// Generic map for any context values (route params, HTTP request info, etc.)
Values map[string]any
}
// NewContext creates a new context with initialized maps
func NewContext() *Context {
return &Context{
Values: make(map[string]any),
}
}
// Set adds a value to the context
func (c *Context) Set(key string, value any) {
c.Values[key] = value
}
// Get retrieves a value from the context
func (c *Context) Get(key string) any {
return c.Values[key]
}

View File

@ -9,6 +9,6 @@ type JobResult struct {
// job represents a Lua script execution request
type job struct {
Bytecode []byte // Compiled LuaJIT bytecode
Params map[string]any // Parameters to pass to the script
Context *Context // Execution context
Result chan<- JobResult // Channel to send result back
}

View File

@ -42,7 +42,7 @@ func NewPool(numWorkers int) (*Pool, error) {
}
// SubmitWithContext sends a job to the worker pool with context
func (p *Pool) SubmitWithContext(ctx context.Context, bytecode []byte, params map[string]any) (any, error) {
func (p *Pool) SubmitWithContext(ctx context.Context, bytecode []byte, execCtx *Context) (any, error) {
if !p.isRunning.Load() {
return nil, ErrPoolClosed
}
@ -50,7 +50,7 @@ func (p *Pool) SubmitWithContext(ctx context.Context, bytecode []byte, params ma
resultChan := make(chan JobResult, 1)
j := job{
Bytecode: bytecode,
Params: params,
Context: execCtx,
Result: resultChan,
}
@ -73,6 +73,11 @@ func (p *Pool) SubmitWithContext(ctx context.Context, bytecode []byte, params ma
}
}
// Submit sends a job to the worker pool
func (p *Pool) Submit(bytecode []byte, execCtx *Context) (any, error) {
return p.SubmitWithContext(context.Background(), bytecode, execCtx)
}
// Shutdown gracefully shuts down the worker pool
func (p *Pool) Shutdown() error {
if !p.isRunning.Load() {

View File

@ -1,7 +1,6 @@
package workers
import (
"context"
"errors"
"sync/atomic"
@ -102,13 +101,17 @@ func (w *worker) resetState() {
w.state.DoString("__reset_globals()")
}
// setParams sets job parameters as a global 'params' table
func (w *worker) setParams(params map[string]any) error {
// Create new table for params
// setContext sets job context as global tables in Lua state
func (w *worker) setContext(ctx *Context) error {
if ctx == nil {
return nil
}
// Create context table
w.state.NewTable()
// Add each parameter to the table
for key, value := range params {
// Add values to context table
for key, value := range ctx.Values {
// Push key
w.state.PushString(key)
@ -121,8 +124,8 @@ func (w *worker) setParams(params map[string]any) error {
w.state.SetTable(-3)
}
// Set the table as global 'params'
w.state.SetGlobal("params")
// Set the table as global 'ctx'
w.state.SetGlobal("ctx")
return nil
}
@ -132,9 +135,9 @@ func (w *worker) executeJob(j job) JobResult {
// Reset state before execution
w.resetState()
// Set parameters
if j.Params != nil {
if err := w.setParams(j.Params); err != nil {
// Set context
if j.Context != nil {
if err := w.setContext(j.Context); err != nil {
return JobResult{nil, err}
}
}
@ -155,8 +158,3 @@ func (w *worker) executeJob(j job) JobResult {
return JobResult{value, err}
}
// Submit sends a job to the worker pool
func (p *Pool) Submit(bytecode []byte, params map[string]any) (any, error) {
return p.SubmitWithContext(context.Background(), bytecode, params)
}

View File

@ -127,10 +127,10 @@ func TestPoolSubmitWithContext(t *testing.T) {
}
}
// We need to make sure we can pass different types of parameters from Go to Lua and
// We need to make sure we can pass different types of context values from Go to Lua and
// get them back properly. This test sends numbers, strings, booleans, and arrays to
// a Lua script and verifies they're all handled correctly in both directions.
func TestJobParameters(t *testing.T) {
func TestContextValues(t *testing.T) {
pool, err := NewPool(2)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
@ -139,21 +139,20 @@ func TestJobParameters(t *testing.T) {
bytecode := createTestBytecode(t, `
return {
num = params.number,
str = params.text,
flag = params.enabled,
list = {params.table[1], params.table[2], params.table[3]},
num = ctx.number,
str = ctx.text,
flag = ctx.enabled,
list = {ctx.table[1], ctx.table[2], ctx.table[3]},
}
`)
params := map[string]any{
"number": 42.5,
"text": "hello",
"enabled": true,
"table": []float64{10, 20, 30},
}
execCtx := NewContext()
execCtx.Set("number", 42.5)
execCtx.Set("text", "hello")
execCtx.Set("enabled", true)
execCtx.Set("table", []float64{10, 20, 30})
result, err := pool.Submit(bytecode, params)
result, err := pool.Submit(bytecode, execCtx)
if err != nil {
t.Fatalf("Failed to submit job: %v", err)
}
@ -188,6 +187,64 @@ func TestJobParameters(t *testing.T) {
}
}
// Test context with nested data structures
func TestNestedContext(t *testing.T) {
pool, err := NewPool(2)
if err != nil {
t.Fatalf("Failed to create pool: %v", err)
}
defer pool.Shutdown()
bytecode := createTestBytecode(t, `
return {
id = ctx.params.id,
name = ctx.params.name,
method = ctx.request.method,
path = ctx.request.path
}
`)
execCtx := NewContext()
// Set nested params
params := map[string]any{
"id": "123",
"name": "test",
}
execCtx.Set("params", params)
// Set nested request info
request := map[string]any{
"method": "GET",
"path": "/api/test",
}
execCtx.Set("request", request)
result, err := pool.Submit(bytecode, execCtx)
if err != nil {
t.Fatalf("Failed to submit job: %v", err)
}
// Result should be a map
resultMap, ok := result.(map[string]any)
if !ok {
t.Fatalf("Expected map result, got %T", result)
}
if resultMap["id"] != "123" {
t.Errorf("Expected id=123, got %v", resultMap["id"])
}
if resultMap["name"] != "test" {
t.Errorf("Expected name=test, got %v", resultMap["name"])
}
if resultMap["method"] != "GET" {
t.Errorf("Expected method=GET, got %v", resultMap["method"])
}
if resultMap["path"] != "/api/test" {
t.Errorf("Expected path=/api/test, got %v", resultMap["path"])
}
}
// A key requirement for our worker pool is that we don't leak state between executions.
// This test confirms that by setting a global variable in one job and then checking
// that it's been cleared before the next job runs on the same worker.
@ -261,7 +318,7 @@ func TestPoolShutdown(t *testing.T) {
}
// A robust worker pool needs to handle errors gracefully. This test checks various
// error scenarios: invalid bytecode, Lua runtime errors, nil parameters (which
// error scenarios: invalid bytecode, Lua runtime errors, nil context (which
// should work fine), and unsupported parameter types (which should properly error out).
func TestErrorHandling(t *testing.T) {
pool, err := NewPool(2)
@ -287,25 +344,24 @@ func TestErrorHandling(t *testing.T) {
t.Errorf("Expected error from Lua error() call, got nil")
}
// Test invalid parameter
bytecode = createTestBytecode(t, "return param")
// This should work with nil value
_, err = pool.Submit(bytecode, map[string]any{
"param": nil,
})
// Test with nil context
bytecode = createTestBytecode(t, "return ctx == nil")
result, err := pool.Submit(bytecode, nil)
if err != nil {
t.Errorf("Unexpected error with nil param: %v", err)
t.Errorf("Unexpected error with nil context: %v", err)
}
if result.(bool) != true {
t.Errorf("Expected ctx to be nil in Lua, but it wasn't")
}
// Complex type that can't be converted
complex := map[string]any{
"param": complex128(1 + 2i), // Unsupported type
}
// Test invalid context value
execCtx := NewContext()
execCtx.Set("param", complex128(1+2i)) // Unsupported type
_, err = pool.Submit(bytecode, complex)
bytecode = createTestBytecode(t, "return ctx.param")
_, err = pool.Submit(bytecode, execCtx)
if err == nil {
t.Errorf("Expected error for unsupported parameter type, got nil")
t.Errorf("Expected error for unsupported context value type, got nil")
}
}
@ -323,15 +379,17 @@ func TestConcurrentExecution(t *testing.T) {
defer pool.Shutdown()
// Create bytecode that returns its input
bytecode := createTestBytecode(t, "return params.n")
bytecode := createTestBytecode(t, "return ctx.n")
// Run multiple jobs concurrently
results := make(chan int, jobs)
for i := 0; i < jobs; i++ {
i := i // Capture loop variable
go func() {
params := map[string]any{"n": float64(i)}
result, err := pool.Submit(bytecode, params)
execCtx := NewContext()
execCtx.Set("n", float64(i))
result, err := pool.Submit(bytecode, execCtx)
if err != nil {
t.Errorf("Job %d failed: %v", i, err)
results <- -1
@ -363,3 +421,25 @@ func TestConcurrentExecution(t *testing.T) {
t.Errorf("Expected %d unique results, got %d", jobs, len(counts))
}
}
// Test context operations
func TestContext(t *testing.T) {
ctx := NewContext()
// Test Set and Get
ctx.Set("key", "value")
if ctx.Get("key") != "value" {
t.Errorf("Expected value, got %v", ctx.Get("key"))
}
// Test overwriting
ctx.Set("key", 123)
if ctx.Get("key") != 123 {
t.Errorf("Expected 123, got %v", ctx.Get("key"))
}
// Test missing key
if ctx.Get("missing") != nil {
t.Errorf("Expected nil for missing key, got %v", ctx.Get("missing"))
}
}