use connection pooling in database wrapper
This commit is contained in:
parent
db2a95cd02
commit
58248ec339
@ -1,7 +1,9 @@
|
|||||||
package database
|
package database
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"runtime"
|
||||||
|
|
||||||
"zombiezen.com/go/sqlite"
|
"zombiezen.com/go/sqlite"
|
||||||
"zombiezen.com/go/sqlite/sqlitex"
|
"zombiezen.com/go/sqlite/sqlitex"
|
||||||
@ -9,61 +11,84 @@ import (
|
|||||||
|
|
||||||
const DefaultPath = "dk.db"
|
const DefaultPath = "dk.db"
|
||||||
|
|
||||||
// DB wraps a SQLite connection with simplified methods
|
// DB wraps a SQLite connection pool with simplified methods
|
||||||
type DB struct {
|
type DB struct {
|
||||||
conn *sqlite.Conn
|
pool *sqlitex.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open creates a new database connection
|
// Open creates a new database connection pool
|
||||||
func Open(path string) (*DB, error) {
|
func Open(path string) (*DB, error) {
|
||||||
if path == "" {
|
if path == "" {
|
||||||
path = DefaultPath
|
path = DefaultPath
|
||||||
}
|
}
|
||||||
|
|
||||||
conn, err := sqlite.OpenConn(path, sqlite.OpenCreate|sqlite.OpenReadWrite|sqlite.OpenWAL)
|
poolSize := max(runtime.GOMAXPROCS(0), 2)
|
||||||
|
|
||||||
|
pool, err := sqlitex.NewPool(path, sqlitex.PoolOptions{
|
||||||
|
PoolSize: poolSize,
|
||||||
|
Flags: sqlite.OpenCreate | sqlite.OpenReadWrite | sqlite.OpenWAL,
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to open database: %w", err)
|
return nil, fmt.Errorf("failed to open database pool: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set pragmas for performance
|
conn, err := pool.Take(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
pool.Close()
|
||||||
|
return nil, fmt.Errorf("failed to get connection from pool: %w", err)
|
||||||
|
}
|
||||||
|
defer pool.Put(conn)
|
||||||
|
|
||||||
if err := sqlitex.ExecuteTransient(conn, "PRAGMA journal_mode = WAL", nil); err != nil {
|
if err := sqlitex.ExecuteTransient(conn, "PRAGMA journal_mode = WAL", nil); err != nil {
|
||||||
conn.Close()
|
pool.Close()
|
||||||
return nil, fmt.Errorf("failed to set WAL mode: %w", err)
|
return nil, fmt.Errorf("failed to set WAL mode: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := sqlitex.ExecuteTransient(conn, "PRAGMA synchronous = NORMAL", nil); err != nil {
|
if err := sqlitex.ExecuteTransient(conn, "PRAGMA synchronous = NORMAL", nil); err != nil {
|
||||||
conn.Close()
|
pool.Close()
|
||||||
return nil, fmt.Errorf("failed to set synchronous mode: %w", err)
|
return nil, fmt.Errorf("failed to set synchronous mode: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &DB{conn: conn}, nil
|
return &DB{pool: pool}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes the database connection
|
// Close closes the database connection pool
|
||||||
func (db *DB) Close() error {
|
func (db *DB) Close() error {
|
||||||
return db.conn.Close()
|
return db.pool.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exec executes a SQL statement without returning results
|
// Exec executes a SQL statement without returning results
|
||||||
func (db *DB) Exec(query string, args ...any) error {
|
func (db *DB) Exec(query string, args ...any) error {
|
||||||
if len(args) == 0 {
|
conn, err := db.pool.Take(context.Background())
|
||||||
return sqlitex.ExecuteTransient(db.conn, query, nil)
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get connection from pool: %w", err)
|
||||||
}
|
}
|
||||||
|
defer db.pool.Put(conn)
|
||||||
return sqlitex.ExecuteTransient(db.conn, query, &sqlitex.ExecOptions{
|
|
||||||
|
if len(args) == 0 {
|
||||||
|
return sqlitex.ExecuteTransient(conn, query, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
return sqlitex.ExecuteTransient(conn, query, &sqlitex.ExecOptions{
|
||||||
Args: args,
|
Args: args,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Query executes a SQL query and calls fn for each row
|
// Query executes a SQL query and calls fn for each row
|
||||||
func (db *DB) Query(query string, fn func(*sqlite.Stmt) error, args ...any) error {
|
func (db *DB) Query(query string, fn func(*sqlite.Stmt) error, args ...any) error {
|
||||||
|
conn, err := db.pool.Take(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to get connection from pool: %w", err)
|
||||||
|
}
|
||||||
|
defer db.pool.Put(conn)
|
||||||
|
|
||||||
if len(args) == 0 {
|
if len(args) == 0 {
|
||||||
return sqlitex.ExecuteTransient(db.conn, query, &sqlitex.ExecOptions{
|
return sqlitex.ExecuteTransient(conn, query, &sqlitex.ExecOptions{
|
||||||
ResultFunc: fn,
|
ResultFunc: fn,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return sqlitex.ExecuteTransient(db.conn, query, &sqlitex.ExecOptions{
|
return sqlitex.ExecuteTransient(conn, query, &sqlitex.ExecOptions{
|
||||||
Args: args,
|
Args: args,
|
||||||
ResultFunc: fn,
|
ResultFunc: fn,
|
||||||
})
|
})
|
||||||
@ -71,11 +96,17 @@ func (db *DB) Query(query string, fn func(*sqlite.Stmt) error, args ...any) erro
|
|||||||
|
|
||||||
// Begin starts a new transaction
|
// Begin starts a new transaction
|
||||||
func (db *DB) Begin() (*Tx, error) {
|
func (db *DB) Begin() (*Tx, error) {
|
||||||
if err := sqlitex.ExecuteTransient(db.conn, "BEGIN", nil); err != nil {
|
conn, err := db.pool.Take(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to get connection from pool: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := sqlitex.ExecuteTransient(conn, "BEGIN", nil); err != nil {
|
||||||
|
db.pool.Put(conn)
|
||||||
return nil, fmt.Errorf("failed to begin transaction: %w", err)
|
return nil, fmt.Errorf("failed to begin transaction: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &Tx{db: db}, nil
|
return &Tx{conn: conn, pool: db.pool}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Transaction runs a function within a transaction
|
// Transaction runs a function within a transaction
|
||||||
@ -84,36 +115,54 @@ func (db *DB) Transaction(fn func(*Tx) error) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := fn(tx); err != nil {
|
if err := fn(tx); err != nil {
|
||||||
tx.Rollback()
|
tx.Rollback()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return tx.Commit()
|
return tx.Commit()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Tx represents a database transaction
|
// Tx represents a database transaction
|
||||||
type Tx struct {
|
type Tx struct {
|
||||||
db *DB
|
conn *sqlite.Conn
|
||||||
|
pool *sqlitex.Pool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Exec executes a SQL statement within the transaction
|
// Exec executes a SQL statement within the transaction
|
||||||
func (tx *Tx) Exec(query string, args ...any) error {
|
func (tx *Tx) Exec(query string, args ...any) error {
|
||||||
return tx.db.Exec(query, args...)
|
if len(args) == 0 {
|
||||||
|
return sqlitex.ExecuteTransient(tx.conn, query, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
return sqlitex.ExecuteTransient(tx.conn, query, &sqlitex.ExecOptions{
|
||||||
|
Args: args,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Query executes a SQL query within the transaction
|
// Query executes a SQL query within the transaction
|
||||||
func (tx *Tx) Query(query string, fn func(*sqlite.Stmt) error, args ...any) error {
|
func (tx *Tx) Query(query string, fn func(*sqlite.Stmt) error, args ...any) error {
|
||||||
return tx.db.Query(query, fn, args...)
|
if len(args) == 0 {
|
||||||
|
return sqlitex.ExecuteTransient(tx.conn, query, &sqlitex.ExecOptions{
|
||||||
|
ResultFunc: fn,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return sqlitex.ExecuteTransient(tx.conn, query, &sqlitex.ExecOptions{
|
||||||
|
Args: args,
|
||||||
|
ResultFunc: fn,
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit commits the transaction
|
// Commit commits the transaction
|
||||||
func (tx *Tx) Commit() error {
|
func (tx *Tx) Commit() error {
|
||||||
return sqlitex.ExecuteTransient(tx.db.conn, "COMMIT", nil)
|
defer tx.pool.Put(tx.conn)
|
||||||
|
return sqlitex.ExecuteTransient(tx.conn, "COMMIT", nil)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rollback rolls back the transaction
|
// Rollback rolls back the transaction
|
||||||
func (tx *Tx) Rollback() error {
|
func (tx *Tx) Rollback() error {
|
||||||
return sqlitex.ExecuteTransient(tx.db.conn, "ROLLBACK", nil)
|
defer tx.pool.Put(tx.conn)
|
||||||
}
|
return sqlitex.ExecuteTransient(tx.conn, "ROLLBACK", nil)
|
||||||
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user