Moonshark/core/workers/worker.go

227 lines
5.3 KiB
Go

package workers
import (
"errors"
"sync/atomic"
luajit "git.sharkk.net/Sky/LuaJIT-to-Go"
)
// Common errors
var (
ErrPoolClosed = errors.New("worker pool is closed")
ErrNoWorkers = errors.New("no workers available")
)
// worker represents a single Lua execution worker
type worker struct {
pool *Pool // Reference to the pool
state *luajit.State // Lua state
id uint32 // Worker ID
}
// run is the main worker function that processes jobs
func (w *worker) run() {
defer w.pool.wg.Done()
// Initialize Lua state
w.state = luajit.New()
if w.state == nil {
// Worker failed to initialize, decrement counter
atomic.AddUint32(&w.pool.workers, ^uint32(0))
return
}
defer w.state.Close()
// Set up reset function for clearing state between requests
if err := w.setupResetFunction(); err != nil {
// Worker failed to initialize reset function, decrement counter
atomic.AddUint32(&w.pool.workers, ^uint32(0))
return
}
// Apply worker configuration if available
if w.pool.config != nil {
if err := w.applyConfig(); err != nil {
// Worker failed to initialize with configuration
atomic.AddUint32(&w.pool.workers, ^uint32(0))
return
}
}
// Main worker loop
for {
select {
case job, ok := <-w.pool.jobs:
if !ok {
// Jobs channel closed, exit
return
}
// Execute job
result := w.executeJob(job)
job.Result <- result
case <-w.pool.quit:
// Quit signal received, exit
return
}
}
}
// applyConfig applies the worker configuration to the Lua state
func (w *worker) applyConfig() error {
config := w.pool.config
// Set package path if specified
if config.PackagePath != "" {
if err := w.state.SetPackagePath(config.PackagePath); err != nil {
return err
}
}
// Register Go functions
for name, fn := range config.Functions {
if err := w.state.RegisterGoFunction(name, fn); err != nil {
return err
}
}
// Load modules from strings
for name, code := range config.Modules {
moduleLoader := `
local module_code = [=====[` + code + `]=====]
package.preload["` + name + `"] = function()
local fn, err = loadstring(module_code, "` + name + `")
if not fn then error(err) end
return fn()
end
`
if err := w.state.DoString(moduleLoader); err != nil {
return err
}
}
// Load modules from files
for name, path := range config.ModulePaths {
moduleLoader := `
package.preload["` + name + `"] = function()
local fn, err = loadfile("` + path + `")
if not fn then error(err) end
return fn()
end
`
if err := w.state.DoString(moduleLoader); err != nil {
return err
}
}
// Apply custom initialization if provided
if config.CustomInit != nil {
if err := config.CustomInit(w.state); err != nil {
return err
}
}
return nil
}
// setupResetFunction initializes the reset function for clearing globals
func (w *worker) setupResetFunction() error {
resetScript := `
-- Create reset function to efficiently clear globals after each request
function __reset_globals()
-- Only keep builtin globals, remove all user-defined globals
local preserve = {
["_G"] = true, ["_VERSION"] = true, ["__reset_globals"] = true,
["assert"] = true, ["collectgarbage"] = true, ["coroutine"] = true,
["debug"] = true, ["dofile"] = true, ["error"] = true,
["getmetatable"] = true, ["io"] = true, ["ipairs"] = true,
["load"] = true, ["loadfile"] = true, ["loadstring"] = true,
["math"] = true, ["next"] = true, ["os"] = true,
["package"] = true, ["pairs"] = true, ["pcall"] = true,
["print"] = true, ["rawequal"] = true, ["rawget"] = true,
["rawset"] = true, ["require"] = true, ["select"] = true,
["setmetatable"] = true, ["string"] = true, ["table"] = true,
["tonumber"] = true, ["tostring"] = true, ["type"] = true,
["unpack"] = true, ["xpcall"] = true
}
-- Clear all non-standard globals
for name in pairs(_G) do
if not preserve[name] then
_G[name] = nil
end
end
-- Run garbage collection to release memory
collectgarbage('collect')
end
`
return w.state.DoString(resetScript)
}
// resetState prepares the Lua state for a new job
func (w *worker) resetState() {
w.state.DoString("__reset_globals()")
}
// setContext sets job context as global tables in Lua state
func (w *worker) setContext(ctx *Context) error {
if ctx == nil {
return nil
}
// Create context table
w.state.NewTable()
// Add values to context table
for key, value := range ctx.Values {
// Push key
w.state.PushString(key)
// Push value
if err := w.state.PushValue(value); err != nil {
return err
}
// Set table[key] = value
w.state.SetTable(-3)
}
// Set the table as global 'ctx'
w.state.SetGlobal("ctx")
return nil
}
// executeJob executes a Lua job in the worker's state
func (w *worker) executeJob(j job) JobResult {
// Reset state before execution
w.resetState()
// Set context
if j.Context != nil {
if err := w.setContext(j.Context); err != nil {
return JobResult{nil, err}
}
}
// Load bytecode
if err := w.state.LoadBytecode(j.Bytecode, "script"); err != nil {
return JobResult{nil, err}
}
// Execute script with one result
if err := w.state.RunBytecodeWithResults(1); err != nil {
return JobResult{nil, err}
}
// Get result
value, err := w.state.ToValue(-1)
w.state.Pop(1) // Pop result
return JobResult{value, err}
}