package nigiri import ( "bufio" "encoding/json" "fmt" "os" "path/filepath" "sync" "time" ) type WALEntry struct { Timestamp time.Time `json:"timestamp"` Store string `json:"store"` Operation string `json:"operation"` ID int `json:"id,omitempty"` Data any `json:"data,omitempty"` SequenceNum uint64 `json:"seq"` } type WAL struct { file *os.File encoder *json.Encoder mu sync.Mutex sequenceNum uint64 writeChan chan *WALEntry flushTimer *time.Timer baseDir string } func newWAL(baseDir string) (*WAL, error) { walPath := filepath.Join(baseDir, "wal.log") file, err := os.OpenFile(walPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) if err != nil { return nil, err } wal := &WAL{ file: file, encoder: json.NewEncoder(file), writeChan: make(chan *WALEntry, 1000), baseDir: baseDir, } go wal.writeLoop() wal.flushTimer = time.AfterFunc(5*time.Second, wal.periodicFlush) return wal, nil } func (w *WAL) writeLoop() { for entry := range w.writeChan { w.mu.Lock() w.sequenceNum++ entry.SequenceNum = w.sequenceNum w.encoder.Encode(entry) w.mu.Unlock() } } func (w *WAL) logOperation(store, operation string, id int, data any) { entry := &WALEntry{ Timestamp: time.Now(), Store: store, Operation: operation, ID: id, Data: data, } select { case w.writeChan <- entry: // Non-blocking async write (~1µs) default: // Channel full - sync write (~0.1-1ms) w.mu.Lock() w.sequenceNum++ entry.SequenceNum = w.sequenceNum w.encoder.Encode(entry) w.mu.Unlock() } } func (w *WAL) flush() error { w.mu.Lock() defer w.mu.Unlock() return w.file.Sync() } func (w *WAL) periodicFlush() { w.flush() w.flushTimer.Reset(5 * time.Second) } func (w *WAL) close() error { close(w.writeChan) w.flushTimer.Stop() w.flush() return w.file.Close() } func (w *WAL) recover(collection *Collection) error { walPath := filepath.Join(w.baseDir, "wal.log") file, err := os.Open(walPath) if os.IsNotExist(err) { return nil } if err != nil { return err } defer file.Close() scanner := bufio.NewScanner(file) entriesApplied := 0 for scanner.Scan() { var entry WALEntry if err := json.Unmarshal(scanner.Bytes(), &entry); err != nil { continue } if err := w.applyEntry(collection, &entry); err == nil { entriesApplied++ } } if entriesApplied > 0 { fmt.Printf("Recovered %d WAL entries\n", entriesApplied) } return w.truncate() } func (w *WAL) applyEntry(collection *Collection, entry *WALEntry) error { collection.mu.Lock() defer collection.mu.Unlock() store, exists := collection.stores[entry.Store] if !exists { return fmt.Errorf("store %s not found", entry.Store) } switch entry.Operation { case "remove": store.(*BaseStore[any]).RemoveUnsafe(entry.ID) case "clear": store.Clear() } return nil } func (w *WAL) truncate() error { w.mu.Lock() defer w.mu.Unlock() w.file.Close() walPath := filepath.Join(w.baseDir, "wal.log") os.Remove(walPath) file, err := os.OpenFile(walPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) if err != nil { return err } w.file = file w.encoder = json.NewEncoder(file) w.sequenceNum = 0 return nil }