package watchers import ( "fmt" "os" "path/filepath" "sync" "time" "git.sharkk.net/Sky/Moonshark/core/logger" ) // Default polling intervals const ( defaultPollInterval = 1 * time.Second // Initial polling interval extendedPollInterval = 5 * time.Second // Extended polling interval after inactivity maxPollInterval = 10 * time.Second // Maximum polling interval after long inactivity inactivityThreshold = 5 * time.Minute // Time before extending polling interval secondExtendThreshold = 30 * time.Minute // Time before second extension ) // Default debounce time between detected change and callback const defaultDebounceTime = 300 * time.Millisecond // FileInfo stores metadata about a file for change detection type FileInfo struct { ModTime time.Time Size int64 IsDir bool } // Watcher implements a simple polling-based file watcher type Watcher struct { // Directory to watch dir string // Map of file paths to their metadata files map[string]FileInfo filesMu sync.RWMutex // Configuration callback func() error log *logger.Logger pollInterval time.Duration // Current polling interval basePollInterval time.Duration // Base (starting) polling interval debounceTime time.Duration recursive bool adaptive bool // Whether to use adaptive polling intervals // Adaptive polling lastChangeTime time.Time // When we last detected a change // Control channels done chan struct{} debounceCh chan struct{} // Wait group for shutdown coordination wg sync.WaitGroup } // WatcherConfig contains configuration for the file watcher type WatcherConfig struct { // Directory to watch Dir string // Callback function to call when changes are detected Callback func() error // Logger instance Log *logger.Logger // Poll interval (0 means use default) PollInterval time.Duration // Debounce time (0 means use default) DebounceTime time.Duration // Recursive watching (watch subdirectories) Recursive bool // Use adaptive polling intervals Adaptive bool } // WatchDirectory sets up filesystem monitoring on a directory // Returns the watcher for later cleanup and any setup error func WatchDirectory(config WatcherConfig) (*Watcher, error) { pollInterval := config.PollInterval if pollInterval == 0 { pollInterval = defaultPollInterval } debounceTime := config.DebounceTime if debounceTime == 0 { debounceTime = defaultDebounceTime } w := &Watcher{ dir: config.Dir, files: make(map[string]FileInfo), callback: config.Callback, log: config.Log, pollInterval: pollInterval, basePollInterval: pollInterval, debounceTime: debounceTime, recursive: config.Recursive, adaptive: config.Adaptive, lastChangeTime: time.Now(), done: make(chan struct{}), debounceCh: make(chan struct{}, 1), } // Perform initial scan if err := w.scanDirectory(); err != nil { return nil, err } // Start the watcher routines w.wg.Add(2) go w.watchLoop() go w.debounceLoop() if config.Adaptive { w.logDebug("Started watching with adaptive polling") } else { w.logDebug("Started watching with fixed polling interval: %v", pollInterval) } return w, nil } // Close stops the watcher func (w *Watcher) Close() error { close(w.done) w.wg.Wait() return nil } // watchLoop periodically scans the directory for changes func (w *Watcher) watchLoop() { defer w.wg.Done() ticker := time.NewTicker(w.pollInterval) defer ticker.Stop() for { select { case <-ticker.C: changed, err := w.checkForChanges() if err != nil { w.logError("Error checking for changes: %v", err) continue } if changed { // Update last change time w.lastChangeTime = time.Now() if w.adaptive && w.pollInterval > w.basePollInterval { w.pollInterval = w.basePollInterval ticker.Reset(w.pollInterval) w.logDebug("Reset to base polling interval: %v", w.pollInterval) } // Try to send a change notification, non-blocking select { case w.debounceCh <- struct{}{}: default: // Channel already has a pending notification } } else if w.adaptive { // Consider extending polling interval if enough time has passed since last change inactiveDuration := time.Since(w.lastChangeTime) if w.pollInterval == w.basePollInterval && inactiveDuration > inactivityThreshold { // First extension w.pollInterval = extendedPollInterval ticker.Reset(w.pollInterval) w.logDebug("Extended polling interval to: %v after %v of inactivity", w.pollInterval, inactiveDuration.Round(time.Second)) } else if w.pollInterval == extendedPollInterval && inactiveDuration > secondExtendThreshold { // Second extension w.pollInterval = maxPollInterval ticker.Reset(w.pollInterval) w.logDebug("Extended polling interval to: %v after %v of inactivity", w.pollInterval, inactiveDuration.Round(time.Second)) } } case <-w.done: return } } } // debounceLoop handles debouncing change notifications func (w *Watcher) debounceLoop() { defer w.wg.Done() var timer *time.Timer for { select { case <-w.debounceCh: // Cancel existing timer if there is one if timer != nil { timer.Stop() } // Start a new timer timer = time.AfterFunc(w.debounceTime, func() { if err := w.callback(); err != nil { w.logError("Refresh callback error: %v", err) } }) case <-w.done: if timer != nil { timer.Stop() } return } } } // scanDirectory builds the initial file list func (w *Watcher) scanDirectory() error { w.filesMu.Lock() defer w.filesMu.Unlock() return filepath.Walk(w.dir, func(path string, info os.FileInfo, err error) error { if err != nil { w.logWarning("Error accessing path %s: %v", path, err) return nil // Continue with other files } // Skip if not recursive and this is a subdirectory if !w.recursive && info.IsDir() && path != w.dir { return filepath.SkipDir } w.files[path] = FileInfo{ ModTime: info.ModTime(), Size: info.Size(), IsDir: info.IsDir(), } return nil }) } // logDebug logs a debug message with the watcher's directory prefix func (w *Watcher) logDebug(format string, args ...any) { w.log.Debug("[Watcher] [%s] %s", w.dir, fmt.Sprintf(format, args...)) } // logInfo logs an info message with the watcher's directory prefix func (w *Watcher) logInfo(format string, args ...any) { w.log.Info("[Watcher] [%s] %s", w.dir, fmt.Sprintf(format, args...)) } // logWarning logs a warning message with the watcher's directory prefix func (w *Watcher) logWarning(format string, args ...any) { w.log.Warning("[Watcher] [%s] %s", w.dir, fmt.Sprintf(format, args...)) } // logError logs an error message with the watcher's directory prefix func (w *Watcher) logError(format string, args ...any) { w.log.Error("[Watcher] [%s] %s", w.dir, fmt.Sprintf(format, args...)) } func (w *Watcher) checkForChanges() (bool, error) { // Get current state currentFiles := make(map[string]FileInfo) err := filepath.Walk(w.dir, func(path string, info os.FileInfo, err error) error { if err != nil { // File might have been deleted between directory read and stat return nil } // Skip if not recursive and this is a subdirectory if !w.recursive && info.IsDir() && path != w.dir { return filepath.SkipDir } currentFiles[path] = FileInfo{ ModTime: info.ModTime(), Size: info.Size(), IsDir: info.IsDir(), } return nil }) if err != nil { return false, err } // Compare with previous state w.filesMu.RLock() previousFiles := w.files w.filesMu.RUnlock() // Check for different file count (quick check) if len(currentFiles) != len(previousFiles) { w.logDebug("File count changed: %d -> %d", len(previousFiles), len(currentFiles)) w.updateFiles(currentFiles) return true, nil } // Check for modified, added, or removed files for path, currentInfo := range currentFiles { prevInfo, exists := previousFiles[path] if !exists { // New file w.logDebug("New file detected: %s", path) w.updateFiles(currentFiles) return true, nil } if currentInfo.ModTime != prevInfo.ModTime || currentInfo.Size != prevInfo.Size { // File modified w.logDebug("File modified: %s", path) w.updateFiles(currentFiles) return true, nil } } // Check for deleted files for path := range previousFiles { if _, exists := currentFiles[path]; !exists { // File deleted w.logDebug("File deleted: %s", path) w.updateFiles(currentFiles) return true, nil } } // No changes detected return false, nil } // updateFiles updates the internal file list func (w *Watcher) updateFiles(newFiles map[string]FileInfo) { w.filesMu.Lock() w.files = newFiles w.filesMu.Unlock() }