Moonshark/core/watchers/Manager.go

140 lines
3.4 KiB
Go

package watchers
import (
"sync"
"time"
"git.sharkk.net/Sky/Moonshark/core/logger"
)
// Default polling intervals
const (
defaultPollInterval = 1 * time.Second // Initial polling interval
extendedPollInterval = 5 * time.Second // Extended polling interval after inactivity
inactivityThreshold = 10 * time.Minute // Time before extending polling interval
)
// WatcherManager coordinates file watching across multiple directories
type WatcherManager struct {
// Registry of directories and their watchers
watchers map[string]*DirectoryWatcher
mu sync.RWMutex
// Shared polling state
pollInterval time.Duration
adaptive bool
lastActivity time.Time
// Control channels
done chan struct{}
ticker *time.Ticker
// Wait group for shutdown coordination
wg sync.WaitGroup
}
// NewWatcherManager creates a new watcher manager
func NewWatcherManager(adaptive bool) *WatcherManager {
manager := &WatcherManager{
watchers: make(map[string]*DirectoryWatcher),
pollInterval: defaultPollInterval,
adaptive: adaptive,
lastActivity: time.Now(),
done: make(chan struct{}),
}
// Start the polling loop
manager.ticker = time.NewTicker(manager.pollInterval)
manager.wg.Add(1)
go manager.pollLoop()
return manager
}
// Close stops all watchers and the manager
func (m *WatcherManager) Close() error {
close(m.done)
if m.ticker != nil {
m.ticker.Stop()
}
m.wg.Wait()
return nil
}
// AddWatcher registers a new directory watcher
func (m *WatcherManager) AddWatcher(watcher *DirectoryWatcher) {
m.mu.Lock()
defer m.mu.Unlock()
m.watchers[watcher.dir] = watcher
logger.Debug("[WatcherManager] Added watcher for directory: %s", watcher.dir)
}
// RemoveWatcher unregisters a directory watcher
func (m *WatcherManager) RemoveWatcher(dir string) {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.watchers, dir)
logger.Debug("[WatcherManager] Removed watcher for directory: %s", dir)
}
// pollLoop is the main polling loop that checks all watched directories
func (m *WatcherManager) pollLoop() {
defer m.wg.Done()
for {
select {
case <-m.ticker.C:
anyActivity := m.checkAllDirectories()
// Update polling interval based on activity
if m.adaptive {
if anyActivity {
// Activity detected, reset to fast polling
m.lastActivity = time.Now()
if m.pollInterval > defaultPollInterval {
m.pollInterval = defaultPollInterval
m.ticker.Reset(m.pollInterval)
logger.Debug("[WatcherManager] Reset to base polling interval: %v", m.pollInterval)
}
} else {
// No activity, consider slowing down polling
inactiveDuration := time.Since(m.lastActivity)
if m.pollInterval == defaultPollInterval && inactiveDuration > inactivityThreshold {
m.pollInterval = extendedPollInterval
m.ticker.Reset(m.pollInterval)
logger.Debug("[WatcherManager] Extended polling interval to: %v after %v of inactivity",
m.pollInterval, inactiveDuration.Round(time.Minute))
}
}
}
case <-m.done:
return
}
}
}
// checkAllDirectories polls all registered directories for changes
func (m *WatcherManager) checkAllDirectories() bool {
m.mu.RLock()
defer m.mu.RUnlock()
anyActivity := false
for _, watcher := range m.watchers {
changed, err := watcher.checkForChanges()
if err != nil {
logger.Error("[WatcherManager] Error checking directory %s: %v", watcher.dir, err)
continue
}
if changed {
anyActivity = true
watcher.notifyChange()
}
}
return anyActivity
}