From 4a17388d836057f7586a1644a3e76bf5d3e17d56 Mon Sep 17 00:00:00 2001 From: Sky Johnson Date: Fri, 7 Mar 2025 07:43:04 -0600 Subject: [PATCH] watchers --- core/http/server.go | 2 +- core/watchers/routerwatchers.go | 44 +++++ core/watchers/watcher.go | 329 ++++++++++++++++++++++++++++++++ moonshark.go | 77 +++++--- 4 files changed, 426 insertions(+), 26 deletions(-) create mode 100644 core/watchers/routerwatchers.go create mode 100644 core/watchers/watcher.go diff --git a/core/http/server.go b/core/http/server.go index daab312..b06a687 100644 --- a/core/http/server.go +++ b/core/http/server.go @@ -36,7 +36,7 @@ func New(luaRouter *routers.LuaRouter, staticRouter *routers.StaticRouter, pool // ListenAndServe starts the server on the given address func (s *Server) ListenAndServe(addr string) error { s.httpServer.Addr = addr - s.logger.Info("Server starting on %s", addr) + s.logger.Info("Server listening at http://localhost%s", addr) return s.httpServer.ListenAndServe() } diff --git a/core/watchers/routerwatchers.go b/core/watchers/routerwatchers.go new file mode 100644 index 0000000..8fb7544 --- /dev/null +++ b/core/watchers/routerwatchers.go @@ -0,0 +1,44 @@ +package watchers + +import ( + "git.sharkk.net/Sky/Moonshark/core/logger" + "git.sharkk.net/Sky/Moonshark/core/routers" +) + +// WatchLuaRouter sets up a watcher for a LuaRouter's routes directory +func WatchLuaRouter(router *routers.LuaRouter, routesDir string, log *logger.Logger) (*Watcher, error) { + config := WatcherConfig{ + Dir: routesDir, + Callback: router.Refresh, + Log: log, + Recursive: true, + Adaptive: true, + } + + watcher, err := WatchDirectory(config) + if err != nil { + return nil, err + } + + log.Info("Started watching Lua routes directory with adaptive polling: %s", routesDir) + return watcher, nil +} + +// WatchStaticRouter sets up a watcher for a StaticRouter's root directory +func WatchStaticRouter(router *routers.StaticRouter, staticDir string, log *logger.Logger) (*Watcher, error) { + config := WatcherConfig{ + Dir: staticDir, + Callback: router.Refresh, + Log: log, + Recursive: true, + Adaptive: true, + } + + watcher, err := WatchDirectory(config) + if err != nil { + return nil, err + } + + log.Info("Started watching static files directory with adaptive polling: %s", staticDir) + return watcher, nil +} diff --git a/core/watchers/watcher.go b/core/watchers/watcher.go new file mode 100644 index 0000000..c984ce2 --- /dev/null +++ b/core/watchers/watcher.go @@ -0,0 +1,329 @@ +package watchers + +import ( + "os" + "path/filepath" + "sync" + "time" + + "git.sharkk.net/Sky/Moonshark/core/logger" +) + +// Default polling intervals +const ( + defaultPollInterval = 1 * time.Second // Initial polling interval + extendedPollInterval = 5 * time.Second // Extended polling interval after inactivity + maxPollInterval = 10 * time.Second // Maximum polling interval after long inactivity + inactivityThreshold = 30 * time.Second // Time before extending polling interval + secondExtendThreshold = 2 * time.Minute // Time before second extension +) + +// Default debounce time between detected change and callback +const defaultDebounceTime = 300 * time.Millisecond + +// FileInfo stores metadata about a file for change detection +type FileInfo struct { + ModTime time.Time + Size int64 + IsDir bool +} + +// Watcher implements a simple polling-based file watcher +type Watcher struct { + // Directory to watch + dir string + + // Map of file paths to their metadata + files map[string]FileInfo + filesMu sync.RWMutex + + // Configuration + callback func() error + log *logger.Logger + pollInterval time.Duration // Current polling interval + basePollInterval time.Duration // Base (starting) polling interval + debounceTime time.Duration + recursive bool + adaptive bool // Whether to use adaptive polling intervals + + // Adaptive polling + lastChangeTime time.Time // When we last detected a change + + // Control channels + done chan struct{} + debounceCh chan struct{} + + // Wait group for shutdown coordination + wg sync.WaitGroup +} + +// WatcherConfig contains configuration for the file watcher +type WatcherConfig struct { + // Directory to watch + Dir string + + // Callback function to call when changes are detected + Callback func() error + + // Logger instance + Log *logger.Logger + + // Poll interval (0 means use default) + PollInterval time.Duration + + // Debounce time (0 means use default) + DebounceTime time.Duration + + // Recursive watching (watch subdirectories) + Recursive bool + + // Use adaptive polling intervals + Adaptive bool +} + +// WatchDirectory sets up filesystem monitoring on a directory +// Returns the watcher for later cleanup and any setup error +func WatchDirectory(config WatcherConfig) (*Watcher, error) { + pollInterval := config.PollInterval + if pollInterval == 0 { + pollInterval = defaultPollInterval + } + + debounceTime := config.DebounceTime + if debounceTime == 0 { + debounceTime = defaultDebounceTime + } + + w := &Watcher{ + dir: config.Dir, + files: make(map[string]FileInfo), + callback: config.Callback, + log: config.Log, + pollInterval: pollInterval, + basePollInterval: pollInterval, + debounceTime: debounceTime, + recursive: config.Recursive, + adaptive: config.Adaptive, + lastChangeTime: time.Now(), + done: make(chan struct{}), + debounceCh: make(chan struct{}, 1), + } + + // Perform initial scan + if err := w.scanDirectory(); err != nil { + return nil, err + } + + // Start the watcher routines + w.wg.Add(2) + go w.watchLoop() + go w.debounceLoop() + + if config.Adaptive { + config.Log.Debug("Started watching directory with adaptive polling: %s", config.Dir) + } else { + config.Log.Debug("Started watching directory: %s", config.Dir) + } + + return w, nil +} + +// Close stops the watcher +func (w *Watcher) Close() error { + close(w.done) + w.wg.Wait() + return nil +} + +// watchLoop periodically scans the directory for changes +func (w *Watcher) watchLoop() { + defer w.wg.Done() + + ticker := time.NewTicker(w.pollInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + changed, err := w.checkForChanges() + if err != nil { + w.log.Error("Error checking for changes: %v", err) + continue + } + + if changed { + // Update last change time + w.lastChangeTime = time.Now() + + // Reset to base polling interval if we were using extended intervals + if w.adaptive && w.pollInterval > w.basePollInterval { + w.pollInterval = w.basePollInterval + ticker.Reset(w.pollInterval) + w.log.Debug("Reset to base polling interval: %v", w.pollInterval) + } + + // Try to send a change notification, non-blocking + select { + case w.debounceCh <- struct{}{}: + default: + // Channel already has a pending notification + } + } else if w.adaptive { + // Consider extending polling interval if enough time has passed since last change + inactiveDuration := time.Since(w.lastChangeTime) + + if w.pollInterval == w.basePollInterval && inactiveDuration > inactivityThreshold { + // First extension + w.pollInterval = extendedPollInterval + ticker.Reset(w.pollInterval) + w.log.Debug("Extended polling interval to %v after %v of inactivity", + w.pollInterval, inactiveDuration.Round(time.Second)) + } else if w.pollInterval == extendedPollInterval && inactiveDuration > secondExtendThreshold { + // Second extension + w.pollInterval = maxPollInterval + ticker.Reset(w.pollInterval) + w.log.Debug("Extended polling interval to %v after %v of inactivity", + w.pollInterval, inactiveDuration.Round(time.Second)) + } + } + + case <-w.done: + return + } + } +} + +// debounceLoop handles debouncing change notifications +func (w *Watcher) debounceLoop() { + defer w.wg.Done() + + var timer *time.Timer + for { + select { + case <-w.debounceCh: + // Cancel existing timer if there is one + if timer != nil { + timer.Stop() + } + + // Start a new timer + timer = time.AfterFunc(w.debounceTime, func() { + if err := w.callback(); err != nil { + w.log.Error("Refresh callback error: %v", err) + } + }) + + case <-w.done: + if timer != nil { + timer.Stop() + } + return + } + } +} + +// scanDirectory builds the initial file list +func (w *Watcher) scanDirectory() error { + w.filesMu.Lock() + defer w.filesMu.Unlock() + + return filepath.Walk(w.dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + w.log.Warning("Error accessing path %s: %v", path, err) + return nil // Continue with other files + } + + // Skip if not recursive and this is a subdirectory + if !w.recursive && info.IsDir() && path != w.dir { + return filepath.SkipDir + } + + w.files[path] = FileInfo{ + ModTime: info.ModTime(), + Size: info.Size(), + IsDir: info.IsDir(), + } + + return nil + }) +} + +// checkForChanges compares current file state with the last scan +func (w *Watcher) checkForChanges() (bool, error) { + // Get current state + currentFiles := make(map[string]FileInfo) + + err := filepath.Walk(w.dir, func(path string, info os.FileInfo, err error) error { + if err != nil { + // File might have been deleted between directory read and stat + return nil + } + + // Skip if not recursive and this is a subdirectory + if !w.recursive && info.IsDir() && path != w.dir { + return filepath.SkipDir + } + + currentFiles[path] = FileInfo{ + ModTime: info.ModTime(), + Size: info.Size(), + IsDir: info.IsDir(), + } + + return nil + }) + + if err != nil { + return false, err + } + + // Compare with previous state + w.filesMu.RLock() + previousFiles := w.files + w.filesMu.RUnlock() + + // Check for different file count (quick check) + if len(currentFiles) != len(previousFiles) { + w.log.Debug("File count changed: %d -> %d", len(previousFiles), len(currentFiles)) + w.updateFiles(currentFiles) + return true, nil + } + + // Check for modified, added, or removed files + for path, currentInfo := range currentFiles { + prevInfo, exists := previousFiles[path] + if !exists { + // New file + w.log.Debug("New file detected: %s", path) + w.updateFiles(currentFiles) + return true, nil + } + + if currentInfo.ModTime != prevInfo.ModTime || currentInfo.Size != prevInfo.Size { + // File modified + w.log.Debug("File modified: %s", path) + w.updateFiles(currentFiles) + return true, nil + } + } + + // Check for deleted files + for path := range previousFiles { + if _, exists := currentFiles[path]; !exists { + // File deleted + w.log.Debug("File deleted: %s", path) + w.updateFiles(currentFiles) + return true, nil + } + } + + // No changes detected + return false, nil +} + +// updateFiles updates the internal file list +func (w *Watcher) updateFiles(newFiles map[string]FileInfo) { + w.filesMu.Lock() + w.files = newFiles + w.filesMu.Unlock() +} diff --git a/moonshark.go b/moonshark.go index ce69bd1..c14ee51 100644 --- a/moonshark.go +++ b/moonshark.go @@ -13,9 +13,37 @@ import ( "git.sharkk.net/Sky/Moonshark/core/logger" "git.sharkk.net/Sky/Moonshark/core/routers" "git.sharkk.net/Sky/Moonshark/core/utils" + "git.sharkk.net/Sky/Moonshark/core/watchers" "git.sharkk.net/Sky/Moonshark/core/workers" ) +// initRouters sets up the Lua and static routers +func initRouters(routesDir, staticDir string, log *logger.Logger) (*routers.LuaRouter, *routers.StaticRouter, error) { + // Ensure directories exist + if err := utils.EnsureDir(routesDir); err != nil { + return nil, nil, fmt.Errorf("routes directory doesn't exist, and could not create it: %v", err) + } + if err := utils.EnsureDir(staticDir); err != nil { + return nil, nil, fmt.Errorf("static directory doesn't exist, and could not create it: %v", err) + } + + // Initialize Lua router for dynamic routes + luaRouter, err := routers.NewLuaRouter(routesDir) + if err != nil { + return nil, nil, fmt.Errorf("failed to initialize Lua router: %v", err) + } + log.Info("Lua router initialized with routes from %s", routesDir) + + // Initialize static file router + staticRouter, err := routers.NewStaticRouter(staticDir) + if err != nil { + return nil, nil, fmt.Errorf("failed to initialize static router: %v", err) + } + log.Info("Static router initialized with files from %s", staticDir) + + return luaRouter, staticRouter, nil +} + func main() { // Initialize logger log := logger.New(logger.LevelDebug, true) @@ -37,18 +65,31 @@ func main() { // Initialize routers routesDir := cfg.GetString("routes_dir", "./routes") staticDir := cfg.GetString("static_dir", "./static") + luaRouter, staticRouter, err := initRouters(routesDir, staticDir, log) + if err != nil { + log.Fatal("Router initialization failed: %v", err) + } + + // Set up file watchers for automatic reloading + luaWatcher, err := watchers.WatchLuaRouter(luaRouter, routesDir, log) + if err != nil { + log.Warning("Failed to watch routes directory: %v", err) + } else { + defer luaWatcher.Close() + log.Info("File watcher active for Lua routes") + } + + staticWatcher, err := watchers.WatchStaticRouter(staticRouter, staticDir, log) + if err != nil { + log.Warning("Failed to watch static directory: %v", err) + } else { + defer staticWatcher.Close() + log.Info("File watcher active for static files") + } // Get worker pool size from config or use default workerPoolSize := cfg.GetInt("worker_pool_size", 4) - // Ensure directories exist - if err = utils.EnsureDir(routesDir); err != nil { - log.Fatal("Routes directory doesn't exist, and could not create it: %v", err) - } - if err = utils.EnsureDir(staticDir); err != nil { - log.Fatal("Static directory doesn't exist, and could not create it: %v", err) - } - // Initialize worker pool pool, err := workers.NewPool(workerPoolSize) if err != nil { @@ -57,20 +98,6 @@ func main() { log.Info("Worker pool initialized with %d workers", workerPoolSize) defer pool.Shutdown() - // Initialize Lua router for dynamic routes - luaRouter, err := routers.NewLuaRouter(routesDir) - if err != nil { - log.Fatal("Failed to initialize Lua router: %v", err) - } - log.Info("Lua router initialized with routes from %s", routesDir) - - // Initialize static file router - staticRouter, err := routers.NewStaticRouter(staticDir) - if err != nil { - log.Fatal("Failed to initialize static router: %v", err) - } - log.Info("Static router initialized with files from %s", staticDir) - // Create HTTP server server := http.New(luaRouter, staticRouter, pool, log) @@ -80,15 +107,15 @@ func main() { // Start server in a goroutine go func() { - addr := fmt.Sprintf(":%d", port) - log.Info("Server listening on http://localhost%s", addr) - if err := server.ListenAndServe(addr); err != nil { + if err := server.ListenAndServe(fmt.Sprintf(":%d", port)); err != nil { if err.Error() != "http: Server closed" { log.Error("Server error: %v", err) } } }() + log.Info("Server started on port %d", port) + // Wait for interrupt signal <-stop log.Info("Shutdown signal received")