169 lines
3.2 KiB
Go
169 lines
3.2 KiB
Go
package nigiri
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type WALEntry struct {
|
|
Timestamp time.Time `json:"timestamp"`
|
|
Store string `json:"store"`
|
|
Operation string `json:"operation"`
|
|
ID int `json:"id,omitempty"`
|
|
Data any `json:"data,omitempty"`
|
|
SequenceNum uint64 `json:"seq"`
|
|
}
|
|
|
|
type WAL struct {
|
|
file *os.File
|
|
encoder *json.Encoder
|
|
mu sync.Mutex
|
|
sequenceNum uint64
|
|
writeChan chan *WALEntry
|
|
flushTimer *time.Timer
|
|
baseDir string
|
|
}
|
|
|
|
func newWAL(baseDir string) (*WAL, error) {
|
|
walPath := filepath.Join(baseDir, "wal.log")
|
|
file, err := os.OpenFile(walPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
wal := &WAL{
|
|
file: file,
|
|
encoder: json.NewEncoder(file),
|
|
writeChan: make(chan *WALEntry, 1000),
|
|
baseDir: baseDir,
|
|
}
|
|
|
|
go wal.writeLoop()
|
|
wal.flushTimer = time.AfterFunc(5*time.Second, wal.periodicFlush)
|
|
return wal, nil
|
|
}
|
|
|
|
func (w *WAL) writeLoop() {
|
|
for entry := range w.writeChan {
|
|
w.mu.Lock()
|
|
w.sequenceNum++
|
|
entry.SequenceNum = w.sequenceNum
|
|
w.encoder.Encode(entry)
|
|
w.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
func (w *WAL) logOperation(store, operation string, id int, data any) {
|
|
entry := &WALEntry{
|
|
Timestamp: time.Now(),
|
|
Store: store,
|
|
Operation: operation,
|
|
ID: id,
|
|
Data: data,
|
|
}
|
|
|
|
select {
|
|
case w.writeChan <- entry:
|
|
// Non-blocking async write (~1µs)
|
|
default:
|
|
// Channel full - sync write (~0.1-1ms)
|
|
w.mu.Lock()
|
|
w.sequenceNum++
|
|
entry.SequenceNum = w.sequenceNum
|
|
w.encoder.Encode(entry)
|
|
w.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
func (w *WAL) flush() error {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
return w.file.Sync()
|
|
}
|
|
|
|
func (w *WAL) periodicFlush() {
|
|
w.flush()
|
|
w.flushTimer.Reset(5 * time.Second)
|
|
}
|
|
|
|
func (w *WAL) close() error {
|
|
close(w.writeChan)
|
|
w.flushTimer.Stop()
|
|
w.flush()
|
|
return w.file.Close()
|
|
}
|
|
|
|
func (w *WAL) recover(collection *Collection) error {
|
|
walPath := filepath.Join(w.baseDir, "wal.log")
|
|
file, err := os.Open(walPath)
|
|
if os.IsNotExist(err) {
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer file.Close()
|
|
|
|
scanner := bufio.NewScanner(file)
|
|
entriesApplied := 0
|
|
|
|
for scanner.Scan() {
|
|
var entry WALEntry
|
|
if err := json.Unmarshal(scanner.Bytes(), &entry); err != nil {
|
|
continue
|
|
}
|
|
|
|
if err := w.applyEntry(collection, &entry); err == nil {
|
|
entriesApplied++
|
|
}
|
|
}
|
|
|
|
if entriesApplied > 0 {
|
|
fmt.Printf("Recovered %d WAL entries\n", entriesApplied)
|
|
}
|
|
return w.truncate()
|
|
}
|
|
|
|
func (w *WAL) applyEntry(collection *Collection, entry *WALEntry) error {
|
|
collection.mu.Lock()
|
|
defer collection.mu.Unlock()
|
|
|
|
store, exists := collection.stores[entry.Store]
|
|
if !exists {
|
|
return fmt.Errorf("store %s not found", entry.Store)
|
|
}
|
|
|
|
switch entry.Operation {
|
|
case "remove":
|
|
store.(*BaseStore[any]).RemoveUnsafe(entry.ID)
|
|
case "clear":
|
|
store.Clear()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (w *WAL) truncate() error {
|
|
w.mu.Lock()
|
|
defer w.mu.Unlock()
|
|
|
|
w.file.Close()
|
|
walPath := filepath.Join(w.baseDir, "wal.log")
|
|
|
|
os.Remove(walPath)
|
|
|
|
file, err := os.OpenFile(walPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
w.file = file
|
|
w.encoder = json.NewEncoder(file)
|
|
w.sequenceNum = 0
|
|
return nil
|
|
}
|