104 lines
2.2 KiB
Go
104 lines
2.2 KiB
Go
package workers
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"sync/atomic"
|
|
)
|
|
|
|
// Pool manages a pool of Lua worker goroutines
|
|
type Pool struct {
|
|
workers uint32 // Number of workers
|
|
jobs chan job // Channel to send jobs to workers
|
|
wg sync.WaitGroup // WaitGroup to track active workers
|
|
quit chan struct{} // Channel to signal shutdown
|
|
isRunning atomic.Bool // Flag to track if pool is running
|
|
}
|
|
|
|
// NewPool creates a new worker pool with the specified number of workers
|
|
func NewPool(numWorkers int) (*Pool, error) {
|
|
if numWorkers <= 0 {
|
|
return nil, ErrNoWorkers
|
|
}
|
|
|
|
p := &Pool{
|
|
workers: uint32(numWorkers),
|
|
jobs: make(chan job, numWorkers), // Buffer equal to worker count
|
|
quit: make(chan struct{}),
|
|
}
|
|
p.isRunning.Store(true)
|
|
|
|
// Start workers
|
|
p.wg.Add(numWorkers)
|
|
for i := 0; i < numWorkers; i++ {
|
|
w := &worker{
|
|
pool: p,
|
|
id: uint32(i),
|
|
}
|
|
go w.run()
|
|
}
|
|
|
|
return p, nil
|
|
}
|
|
|
|
// SubmitWithContext sends a job to the worker pool with context
|
|
func (p *Pool) SubmitWithContext(ctx context.Context, bytecode []byte, execCtx *Context) (any, error) {
|
|
if !p.isRunning.Load() {
|
|
return nil, ErrPoolClosed
|
|
}
|
|
|
|
resultChan := make(chan JobResult, 1)
|
|
j := job{
|
|
Bytecode: bytecode,
|
|
Context: execCtx,
|
|
Result: resultChan,
|
|
}
|
|
|
|
// Submit job with context
|
|
select {
|
|
case p.jobs <- j:
|
|
// Job submitted
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
|
|
// Wait for result with context
|
|
select {
|
|
case result := <-resultChan:
|
|
return result.Value, result.Error
|
|
case <-ctx.Done():
|
|
// Note: The job will still be processed by a worker,
|
|
// but the result will be discarded
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
|
|
// Submit sends a job to the worker pool
|
|
func (p *Pool) Submit(bytecode []byte, execCtx *Context) (any, error) {
|
|
return p.SubmitWithContext(context.Background(), bytecode, execCtx)
|
|
}
|
|
|
|
// Shutdown gracefully shuts down the worker pool
|
|
func (p *Pool) Shutdown() error {
|
|
if !p.isRunning.Load() {
|
|
return ErrPoolClosed
|
|
}
|
|
p.isRunning.Store(false)
|
|
|
|
// Signal workers to quit
|
|
close(p.quit)
|
|
|
|
// Wait for workers to finish
|
|
p.wg.Wait()
|
|
|
|
// Close jobs channel
|
|
close(p.jobs)
|
|
|
|
return nil
|
|
}
|
|
|
|
// ActiveWorkers returns the number of active workers
|
|
func (p *Pool) ActiveWorkers() uint32 {
|
|
return atomic.LoadUint32(&p.workers)
|
|
}
|