Compare commits

..

No commits in common. "9b1942903b4e593e25d4e9c307d49b27e1d1a642" and "727ce89da74454205e0a4d0b4f061de120d9bf2a" have entirely different histories.

11 changed files with 449 additions and 619 deletions

1
.gitignore vendored
View File

@ -27,4 +27,3 @@ config.lua
routes/
static/
libs/
override/

View File

@ -101,7 +101,7 @@ func (s *Server) handleRequest(w http.ResponseWriter, r *http.Request) {
bytecode, scriptPath, found := s.luaRouter.GetBytecode(r.Method, r.URL.Path, params)
// Check if we found a route but it has no valid bytecode (compile error)
if found && len(bytecode) == 0 {
if found && (bytecode == nil || len(bytecode) == 0) {
// Get the actual error from the router - this requires exposing the actual error
// from the node in the GetBytecode method
errorMsg := "Route exists but failed to compile. Check server logs for details."
@ -147,7 +147,6 @@ func (s *Server) HandleMethodNotAllowed(w http.ResponseWriter, r *http.Request)
// handleLuaRoute executes a Lua route
func (s *Server) handleLuaRoute(w http.ResponseWriter, r *http.Request, bytecode []byte, scriptPath string, params *routers.Params) {
ctx := runner.NewContext()
defer ctx.Release()
// Log bytecode size
s.logger.Debug("Executing Lua route with %d bytes of bytecode", len(bytecode))
@ -235,8 +234,6 @@ func writeResponse(w http.ResponseWriter, result any, log *logger.Logger) {
// Check for HTTPResponse type
if httpResp, ok := result.(*runner.HTTPResponse); ok {
defer runner.ReleaseResponse(httpResp)
// Set response headers
for name, value := range httpResp.Headers {
w.Header().Set(name, value)

View File

@ -1,34 +1,16 @@
package runner
import "sync"
// Context represents execution context for a Lua script
type Context struct {
// Generic map for any context values (route params, HTTP request info, etc.)
Values map[string]any
}
// Context pool to reduce allocations
var contextPool = sync.Pool{
New: func() interface{} {
return &Context{
Values: make(map[string]any, 16), // Pre-allocate with reasonable capacity
}
},
}
// NewContext creates a new context, potentially reusing one from the pool
// NewContext creates a new context with initialized maps
func NewContext() *Context {
return contextPool.Get().(*Context)
}
// Release returns the context to the pool after clearing its values
func (c *Context) Release() {
// Clear all values to prevent data leakage
for k := range c.Values {
delete(c.Values, k)
return &Context{
Values: make(map[string]any),
}
contextPool.Put(c)
}
// Set adds a value to the context

View File

@ -2,7 +2,6 @@ package runner
import (
"net/http"
"sync"
luajit "git.sharkk.net/Sky/LuaJIT-to-Go"
)
@ -15,43 +14,13 @@ type HTTPResponse struct {
Cookies []*http.Cookie `json:"-"`
}
// Response pool to reduce allocations
var responsePool = sync.Pool{
New: func() interface{} {
return &HTTPResponse{
Status: 200,
Headers: make(map[string]string, 8), // Pre-allocate with reasonable capacity
Cookies: make([]*http.Cookie, 0, 4), // Pre-allocate with reasonable capacity
}
},
}
// NewHTTPResponse creates a default HTTP response, potentially reusing one from the pool
// NewHTTPResponse creates a default HTTP response
func NewHTTPResponse() *HTTPResponse {
return responsePool.Get().(*HTTPResponse)
}
// ReleaseResponse returns the response to the pool after clearing its values
func ReleaseResponse(resp *HTTPResponse) {
if resp == nil {
return
return &HTTPResponse{
Status: 200,
Headers: make(map[string]string),
Cookies: []*http.Cookie{},
}
// Clear all values to prevent data leakage
resp.Status = 200 // Reset to default
// Clear headers
for k := range resp.Headers {
delete(resp.Headers, k)
}
// Clear cookies
resp.Cookies = resp.Cookies[:0] // Keep capacity but set length to 0
// Clear body
resp.Body = nil
responsePool.Put(resp)
}
// LuaHTTPModule is the pure Lua implementation of the HTTP module
@ -128,8 +97,7 @@ func GetHTTPResponse(state *luajit.State) (*HTTPResponse, bool) {
state.GetGlobal("__http_responses")
if state.IsNil(-1) {
state.Pop(1)
ReleaseResponse(response) // Return unused response to pool
return nil, false
return response, false
}
// Check for response at thread index
@ -137,8 +105,7 @@ func GetHTTPResponse(state *luajit.State) (*HTTPResponse, bool) {
state.GetTable(-2)
if state.IsNil(-1) {
state.Pop(2)
ReleaseResponse(response) // Return unused response to pool
return nil, false
return response, false
}
// Get status

View File

@ -22,13 +22,6 @@ type StateInitFunc func(*luajit.State) error
// RunnerOption defines a functional option for configuring the LuaRunner
type RunnerOption func(*LuaRunner)
// Result channel pool to reduce allocations
var resultChanPool = sync.Pool{
New: func() interface{} {
return make(chan JobResult, 1)
},
}
// LuaRunner runs Lua scripts using a single Lua state
type LuaRunner struct {
state *luajit.State // The Lua state
@ -198,18 +191,7 @@ func (r *LuaRunner) RunWithContext(ctx context.Context, bytecode []byte, execCtx
}
r.mu.RUnlock()
// Get a result channel from the pool
resultChanInterface := resultChanPool.Get()
resultChan := resultChanInterface.(chan JobResult)
// Make sure to clear any previous results
select {
case <-resultChan:
// Drain the channel if it has a value
default:
// Channel is already empty
}
resultChan := make(chan JobResult, 1)
j := job{
Bytecode: bytecode,
Context: execCtx,
@ -222,26 +204,16 @@ func (r *LuaRunner) RunWithContext(ctx context.Context, bytecode []byte, execCtx
case r.jobQueue <- j:
// Job submitted
case <-ctx.Done():
// Return the channel to the pool before exiting
resultChanPool.Put(resultChan)
return nil, ctx.Err()
}
// Wait for result with context
var result JobResult
select {
case result = <-resultChan:
// Got result
case result := <-resultChan:
return result.Value, result.Error
case <-ctx.Done():
// Return the channel to the pool before exiting
resultChanPool.Put(resultChan)
return nil, ctx.Err()
}
// Return the channel to the pool
resultChanPool.Put(resultChan)
return result.Value, result.Error
}
// Run executes a Lua script

View File

@ -1,148 +0,0 @@
package watchers
import (
"fmt"
"sync"
"git.sharkk.net/Sky/Moonshark/core/logger"
"git.sharkk.net/Sky/Moonshark/core/routers"
"git.sharkk.net/Sky/Moonshark/core/runner"
)
// Global watcher manager instance
var (
globalManager *WatcherManager
globalManagerOnce sync.Once
)
// GetWatcherManager returns the global watcher manager, creating it if needed
func GetWatcherManager(log *logger.Logger, adaptive bool) *WatcherManager {
globalManagerOnce.Do(func() {
globalManager = NewWatcherManager(log, adaptive)
})
return globalManager
}
// WatchDirectory creates a new directory watcher and registers it with the manager
func WatchDirectory(config DirectoryWatcherConfig, manager *WatcherManager) (*Watcher, error) {
dirWatcher, err := NewDirectoryWatcher(config)
if err != nil {
return nil, fmt.Errorf("failed to create directory watcher: %w", err)
}
manager.AddWatcher(dirWatcher)
// Create a wrapper Watcher that implements the old interface
w := &Watcher{
dir: config.Dir,
dirWatch: dirWatcher,
manager: manager,
}
config.Log.Info("Started watching directory: %s", config.Dir)
return w, nil
}
// Watcher is a compatibility wrapper that maintains the old API
type Watcher struct {
dir string
dirWatch *DirectoryWatcher
manager *WatcherManager
}
// Close unregisters the watcher from the manager
func (w *Watcher) Close() error {
w.manager.RemoveWatcher(w.dir)
return nil
}
// WatchLuaRouter sets up a watcher for a LuaRouter's routes directory
func WatchLuaRouter(router *routers.LuaRouter, routesDir string, log *logger.Logger) (*Watcher, error) {
manager := GetWatcherManager(log, true) // Use adaptive polling
config := DirectoryWatcherConfig{
Dir: routesDir,
Callback: router.Refresh,
Log: log,
Recursive: true,
}
watcher, err := WatchDirectory(config, manager)
if err != nil {
return nil, err
}
log.Info("Started watching Lua routes directory: %s", routesDir)
return watcher, nil
}
// WatchStaticRouter sets up a watcher for a StaticRouter's root directory
func WatchStaticRouter(router *routers.StaticRouter, staticDir string, log *logger.Logger) (*Watcher, error) {
manager := GetWatcherManager(log, true) // Use adaptive polling
config := DirectoryWatcherConfig{
Dir: staticDir,
Callback: router.Refresh,
Log: log,
Recursive: true,
}
watcher, err := WatchDirectory(config, manager)
if err != nil {
return nil, err
}
log.Info("Started watching static files directory: %s", staticDir)
return watcher, nil
}
// WatchLuaModules sets up watchers for Lua module directories
func WatchLuaModules(luaRunner *runner.LuaRunner, libDirs []string, log *logger.Logger) ([]*Watcher, error) {
manager := GetWatcherManager(log, true) // Use adaptive polling
watchers := make([]*Watcher, 0, len(libDirs))
for _, dir := range libDirs {
// Create a directory-specific callback
dirCopy := dir // Capture for closure
callback := func() error {
log.Debug("Detected changes in Lua module directory: %s", dirCopy)
// Reload modules from this directory
if err := luaRunner.ReloadAllModules(); err != nil {
log.Warning("Error reloading modules: %v", err)
}
return nil
}
config := DirectoryWatcherConfig{
Dir: dir,
Callback: callback,
Log: log,
Recursive: true,
}
watcher, err := WatchDirectory(config, manager)
if err != nil {
// Clean up already created watchers
for _, w := range watchers {
w.Close()
}
return nil, err
}
watchers = append(watchers, watcher)
log.Info("Started watching Lua modules directory: %s", dir)
}
return watchers, nil
}
// ShutdownWatcherManager closes the global watcher manager if it exists
func ShutdownWatcherManager() {
if globalManager != nil {
globalManager.Close()
globalManager = nil
}
}

View File

@ -1,231 +0,0 @@
package watchers
import (
"fmt"
"os"
"path/filepath"
"sync"
"time"
"git.sharkk.net/Sky/Moonshark/core/logger"
)
// Default debounce time between detected change and callback
const defaultDebounceTime = 300 * time.Millisecond
// FileInfo stores metadata about a file for change detection
type FileInfo struct {
ModTime time.Time
Size int64
IsDir bool
}
// DirectoryWatcher watches a specific directory for changes
type DirectoryWatcher struct {
// Directory to watch
dir string
// Map of file paths to their metadata
files map[string]FileInfo
filesMu sync.RWMutex
// Configuration
callback func() error
log *logger.Logger
debounceTime time.Duration
recursive bool
// Debounce timer
debounceTimer *time.Timer
debouncing bool
debounceMu sync.Mutex
}
// DirectoryWatcherConfig contains configuration for a directory watcher
type DirectoryWatcherConfig struct {
// Directory to watch
Dir string
// Callback function to call when changes are detected
Callback func() error
// Logger instance
Log *logger.Logger
// Debounce time (0 means use default)
DebounceTime time.Duration
// Recursive watching (watch subdirectories)
Recursive bool
}
// NewDirectoryWatcher creates a new directory watcher
func NewDirectoryWatcher(config DirectoryWatcherConfig) (*DirectoryWatcher, error) {
debounceTime := config.DebounceTime
if debounceTime == 0 {
debounceTime = defaultDebounceTime
}
w := &DirectoryWatcher{
dir: config.Dir,
files: make(map[string]FileInfo),
callback: config.Callback,
log: config.Log,
debounceTime: debounceTime,
recursive: config.Recursive,
}
// Perform initial scan
if err := w.scanDirectory(); err != nil {
return nil, err
}
return w, nil
}
// scanDirectory builds the initial file list
func (w *DirectoryWatcher) scanDirectory() error {
w.filesMu.Lock()
defer w.filesMu.Unlock()
return filepath.Walk(w.dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
w.logWarning("Error accessing path %s: %v", path, err)
return nil // Continue with other files
}
// Skip if not recursive and this is a subdirectory
if !w.recursive && info.IsDir() && path != w.dir {
return filepath.SkipDir
}
w.files[path] = FileInfo{
ModTime: info.ModTime(),
Size: info.Size(),
IsDir: info.IsDir(),
}
return nil
})
}
// checkForChanges detects if any files have been added, modified, or deleted
func (w *DirectoryWatcher) checkForChanges() (bool, error) {
// Get current state
currentFiles := make(map[string]FileInfo)
err := filepath.Walk(w.dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
// File might have been deleted between directory read and stat
return nil
}
// Skip if not recursive and this is a subdirectory
if !w.recursive && info.IsDir() && path != w.dir {
return filepath.SkipDir
}
currentFiles[path] = FileInfo{
ModTime: info.ModTime(),
Size: info.Size(),
IsDir: info.IsDir(),
}
return nil
})
if err != nil {
return false, err
}
// Compare with previous state
w.filesMu.RLock()
previousFiles := w.files
w.filesMu.RUnlock()
// Check for different file count (quick check)
if len(currentFiles) != len(previousFiles) {
w.logDebug("File count changed: %d -> %d", len(previousFiles), len(currentFiles))
w.updateFiles(currentFiles)
return true, nil
}
// Check for modified, added, or removed files
for path, currentInfo := range currentFiles {
prevInfo, exists := previousFiles[path]
if !exists {
// New file
w.logDebug("New file detected: %s", path)
w.updateFiles(currentFiles)
return true, nil
}
if currentInfo.ModTime != prevInfo.ModTime || currentInfo.Size != prevInfo.Size {
// File modified
w.logDebug("File modified: %s", path)
w.updateFiles(currentFiles)
return true, nil
}
}
// Check for deleted files
for path := range previousFiles {
if _, exists := currentFiles[path]; !exists {
// File deleted
w.logDebug("File deleted: %s", path)
w.updateFiles(currentFiles)
return true, nil
}
}
// No changes detected
return false, nil
}
// updateFiles updates the internal file list
func (w *DirectoryWatcher) updateFiles(newFiles map[string]FileInfo) {
w.filesMu.Lock()
w.files = newFiles
w.filesMu.Unlock()
}
// notifyChange triggers the callback with debouncing
func (w *DirectoryWatcher) notifyChange() {
w.debounceMu.Lock()
defer w.debounceMu.Unlock()
if w.debouncing {
// Reset timer if already debouncing
if w.debounceTimer != nil {
w.debounceTimer.Stop()
}
} else {
w.debouncing = true
}
w.debounceTimer = time.AfterFunc(w.debounceTime, func() {
if err := w.callback(); err != nil {
w.logError("Callback error: %v", err)
}
// Reset debouncing state
w.debounceMu.Lock()
w.debouncing = false
w.debounceMu.Unlock()
})
}
// logDebug logs a debug message with the watcher's directory prefix
func (w *DirectoryWatcher) logDebug(format string, args ...any) {
w.log.Debug("[Watcher] [%s] %s", w.dir, fmt.Sprintf(format, args...))
}
// logWarning logs a warning message with the watcher's directory prefix
func (w *DirectoryWatcher) logWarning(format string, args ...any) {
w.log.Warning("[Watcher] [%s] %s", w.dir, fmt.Sprintf(format, args...))
}
// logError logs an error message with the watcher's directory prefix
func (w *DirectoryWatcher) logError(format string, args ...any) {
w.log.Error("[Watcher] [%s] %s", w.dir, fmt.Sprintf(format, args...))
}

View File

@ -1,143 +0,0 @@
package watchers
import (
"sync"
"time"
"git.sharkk.net/Sky/Moonshark/core/logger"
)
// Default polling intervals
const (
defaultPollInterval = 1 * time.Second // Initial polling interval
extendedPollInterval = 5 * time.Second // Extended polling interval after inactivity
inactivityThreshold = 10 * time.Minute // Time before extending polling interval
)
// WatcherManager coordinates file watching across multiple directories
type WatcherManager struct {
// Registry of directories and their watchers
watchers map[string]*DirectoryWatcher
mu sync.RWMutex
// Shared polling state
pollInterval time.Duration
adaptive bool
lastActivity time.Time
// Control channels
done chan struct{}
ticker *time.Ticker
// Logger
log *logger.Logger
// Wait group for shutdown coordination
wg sync.WaitGroup
}
// NewWatcherManager creates a new watcher manager
func NewWatcherManager(log *logger.Logger, adaptive bool) *WatcherManager {
manager := &WatcherManager{
watchers: make(map[string]*DirectoryWatcher),
pollInterval: defaultPollInterval,
adaptive: adaptive,
lastActivity: time.Now(),
done: make(chan struct{}),
log: log,
}
// Start the polling loop
manager.ticker = time.NewTicker(manager.pollInterval)
manager.wg.Add(1)
go manager.pollLoop()
return manager
}
// Close stops all watchers and the manager
func (m *WatcherManager) Close() error {
close(m.done)
if m.ticker != nil {
m.ticker.Stop()
}
m.wg.Wait()
return nil
}
// AddWatcher registers a new directory watcher
func (m *WatcherManager) AddWatcher(watcher *DirectoryWatcher) {
m.mu.Lock()
defer m.mu.Unlock()
m.watchers[watcher.dir] = watcher
m.log.Debug("[WatcherManager] Added watcher for directory: %s", watcher.dir)
}
// RemoveWatcher unregisters a directory watcher
func (m *WatcherManager) RemoveWatcher(dir string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.watchers, dir)
m.log.Debug("[WatcherManager] Removed watcher for directory: %s", dir)
}
// pollLoop is the main polling loop that checks all watched directories
func (m *WatcherManager) pollLoop() {
defer m.wg.Done()
for {
select {
case <-m.ticker.C:
anyActivity := m.checkAllDirectories()
// Update polling interval based on activity
if m.adaptive {
if anyActivity {
// Activity detected, reset to fast polling
m.lastActivity = time.Now()
if m.pollInterval > defaultPollInterval {
m.pollInterval = defaultPollInterval
m.ticker.Reset(m.pollInterval)
m.log.Debug("[WatcherManager] Reset to base polling interval: %v", m.pollInterval)
}
} else {
// No activity, consider slowing down polling
inactiveDuration := time.Since(m.lastActivity)
if m.pollInterval == defaultPollInterval && inactiveDuration > inactivityThreshold {
m.pollInterval = extendedPollInterval
m.ticker.Reset(m.pollInterval)
m.log.Debug("[WatcherManager] Extended polling interval to: %v after %v of inactivity",
m.pollInterval, inactiveDuration.Round(time.Minute))
}
}
}
case <-m.done:
return
}
}
}
// checkAllDirectories polls all registered directories for changes
func (m *WatcherManager) checkAllDirectories() bool {
m.mu.RLock()
defer m.mu.RUnlock()
anyActivity := false
for _, watcher := range m.watchers {
changed, err := watcher.checkForChanges()
if err != nil {
m.log.Error("[WatcherManager] Error checking directory %s: %v", watcher.dir, err)
continue
}
if changed {
anyActivity = true
watcher.notifyChange()
}
}
return anyActivity
}

View File

@ -0,0 +1,50 @@
package watchers
import (
"git.sharkk.net/Sky/Moonshark/core/logger"
"git.sharkk.net/Sky/Moonshark/core/runner"
)
// WatchLuaModules sets up an optimized watcher for Lua module directories
func WatchLuaModules(luaRunner *runner.LuaRunner, libDirs []string, log *logger.Logger) ([]*Watcher, error) {
watchers := make([]*Watcher, 0, len(libDirs))
for _, dir := range libDirs {
// Create a directory-specific callback that identifies changed files
dirCopy := dir // Capture for closure
callback := func() error {
log.Debug("Detected changes in Lua module directory: %s", dirCopy)
// Instead of clearing everything, use directory-level smart refresh
// This will scan lib directory and refresh all modified Lua modules
if err := luaRunner.ReloadAllModules(); err != nil {
log.Warning("Error reloading modules: %v", err)
}
return nil
}
config := WatcherConfig{
Dir: dir,
Callback: callback,
Log: log,
Recursive: true,
Adaptive: true,
DebounceTime: defaultDebounceTime,
}
watcher, err := WatchDirectory(config)
if err != nil {
for _, w := range watchers {
w.Close()
}
return nil, err
}
watchers = append(watchers, watcher)
log.Info("Started watching Lua modules directory: %s", dir)
}
return watchers, nil
}

View File

@ -0,0 +1,44 @@
package watchers
import (
"git.sharkk.net/Sky/Moonshark/core/logger"
"git.sharkk.net/Sky/Moonshark/core/routers"
)
// WatchLuaRouter sets up a watcher for a LuaRouter's routes directory
func WatchLuaRouter(router *routers.LuaRouter, routesDir string, log *logger.Logger) (*Watcher, error) {
config := WatcherConfig{
Dir: routesDir,
Callback: router.Refresh,
Log: log,
Recursive: true,
Adaptive: true,
}
watcher, err := WatchDirectory(config)
if err != nil {
return nil, err
}
log.Info("Started watching Lua routes directory with adaptive polling: %s", routesDir)
return watcher, nil
}
// WatchStaticRouter sets up a watcher for a StaticRouter's root directory
func WatchStaticRouter(router *routers.StaticRouter, staticDir string, log *logger.Logger) (*Watcher, error) {
config := WatcherConfig{
Dir: staticDir,
Callback: router.Refresh,
Log: log,
Recursive: true,
Adaptive: true,
}
watcher, err := WatchDirectory(config)
if err != nil {
return nil, err
}
log.Info("Started watching static files directory with adaptive polling: %s", staticDir)
return watcher, nil
}

341
core/watchers/watcher.go Normal file
View File

@ -0,0 +1,341 @@
package watchers
import (
"fmt"
"os"
"path/filepath"
"sync"
"time"
"git.sharkk.net/Sky/Moonshark/core/logger"
)
// Default polling intervals
const (
defaultPollInterval = 1 * time.Second // Initial polling interval
extendedPollInterval = 5 * time.Second // Extended polling interval after inactivity
inactivityThreshold = 10 * time.Minute // Time before extending polling interval
)
// Default debounce time between detected change and callback
const defaultDebounceTime = 300 * time.Millisecond
// FileInfo stores metadata about a file for change detection
type FileInfo struct {
ModTime time.Time
Size int64
IsDir bool
}
// Watcher implements a simple polling-based file watcher
type Watcher struct {
// Directory to watch
dir string
// Map of file paths to their metadata
files map[string]FileInfo
filesMu sync.RWMutex
// Configuration
callback func() error
log *logger.Logger
pollInterval time.Duration // Current polling interval
basePollInterval time.Duration // Base (starting) polling interval
debounceTime time.Duration
recursive bool
adaptive bool // Whether to use adaptive polling intervals
// Adaptive polling
lastChangeTime time.Time // When we last detected a change
// Control channels
done chan struct{}
debounceCh chan struct{}
// Wait group for shutdown coordination
wg sync.WaitGroup
}
// WatcherConfig contains configuration for the file watcher
type WatcherConfig struct {
// Directory to watch
Dir string
// Callback function to call when changes are detected
Callback func() error
// Logger instance
Log *logger.Logger
// Poll interval (0 means use default)
PollInterval time.Duration
// Debounce time (0 means use default)
DebounceTime time.Duration
// Recursive watching (watch subdirectories)
Recursive bool
// Use adaptive polling intervals
Adaptive bool
}
// WatchDirectory sets up filesystem monitoring on a directory
// Returns the watcher for later cleanup and any setup error
func WatchDirectory(config WatcherConfig) (*Watcher, error) {
pollInterval := config.PollInterval
if pollInterval == 0 {
pollInterval = defaultPollInterval
}
debounceTime := config.DebounceTime
if debounceTime == 0 {
debounceTime = defaultDebounceTime
}
w := &Watcher{
dir: config.Dir,
files: make(map[string]FileInfo),
callback: config.Callback,
log: config.Log,
pollInterval: pollInterval,
basePollInterval: pollInterval,
debounceTime: debounceTime,
recursive: config.Recursive,
adaptive: config.Adaptive,
lastChangeTime: time.Now(),
done: make(chan struct{}),
debounceCh: make(chan struct{}, 1),
}
// Perform initial scan
if err := w.scanDirectory(); err != nil {
return nil, err
}
// Start the watcher routines
w.wg.Add(2)
go w.watchLoop()
go w.debounceLoop()
if config.Adaptive {
w.logDebug("Started watching with adaptive polling (1s default, 5s after 10m inactivity)")
} else {
w.logDebug("Started watching with fixed polling interval: %v", pollInterval)
}
return w, nil
}
// Close stops the watcher
func (w *Watcher) Close() error {
close(w.done)
w.wg.Wait()
return nil
}
// watchLoop periodically scans the directory for changes
func (w *Watcher) watchLoop() {
defer w.wg.Done()
ticker := time.NewTicker(w.pollInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
changed, err := w.checkForChanges()
if err != nil {
w.logError("Error checking for changes: %v", err)
continue
}
if changed {
// Update last change time
w.lastChangeTime = time.Now()
if w.adaptive && w.pollInterval > w.basePollInterval {
w.pollInterval = w.basePollInterval
ticker.Reset(w.pollInterval)
w.logDebug("Reset to base polling interval: %v", w.pollInterval)
}
// Try to send a change notification, non-blocking
select {
case w.debounceCh <- struct{}{}:
default:
// Channel already has a pending notification
}
} else if w.adaptive {
// Consider extending polling interval if enough time has passed since last change
inactiveDuration := time.Since(w.lastChangeTime)
if w.pollInterval == w.basePollInterval && inactiveDuration > inactivityThreshold {
// Extend polling interval
w.pollInterval = extendedPollInterval
ticker.Reset(w.pollInterval)
w.logDebug("Extended polling interval to: %v after %v of inactivity",
w.pollInterval, inactiveDuration.Round(time.Minute))
}
}
case <-w.done:
return
}
}
}
// debounceLoop handles debouncing change notifications
func (w *Watcher) debounceLoop() {
defer w.wg.Done()
var timer *time.Timer
for {
select {
case <-w.debounceCh:
// Cancel existing timer if there is one
if timer != nil {
timer.Stop()
}
// Start a new timer
timer = time.AfterFunc(w.debounceTime, func() {
if err := w.callback(); err != nil {
w.logError("Refresh callback error: %v", err)
}
})
case <-w.done:
if timer != nil {
timer.Stop()
}
return
}
}
}
// scanDirectory builds the initial file list
func (w *Watcher) scanDirectory() error {
w.filesMu.Lock()
defer w.filesMu.Unlock()
return filepath.Walk(w.dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
w.logWarning("Error accessing path %s: %v", path, err)
return nil // Continue with other files
}
// Skip if not recursive and this is a subdirectory
if !w.recursive && info.IsDir() && path != w.dir {
return filepath.SkipDir
}
w.files[path] = FileInfo{
ModTime: info.ModTime(),
Size: info.Size(),
IsDir: info.IsDir(),
}
return nil
})
}
// logDebug logs a debug message with the watcher's directory prefix
func (w *Watcher) logDebug(format string, args ...any) {
w.log.Debug("[Watcher] [%s] %s", w.dir, fmt.Sprintf(format, args...))
}
// logInfo logs an info message with the watcher's directory prefix
func (w *Watcher) logInfo(format string, args ...any) {
w.log.Info("[Watcher] [%s] %s", w.dir, fmt.Sprintf(format, args...))
}
// logWarning logs a warning message with the watcher's directory prefix
func (w *Watcher) logWarning(format string, args ...any) {
w.log.Warning("[Watcher] [%s] %s", w.dir, fmt.Sprintf(format, args...))
}
// logError logs an error message with the watcher's directory prefix
func (w *Watcher) logError(format string, args ...any) {
w.log.Error("[Watcher] [%s] %s", w.dir, fmt.Sprintf(format, args...))
}
// checkForChanges detects if any files have been added, modified, or deleted
func (w *Watcher) checkForChanges() (bool, error) {
// Get current state
currentFiles := make(map[string]FileInfo)
err := filepath.Walk(w.dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
// File might have been deleted between directory read and stat
return nil
}
// Skip if not recursive and this is a subdirectory
if !w.recursive && info.IsDir() && path != w.dir {
return filepath.SkipDir
}
currentFiles[path] = FileInfo{
ModTime: info.ModTime(),
Size: info.Size(),
IsDir: info.IsDir(),
}
return nil
})
if err != nil {
return false, err
}
// Compare with previous state
w.filesMu.RLock()
previousFiles := w.files
w.filesMu.RUnlock()
// Check for different file count (quick check)
if len(currentFiles) != len(previousFiles) {
w.logDebug("File count changed: %d -> %d", len(previousFiles), len(currentFiles))
w.updateFiles(currentFiles)
return true, nil
}
// Check for modified, added, or removed files
for path, currentInfo := range currentFiles {
prevInfo, exists := previousFiles[path]
if !exists {
// New file
w.logDebug("New file detected: %s", path)
w.updateFiles(currentFiles)
return true, nil
}
if currentInfo.ModTime != prevInfo.ModTime || currentInfo.Size != prevInfo.Size {
// File modified
w.logDebug("File modified: %s", path)
w.updateFiles(currentFiles)
return true, nil
}
}
// Check for deleted files
for path := range previousFiles {
if _, exists := currentFiles[path]; !exists {
// File deleted
w.logDebug("File deleted: %s", path)
w.updateFiles(currentFiles)
return true, nil
}
}
// No changes detected
return false, nil
}
// updateFiles updates the internal file list
func (w *Watcher) updateFiles(newFiles map[string]FileInfo) {
w.filesMu.Lock()
w.files = newFiles
w.filesMu.Unlock()
}