Moonshark/core/workers/eventloop.go
2025-03-15 18:27:33 -05:00

373 lines
9.0 KiB
Go

package workers
import (
"context"
"errors"
"sync"
"sync/atomic"
"time"
luajit "git.sharkk.net/Sky/LuaJIT-to-Go"
)
// Common errors
var (
ErrLoopClosed = errors.New("event loop is closed")
ErrExecutionTimeout = errors.New("script execution timed out")
)
// StateInitFunc is a function that initializes a Lua state
type StateInitFunc func(*luajit.State) error
// EventLoop represents a single-threaded Lua execution environment
type EventLoop struct {
state *luajit.State // Single Lua state for all executions
jobQueue chan job // Channel for receiving jobs
quit chan struct{} // Channel for shutdown signaling
wg sync.WaitGroup // WaitGroup for clean shutdown
isRunning atomic.Bool // Flag to track if loop is running
timeout time.Duration // Default timeout for script execution
stateInit StateInitFunc // Optional function to initialize Lua state
bufferSize int // Size of job queue buffer
}
// EventLoopConfig contains configuration options for creating an EventLoop
type EventLoopConfig struct {
// StateInit is a function to initialize the Lua state with custom modules and functions
StateInit StateInitFunc
// BufferSize is the size of the job queue buffer (default: 100)
BufferSize int
// Timeout is the default execution timeout (default: 30s, 0 means no timeout)
Timeout time.Duration
}
// NewEventLoop creates a new event loop with default configuration
func NewEventLoop() (*EventLoop, error) {
return NewEventLoopWithConfig(EventLoopConfig{})
}
// NewEventLoopWithInit creates a new event loop with a state initialization function
func NewEventLoopWithInit(init StateInitFunc) (*EventLoop, error) {
return NewEventLoopWithConfig(EventLoopConfig{
StateInit: init,
})
}
// NewEventLoopWithConfig creates a new event loop with custom configuration
func NewEventLoopWithConfig(config EventLoopConfig) (*EventLoop, error) {
// Set default values
bufferSize := config.BufferSize
if bufferSize <= 0 {
bufferSize = 100 // Default buffer size
}
timeout := config.Timeout
if timeout == 0 {
timeout = 30 * time.Second // Default timeout
}
// Initialize the Lua state
state := luajit.New()
if state == nil {
return nil, errors.New("failed to create Lua state")
}
// Create the event loop instance
el := &EventLoop{
state: state,
jobQueue: make(chan job, bufferSize),
quit: make(chan struct{}),
timeout: timeout,
stateInit: config.StateInit,
bufferSize: bufferSize,
}
el.isRunning.Store(true)
// Set up the sandbox environment
if err := setupSandbox(el.state); err != nil {
state.Close()
return nil, err
}
// Initialize the state if needed
if el.stateInit != nil {
if err := el.stateInit(el.state); err != nil {
state.Close()
return nil, err
}
}
// Start the event loop
el.wg.Add(1)
go el.run()
return el, nil
}
// run is the main event loop goroutine
func (el *EventLoop) run() {
defer el.wg.Done()
defer el.state.Close()
for {
select {
case job, ok := <-el.jobQueue:
if !ok {
// Job queue closed, exit
return
}
// Execute job with timeout if configured
if el.timeout > 0 {
el.executeJobWithTimeout(job)
} else {
// Execute without timeout
result := executeJobSandboxed(el.state, job)
job.Result <- result
}
case <-el.quit:
// Quit signal received, exit
return
}
}
}
// executeJobWithTimeout executes a job with a timeout
func (el *EventLoop) executeJobWithTimeout(j job) {
// Create a context with timeout
ctx, cancel := context.WithTimeout(context.Background(), el.timeout)
defer cancel()
// Create a channel for the result
resultCh := make(chan JobResult, 1)
// Execute the job in a separate goroutine
go func() {
result := executeJobSandboxed(el.state, j)
select {
case resultCh <- result:
// Result sent successfully
case <-ctx.Done():
// Context canceled, result no longer needed
}
}()
// Wait for result or timeout
select {
case result := <-resultCh:
// Send result to the original channel
j.Result <- result
case <-ctx.Done():
// Timeout occurred
j.Result <- JobResult{nil, ErrExecutionTimeout}
// NOTE: The Lua execution continues in the background until it completes,
// but the result is discarded. This is a compromise to avoid forcibly
// terminating Lua code which could corrupt the state.
}
}
// Submit sends a job to the event loop
func (el *EventLoop) Submit(bytecode []byte, execCtx *Context) (any, error) {
return el.SubmitWithContext(context.Background(), bytecode, execCtx)
}
// SubmitWithTimeout sends a job to the event loop with a specific timeout
func (el *EventLoop) SubmitWithTimeout(bytecode []byte, execCtx *Context, timeout time.Duration) (any, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return el.SubmitWithContext(ctx, bytecode, execCtx)
}
// SubmitWithContext sends a job to the event loop with context for cancellation
func (el *EventLoop) SubmitWithContext(ctx context.Context, bytecode []byte, execCtx *Context) (any, error) {
if !el.isRunning.Load() {
return nil, ErrLoopClosed
}
resultChan := make(chan JobResult, 1)
j := job{
Bytecode: bytecode,
Context: execCtx,
Result: resultChan,
}
// Submit job with context
select {
case el.jobQueue <- j:
// Job submitted
case <-ctx.Done():
return nil, ctx.Err()
}
// Wait for result with context
select {
case result := <-resultChan:
return result.Value, result.Error
case <-ctx.Done():
// Context canceled, but the job might still be processed
return nil, ctx.Err()
}
}
// SetTimeout updates the default timeout for script execution
func (el *EventLoop) SetTimeout(timeout time.Duration) {
el.timeout = timeout
}
// Shutdown gracefully shuts down the event loop
func (el *EventLoop) Shutdown() error {
if !el.isRunning.Load() {
return ErrLoopClosed
}
el.isRunning.Store(false)
// Signal event loop to quit
close(el.quit)
// Wait for event loop to finish
el.wg.Wait()
// Close job queue
close(el.jobQueue)
return nil
}
// setupSandbox initializes the sandbox environment in the Lua state
func setupSandbox(state *luajit.State) error {
// This is the Lua script that creates our sandbox function
setupScript := `
-- Create a function to run code in a sandbox environment
function __create_sandbox()
-- Create new environment table
local env = {}
-- Add standard library modules (can be restricted as needed)
env.string = string
env.table = table
env.math = math
env.os = {
time = os.time,
date = os.date,
difftime = os.difftime,
clock = os.clock
}
env.tonumber = tonumber
env.tostring = tostring
env.type = type
env.pairs = pairs
env.ipairs = ipairs
env.next = next
env.select = select
env.unpack = unpack
env.pcall = pcall
env.xpcall = xpcall
env.error = error
env.assert = assert
-- Allow access to package.loaded for modules
env.require = function(name)
return package.loaded[name]
end
-- Create metatable to restrict access to _G
local mt = {
__index = function(t, k)
-- First check in env table
local v = rawget(env, k)
if v ~= nil then return v end
-- If not found, check for registered modules/functions
local moduleValue = _G[k]
if type(moduleValue) == "table" or
type(moduleValue) == "function" then
return moduleValue
end
return nil
end,
__newindex = function(t, k, v)
rawset(env, k, v)
end
}
setmetatable(env, mt)
return env
end
-- Create function to execute code with a sandbox
function __run_sandboxed(f, ctx)
local env = __create_sandbox()
-- Add context to the environment if provided
if ctx then
env.ctx = ctx
end
-- Set the environment and run the function
setfenv(f, env)
return f()
end
`
return state.DoString(setupScript)
}
// executeJobSandboxed runs a script in a sandbox environment
func executeJobSandboxed(state *luajit.State, j job) JobResult {
// Set up context if provided
if j.Context != nil {
// Push context table
state.NewTable()
// Add values to context table
for key, value := range j.Context.Values {
// Push key
state.PushString(key)
// Push value
if err := state.PushValue(value); err != nil {
state.Pop(1) // Pop table
return JobResult{nil, err}
}
// Set table[key] = value
state.SetTable(-3)
}
} else {
// Push nil if no context
state.PushNil()
}
// Load bytecode
if err := state.LoadBytecode(j.Bytecode, "script"); err != nil {
state.Pop(1) // Pop context
return JobResult{nil, err}
}
// Get the sandbox runner function
state.GetGlobal("__run_sandboxed")
// Push loaded function and context as arguments
state.PushCopy(-2) // Copy the loaded function
state.PushCopy(-4) // Copy the context table or nil
// Remove the original function and context
state.Remove(-5) // Remove original context
state.Remove(-4) // Remove original function
// Call the sandbox runner with 2 args (function and context), expecting 1 result
if err := state.Call(2, 1); err != nil {
return JobResult{nil, err}
}
// Get result
value, err := state.ToValue(-1)
state.Pop(1) // Pop result
return JobResult{value, err}
}