Nigiri/wal.go

169 lines
3.2 KiB
Go

package nigiri
import (
"bufio"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"time"
)
type WALEntry struct {
Timestamp time.Time `json:"timestamp"`
Store string `json:"store"`
Operation string `json:"operation"`
ID int `json:"id,omitempty"`
Data any `json:"data,omitempty"`
SequenceNum uint64 `json:"seq"`
}
type WAL struct {
file *os.File
encoder *json.Encoder
mu sync.Mutex
sequenceNum uint64
writeChan chan *WALEntry
flushTimer *time.Timer
baseDir string
}
func newWAL(baseDir string) (*WAL, error) {
walPath := filepath.Join(baseDir, "wal.log")
file, err := os.OpenFile(walPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return nil, err
}
wal := &WAL{
file: file,
encoder: json.NewEncoder(file),
writeChan: make(chan *WALEntry, 1000),
baseDir: baseDir,
}
go wal.writeLoop()
wal.flushTimer = time.AfterFunc(5*time.Second, wal.periodicFlush)
return wal, nil
}
func (w *WAL) writeLoop() {
for entry := range w.writeChan {
w.mu.Lock()
w.sequenceNum++
entry.SequenceNum = w.sequenceNum
w.encoder.Encode(entry)
w.mu.Unlock()
}
}
func (w *WAL) logOperation(store, operation string, id int, data any) {
entry := &WALEntry{
Timestamp: time.Now(),
Store: store,
Operation: operation,
ID: id,
Data: data,
}
select {
case w.writeChan <- entry:
// Non-blocking async write (~1µs)
default:
// Channel full - sync write (~0.1-1ms)
w.mu.Lock()
w.sequenceNum++
entry.SequenceNum = w.sequenceNum
w.encoder.Encode(entry)
w.mu.Unlock()
}
}
func (w *WAL) flush() error {
w.mu.Lock()
defer w.mu.Unlock()
return w.file.Sync()
}
func (w *WAL) periodicFlush() {
w.flush()
w.flushTimer.Reset(5 * time.Second)
}
func (w *WAL) close() error {
close(w.writeChan)
w.flushTimer.Stop()
w.flush()
return w.file.Close()
}
func (w *WAL) recover(collection *Collection) error {
walPath := filepath.Join(w.baseDir, "wal.log")
file, err := os.Open(walPath)
if os.IsNotExist(err) {
return nil
}
if err != nil {
return err
}
defer file.Close()
scanner := bufio.NewScanner(file)
entriesApplied := 0
for scanner.Scan() {
var entry WALEntry
if err := json.Unmarshal(scanner.Bytes(), &entry); err != nil {
continue
}
if err := w.applyEntry(collection, &entry); err == nil {
entriesApplied++
}
}
if entriesApplied > 0 {
fmt.Printf("Recovered %d WAL entries\n", entriesApplied)
}
return w.truncate()
}
func (w *WAL) applyEntry(collection *Collection, entry *WALEntry) error {
collection.mu.Lock()
defer collection.mu.Unlock()
store, exists := collection.stores[entry.Store]
if !exists {
return fmt.Errorf("store %s not found", entry.Store)
}
switch entry.Operation {
case "remove":
store.(*BaseStore[any]).RemoveUnsafe(entry.ID)
case "clear":
store.Clear()
}
return nil
}
func (w *WAL) truncate() error {
w.mu.Lock()
defer w.mu.Unlock()
w.file.Close()
walPath := filepath.Join(w.baseDir, "wal.log")
os.Remove(walPath)
file, err := os.OpenFile(walPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
return err
}
w.file = file
w.encoder = json.NewEncoder(file)
w.sequenceNum = 0
return nil
}