Moonshark/core/workers/pool.go
2025-03-06 06:13:53 -06:00

99 lines
2.1 KiB
Go

package workers
import (
"context"
"sync"
"sync/atomic"
)
// Pool manages a pool of Lua worker goroutines
type Pool struct {
workers uint32 // Number of workers
jobs chan job // Channel to send jobs to workers
wg sync.WaitGroup // WaitGroup to track active workers
quit chan struct{} // Channel to signal shutdown
isRunning atomic.Bool // Flag to track if pool is running
}
// NewPool creates a new worker pool with the specified number of workers
func NewPool(numWorkers int) (*Pool, error) {
if numWorkers <= 0 {
return nil, ErrNoWorkers
}
p := &Pool{
workers: uint32(numWorkers),
jobs: make(chan job, numWorkers), // Buffer equal to worker count
quit: make(chan struct{}),
}
p.isRunning.Store(true)
// Start workers
p.wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
w := &worker{
pool: p,
id: uint32(i),
}
go w.run()
}
return p, nil
}
// SubmitWithContext sends a job to the worker pool with context
func (p *Pool) SubmitWithContext(ctx context.Context, bytecode []byte, params map[string]any) (any, error) {
if !p.isRunning.Load() {
return nil, ErrPoolClosed
}
resultChan := make(chan JobResult, 1)
j := job{
Bytecode: bytecode,
Params: params,
Result: resultChan,
}
// Submit job with context
select {
case p.jobs <- j:
// Job submitted
case <-ctx.Done():
return nil, ctx.Err()
}
// Wait for result with context
select {
case result := <-resultChan:
return result.Value, result.Error
case <-ctx.Done():
// Note: The job will still be processed by a worker,
// but the result will be discarded
return nil, ctx.Err()
}
}
// Shutdown gracefully shuts down the worker pool
func (p *Pool) Shutdown() error {
if !p.isRunning.Load() {
return ErrPoolClosed
}
p.isRunning.Store(false)
// Signal workers to quit
close(p.quit)
// Wait for workers to finish
p.wg.Wait()
// Close jobs channel
close(p.jobs)
return nil
}
// ActiveWorkers returns the number of active workers
func (p *Pool) ActiveWorkers() uint32 {
return atomic.LoadUint32(&p.workers)
}