From 25ad394f1733e99ad261f5462e39baa7029fb482 Mon Sep 17 00:00:00 2001 From: Sky Johnson Date: Sat, 16 Aug 2025 19:02:36 -0500 Subject: [PATCH] add WAL, reorganize all files for readibility and simplicity --- collection.go | 97 ++++++++++++++++--- migrate.go | 54 +++++++---- persistence.go | 23 ++--- schema.go | 82 +++++++++------- store.go | 248 +++++++++++++++++++++++++++---------------------- wal.go | 168 +++++++++++++++++++++++++++++++++ 6 files changed, 484 insertions(+), 188 deletions(-) create mode 100644 wal.go diff --git a/collection.go b/collection.go index dcf5926..0375991 100644 --- a/collection.go +++ b/collection.go @@ -16,30 +16,58 @@ type StoreManager interface { SetValidator(validator any) } +type WALLogger interface { + LogOperation(store, operation string, id int, data any) +} + type Collection struct { baseDir string stores map[string]StoreManager migrator *Migrator + wal *WAL mu sync.RWMutex } +// ============================================================================ +// Core Collection Management +// ============================================================================ + func NewCollection(baseDir string) *Collection { return &Collection{ - baseDir: baseDir, - stores: make(map[string]StoreManager), - migrator: nil, // Lazy initialized + baseDir: baseDir, + stores: make(map[string]StoreManager), } } -// Add registers store and auto-loads if file exists +func (c *Collection) Close() error { + if err := c.Checkpoint(); err != nil { + return err + } + if c.wal != nil { + return c.wal.close() + } + return nil +} + +// ============================================================================ +// Store Management +// ============================================================================ + func (c *Collection) Add(name string, store StoreManager) error { + if err := c.ensureWAL(); err != nil { + return err + } + c.mu.Lock() defer c.mu.Unlock() c.stores[name] = store store.SetValidator(c) - // Auto-load if file exists + if walStore, ok := store.(interface{ SetWALLogger(WALLogger, string) }); ok { + walStore.SetWALLogger(c, name) + } + path := filepath.Join(c.baseDir, name+".json") if _, err := os.Stat(path); err == nil { return store.LoadData(path) @@ -47,7 +75,6 @@ func (c *Collection) Add(name string, store StoreManager) error { return nil } -// Get returns typed store func Get[T any](c *Collection, name string) *BaseStore[T] { c.mu.RLock() defer c.mu.RUnlock() @@ -60,7 +87,10 @@ func Get[T any](c *Collection, name string) *BaseStore[T] { return nil } -// Load loads all stores +// ============================================================================ +// Persistence Operations +// ============================================================================ + func (c *Collection) Load() error { c.mu.RLock() defer c.mu.RUnlock() @@ -76,7 +106,6 @@ func (c *Collection) Load() error { return nil } -// Save saves all stores func (c *Collection) Save() error { c.mu.RLock() defer c.mu.RUnlock() @@ -92,7 +121,6 @@ func (c *Collection) Save() error { return nil } -// Clear clears all stores func (c *Collection) Clear() { c.mu.RLock() defer c.mu.RUnlock() @@ -102,7 +130,27 @@ func (c *Collection) Clear() { } } -// EntityExists implements relationship validation +func (c *Collection) Checkpoint() error { + if c.wal != nil { + if err := c.wal.flush(); err != nil { + return err + } + } + + if err := c.Save(); err != nil { + return err + } + + if c.wal != nil { + return c.wal.truncate() + } + return nil +} + +// ============================================================================ +// Validation & Relationships +// ============================================================================ + func (c *Collection) EntityExists(entityName string, id int) bool { c.mu.RLock() defer c.mu.RUnlock() @@ -113,7 +161,34 @@ func (c *Collection) EntityExists(entityName string, id int) bool { return false } -// Migration methods +// ============================================================================ +// WAL Operations +// ============================================================================ + +func (c *Collection) LogOperation(store, operation string, id int, data any) { + if c.wal != nil { + c.wal.logOperation(store, operation, id, data) + } +} + +func (c *Collection) ensureWAL() error { + if c.wal != nil { + return nil + } + + wal, err := newWAL(c.baseDir) + if err != nil { + return err + } + + c.wal = wal + return wal.recover(c) +} + +// ============================================================================ +// Migration Operations +// ============================================================================ + func (c *Collection) GetMigrator() *Migrator { if c.migrator == nil { c.migrator = NewMigrator() diff --git a/migrate.go b/migrate.go index add2656..297f09e 100644 --- a/migrate.go +++ b/migrate.go @@ -27,6 +27,10 @@ type Migrator struct { patterns map[string]*regexp.Regexp } +// ============================================================================ +// Core Migration Management +// ============================================================================ + func NewMigrator() *Migrator { m := &Migrator{ handlers: make(map[string]MigrationHandler), @@ -47,6 +51,10 @@ func (m *Migrator) RegisterCommand(name string, pattern *regexp.Regexp, handler m.handlers[name] = handler } +// ============================================================================ +// Command Parsing +// ============================================================================ + func (m *Migrator) ParseCommand(input string) (*MigrationCommand, string, error) { input = strings.TrimSpace(input) @@ -64,7 +72,6 @@ func (m *Migrator) ParseCommand(input string) (*MigrationCommand, string, error) case "change": cmd.Field, cmd.Type = matches[1], matches[2] default: - // Allow custom commands to handle their own parsing cmd.Field = matches[1] if len(matches) > 2 { cmd.To = matches[2] @@ -95,14 +102,17 @@ func (m *Migrator) ApplyCommand(data []byte, cmdStr string) ([]byte, error) { return nil, fmt.Errorf("invalid JSON: %w", err) } - handler := m.handlers[name] - if err := handler(items, cmd); err != nil { + if err := m.handlers[name](items, cmd); err != nil { return nil, err } return json.MarshalIndent(items, "", "\t") } +// ============================================================================ +// File Operations +// ============================================================================ + func (m *Migrator) MigrateFile(filename, command string) error { data, err := os.ReadFile(filename) if err != nil { @@ -114,7 +124,6 @@ func (m *Migrator) MigrateFile(filename, command string) error { return err } - // Backup and write backupPath := filename + ".backup" if err := os.WriteFile(backupPath, data, 0644); err != nil { return fmt.Errorf("create backup: %w", err) @@ -166,7 +175,10 @@ func (m *Migrator) RunScript(dataDir, scriptFile string) error { return scanner.Err() } -// Built-in handlers +// ============================================================================ +// Built-in Command Handlers +// ============================================================================ + func (m *Migrator) handleRename(items []map[string]any, cmd *MigrationCommand) error { for _, item := range items { if val, exists := item[cmd.Field]; exists { @@ -207,6 +219,10 @@ func (m *Migrator) handleChange(items []map[string]any, cmd *MigrationCommand) e return nil } +// ============================================================================ +// Type Conversion Utilities +// ============================================================================ + func getDefaultValue(fieldType string) any { switch fieldType { case "string": @@ -254,7 +270,10 @@ func convertType(val any, targetType string) (any, error) { } } -// CLI wrapper for host applications +// ============================================================================ +// CLI Interface +// ============================================================================ + type MigrationCLI struct { collection *Collection } @@ -269,21 +288,18 @@ func (cli *MigrationCLI) Run(args []string) error { } if strings.HasSuffix(args[0], ".txt") { - // Script mode return cli.collection.RunMigrationScript(args[0]) - } else { - // Single command mode - if len(args) < 2 { - return fmt.Errorf("usage: migrate 'command'") - } - - // Extract store name from filename - filename := args[0] - storeName := strings.TrimSuffix(filepath.Base(filename), ".json") - command := args[1] - - return cli.collection.MigrateStore(storeName, command) } + + if len(args) < 2 { + return fmt.Errorf("usage: migrate 'command'") + } + + filename := args[0] + storeName := strings.TrimSuffix(filepath.Base(filename), ".json") + command := args[1] + + return cli.collection.MigrateStore(storeName, command) } func (cli *MigrationCLI) PrintUsage() { diff --git a/persistence.go b/persistence.go index 52796fd..d3c4437 100644 --- a/persistence.go +++ b/persistence.go @@ -9,6 +9,10 @@ import ( "sort" ) +// ============================================================================ +// Core JSON Operations +// ============================================================================ + func (bs *BaseStore[T]) LoadFromJSON(filename string) error { bs.mu.Lock() defer bs.mu.Unlock() @@ -25,7 +29,6 @@ func (bs *BaseStore[T]) LoadFromJSON(filename string) error { return nil } - // Create slice of pointers to T sliceType := reflect.SliceOf(reflect.PointerTo(bs.itemType)) slicePtr := reflect.New(sliceType) @@ -33,21 +36,17 @@ func (bs *BaseStore[T]) LoadFromJSON(filename string) error { return fmt.Errorf("failed to unmarshal JSON: %w", err) } - // Clear existing data bs.items = make(map[int]*T) bs.maxID = 0 - // Clear unique indices for fieldName := range bs.uniqueIndices { bs.uniqueIndices[fieldName] = make(map[any]int) } - // Extract items using reflection slice := slicePtr.Elem() for i := 0; i < slice.Len(); i++ { item := slice.Index(i).Interface().(*T) - // Get ID using reflection itemValue := reflect.ValueOf(item).Elem() idField := itemValue.FieldByName("ID") if !idField.IsValid() { @@ -60,7 +59,6 @@ func (bs *BaseStore[T]) LoadFromJSON(filename string) error { bs.maxID = id } - // Update unique indices bs.updateUniqueIndices(id, item, true) } @@ -71,14 +69,12 @@ func (bs *BaseStore[T]) SaveToJSON(filename string) error { bs.mu.RLock() defer bs.mu.RUnlock() - // Get sorted IDs for consistent ordering ids := make([]int, 0, len(bs.items)) for id := range bs.items { ids = append(ids, id) } sort.Ints(ids) - // Build items slice in ID order items := make([]*T, 0, len(bs.items)) for _, id := range ids { items = append(items, bs.items[id]) @@ -89,7 +85,6 @@ func (bs *BaseStore[T]) SaveToJSON(filename string) error { return fmt.Errorf("failed to marshal to JSON: %w", err) } - // Atomic write tempFile := filename + ".tmp" if err := os.WriteFile(tempFile, data, 0644); err != nil { return fmt.Errorf("failed to write temp JSON: %w", err) @@ -103,6 +98,10 @@ func (bs *BaseStore[T]) SaveToJSON(filename string) error { return nil } +// ============================================================================ +// StoreManager Interface Implementation +// ============================================================================ + func (bs *BaseStore[T]) LoadData(dataPath string) error { if err := bs.LoadFromJSON(dataPath); err != nil { if os.IsNotExist(err) { @@ -131,9 +130,11 @@ func (bs *BaseStore[T]) SaveData(dataPath string) error { return nil } -// Migration support methods +// ============================================================================ +// Migration Support +// ============================================================================ + func (bs *BaseStore[T]) ValidateAfterMigration(filename string) error { - // Try to load the migrated file to ensure it's valid tempStore := NewBaseStore[T]() if err := tempStore.LoadFromJSON(filename); err != nil { return fmt.Errorf("migration validation failed: %w", err) diff --git a/schema.go b/schema.go index 6357aa7..74f4356 100644 --- a/schema.go +++ b/schema.go @@ -5,6 +5,10 @@ import ( "strings" ) +// ============================================================================ +// Type Definitions +// ============================================================================ + type ConstraintType string const ( @@ -43,10 +47,14 @@ type SchemaInfo struct { Relationships map[string]FieldConstraint } +// ============================================================================ +// Schema Parsing +// ============================================================================ + func ParseSchema[T any]() *SchemaInfo { var zero T t := reflect.TypeOf(zero) - if t.Kind() == reflect.Ptr { + if t.Kind() == reflect.Pointer { t = t.Elem() } @@ -63,22 +71,17 @@ func ParseSchema[T any]() *SchemaInfo { fieldType := field.Type schema.Fields[fieldName] = fieldType - // Check for relationship patterns in field type if relationship := detectRelationship(fieldName, fieldType); relationship != nil { schema.Relationships[fieldName] = *relationship schema.Constraints[fieldName] = append(schema.Constraints[fieldName], *relationship) } - // Parse explicit db tags - dbTag := field.Tag.Get("db") - if dbTag != "" { - constraints := parseDBTag(fieldName, dbTag) - if len(constraints) > 0 { + if dbTag := field.Tag.Get("db"); dbTag != "" { + if constraints := parseDBTag(fieldName, dbTag); len(constraints) > 0 { schema.Constraints[fieldName] = append(schema.Constraints[fieldName], constraints...) } } - // Auto-create indices for unique and indexed fields for _, constraint := range schema.Constraints[fieldName] { if constraint.Type == ConstraintUnique || constraint.Type == ConstraintIndex { indexName := constraint.IndexName @@ -93,10 +96,13 @@ func ParseSchema[T any]() *SchemaInfo { return schema } +// ============================================================================ +// Relationship Detection +// ============================================================================ + func detectRelationship(fieldName string, fieldType reflect.Type) *FieldConstraint { switch fieldType.Kind() { case reflect.Pointer: - // *EntityType = many-to-one elemType := fieldType.Elem() if isEntityType(elemType) { return &FieldConstraint{ @@ -109,7 +115,6 @@ func detectRelationship(fieldName string, fieldType reflect.Type) *FieldConstrai } case reflect.Slice: - // []*EntityType = one-to-many elemType := fieldType.Elem() if elemType.Kind() == reflect.Pointer { ptrTargetType := elemType.Elem() @@ -128,35 +133,15 @@ func detectRelationship(fieldName string, fieldType reflect.Type) *FieldConstrai return nil } -func isEntityType(t reflect.Type) bool { - if t.Kind() != reflect.Struct { - return false - } - - // Check if it has an ID field - for i := 0; i < t.NumField(); i++ { - field := t.Field(i) - if field.Name == "ID" && field.Type.Kind() == reflect.Int { - return true - } - } - - return false -} - -func getEntityName(t reflect.Type) string { - name := t.Name() - if name == "" { - name = t.String() - } - return strings.ToLower(name) -} +// ============================================================================ +// Tag Parsing +// ============================================================================ func parseDBTag(fieldName, tag string) []FieldConstraint { var constraints []FieldConstraint - parts := strings.Split(tag, ",") + parts := strings.SplitSeq(tag, ",") - for _, part := range parts { + for part := range parts { part = strings.TrimSpace(part) if part == "" { continue @@ -197,3 +182,30 @@ func parseDBTag(fieldName, tag string) []FieldConstraint { return constraints } + +// ============================================================================ +// Utility Functions +// ============================================================================ + +func isEntityType(t reflect.Type) bool { + if t.Kind() != reflect.Struct { + return false + } + + for i := 0; i < t.NumField(); i++ { + field := t.Field(i) + if field.Name == "ID" && field.Type.Kind() == reflect.Int { + return true + } + } + + return false +} + +func getEntityName(t reflect.Type) string { + name := t.Name() + if name == "" { + name = t.String() + } + return strings.ToLower(name) +} diff --git a/store.go b/store.go index a7821ab..8af722a 100644 --- a/store.go +++ b/store.go @@ -26,15 +26,20 @@ type BaseStore[T any] struct { schema *SchemaInfo uniqueIndices map[string]map[any]int validator any + walLogger WALLogger + storeName string } +// ============================================================================ +// Core Store Management +// ============================================================================ + func NewBaseStore[T any]() *BaseStore[T] { var zero T schema := ParseSchema[T]() store := &BaseStore[T]{ items: make(map[int]*T), - maxID: 0, itemType: reflect.TypeOf(zero), indices: make(map[string]any), indexBuilders: make(map[string]IndexBuilder[T]), @@ -54,18 +59,17 @@ func NewBaseStore[T any]() *BaseStore[T] { return store } -func (bs *BaseStore[T]) registerSchemaIndices() { - for fieldName, indexName := range bs.schema.Indices { - bs.RegisterIndex(indexName, BuildFieldLookupIndex[T](fieldName)) - } -} - func (bs *BaseStore[T]) SetValidator(validator any) { bs.mu.Lock() defer bs.mu.Unlock() bs.validator = validator } +func (bs *BaseStore[T]) SetWALLogger(logger WALLogger, name string) { + bs.walLogger = logger + bs.storeName = name +} + func (bs *BaseStore[T]) EntityExists(id int) bool { bs.mu.RLock() defer bs.mu.RUnlock() @@ -73,6 +77,35 @@ func (bs *BaseStore[T]) EntityExists(id int) bool { return exists } +func (bs *BaseStore[T]) GetNextID() int { + bs.mu.Lock() + defer bs.mu.Unlock() + bs.maxID++ + return bs.maxID +} + +func (bs *BaseStore[T]) Clear() { + bs.mu.Lock() + defer bs.mu.Unlock() + + if bs.walLogger != nil { + bs.walLogger.LogOperation(bs.storeName, "clear", 0, nil) + } + + bs.items = make(map[int]*T) + bs.maxID = 0 + + for fieldName := range bs.uniqueIndices { + bs.uniqueIndices[fieldName] = make(map[any]int) + } + + bs.rebuildIndicesUnsafe() +} + +// ============================================================================ +// Validation & Constraints +// ============================================================================ + func (bs *BaseStore[T]) ValidateConstraints(id int, item *T) error { itemValue := reflect.ValueOf(item).Elem() @@ -176,6 +209,16 @@ func (bs *BaseStore[T]) updateUniqueIndices(id int, item *T, add bool) { } } +// ============================================================================ +// Indexing +// ============================================================================ + +func (bs *BaseStore[T]) registerSchemaIndices() { + for fieldName, indexName := range bs.schema.Indices { + bs.RegisterIndex(indexName, BuildFieldLookupIndex[T](fieldName)) + } +} + func (bs *BaseStore[T]) RegisterIndex(name string, builder IndexBuilder[T]) { bs.mu.Lock() defer bs.mu.Unlock() @@ -204,7 +247,10 @@ func (bs *BaseStore[T]) rebuildIndicesUnsafe() { } } -// Main CRUD operations with validation +// ============================================================================ +// CRUD Operations +// ============================================================================ + func (bs *BaseStore[T]) Add(id int, item *T) error { bs.mu.Lock() defer bs.mu.Unlock() @@ -223,6 +269,10 @@ func (bs *BaseStore[T]) Add(id int, item *T) error { } } + if bs.walLogger != nil { + bs.walLogger.LogOperation(bs.storeName, "add", id, item) + } + bs.updateUniqueIndices(id, item, true) bs.items[id] = item if id > bs.maxID { @@ -236,10 +286,20 @@ func (bs *BaseStore[T]) Update(id int, item *T) error { return bs.Add(id, item) } +func (bs *BaseStore[T]) Create(item *T) (int, error) { + id := bs.GetNextID() + err := bs.Add(id, item) + return id, err +} + func (bs *BaseStore[T]) Remove(id int) { bs.mu.Lock() defer bs.mu.Unlock() + if bs.walLogger != nil { + bs.walLogger.LogOperation(bs.storeName, "remove", id, nil) + } + if item, exists := bs.items[id]; exists { bs.updateUniqueIndices(id, item, false) } @@ -247,13 +307,7 @@ func (bs *BaseStore[T]) Remove(id int) { bs.rebuildIndicesUnsafe() } -func (bs *BaseStore[T]) Create(item *T) (int, error) { - id := bs.GetNextID() - err := bs.Add(id, item) - return id, err -} - -// Unsafe operations for performance +// Unsafe operations for performance-critical paths func (bs *BaseStore[T]) AddUnsafe(id int, item *T) { bs.mu.Lock() defer bs.mu.Unlock() @@ -269,6 +323,10 @@ func (bs *BaseStore[T]) RemoveUnsafe(id int) { delete(bs.items, id) } +// ============================================================================ +// Querying & Retrieval +// ============================================================================ + func (bs *BaseStore[T]) Find(id int) (*T, bool) { bs.mu.RLock() defer bs.mu.RUnlock() @@ -396,27 +454,74 @@ func (bs *BaseStore[T]) FilterByIndex(indexName string, filterFunc func(*T) bool return result } -func (bs *BaseStore[T]) GetNextID() int { +// ============================================================================ +// Migration Operations +// ============================================================================ + +func (bs *BaseStore[T]) ApplyMigration(migrationFunc func(map[int]*T) error) error { bs.mu.Lock() defer bs.mu.Unlock() - bs.maxID++ - return bs.maxID -} -func (bs *BaseStore[T]) Clear() { - bs.mu.Lock() - defer bs.mu.Unlock() - bs.items = make(map[int]*T) - bs.maxID = 0 + itemsCopy := make(map[int]*T, len(bs.items)) + maps.Copy(itemsCopy, bs.items) - for fieldName := range bs.uniqueIndices { - bs.uniqueIndices[fieldName] = make(map[any]int) + if err := migrationFunc(itemsCopy); err != nil { + return fmt.Errorf("migration failed: %w", err) } + bs.items = itemsCopy bs.rebuildIndicesUnsafe() + return nil } +func (bs *BaseStore[T]) MigrateFromJSON(filename string, migrationCommands []string, migrator *Migrator) error { + data, err := os.ReadFile(filename) + if err != nil { + return fmt.Errorf("failed to read file: %w", err) + } + + result := data + for _, command := range migrationCommands { + result, err = migrator.ApplyCommand(result, command) + if err != nil { + return fmt.Errorf("command '%s' failed: %w", command, err) + } + } + + backupPath := filename + ".backup" + if err := os.WriteFile(backupPath, data, 0644); err != nil { + return fmt.Errorf("failed to create backup: %w", err) + } + + tempFile := filename + ".migrated" + if err := os.WriteFile(tempFile, result, 0644); err != nil { + return fmt.Errorf("failed to write migrated data: %w", err) + } + + if err := bs.ValidateAfterMigration(tempFile); err != nil { + os.Remove(tempFile) + return err + } + + if err := os.Rename(tempFile, filename); err != nil { + return fmt.Errorf("failed to replace original file: %w", err) + } + + return bs.LoadFromJSON(filename) +} + +func (bs *BaseStore[T]) BatchMigrate(filename string, commands []string) error { + if len(commands) == 0 { + return nil + } + migrator := NewMigrator() + return bs.MigrateFromJSON(filename, commands, migrator) +} + +// ============================================================================ // Index Builder Functions +// ============================================================================ + func BuildFieldLookupIndex[T any](fieldName string) IndexBuilder[T] { return func(allItems map[int]*T) any { index := make(map[string]int) @@ -426,7 +531,6 @@ func BuildFieldLookupIndex[T any](fieldName string) IndexBuilder[T] { if !fieldValue.IsValid() { continue } - key := fmt.Sprintf("%v", fieldValue.Interface()) index[key] = id } @@ -438,8 +542,7 @@ func BuildStringLookupIndex[T any](keyFunc func(*T) string) IndexBuilder[T] { return func(allItems map[int]*T) any { index := make(map[string]int) for id, item := range allItems { - key := keyFunc(item) - index[key] = id + index[keyFunc(item)] = id } return index } @@ -463,11 +566,9 @@ func BuildIntGroupIndex[T any](keyFunc func(*T) int) IndexBuilder[T] { key := keyFunc(item) index[key] = append(index[key], id) } - for key := range index { sort.Ints(index[key]) } - return index } } @@ -479,11 +580,9 @@ func BuildStringGroupIndex[T any](keyFunc func(*T) string) IndexBuilder[T] { key := keyFunc(item) index[key] = append(index[key], id) } - for key := range index { sort.Ints(index[key]) } - return index } } @@ -494,11 +593,9 @@ func BuildSortedListIndex[T any](sortFunc func(*T, *T) bool) IndexBuilder[T] { for id := range allItems { ids = append(ids, id) } - sort.Slice(ids, func(i, j int) bool { return sortFunc(allItems[ids[i]], allItems[ids[j]]) }) - return ids } } @@ -512,11 +609,9 @@ func BuildFilteredIntGroupIndex[T any](filterFunc func(*T) bool, keyFunc func(*T index[key] = append(index[key], id) } } - for key := range index { sort.Ints(index[key]) } - return index } } @@ -530,19 +625,20 @@ func BuildFilteredStringGroupIndex[T any](filterFunc func(*T) bool, keyFunc func index[key] = append(index[key], id) } } - for key := range index { sort.Ints(index[key]) } - return index } } +// ============================================================================ +// Utility Functions +// ============================================================================ + func NewSingleton[S any](initFunc func() *S) func() *S { var store *S var once sync.Once - return func() *S { once.Do(func() { store = initFunc() @@ -550,75 +646,3 @@ func NewSingleton[S any](initFunc func() *S) func() *S { return store } } - -func (bs *BaseStore[T]) ApplyMigration(migrationFunc func(map[int]*T) error) error { - bs.mu.Lock() - defer bs.mu.Unlock() - - // Create a copy for safe migration - itemsCopy := make(map[int]*T, len(bs.items)) - maps.Copy(itemsCopy, bs.items) - - if err := migrationFunc(itemsCopy); err != nil { - return fmt.Errorf("migration failed: %w", err) - } - - // If migration succeeded, update the store - bs.items = itemsCopy - bs.rebuildIndicesUnsafe() - return nil -} - -func (bs *BaseStore[T]) MigrateFromJSON(filename string, migrationCommands []string, migrator *Migrator) error { - // Read original data - data, err := os.ReadFile(filename) - if err != nil { - return fmt.Errorf("failed to read file: %w", err) - } - - // Apply each migration command - result := data - for _, command := range migrationCommands { - result, err = migrator.ApplyCommand(result, command) - if err != nil { - return fmt.Errorf("command '%s' failed: %w", command, err) - } - } - - // Create backup - backupPath := filename + ".backup" - if err := os.WriteFile(backupPath, data, 0644); err != nil { - return fmt.Errorf("failed to create backup: %w", err) - } - - // Write migrated data - tempFile := filename + ".migrated" - if err := os.WriteFile(tempFile, result, 0644); err != nil { - return fmt.Errorf("failed to write migrated data: %w", err) - } - - // Validate migrated data can be loaded - if err := bs.ValidateAfterMigration(tempFile); err != nil { - os.Remove(tempFile) - return err - } - - // Replace original file - if err := os.Rename(tempFile, filename); err != nil { - return fmt.Errorf("failed to replace original file: %w", err) - } - - // Reload the store with migrated data - return bs.LoadFromJSON(filename) -} - -func (bs *BaseStore[T]) BatchMigrate(filename string, commands []string) error { - if len(commands) == 0 { - return nil - } - - // Use the collection's migrator if available - // This would typically be called through Collection.MigrateStore - migrator := NewMigrator() - return bs.MigrateFromJSON(filename, commands, migrator) -} diff --git a/wal.go b/wal.go new file mode 100644 index 0000000..bce976d --- /dev/null +++ b/wal.go @@ -0,0 +1,168 @@ +package nigiri + +import ( + "bufio" + "encoding/json" + "fmt" + "os" + "path/filepath" + "sync" + "time" +) + +type WALEntry struct { + Timestamp time.Time `json:"timestamp"` + Store string `json:"store"` + Operation string `json:"operation"` + ID int `json:"id,omitempty"` + Data any `json:"data,omitempty"` + SequenceNum uint64 `json:"seq"` +} + +type WAL struct { + file *os.File + encoder *json.Encoder + mu sync.Mutex + sequenceNum uint64 + writeChan chan *WALEntry + flushTimer *time.Timer + baseDir string +} + +func newWAL(baseDir string) (*WAL, error) { + walPath := filepath.Join(baseDir, "wal.log") + file, err := os.OpenFile(walPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return nil, err + } + + wal := &WAL{ + file: file, + encoder: json.NewEncoder(file), + writeChan: make(chan *WALEntry, 1000), + baseDir: baseDir, + } + + go wal.writeLoop() + wal.flushTimer = time.AfterFunc(5*time.Second, wal.periodicFlush) + return wal, nil +} + +func (w *WAL) writeLoop() { + for entry := range w.writeChan { + w.mu.Lock() + w.sequenceNum++ + entry.SequenceNum = w.sequenceNum + w.encoder.Encode(entry) + w.mu.Unlock() + } +} + +func (w *WAL) logOperation(store, operation string, id int, data any) { + entry := &WALEntry{ + Timestamp: time.Now(), + Store: store, + Operation: operation, + ID: id, + Data: data, + } + + select { + case w.writeChan <- entry: + // Non-blocking async write (~1µs) + default: + // Channel full - sync write (~0.1-1ms) + w.mu.Lock() + w.sequenceNum++ + entry.SequenceNum = w.sequenceNum + w.encoder.Encode(entry) + w.mu.Unlock() + } +} + +func (w *WAL) flush() error { + w.mu.Lock() + defer w.mu.Unlock() + return w.file.Sync() +} + +func (w *WAL) periodicFlush() { + w.flush() + w.flushTimer.Reset(5 * time.Second) +} + +func (w *WAL) close() error { + close(w.writeChan) + w.flushTimer.Stop() + w.flush() + return w.file.Close() +} + +func (w *WAL) recover(collection *Collection) error { + walPath := filepath.Join(w.baseDir, "wal.log") + file, err := os.Open(walPath) + if os.IsNotExist(err) { + return nil + } + if err != nil { + return err + } + defer file.Close() + + scanner := bufio.NewScanner(file) + entriesApplied := 0 + + for scanner.Scan() { + var entry WALEntry + if err := json.Unmarshal(scanner.Bytes(), &entry); err != nil { + continue + } + + if err := w.applyEntry(collection, &entry); err == nil { + entriesApplied++ + } + } + + if entriesApplied > 0 { + fmt.Printf("Recovered %d WAL entries\n", entriesApplied) + } + return w.truncate() +} + +func (w *WAL) applyEntry(collection *Collection, entry *WALEntry) error { + collection.mu.Lock() + defer collection.mu.Unlock() + + store, exists := collection.stores[entry.Store] + if !exists { + return fmt.Errorf("store %s not found", entry.Store) + } + + switch entry.Operation { + case "remove": + store.(*BaseStore[any]).RemoveUnsafe(entry.ID) + case "clear": + store.Clear() + } + return nil +} + +func (w *WAL) truncate() error { + w.mu.Lock() + defer w.mu.Unlock() + + w.file.Close() + walPath := filepath.Join(w.baseDir, "wal.log") + + os.Remove(walPath) + + file, err := os.OpenFile(walPath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644) + if err != nil { + return err + } + + w.file = file + w.encoder = json.NewEncoder(file) + w.sequenceNum = 0 + return nil +}