Moonshark/core/watchers/watcher.go
2025-03-22 14:16:45 -05:00

342 lines
8.3 KiB
Go

package watchers
import (
"fmt"
"os"
"path/filepath"
"sync"
"time"
"git.sharkk.net/Sky/Moonshark/core/logger"
)
// Default polling intervals
const (
defaultPollInterval = 1 * time.Second // Initial polling interval
extendedPollInterval = 5 * time.Second // Extended polling interval after inactivity
inactivityThreshold = 10 * time.Minute // Time before extending polling interval
)
// Default debounce time between detected change and callback
const defaultDebounceTime = 300 * time.Millisecond
// FileInfo stores metadata about a file for change detection
type FileInfo struct {
ModTime time.Time
Size int64
IsDir bool
}
// Watcher implements a simple polling-based file watcher
type Watcher struct {
// Directory to watch
dir string
// Map of file paths to their metadata
files map[string]FileInfo
filesMu sync.RWMutex
// Configuration
callback func() error
log *logger.Logger
pollInterval time.Duration // Current polling interval
basePollInterval time.Duration // Base (starting) polling interval
debounceTime time.Duration
recursive bool
adaptive bool // Whether to use adaptive polling intervals
// Adaptive polling
lastChangeTime time.Time // When we last detected a change
// Control channels
done chan struct{}
debounceCh chan struct{}
// Wait group for shutdown coordination
wg sync.WaitGroup
}
// WatcherConfig contains configuration for the file watcher
type WatcherConfig struct {
// Directory to watch
Dir string
// Callback function to call when changes are detected
Callback func() error
// Logger instance
Log *logger.Logger
// Poll interval (0 means use default)
PollInterval time.Duration
// Debounce time (0 means use default)
DebounceTime time.Duration
// Recursive watching (watch subdirectories)
Recursive bool
// Use adaptive polling intervals
Adaptive bool
}
// WatchDirectory sets up filesystem monitoring on a directory
// Returns the watcher for later cleanup and any setup error
func WatchDirectory(config WatcherConfig) (*Watcher, error) {
pollInterval := config.PollInterval
if pollInterval == 0 {
pollInterval = defaultPollInterval
}
debounceTime := config.DebounceTime
if debounceTime == 0 {
debounceTime = defaultDebounceTime
}
w := &Watcher{
dir: config.Dir,
files: make(map[string]FileInfo),
callback: config.Callback,
log: config.Log,
pollInterval: pollInterval,
basePollInterval: pollInterval,
debounceTime: debounceTime,
recursive: config.Recursive,
adaptive: config.Adaptive,
lastChangeTime: time.Now(),
done: make(chan struct{}),
debounceCh: make(chan struct{}, 1),
}
// Perform initial scan
if err := w.scanDirectory(); err != nil {
return nil, err
}
// Start the watcher routines
w.wg.Add(2)
go w.watchLoop()
go w.debounceLoop()
if config.Adaptive {
w.logDebug("Started watching with adaptive polling (1s default, 5s after 10m inactivity)")
} else {
w.logDebug("Started watching with fixed polling interval: %v", pollInterval)
}
return w, nil
}
// Close stops the watcher
func (w *Watcher) Close() error {
close(w.done)
w.wg.Wait()
return nil
}
// watchLoop periodically scans the directory for changes
func (w *Watcher) watchLoop() {
defer w.wg.Done()
ticker := time.NewTicker(w.pollInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
changed, err := w.checkForChanges()
if err != nil {
w.logError("Error checking for changes: %v", err)
continue
}
if changed {
// Update last change time
w.lastChangeTime = time.Now()
if w.adaptive && w.pollInterval > w.basePollInterval {
w.pollInterval = w.basePollInterval
ticker.Reset(w.pollInterval)
w.logDebug("Reset to base polling interval: %v", w.pollInterval)
}
// Try to send a change notification, non-blocking
select {
case w.debounceCh <- struct{}{}:
default:
// Channel already has a pending notification
}
} else if w.adaptive {
// Consider extending polling interval if enough time has passed since last change
inactiveDuration := time.Since(w.lastChangeTime)
if w.pollInterval == w.basePollInterval && inactiveDuration > inactivityThreshold {
// Extend polling interval
w.pollInterval = extendedPollInterval
ticker.Reset(w.pollInterval)
w.logDebug("Extended polling interval to: %v after %v of inactivity",
w.pollInterval, inactiveDuration.Round(time.Minute))
}
}
case <-w.done:
return
}
}
}
// debounceLoop handles debouncing change notifications
func (w *Watcher) debounceLoop() {
defer w.wg.Done()
var timer *time.Timer
for {
select {
case <-w.debounceCh:
// Cancel existing timer if there is one
if timer != nil {
timer.Stop()
}
// Start a new timer
timer = time.AfterFunc(w.debounceTime, func() {
if err := w.callback(); err != nil {
w.logError("Refresh callback error: %v", err)
}
})
case <-w.done:
if timer != nil {
timer.Stop()
}
return
}
}
}
// scanDirectory builds the initial file list
func (w *Watcher) scanDirectory() error {
w.filesMu.Lock()
defer w.filesMu.Unlock()
return filepath.Walk(w.dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
w.logWarning("Error accessing path %s: %v", path, err)
return nil // Continue with other files
}
// Skip if not recursive and this is a subdirectory
if !w.recursive && info.IsDir() && path != w.dir {
return filepath.SkipDir
}
w.files[path] = FileInfo{
ModTime: info.ModTime(),
Size: info.Size(),
IsDir: info.IsDir(),
}
return nil
})
}
// logDebug logs a debug message with the watcher's directory prefix
func (w *Watcher) logDebug(format string, args ...any) {
w.log.Debug("[Watcher] [%s] %s", w.dir, fmt.Sprintf(format, args...))
}
// logInfo logs an info message with the watcher's directory prefix
func (w *Watcher) logInfo(format string, args ...any) {
w.log.Info("[Watcher] [%s] %s", w.dir, fmt.Sprintf(format, args...))
}
// logWarning logs a warning message with the watcher's directory prefix
func (w *Watcher) logWarning(format string, args ...any) {
w.log.Warning("[Watcher] [%s] %s", w.dir, fmt.Sprintf(format, args...))
}
// logError logs an error message with the watcher's directory prefix
func (w *Watcher) logError(format string, args ...any) {
w.log.Error("[Watcher] [%s] %s", w.dir, fmt.Sprintf(format, args...))
}
// checkForChanges detects if any files have been added, modified, or deleted
func (w *Watcher) checkForChanges() (bool, error) {
// Get current state
currentFiles := make(map[string]FileInfo)
err := filepath.Walk(w.dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
// File might have been deleted between directory read and stat
return nil
}
// Skip if not recursive and this is a subdirectory
if !w.recursive && info.IsDir() && path != w.dir {
return filepath.SkipDir
}
currentFiles[path] = FileInfo{
ModTime: info.ModTime(),
Size: info.Size(),
IsDir: info.IsDir(),
}
return nil
})
if err != nil {
return false, err
}
// Compare with previous state
w.filesMu.RLock()
previousFiles := w.files
w.filesMu.RUnlock()
// Check for different file count (quick check)
if len(currentFiles) != len(previousFiles) {
w.logDebug("File count changed: %d -> %d", len(previousFiles), len(currentFiles))
w.updateFiles(currentFiles)
return true, nil
}
// Check for modified, added, or removed files
for path, currentInfo := range currentFiles {
prevInfo, exists := previousFiles[path]
if !exists {
// New file
w.logDebug("New file detected: %s", path)
w.updateFiles(currentFiles)
return true, nil
}
if currentInfo.ModTime != prevInfo.ModTime || currentInfo.Size != prevInfo.Size {
// File modified
w.logDebug("File modified: %s", path)
w.updateFiles(currentFiles)
return true, nil
}
}
// Check for deleted files
for path := range previousFiles {
if _, exists := currentFiles[path]; !exists {
// File deleted
w.logDebug("File deleted: %s", path)
w.updateFiles(currentFiles)
return true, nil
}
}
// No changes detected
return false, nil
}
// updateFiles updates the internal file list
func (w *Watcher) updateFiles(newFiles map[string]FileInfo) {
w.filesMu.Lock()
w.files = newFiles
w.filesMu.Unlock()
}