Moonshark/watchers/manager.go

154 lines
3.3 KiB
Go

package watchers
import (
"errors"
"sync"
"time"
"Moonshark/logger"
)
// DefaultPollInterval is the time between directory checks
const DefaultPollInterval = 1 * time.Second
var (
ErrDirectoryNotFound = errors.New("directory not found")
ErrAlreadyWatching = errors.New("already watching directory")
)
// WatcherManager coordinates file watching across multiple directories
type WatcherManager struct {
watchers map[string]*DirectoryWatcher
mu sync.RWMutex
done chan struct{}
ticker *time.Ticker
interval time.Duration
wg sync.WaitGroup
}
// NewWatcherManager creates a new watcher manager with a specified poll interval
func NewWatcherManager(pollInterval time.Duration) *WatcherManager {
if pollInterval <= 0 {
pollInterval = DefaultPollInterval
}
manager := &WatcherManager{
watchers: make(map[string]*DirectoryWatcher),
done: make(chan struct{}),
interval: pollInterval,
}
manager.ticker = time.NewTicker(pollInterval)
manager.wg.Add(1)
go manager.pollLoop()
return manager
}
// Close stops all watchers and the manager
func (m *WatcherManager) Close() error {
close(m.done)
if m.ticker != nil {
m.ticker.Stop()
}
m.wg.Wait()
return nil
}
// WatchDirectory adds a new directory to watch and returns the watcher
func (m *WatcherManager) WatchDirectory(config DirectoryWatcherConfig) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, exists := m.watchers[config.Dir]; exists {
return ErrAlreadyWatching
}
if config.DebounceTime == 0 {
config.DebounceTime = defaultDebounceTime
}
watcher := &DirectoryWatcher{
dir: config.Dir,
files: make(map[string]FileInfo),
callback: config.Callback,
enhancedCallback: config.EnhancedCallback,
debounceTime: config.DebounceTime,
recursive: config.Recursive,
}
// Perform initial scan
if err := watcher.scanDirectory(); err != nil {
return err
}
m.watchers[config.Dir] = watcher
logger.Debugf("WatcherManager added watcher for %s", config.Dir)
return nil
}
// UnwatchDirectory removes a directory from being watched
func (m *WatcherManager) UnwatchDirectory(dir string) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, exists := m.watchers[dir]; !exists {
return ErrDirectoryNotFound
}
delete(m.watchers, dir)
logger.Debugf("WatcherManager removed watcher for %s", dir)
return nil
}
// pollLoop is the main polling loop that checks all watched directories
func (m *WatcherManager) pollLoop() {
defer m.wg.Done()
for {
select {
case <-m.ticker.C:
m.checkAllDirectories()
case <-m.done:
return
}
}
}
// checkAllDirectories polls all registered directories for changes
func (m *WatcherManager) checkAllDirectories() {
m.mu.RLock()
watchers := make([]*DirectoryWatcher, 0, len(m.watchers))
for _, w := range m.watchers {
watchers = append(watchers, w)
}
m.mu.RUnlock()
changesDetected := 0
for _, watcher := range watchers {
if watcher.consecutiveErrors > 3 {
if watcher.consecutiveErrors == 4 {
logger.Errorf("Temporarily skipping directory %s due to errors: %v",
watcher.dir, watcher.lastError)
watcher.consecutiveErrors++
}
continue
}
changed, err := watcher.checkForChanges()
if err != nil {
logger.Errorf("Error checking directory %s: %v", watcher.dir, err)
continue
}
if changed {
changesDetected++
watcher.notifyChange()
}
}
}