simplify watcher

This commit is contained in:
Sky Johnson 2025-05-01 13:04:39 -05:00
parent 87eaddd8e4
commit d50081ef55
4 changed files with 118 additions and 124 deletions

View File

@ -199,12 +199,17 @@ func (s *Moonshark) initRunner() error {
// setupWatchers initializes and starts all configured file watchers // setupWatchers initializes and starts all configured file watchers
func (s *Moonshark) setupWatchers() error { func (s *Moonshark) setupWatchers() error {
manager := watchers.GetWatcherManager()
// Set up watcher for Lua routes // Set up watcher for Lua routes
luaRouterWatcher, err := watchers.WatchLuaRouter(s.LuaRouter, s.LuaRunner, s.Config.Dirs.Routes) routeWatcher, err := watchers.WatchLuaRouter(s.LuaRouter, s.LuaRunner, s.Config.Dirs.Routes)
if err != nil { if err != nil {
logger.Warning("Failed to watch routes directory: %v", err) logger.Warning("Failed to watch routes directory: %v", err)
} else { } else {
s.cleanupFuncs = append(s.cleanupFuncs, luaRouterWatcher.Close) routesDir := routeWatcher.GetDir()
s.cleanupFuncs = append(s.cleanupFuncs, func() error {
return manager.UnwatchDirectory(routesDir)
})
} }
// Set up watchers for Lua modules libraries // Set up watchers for Lua modules libraries
@ -213,8 +218,10 @@ func (s *Moonshark) setupWatchers() error {
logger.Warning("Failed to watch Lua module directories: %v", err) logger.Warning("Failed to watch Lua module directories: %v", err)
} else { } else {
for _, watcher := range moduleWatchers { for _, watcher := range moduleWatchers {
w := watcher // Capture variable for closure dirPath := watcher.GetDir()
s.cleanupFuncs = append(s.cleanupFuncs, w.Close) s.cleanupFuncs = append(s.cleanupFuncs, func() error {
return manager.UnwatchDirectory(dirPath)
})
} }
logger.Info("File watchers active for %d Lua module directories", len(moduleWatchers)) logger.Info("File watchers active for %d Lua module directories", len(moduleWatchers))
} }

View File

@ -9,56 +9,31 @@ import (
"Moonshark/core/utils/logger" "Moonshark/core/utils/logger"
) )
// Global watcher manager instance // Global watcher manager instance with explicit creation
var ( var (
globalManager *WatcherManager globalManager *WatcherManager
globalManagerOnce sync.Once globalManagerOnce sync.Once
) )
// GetWatcherManager returns the global watcher manager, creating it if needed // GetWatcherManager returns the global watcher manager, creating it if needed
func GetWatcherManager(adaptive bool) *WatcherManager { func GetWatcherManager() *WatcherManager {
globalManagerOnce.Do(func() { globalManagerOnce.Do(func() {
globalManager = NewWatcherManager() globalManager = NewWatcherManager(DefaultPollInterval)
}) })
return globalManager return globalManager
} }
// WatchDirectory creates a new directory watcher and registers it with the manager // ShutdownWatcherManager closes the global watcher manager if it exists
func WatchDirectory(config DirectoryWatcherConfig, manager *WatcherManager) (*Watcher, error) { func ShutdownWatcherManager() {
dirWatcher, err := NewDirectoryWatcher(config) if globalManager != nil {
if err != nil { globalManager.Close()
return nil, fmt.Errorf("failed to create directory watcher: %w", err) globalManager = nil
} }
manager.AddWatcher(dirWatcher)
// Create a wrapper Watcher that implements the old interface
w := &Watcher{
dir: config.Dir,
dirWatch: dirWatcher,
manager: manager,
}
return w, nil
} }
// Watcher is a compatibility wrapper that maintains the old API // WatchLuaRouter sets up a watcher for a LuaRouter's routes directory
type Watcher struct { func WatchLuaRouter(router *routers.LuaRouter, runner *runner.Runner, routesDir string) (*DirectoryWatcher, error) {
dir string manager := GetWatcherManager()
dirWatch *DirectoryWatcher
manager *WatcherManager
}
// Close unregisters the watcher from the manager
func (w *Watcher) Close() error {
w.manager.RemoveWatcher(w.dir)
return nil
}
// WatchLuaRouter sets up a watcher for a LuaRouter's routes directory; also updates
// the LuaRunner so that the state can be rebuilt
func WatchLuaRouter(router *routers.LuaRouter, runner *runner.Runner, routesDir string) (*Watcher, error) {
manager := GetWatcherManager(true)
runnerRefresh := func() error { runnerRefresh := func() error {
logger.Debug("Refreshing LuaRunner state due to file change") logger.Debug("Refreshing LuaRunner state due to file change")
@ -74,9 +49,9 @@ func WatchLuaRouter(router *routers.LuaRouter, runner *runner.Runner, routesDir
Recursive: true, Recursive: true,
} }
watcher, err := WatchDirectory(config, manager) watcher, err := manager.WatchDirectory(config)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("failed to watch directory: %w", err)
} }
logger.Info("Started watching Lua routes directory: %s", routesDir) logger.Info("Started watching Lua routes directory: %s", routesDir)
@ -84,13 +59,12 @@ func WatchLuaRouter(router *routers.LuaRouter, runner *runner.Runner, routesDir
} }
// WatchLuaModules sets up watchers for Lua module directories // WatchLuaModules sets up watchers for Lua module directories
func WatchLuaModules(luaRunner *runner.Runner, libDirs []string) ([]*Watcher, error) { func WatchLuaModules(luaRunner *runner.Runner, libDirs []string) ([]*DirectoryWatcher, error) {
manager := GetWatcherManager(true) manager := GetWatcherManager()
watchers := make([]*Watcher, 0, len(libDirs)) watchers := make([]*DirectoryWatcher, 0, len(libDirs))
for _, dir := range libDirs { for _, dir := range libDirs {
// Create a directory-specific callback dirCopy := dir
dirCopy := dir // Capture for closure
callback := func() error { callback := func() error {
logger.Debug("Detected changes in Lua module directory: %s", dirCopy) logger.Debug("Detected changes in Lua module directory: %s", dirCopy)
@ -108,13 +82,12 @@ func WatchLuaModules(luaRunner *runner.Runner, libDirs []string) ([]*Watcher, er
Recursive: true, Recursive: true,
} }
watcher, err := WatchDirectory(config, manager) watcher, err := manager.WatchDirectory(config)
if err != nil { if err != nil {
// Clean up already created watchers
for _, w := range watchers { for _, w := range watchers {
w.Close() manager.UnwatchDirectory(w.dir)
} }
return nil, err return nil, fmt.Errorf("failed to watch directory %s: %w", dir, err)
} }
watchers = append(watchers, watcher) watchers = append(watchers, watcher)
@ -124,14 +97,6 @@ func WatchLuaModules(luaRunner *runner.Runner, libDirs []string) ([]*Watcher, er
return watchers, nil return watchers, nil
} }
// ShutdownWatcherManager closes the global watcher manager if it exists
func ShutdownWatcherManager() {
if globalManager != nil {
globalManager.Close()
globalManager = nil
}
}
// combineCallbacks creates a single callback function from multiple callbacks // combineCallbacks creates a single callback function from multiple callbacks
func combineCallbacks(callbacks ...func() error) func() error { func combineCallbacks(callbacks ...func() error) func() error {
return func() error { return func() error {

View File

@ -13,11 +13,9 @@ import (
// Default debounce time between detected change and callback // Default debounce time between detected change and callback
const defaultDebounceTime = 300 * time.Millisecond const defaultDebounceTime = 300 * time.Millisecond
// FileInfo stores metadata about a file for change detection // FileInfo stores minimal metadata about a file for change detection
type FileInfo struct { type FileInfo struct {
ModTime time.Time ModTime time.Time
Size int64
IsDir bool
} }
// DirectoryWatcher watches a specific directory for changes // DirectoryWatcher watches a specific directory for changes
@ -38,6 +36,10 @@ type DirectoryWatcher struct {
debounceTimer *time.Timer debounceTimer *time.Timer
debouncing bool debouncing bool
debounceMu sync.Mutex debounceMu sync.Mutex
// Error tracking
consecutiveErrors int
lastError error
} }
// DirectoryWatcherConfig contains configuration for a directory watcher // DirectoryWatcherConfig contains configuration for a directory watcher
@ -48,35 +50,11 @@ type DirectoryWatcherConfig struct {
Recursive bool // Recursive watching (watch subdirectories) Recursive bool // Recursive watching (watch subdirectories)
} }
// NewDirectoryWatcher creates a new directory watcher
func NewDirectoryWatcher(config DirectoryWatcherConfig) (*DirectoryWatcher, error) {
debounceTime := config.DebounceTime
if debounceTime == 0 {
debounceTime = defaultDebounceTime
}
w := &DirectoryWatcher{
dir: config.Dir,
files: make(map[string]FileInfo),
callback: config.Callback,
debounceTime: debounceTime,
recursive: config.Recursive,
}
// Perform initial scan
if err := w.scanDirectory(); err != nil {
return nil, err
}
return w, nil
}
// scanDirectory builds the initial file list // scanDirectory builds the initial file list
func (w *DirectoryWatcher) scanDirectory() error { func (w *DirectoryWatcher) scanDirectory() error {
w.filesMu.Lock() w.filesMu.Lock()
defer w.filesMu.Unlock() defer w.filesMu.Unlock()
// Clear existing files map
w.files = make(map[string]FileInfo) w.files = make(map[string]FileInfo)
return filepath.Walk(w.dir, func(path string, info os.FileInfo, err error) error { return filepath.Walk(w.dir, func(path string, info os.FileInfo, err error) error {
@ -84,15 +62,12 @@ func (w *DirectoryWatcher) scanDirectory() error {
return nil // Skip files with errors return nil // Skip files with errors
} }
// Skip subdirectories if not recursive
if !w.recursive && info.IsDir() && path != w.dir { if !w.recursive && info.IsDir() && path != w.dir {
return filepath.SkipDir return filepath.SkipDir
} }
w.files[path] = FileInfo{ w.files[path] = FileInfo{
ModTime: info.ModTime(), ModTime: info.ModTime(),
Size: info.Size(),
IsDir: info.IsDir(),
} }
return nil return nil
@ -101,42 +76,34 @@ func (w *DirectoryWatcher) scanDirectory() error {
// checkForChanges detects if any files have been added, modified, or deleted // checkForChanges detects if any files have been added, modified, or deleted
func (w *DirectoryWatcher) checkForChanges() (bool, error) { func (w *DirectoryWatcher) checkForChanges() (bool, error) {
// Lock for reading previous state
w.filesMu.RLock() w.filesMu.RLock()
prevFileCount := len(w.files) prevFileCount := len(w.files)
w.filesMu.RUnlock() w.filesMu.RUnlock()
// Track new state and whether changes were detected
newFiles := make(map[string]FileInfo) newFiles := make(map[string]FileInfo)
changed := false changed := false
// Walk the directory to check for changes
err := filepath.Walk(w.dir, func(path string, info os.FileInfo, err error) error { err := filepath.Walk(w.dir, func(path string, info os.FileInfo, err error) error {
// Skip errors (file might have been deleted) // Skip errors (file might have been deleted)
if err != nil { if err != nil {
return nil return nil
} }
// Skip subdirectories if not recursive
if !w.recursive && info.IsDir() && path != w.dir { if !w.recursive && info.IsDir() && path != w.dir {
return filepath.SkipDir return filepath.SkipDir
} }
// Store current file info
currentInfo := FileInfo{ currentInfo := FileInfo{
ModTime: info.ModTime(), ModTime: info.ModTime(),
Size: info.Size(),
IsDir: info.IsDir(),
} }
newFiles[path] = currentInfo newFiles[path] = currentInfo
// Check if file is new or modified (only if we haven't already detected a change)
if !changed { if !changed {
w.filesMu.RLock() w.filesMu.RLock()
prevInfo, exists := w.files[path] prevInfo, exists := w.files[path]
w.filesMu.RUnlock() w.filesMu.RUnlock()
if !exists || currentInfo.ModTime != prevInfo.ModTime || currentInfo.Size != prevInfo.Size { if !exists || currentInfo.ModTime != prevInfo.ModTime {
changed = true changed = true
w.logDebug("File changed: %s", path) w.logDebug("File changed: %s", path)
} }
@ -146,10 +113,13 @@ func (w *DirectoryWatcher) checkForChanges() (bool, error) {
}) })
if err != nil { if err != nil {
w.consecutiveErrors++
w.lastError = err
return false, err return false, err
} }
// Check for deleted files (only if we haven't already detected a change) w.consecutiveErrors = 0
if !changed && len(newFiles) != prevFileCount { if !changed && len(newFiles) != prevFileCount {
w.filesMu.RLock() w.filesMu.RLock()
for path := range w.files { for path := range w.files {
@ -162,7 +132,6 @@ func (w *DirectoryWatcher) checkForChanges() (bool, error) {
w.filesMu.RUnlock() w.filesMu.RUnlock()
} }
// Update files map if changed
if changed { if changed {
w.filesMu.Lock() w.filesMu.Lock()
w.files = newFiles w.files = newFiles
@ -177,7 +146,6 @@ func (w *DirectoryWatcher) notifyChange() {
w.debounceMu.Lock() w.debounceMu.Lock()
defer w.debounceMu.Unlock() defer w.debounceMu.Unlock()
// Reset timer if already debouncing
if w.debouncing { if w.debouncing {
if w.debounceTimer != nil { if w.debounceTimer != nil {
w.debounceTimer.Stop() w.debounceTimer.Stop()
@ -191,7 +159,6 @@ func (w *DirectoryWatcher) notifyChange() {
w.logError("Callback error: %v", err) w.logError("Callback error: %v", err)
} }
// Reset debouncing state
w.debounceMu.Lock() w.debounceMu.Lock()
w.debouncing = false w.debouncing = false
w.debounceMu.Unlock() w.debounceMu.Unlock()
@ -203,12 +170,12 @@ func (w *DirectoryWatcher) logDebug(format string, args ...any) {
logger.Debug("[Watcher] [%s] %s", w.dir, fmt.Sprintf(format, args...)) logger.Debug("[Watcher] [%s] %s", w.dir, fmt.Sprintf(format, args...))
} }
// logWarning logs a warning message with the watcher's directory prefix
func (w *DirectoryWatcher) logWarning(format string, args ...any) {
logger.Warning("[Watcher] [%s] %s", w.dir, fmt.Sprintf(format, args...))
}
// logError logs an error message with the watcher's directory prefix // logError logs an error message with the watcher's directory prefix
func (w *DirectoryWatcher) logError(format string, args ...any) { func (w *DirectoryWatcher) logError(format string, args ...any) {
logger.Error("[Watcher] [%s] %s", w.dir, fmt.Sprintf(format, args...)) logger.Error("[Watcher] [%s] %s", w.dir, fmt.Sprintf(format, args...))
} }
// GetDir gets the DirectoryWatcher's current directory
func (w *DirectoryWatcher) GetDir() string {
return w.dir
}

View File

@ -1,15 +1,20 @@
package watchers package watchers
import ( import (
"errors"
"sync" "sync"
"time" "time"
"Moonshark/core/utils/logger" "Moonshark/core/utils/logger"
) )
// Default polling interval // DefaultPollInterval is the time between directory checks
const ( const DefaultPollInterval = 1 * time.Second
defaultPollInterval = 1 * time.Second
// Common errors
var (
ErrDirectoryNotFound = errors.New("directory not found")
ErrAlreadyWatching = errors.New("already watching directory")
) )
// WatcherManager coordinates file watching across multiple directories // WatcherManager coordinates file watching across multiple directories
@ -19,18 +24,24 @@ type WatcherManager struct {
done chan struct{} done chan struct{}
ticker *time.Ticker ticker *time.Ticker
interval time.Duration
wg sync.WaitGroup wg sync.WaitGroup
} }
// NewWatcherManager creates a new watcher manager // NewWatcherManager creates a new watcher manager with a specified poll interval
func NewWatcherManager() *WatcherManager { func NewWatcherManager(pollInterval time.Duration) *WatcherManager {
if pollInterval <= 0 {
pollInterval = DefaultPollInterval
}
manager := &WatcherManager{ manager := &WatcherManager{
watchers: make(map[string]*DirectoryWatcher), watchers: make(map[string]*DirectoryWatcher),
done: make(chan struct{}), done: make(chan struct{}),
interval: pollInterval,
} }
manager.ticker = time.NewTicker(defaultPollInterval) manager.ticker = time.NewTicker(pollInterval)
manager.wg.Add(1) manager.wg.Add(1)
go manager.pollLoop() go manager.pollLoop()
@ -47,22 +58,50 @@ func (m *WatcherManager) Close() error {
return nil return nil
} }
// AddWatcher registers a new directory watcher // WatchDirectory adds a new directory to watch and returns the watcher
func (m *WatcherManager) AddWatcher(watcher *DirectoryWatcher) { func (m *WatcherManager) WatchDirectory(config DirectoryWatcherConfig) (*DirectoryWatcher, error) {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
m.watchers[watcher.dir] = watcher if _, exists := m.watchers[config.Dir]; exists {
logger.Debug("WatcherManager added watcher for %s", watcher.dir) return nil, ErrAlreadyWatching
}
if config.DebounceTime == 0 {
config.DebounceTime = defaultDebounceTime
}
watcher := &DirectoryWatcher{
dir: config.Dir,
files: make(map[string]FileInfo),
callback: config.Callback,
debounceTime: config.DebounceTime,
recursive: config.Recursive,
}
// Perform initial scan
if err := watcher.scanDirectory(); err != nil {
return nil, err
}
m.watchers[config.Dir] = watcher
logger.Debug("WatcherManager added watcher for %s", config.Dir)
return watcher, nil
} }
// RemoveWatcher unregisters a directory watcher // UnwatchDirectory removes a directory from being watched
func (m *WatcherManager) RemoveWatcher(dir string) { func (m *WatcherManager) UnwatchDirectory(dir string) error {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
if _, exists := m.watchers[dir]; !exists {
return ErrDirectoryNotFound
}
delete(m.watchers, dir) delete(m.watchers, dir)
logger.Debug("WatcherManager removed watcher for %s", dir) logger.Debug("WatcherManager removed watcher for %s", dir)
return nil
} }
// pollLoop is the main polling loop that checks all watched directories // pollLoop is the main polling loop that checks all watched directories
@ -82,16 +121,32 @@ func (m *WatcherManager) pollLoop() {
// checkAllDirectories polls all registered directories for changes // checkAllDirectories polls all registered directories for changes
func (m *WatcherManager) checkAllDirectories() { func (m *WatcherManager) checkAllDirectories() {
m.mu.RLock() m.mu.RLock()
defer m.mu.RUnlock() watchers := make([]*DirectoryWatcher, 0, len(m.watchers))
for _, w := range m.watchers {
watchers = append(watchers, w)
}
m.mu.RUnlock()
changesDetected := 0
for _, watcher := range watchers {
if watcher.consecutiveErrors > 3 {
if watcher.consecutiveErrors == 4 {
logger.Error("Temporarily skipping directory %s due to errors: %v",
watcher.dir, watcher.lastError)
watcher.consecutiveErrors++
}
continue
}
for _, watcher := range m.watchers {
changed, err := watcher.checkForChanges() changed, err := watcher.checkForChanges()
if err != nil { if err != nil {
logger.Error("[WatcherManager] Error checking directory %s: %v", watcher.dir, err) logger.Error("Error checking directory %s: %v", watcher.dir, err)
continue continue
} }
if changed { if changed {
changesDetected++
watcher.notifyChange() watcher.notifyChange()
} }
} }